4#ifndef XGBOOST_COMMON_THREADING_UTILS_H_
5#define XGBOOST_COMMON_THREADING_UTILS_H_
23inline int32_t omp_get_thread_limit() __GOMP_NOTHROW {
return 1; }
28#if defined(_OPENMP) && defined(_MSC_VER)
32inline int32_t omp_get_thread_limit() {
return std::numeric_limits<int32_t>::max(); }
41 Range1d(
size_t begin,
size_t end): begin_(begin), end_(end) {
45 size_t begin()
const {
90 template <
typename Getter>
91 BlockedSpace2d(std::size_t dim1, Getter&& getter_size_dim2, std::size_t grain_size) {
92 static_assert(std::is_integral_v<std::invoke_result_t<Getter, std::size_t>>);
93 for (std::size_t i = 0; i < dim1; ++i) {
94 std::size_t size = getter_size_dim2(i);
96 std::size_t n_blocks = size / grain_size + !!(size % grain_size);
97 for (std::size_t iblock = 0; iblock < n_blocks; ++iblock) {
98 std::size_t begin = iblock * grain_size;
99 std::size_t end = std::min(begin + grain_size, size);
100 AddBlock(i, begin, end);
106 [[nodiscard]] std::size_t Size()
const {
107 return ranges_.size();
111 [[nodiscard]] std::size_t GetFirstDimension(std::size_t i)
const {
112 CHECK_LT(i, first_dimension_.size());
113 return first_dimension_[i];
117 [[nodiscard]]
Range1d GetRange(std::size_t i)
const {
118 CHECK_LT(i, ranges_.size());
130 void AddBlock(std::size_t first_dim, std::size_t begin, std::size_t end) {
131 first_dimension_.push_back(first_dim);
132 ranges_.emplace_back(begin, end);
135 std::vector<Range1d> ranges_;
136 std::vector<std::size_t> first_dimension_;
141template <
typename Func>
142void ParallelFor2d(
const BlockedSpace2d& space,
int n_threads, Func&& func) {
143 static_assert(std::is_void_v<std::invoke_result_t<Func, std::size_t, Range1d>>);
144 std::size_t n_blocks_in_space = space.Size();
145 CHECK_GE(n_threads, 1);
148#pragma omp parallel num_threads(n_threads)
151 std::size_t tid = omp_get_thread_num();
152 std::size_t chunck_size = n_blocks_in_space / n_threads + !!(n_blocks_in_space % n_threads);
154 std::size_t begin = chunck_size * tid;
155 std::size_t end = std::min(begin + chunck_size, n_blocks_in_space);
156 for (
auto i = begin; i < end; i++) {
157 func(space.GetFirstDimension(i), space.GetRange(i));
177 Sched static Dyn(
size_t n = 0) {
return Sched{kDynamic, n}; }
178 Sched static Static(
size_t n = 0) {
return Sched{kStatic, n}; }
179 Sched static Guided() {
return Sched{kGuided}; }
182template <
typename Index,
typename Func>
183void ParallelFor(
Index size, int32_t n_threads,
Sched sched, Func fn) {
186 using OmpInd = std::conditional_t<std::is_signed<Index>::value,
Index,
omp_ulong>;
188 using OmpInd =
Index;
190 OmpInd length =
static_cast<OmpInd
>(size);
191 CHECK_GE(n_threads, 1);
194 switch (sched.sched) {
196#pragma omp parallel for num_threads(n_threads)
197 for (OmpInd i = 0; i < length; ++i) {
202 case Sched::kDynamic: {
203 if (sched.chunk == 0) {
204#pragma omp parallel for num_threads(n_threads) schedule(dynamic)
205 for (OmpInd i = 0; i < length; ++i) {
209#pragma omp parallel for num_threads(n_threads) schedule(dynamic, sched.chunk)
210 for (OmpInd i = 0; i < length; ++i) {
216 case Sched::kStatic: {
217 if (sched.chunk == 0) {
218#pragma omp parallel for num_threads(n_threads) schedule(static)
219 for (OmpInd i = 0; i < length; ++i) {
223#pragma omp parallel for num_threads(n_threads) schedule(static, sched.chunk)
224 for (OmpInd i = 0; i < length; ++i) {
230 case Sched::kGuided: {
231#pragma omp parallel for num_threads(n_threads) schedule(guided)
232 for (OmpInd i = 0; i < length; ++i) {
241template <
typename Index,
typename Func>
242void ParallelFor(Index size, int32_t n_threads, Func fn) {
243 ParallelFor(size, n_threads, Sched::Static(), fn);
246inline std::int32_t OmpGetThreadLimit() {
247 std::int32_t limit = omp_get_thread_limit();
248 CHECK_GE(limit, 1) <<
"Invalid thread limit for OpenMP.";
269template <typename T,
std::
size_t MaxStackSize>
272 explicit MemStackAllocator(
size_t required_size) : required_size_(required_size) {
273 if (MaxStackSize >= required_size_) {
276 ptr_ =
reinterpret_cast<T*
>(std::malloc(required_size_ *
sizeof(T)));
279 throw std::bad_alloc{};
283 std::fill_n(ptr_, required_size_, init);
287 if (required_size_ > MaxStackSize) {
291 T& operator[](
size_t i) {
return ptr_[i]; }
292 T
const& operator[](
size_t i)
const {
return ptr_[i]; }
294 auto data()
const {
return ptr_; }
295 auto data() {
return ptr_; }
296 std::size_t size()
const {
return required_size_; }
298 auto cbegin()
const {
return data(); }
299 auto cend()
const {
return data() + size(); }
303 size_t required_size_;
304 T stack_mem_[MaxStackSize];
OMP Exception class catches, saves and rethrows exception from OMP blocks.
Definition common.h:53
void Rethrow()
should be called from the main thread to rethrow the exception
Definition common.h:84
void Run(Function f, Parameters... params)
Parallel OMP blocks should be placed within Run to save exception.
Definition common.h:65
Definition threading_utils.h:74
Optionally compressed gradient index.
Definition hist_util.h:210
A C-style array with in-stack allocation. As long as the array is smaller than MaxStackSize,...
Definition threading_utils.h:270
Definition threading_utils.h:39
defines console logging options for xgboost. Use to enforce unified print behavior.
Copyright 2017-2023, XGBoost Contributors.
Definition span.h:77
std::int32_t OmpGetNumThreads(std::int32_t n_threads)
Get the number of available threads based on n_threads specified by users.
Definition threading_utils.cc:93
std::int32_t GetCfsCPUCount() noexcept
Get thread limit from CFS.
Definition threading_utils.cc:75
std::int32_t constexpr DefaultMaxThreads()
Constant that can be used for initializing static thread local memory.
Definition threading_utils.h:310
dmlc::omp_ulong omp_ulong
define unsigned long for openmp loop
Definition base.h:322
header to handle OpenMP compatibility issues
OpenMP schedule.
Definition threading_utils.h:167
defines some common utility function.