Medial Code Documentation
Loading...
Searching...
No Matches
threading_utils.h
1
4#ifndef XGBOOST_COMMON_THREADING_UTILS_H_
5#define XGBOOST_COMMON_THREADING_UTILS_H_
6
7#include <dmlc/common.h>
8#include <dmlc/omp.h>
9
10#include <algorithm> // for min
11#include <cstddef> // for size_t
12#include <cstdint> // for int32_t
13#include <cstdlib> // for malloc, free
14#include <functional> // for function
15#include <new> // for bad_alloc
16#include <type_traits> // for is_signed, conditional_t, is_integral_v, invoke_result_t
17#include <vector> // for vector
18
19#include "xgboost/logging.h"
20
21#if !defined(_OPENMP)
22extern "C" {
23inline int32_t omp_get_thread_limit() __GOMP_NOTHROW { return 1; } // NOLINT
24}
25#endif // !defined(_OPENMP)
26
27// MSVC doesn't implement the thread limit.
28#if defined(_OPENMP) && defined(_MSC_VER)
29#include <limits>
30
31extern "C" {
32inline int32_t omp_get_thread_limit() { return std::numeric_limits<int32_t>::max(); } // NOLINT
33}
34#endif // defined(_MSC_VER)
35
36namespace xgboost::common {
37// Represent simple range of indexes [begin, end)
38// Inspired by tbb::blocked_range
39class Range1d {
40 public:
41 Range1d(size_t begin, size_t end): begin_(begin), end_(end) {
42 CHECK_LT(begin, end);
43 }
44
45 size_t begin() const { // NOLINT
46 return begin_;
47 }
48
49 size_t end() const { // NOLINT
50 return end_;
51 }
52
53 private:
54 size_t begin_;
55 size_t end_;
56};
57
58
59// Split 2d space to balanced blocks
60// Implementation of the class is inspired by tbb::blocked_range2d
61// However, TBB provides only (n x m) 2d range (matrix) separated by blocks. Example:
62// [ 1,2,3 ]
63// [ 4,5,6 ]
64// [ 7,8,9 ]
65// But the class is able to work with different sizes in each 'row'. Example:
66// [ 1,2 ]
67// [ 3,4,5,6 ]
68// [ 7,8,9]
69// If grain_size is 2: It produces following blocks:
70// [1,2], [3,4], [5,6], [7,8], [9]
71// The class helps to process data in several tree nodes (non-balanced usually) in parallel
72// Using nested parallelism (by nodes and by data in each node)
73// it helps to improve CPU resources utilization
75 public:
76 // Example of space:
77 // [ 1,2 ]
78 // [ 3,4,5,6 ]
79 // [ 7,8,9]
80 // BlockedSpace2d will create following blocks (tasks) if grain_size=2:
81 // 1-block: first_dimension = 0, range of indexes in a 'row' = [0,2) (includes [1,2] values)
82 // 2-block: first_dimension = 1, range of indexes in a 'row' = [0,2) (includes [3,4] values)
83 // 3-block: first_dimension = 1, range of indexes in a 'row' = [2,4) (includes [5,6] values)
84 // 4-block: first_dimension = 2, range of indexes in a 'row' = [0,2) (includes [7,8] values)
85 // 5-block: first_dimension = 2, range of indexes in a 'row' = [2,3) (includes [9] values)
86 // Arguments:
87 // dim1 - size of the first dimension in the space
88 // getter_size_dim2 - functor to get the second dimensions for each 'row' by row-index
89 // grain_size - max size of produced blocks
90 template <typename Getter>
91 BlockedSpace2d(std::size_t dim1, Getter&& getter_size_dim2, std::size_t grain_size) {
92 static_assert(std::is_integral_v<std::invoke_result_t<Getter, std::size_t>>);
93 for (std::size_t i = 0; i < dim1; ++i) {
94 std::size_t size = getter_size_dim2(i);
95 // Each row (second dim) is divided into n_blocks
96 std::size_t n_blocks = size / grain_size + !!(size % grain_size);
97 for (std::size_t iblock = 0; iblock < n_blocks; ++iblock) {
98 std::size_t begin = iblock * grain_size;
99 std::size_t end = std::min(begin + grain_size, size);
100 AddBlock(i, begin, end);
101 }
102 }
103 }
104
105 // Amount of blocks(tasks) in a space
106 [[nodiscard]] std::size_t Size() const {
107 return ranges_.size();
108 }
109
110 // get index of the first dimension of i-th block(task)
111 [[nodiscard]] std::size_t GetFirstDimension(std::size_t i) const {
112 CHECK_LT(i, first_dimension_.size());
113 return first_dimension_[i];
114 }
115
116 // get a range of indexes for the second dimension of i-th block(task)
117 [[nodiscard]] Range1d GetRange(std::size_t i) const {
118 CHECK_LT(i, ranges_.size());
119 return ranges_[i];
120 }
121
122 private:
130 void AddBlock(std::size_t first_dim, std::size_t begin, std::size_t end) {
131 first_dimension_.push_back(first_dim);
132 ranges_.emplace_back(begin, end);
133 }
134
135 std::vector<Range1d> ranges_;
136 std::vector<std::size_t> first_dimension_;
137};
138
139
140// Wrapper to implement nested parallelism with simple omp parallel for
141template <typename Func>
142void ParallelFor2d(const BlockedSpace2d& space, int n_threads, Func&& func) {
143 static_assert(std::is_void_v<std::invoke_result_t<Func, std::size_t, Range1d>>);
144 std::size_t n_blocks_in_space = space.Size();
145 CHECK_GE(n_threads, 1);
146
148#pragma omp parallel num_threads(n_threads)
149 {
150 exc.Run([&]() {
151 std::size_t tid = omp_get_thread_num();
152 std::size_t chunck_size = n_blocks_in_space / n_threads + !!(n_blocks_in_space % n_threads);
153
154 std::size_t begin = chunck_size * tid;
155 std::size_t end = std::min(begin + chunck_size, n_blocks_in_space);
156 for (auto i = begin; i < end; i++) {
157 func(space.GetFirstDimension(i), space.GetRange(i));
158 }
159 });
160 }
161 exc.Rethrow();
162}
163
167struct Sched {
168 enum {
169 kAuto,
170 kDynamic,
171 kStatic,
172 kGuided,
173 } sched;
174 size_t chunk{0};
175
176 Sched static Auto() { return Sched{kAuto}; }
177 Sched static Dyn(size_t n = 0) { return Sched{kDynamic, n}; }
178 Sched static Static(size_t n = 0) { return Sched{kStatic, n}; }
179 Sched static Guided() { return Sched{kGuided}; }
180};
181
182template <typename Index, typename Func>
183void ParallelFor(Index size, int32_t n_threads, Sched sched, Func fn) {
184#if defined(_MSC_VER)
185 // msvc doesn't support unsigned integer as openmp index.
186 using OmpInd = std::conditional_t<std::is_signed<Index>::value, Index, omp_ulong>;
187#else
188 using OmpInd = Index;
189#endif
190 OmpInd length = static_cast<OmpInd>(size);
191 CHECK_GE(n_threads, 1);
192
194 switch (sched.sched) {
195 case Sched::kAuto: {
196#pragma omp parallel for num_threads(n_threads)
197 for (OmpInd i = 0; i < length; ++i) {
198 exc.Run(fn, i);
199 }
200 break;
201 }
202 case Sched::kDynamic: {
203 if (sched.chunk == 0) {
204#pragma omp parallel for num_threads(n_threads) schedule(dynamic)
205 for (OmpInd i = 0; i < length; ++i) {
206 exc.Run(fn, i);
207 }
208 } else {
209#pragma omp parallel for num_threads(n_threads) schedule(dynamic, sched.chunk)
210 for (OmpInd i = 0; i < length; ++i) {
211 exc.Run(fn, i);
212 }
213 }
214 break;
215 }
216 case Sched::kStatic: {
217 if (sched.chunk == 0) {
218#pragma omp parallel for num_threads(n_threads) schedule(static)
219 for (OmpInd i = 0; i < length; ++i) {
220 exc.Run(fn, i);
221 }
222 } else {
223#pragma omp parallel for num_threads(n_threads) schedule(static, sched.chunk)
224 for (OmpInd i = 0; i < length; ++i) {
225 exc.Run(fn, i);
226 }
227 }
228 break;
229 }
230 case Sched::kGuided: {
231#pragma omp parallel for num_threads(n_threads) schedule(guided)
232 for (OmpInd i = 0; i < length; ++i) {
233 exc.Run(fn, i);
234 }
235 break;
236 }
237 }
238 exc.Rethrow();
239}
240
241template <typename Index, typename Func>
242void ParallelFor(Index size, int32_t n_threads, Func fn) {
243 ParallelFor(size, n_threads, Sched::Static(), fn);
244}
245
246inline std::int32_t OmpGetThreadLimit() {
247 std::int32_t limit = omp_get_thread_limit();
248 CHECK_GE(limit, 1) << "Invalid thread limit for OpenMP.";
249 return limit;
250}
251
257std::int32_t GetCfsCPUCount() noexcept;
258
262std::int32_t OmpGetNumThreads(std::int32_t n_threads);
263
269template <typename T, std::size_t MaxStackSize>
271 public:
272 explicit MemStackAllocator(size_t required_size) : required_size_(required_size) {
273 if (MaxStackSize >= required_size_) {
274 ptr_ = stack_mem_;
275 } else {
276 ptr_ = reinterpret_cast<T*>(std::malloc(required_size_ * sizeof(T)));
277 }
278 if (!ptr_) {
279 throw std::bad_alloc{};
280 }
281 }
282 MemStackAllocator(size_t required_size, T init) : MemStackAllocator{required_size} {
283 std::fill_n(ptr_, required_size_, init);
284 }
285
287 if (required_size_ > MaxStackSize) {
288 std::free(ptr_);
289 }
290 }
291 T& operator[](size_t i) { return ptr_[i]; }
292 T const& operator[](size_t i) const { return ptr_[i]; }
293
294 auto data() const { return ptr_; } // NOLINT
295 auto data() { return ptr_; } // NOLINT
296 std::size_t size() const { return required_size_; } // NOLINT
297
298 auto cbegin() const { return data(); } // NOLINT
299 auto cend() const { return data() + size(); } // NOLINT
300
301 private:
302 T* ptr_ = nullptr;
303 size_t required_size_;
304 T stack_mem_[MaxStackSize];
305};
306
310std::int32_t constexpr DefaultMaxThreads() { return 128; }
311} // namespace xgboost::common
312
313#endif // XGBOOST_COMMON_THREADING_UTILS_H_
OMP Exception class catches, saves and rethrows exception from OMP blocks.
Definition common.h:53
void Rethrow()
should be called from the main thread to rethrow the exception
Definition common.h:84
void Run(Function f, Parameters... params)
Parallel OMP blocks should be placed within Run to save exception.
Definition common.h:65
Definition threading_utils.h:74
Optionally compressed gradient index.
Definition hist_util.h:210
A C-style array with in-stack allocation. As long as the array is smaller than MaxStackSize,...
Definition threading_utils.h:270
Definition threading_utils.h:39
defines console logging options for xgboost. Use to enforce unified print behavior.
Definition StdDeque.h:58
Copyright 2017-2023, XGBoost Contributors.
Definition span.h:77
std::int32_t OmpGetNumThreads(std::int32_t n_threads)
Get the number of available threads based on n_threads specified by users.
Definition threading_utils.cc:93
std::int32_t GetCfsCPUCount() noexcept
Get thread limit from CFS.
Definition threading_utils.cc:75
std::int32_t constexpr DefaultMaxThreads()
Constant that can be used for initializing static thread local memory.
Definition threading_utils.h:310
dmlc::omp_ulong omp_ulong
define unsigned long for openmp loop
Definition base.h:322
header to handle OpenMP compatibility issues
OpenMP schedule.
Definition threading_utils.h:167
defines some common utility function.