7#ifndef DMLC_CONCURRENCY_H_
8#define DMLC_CONCURRENCY_H_
17#include <condition_variable>
33#pragma clang diagnostic push
34#pragma clang diagnostic ignored "-Wbraced-scalar-init"
36 Spinlock() : lock_(ATOMIC_FLAG_INIT) {
39#pragma clang diagnostic pop
42 ~Spinlock() =
default;
46 inline void lock() noexcept(true);
50 inline
void unlock() noexcept(true);
53 std::atomic_flag lock_;
61enum class ConcurrentQueueType {
72 ConcurrentQueueType type = ConcurrentQueueType::kFIFO>
73class ConcurrentBlockingQueue {
75 ConcurrentBlockingQueue();
76 ~ConcurrentBlockingQueue() =
default;
88 void Push(E&& e,
int priority = 0);
101 template <
typename E>
102 void PushFront(E&& e,
int priority = 0);
117 void SignalForKill();
128 inline bool operator<(
const Entry &b)
const {
129 return priority < b.priority;
134 std::condition_variable cv_;
135 std::atomic<bool> exit_now_;
138 std::vector<Entry> priority_queue_;
140 std::deque<T> fifo_queue_;
147inline void Spinlock::lock() noexcept(true) {
148 while (lock_.test_and_set(std::memory_order_acquire)) {
152inline void Spinlock::unlock() noexcept(true) {
153 lock_.clear(std::memory_order_release);
156template <
typename T, ConcurrentQueueType type>
157ConcurrentBlockingQueue<T, type>::ConcurrentBlockingQueue()
158 : exit_now_{false}, nwait_consumer_{0} {}
160template <
typename T, ConcurrentQueueType type>
162void ConcurrentBlockingQueue<T, type>::Push(E&& e,
int priority) {
163 static_assert(std::is_same<
typename std::remove_cv<
164 typename std::remove_reference<E>::type>::type,
166 "Types must match.");
169 std::lock_guard<std::mutex> lock{mutex_};
170 if (type == ConcurrentQueueType::kFIFO) {
171 fifo_queue_.emplace_back(std::forward<E>(e));
172 notify = nwait_consumer_ != 0;
175 entry.data = std::move(e);
176 entry.priority = priority;
177 priority_queue_.push_back(std::move(entry));
178 std::push_heap(priority_queue_.begin(), priority_queue_.end());
179 notify = nwait_consumer_ != 0;
182 if (notify) cv_.notify_one();
185template <
typename T, ConcurrentQueueType type>
187void ConcurrentBlockingQueue<T, type>::PushFront(E&& e,
int priority) {
188 static_assert(std::is_same<
typename std::remove_cv<
189 typename std::remove_reference<E>::type>::type,
191 "Types must match.");
194 std::lock_guard<std::mutex> lock{mutex_};
195 if (type == ConcurrentQueueType::kFIFO) {
196 fifo_queue_.emplace_front(std::forward<E>(e));
197 notify = nwait_consumer_ != 0;
200 entry.data = std::move(e);
201 entry.priority = priority;
202 priority_queue_.push_back(std::move(entry));
203 std::push_heap(priority_queue_.begin(), priority_queue_.end());
204 notify = nwait_consumer_ != 0;
207 if (notify) cv_.notify_one();
210template <
typename T, ConcurrentQueueType type>
211bool ConcurrentBlockingQueue<T, type>::Pop(T* rv) {
212 std::unique_lock<std::mutex> lock{mutex_};
213 if (type == ConcurrentQueueType::kFIFO) {
215 cv_.wait(lock, [
this] {
216 return !fifo_queue_.empty() || exit_now_.load();
219 if (!exit_now_.load()) {
220 *rv = std::move(fifo_queue_.front());
221 fifo_queue_.pop_front();
228 cv_.wait(lock, [
this] {
229 return !priority_queue_.empty() || exit_now_.load();
232 if (!exit_now_.load()) {
233 std::pop_heap(priority_queue_.begin(), priority_queue_.end());
234 *rv = std::move(priority_queue_.back().data);
235 priority_queue_.pop_back();
243template <
typename T, ConcurrentQueueType type>
244void ConcurrentBlockingQueue<T, type>::SignalForKill() {
246 std::lock_guard<std::mutex> lock{mutex_};
247 exit_now_.store(
true);
252template <
typename T, ConcurrentQueueType type>
253size_t ConcurrentBlockingQueue<T, type>::Size() {
254 std::lock_guard<std::mutex> lock{mutex_};
255 if (type == ConcurrentQueueType::kFIFO) {
256 return fifo_queue_.size();
258 return priority_queue_.size();
defines configuration macros
#define DISALLOW_COPY_AND_ASSIGN(T)
Disable copy constructor and assignment operator.
Definition base.h:202
bool operator<(const value_t lhs, const value_t rhs) noexcept
comparison operator for JSON types
Definition json.hpp:2889
namespace for dmlc
Definition array_view.h:12