Medial Code Documentation
Loading...
Searching...
No Matches
blockingconcurrentqueue.h
1
2// Provides an efficient blocking version of moodycamel::ConcurrentQueue.
3// ©2015-2016 Cameron Desrochers. Distributed under the terms of the simplified
4// BSD license, available at the top of concurrentqueue.h.
5// Uses Jeff Preshing's semaphore implementation (under the terms of its
6// separate zlib license, embedded below).
7
8#ifndef DMLC_BLOCKINGCONCURRENTQUEUE_H_
9#define DMLC_BLOCKINGCONCURRENTQUEUE_H_
10
11#pragma once
12
13#include "concurrentqueue.h"
14#include <type_traits>
15#include <cerrno>
16#include <memory>
17#include <chrono>
18#include <ctime>
19
20#if defined(_WIN32)
21// Avoid including windows.h in a header; we only need a handful of
22// items, so we'll redeclare them here (this is relatively safe since
23// the API generally has to remain stable between Windows versions).
24// I know this is an ugly hack but it still beats polluting the global
25// namespace with thousands of generic names or adding a .cpp for nothing.
26extern "C" {
27 struct _SECURITY_ATTRIBUTES;
28 __declspec(dllimport) void* __stdcall CreateSemaphoreW(_SECURITY_ATTRIBUTES* lpSemaphoreAttributes, long lInitialCount, long lMaximumCount, const wchar_t* lpName);
29 __declspec(dllimport) int __stdcall CloseHandle(void* hObject);
30 __declspec(dllimport) unsigned long __stdcall WaitForSingleObject(void* hHandle, unsigned long dwMilliseconds);
31 __declspec(dllimport) int __stdcall ReleaseSemaphore(void* hSemaphore, long lReleaseCount, long* lpPreviousCount);
32}
33#elif defined(__MACH__)
34#include <mach/mach.h>
35#elif defined(__unix__)
36#include <semaphore.h>
37#endif
38
39namespace dmlc {
40
41namespace moodycamel
42{
43namespace details
44{
45 // Code in the mpmc_sema namespace below is an adaptation of Jeff Preshing's
46 // portable + lightweight semaphore implementations, originally from
47 // https://github.com/preshing/cpp11-on-multicore/blob/master/common/sema.h
48 // LICENSE:
49 // Copyright (c) 2015 Jeff Preshing
50 //
51 // This software is provided 'as-is', without any express or implied
52 // warranty. In no event will the authors be held liable for any damages
53 // arising from the use of this software.
54 //
55 // Permission is granted to anyone to use this software for any purpose,
56 // including commercial applications, and to alter it and redistribute it
57 // freely, subject to the following restrictions:
58 //
59 // 1. The origin of this software must not be misrepresented; you must not
60 // claim that you wrote the original software. If you use this software
61 // in a product, an acknowledgement in the product documentation would be
62 // appreciated but is not required.
63 // 2. Altered source versions must be plainly marked as such, and must not be
64 // misrepresented as being the original software.
65 // 3. This notice may not be removed or altered from any source distribution.
66 namespace mpmc_sema
67 {
68#if defined(_WIN32)
69 class Semaphore
70 {
71 private:
72 void* m_hSema;
73
74 Semaphore(const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION;
75 Semaphore& operator=(const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION;
76
77 public:
78 Semaphore(int initialCount = 0)
79 {
80 assert(initialCount >= 0);
81 const long maxLong = 0x7fffffff;
82 m_hSema = CreateSemaphoreW(nullptr, initialCount, maxLong, nullptr);
83 }
84
85 ~Semaphore()
86 {
87 CloseHandle(m_hSema);
88 }
89
90 void wait()
91 {
92 const unsigned long infinite = 0xffffffff;
93 WaitForSingleObject(m_hSema, infinite);
94 }
95
96 bool try_wait()
97 {
98 const unsigned long RC_WAIT_TIMEOUT = 0x00000102;
99 return WaitForSingleObject(m_hSema, 0) != RC_WAIT_TIMEOUT;
100 }
101
102 bool timed_wait(std::uint64_t usecs)
103 {
104 const unsigned long RC_WAIT_TIMEOUT = 0x00000102;
105 return WaitForSingleObject(m_hSema, (unsigned long)(usecs / 1000)) != RC_WAIT_TIMEOUT;
106 }
107
108 void signal(int count = 1)
109 {
110 ReleaseSemaphore(m_hSema, count, nullptr);
111 }
112 };
113#elif defined(__MACH__)
114 //---------------------------------------------------------
115 // Semaphore (Apple iOS and OSX)
116 // Can't use POSIX semaphores due to http://lists.apple.com/archives/darwin-kernel/2009/Apr/msg00010.html
117 //---------------------------------------------------------
118 class Semaphore
119 {
120 private:
121 semaphore_t m_sema;
122
123 Semaphore(const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION;
124 Semaphore& operator=(const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION;
125
126 public:
127 Semaphore(int initialCount = 0)
128 {
129 assert(initialCount >= 0);
130 semaphore_create(mach_task_self(), &m_sema, SYNC_POLICY_FIFO, initialCount);
131 }
132
133 ~Semaphore()
134 {
135 semaphore_destroy(mach_task_self(), m_sema);
136 }
137
138 void wait()
139 {
140 semaphore_wait(m_sema);
141 }
142
143 bool try_wait()
144 {
145 return timed_wait(0);
146 }
147
148 bool timed_wait(std::uint64_t timeout_usecs)
149 {
150 mach_timespec_t ts;
151 ts.tv_sec = static_cast<unsigned int>(timeout_usecs / 1000000);
152 ts.tv_nsec = (timeout_usecs % 1000000) * 1000;
153
154 // added in OSX 10.10: https://developer.apple.com/library/prerelease/mac/documentation/General/Reference/APIDiffsMacOSX10_10SeedDiff/modules/Darwin.html
155 kern_return_t rc = semaphore_timedwait(m_sema, ts);
156
157 return rc != KERN_OPERATION_TIMED_OUT;
158 }
159
160 void signal()
161 {
162 semaphore_signal(m_sema);
163 }
164
165 void signal(int count)
166 {
167 while (count-- > 0)
168 {
169 semaphore_signal(m_sema);
170 }
171 }
172 };
173#elif defined(__unix__)
174 //---------------------------------------------------------
175 // Semaphore (POSIX, Linux)
176 //---------------------------------------------------------
177 class Semaphore
178 {
179 private:
180 sem_t m_sema;
181
182 Semaphore(const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION;
183 Semaphore& operator=(const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION;
184
185 public:
186 Semaphore(int initialCount = 0)
187 {
188 assert(initialCount >= 0);
189 sem_init(&m_sema, 0, initialCount);
190 }
191
192 ~Semaphore()
193 {
194 sem_destroy(&m_sema);
195 }
196
197 void wait()
198 {
199 // http://stackoverflow.com/questions/2013181/gdb-causes-sem-wait-to-fail-with-eintr-error
200 int rc;
201 do {
202 rc = sem_wait(&m_sema);
203 } while (rc == -1 && errno == EINTR);
204 }
205
206 bool try_wait()
207 {
208 int rc;
209 do {
210 rc = sem_trywait(&m_sema);
211 } while (rc == -1 && errno == EINTR);
212 return !(rc == -1 && errno == EAGAIN);
213 }
214
215 bool timed_wait(std::uint64_t usecs)
216 {
217 struct timespec ts;
218 const int usecs_in_1_sec = 1000000;
219 const int nsecs_in_1_sec = 1000000000;
220 clock_gettime(CLOCK_REALTIME, &ts);
221 ts.tv_sec += usecs / usecs_in_1_sec;
222 ts.tv_nsec += (usecs % usecs_in_1_sec) * 1000;
223 // sem_timedwait bombs if you have more than 1e9 in tv_nsec
224 // so we have to clean things up before passing it in
225 if (ts.tv_nsec >= nsecs_in_1_sec) {
226 ts.tv_nsec -= nsecs_in_1_sec;
227 ++ts.tv_sec;
228 }
229
230 int rc;
231 do {
232 rc = sem_timedwait(&m_sema, &ts);
233 } while (rc == -1 && errno == EINTR);
234 return !(rc == -1 && errno == ETIMEDOUT);
235 }
236
237 void signal()
238 {
239 sem_post(&m_sema);
240 }
241
242 void signal(int count)
243 {
244 while (count-- > 0)
245 {
246 sem_post(&m_sema);
247 }
248 }
249 };
250#else
251#error Unsupported platform! (No semaphore wrapper available)
252#endif
253
254 //---------------------------------------------------------
255 // LightweightSemaphore
256 //---------------------------------------------------------
257 class LightweightSemaphore
258 {
259 public:
260 typedef std::make_signed<std::size_t>::type ssize_t;
261
262 private:
263 std::atomic<ssize_t> m_count;
264 Semaphore m_sema;
265
266 bool waitWithPartialSpinning(std::int64_t timeout_usecs = -1)
267 {
268 ssize_t oldCount;
269 // Is there a better way to set the initial spin count?
270 // If we lower it to 1000, testBenaphore becomes 15x slower on my Core i7-5930K Windows PC,
271 // as threads start hitting the kernel semaphore.
272 int spin = 10000;
273 while (--spin >= 0)
274 {
275 oldCount = m_count.load(std::memory_order_relaxed);
276 if ((oldCount > 0) && m_count.compare_exchange_strong(oldCount, oldCount - 1, std::memory_order_acquire, std::memory_order_relaxed))
277 return true;
278 std::atomic_signal_fence(std::memory_order_acquire); // Prevent the compiler from collapsing the loop.
279 }
280 oldCount = m_count.fetch_sub(1, std::memory_order_acquire);
281 if (oldCount > 0)
282 return true;
283 if (timeout_usecs < 0)
284 {
285 m_sema.wait();
286 return true;
287 }
288 if (m_sema.timed_wait((std::uint64_t)timeout_usecs))
289 return true;
290 // At this point, we've timed out waiting for the semaphore, but the
291 // count is still decremented indicating we may still be waiting on
292 // it. So we have to re-adjust the count, but only if the semaphore
293 // wasn't signaled enough times for us too since then. If it was, we
294 // need to release the semaphore too.
295 while (true)
296 {
297 oldCount = m_count.load(std::memory_order_acquire);
298 if (oldCount >= 0 && m_sema.try_wait())
299 return true;
300 if (oldCount < 0 && m_count.compare_exchange_strong(oldCount, oldCount + 1, std::memory_order_relaxed, std::memory_order_relaxed))
301 return false;
302 }
303 }
304
305 ssize_t waitManyWithPartialSpinning(ssize_t max, std::int64_t timeout_usecs = -1)
306 {
307 assert(max > 0);
308 ssize_t oldCount;
309 int spin = 10000;
310 while (--spin >= 0)
311 {
312 oldCount = m_count.load(std::memory_order_relaxed);
313 if (oldCount > 0)
314 {
315 ssize_t newCount = oldCount > max ? oldCount - max : 0;
316 if (m_count.compare_exchange_strong(oldCount, newCount, std::memory_order_acquire, std::memory_order_relaxed))
317 return oldCount - newCount;
318 }
319 std::atomic_signal_fence(std::memory_order_acquire);
320 }
321 oldCount = m_count.fetch_sub(1, std::memory_order_acquire);
322 if (oldCount <= 0)
323 {
324 if (timeout_usecs < 0)
325 m_sema.wait();
326 else if (!m_sema.timed_wait((std::uint64_t)timeout_usecs))
327 {
328 while (true)
329 {
330 oldCount = m_count.load(std::memory_order_acquire);
331 if (oldCount >= 0 && m_sema.try_wait())
332 break;
333 if (oldCount < 0 && m_count.compare_exchange_strong(oldCount, oldCount + 1, std::memory_order_relaxed, std::memory_order_relaxed))
334 return 0;
335 }
336 }
337 }
338 if (max > 1)
339 return 1 + tryWaitMany(max - 1);
340 return 1;
341 }
342
343 public:
344 LightweightSemaphore(ssize_t initialCount = 0) : m_count(initialCount)
345 {
346 assert(initialCount >= 0);
347 }
348
349 bool tryWait()
350 {
351 ssize_t oldCount = m_count.load(std::memory_order_relaxed);
352 while (oldCount > 0)
353 {
354 if (m_count.compare_exchange_weak(oldCount, oldCount - 1, std::memory_order_acquire, std::memory_order_relaxed))
355 return true;
356 }
357 return false;
358 }
359
360 void wait()
361 {
362 if (!tryWait())
363 waitWithPartialSpinning();
364 }
365
366 bool wait(std::int64_t timeout_usecs)
367 {
368 return tryWait() || waitWithPartialSpinning(timeout_usecs);
369 }
370
371 // Acquires between 0 and (greedily) max, inclusive
372 ssize_t tryWaitMany(ssize_t max)
373 {
374 assert(max >= 0);
375 ssize_t oldCount = m_count.load(std::memory_order_relaxed);
376 while (oldCount > 0)
377 {
378 ssize_t newCount = oldCount > max ? oldCount - max : 0;
379 if (m_count.compare_exchange_weak(oldCount, newCount, std::memory_order_acquire, std::memory_order_relaxed))
380 return oldCount - newCount;
381 }
382 return 0;
383 }
384
385 // Acquires at least one, and (greedily) at most max
386 ssize_t waitMany(ssize_t max, std::int64_t timeout_usecs)
387 {
388 assert(max >= 0);
389 ssize_t result = tryWaitMany(max);
390 if (result == 0 && max > 0)
391 result = waitManyWithPartialSpinning(max, timeout_usecs);
392 return result;
393 }
394
395 ssize_t waitMany(ssize_t max)
396 {
397 ssize_t result = waitMany(max, -1);
398 assert(result > 0);
399 return result;
400 }
401
402 void signal(ssize_t count = 1)
403 {
404 assert(count >= 0);
405 ssize_t oldCount = m_count.fetch_add(count, std::memory_order_release);
406 ssize_t toRelease = -oldCount < count ? -oldCount : count;
407 if (toRelease > 0)
408 {
409 m_sema.signal((int)toRelease);
410 }
411 }
412
413 ssize_t availableApprox() const
414 {
415 ssize_t count = m_count.load(std::memory_order_relaxed);
416 return count > 0 ? count : 0;
417 }
418 };
419 } // end namespace mpmc_sema
420} // end namespace details
421
422
423// This is a blocking version of the queue. It has an almost identical interface to
424// the normal non-blocking version, with the addition of various wait_dequeue() methods
425// and the removal of producer-specific dequeue methods.
426template<typename T, typename Traits = ConcurrentQueueDefaultTraits>
427class BlockingConcurrentQueue
428{
429private:
430 typedef ::dmlc::moodycamel::ConcurrentQueue<T, Traits> ConcurrentQueue;
431 typedef details::mpmc_sema::LightweightSemaphore LightweightSemaphore;
432
433public:
434 typedef typename ConcurrentQueue::producer_token_t producer_token_t;
435 typedef typename ConcurrentQueue::consumer_token_t consumer_token_t;
436
437 typedef typename ConcurrentQueue::index_t index_t;
438 typedef typename ConcurrentQueue::size_t size_t;
439 typedef typename std::make_signed<size_t>::type ssize_t;
440
441 static const size_t BLOCK_SIZE = ConcurrentQueue::BLOCK_SIZE;
442 static const size_t EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD = ConcurrentQueue::EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD;
443 static const size_t EXPLICIT_INITIAL_INDEX_SIZE = ConcurrentQueue::EXPLICIT_INITIAL_INDEX_SIZE;
444 static const size_t IMPLICIT_INITIAL_INDEX_SIZE = ConcurrentQueue::IMPLICIT_INITIAL_INDEX_SIZE;
445 static const size_t INITIAL_IMPLICIT_PRODUCER_HASH_SIZE = ConcurrentQueue::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE;
446 static const std::uint32_t EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE = ConcurrentQueue::EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE;
447 static const size_t MAX_SUBQUEUE_SIZE = ConcurrentQueue::MAX_SUBQUEUE_SIZE;
448
449public:
450 // Creates a queue with at least `capacity` element slots; note that the
451 // actual number of elements that can be inserted without additional memory
452 // allocation depends on the number of producers and the block size (e.g. if
453 // the block size is equal to `capacity`, only a single block will be allocated
454 // up-front, which means only a single producer will be able to enqueue elements
455 // without an extra allocation -- blocks aren't shared between producers).
456 // This method is not thread safe -- it is up to the user to ensure that the
457 // queue is fully constructed before it starts being used by other threads (this
458 // includes making the memory effects of construction visible, possibly with a
459 // memory barrier).
460 explicit BlockingConcurrentQueue(size_t capacity = 6 * BLOCK_SIZE)
461 : inner(capacity), sema(create<LightweightSemaphore>(), &BlockingConcurrentQueue::template destroy<LightweightSemaphore>)
462 {
463 assert(reinterpret_cast<ConcurrentQueue*>((BlockingConcurrentQueue*)1) == &((BlockingConcurrentQueue*)1)->inner && "BlockingConcurrentQueue must have ConcurrentQueue as its first member");
464 if (!sema) {
465 MOODYCAMEL_THROW(std::bad_alloc());
466 }
467 }
468
469 BlockingConcurrentQueue(size_t minCapacity, size_t maxExplicitProducers, size_t maxImplicitProducers)
470 : inner(minCapacity, maxExplicitProducers, maxImplicitProducers), sema(create<LightweightSemaphore>(), &BlockingConcurrentQueue::template destroy<LightweightSemaphore>)
471 {
472 assert(reinterpret_cast<ConcurrentQueue*>((BlockingConcurrentQueue*)1) == &((BlockingConcurrentQueue*)1)->inner && "BlockingConcurrentQueue must have ConcurrentQueue as its first member");
473 if (!sema) {
474 MOODYCAMEL_THROW(std::bad_alloc());
475 }
476 }
477
478 // Disable copying and copy assignment
479 BlockingConcurrentQueue(BlockingConcurrentQueue const&) MOODYCAMEL_DELETE_FUNCTION;
480 BlockingConcurrentQueue& operator=(BlockingConcurrentQueue const&) MOODYCAMEL_DELETE_FUNCTION;
481
482 // Moving is supported, but note that it is *not* a thread-safe operation.
483 // Nobody can use the queue while it's being moved, and the memory effects
484 // of that move must be propagated to other threads before they can use it.
485 // Note: When a queue is moved, its tokens are still valid but can only be
486 // used with the destination queue (i.e. semantically they are moved along
487 // with the queue itself).
488 BlockingConcurrentQueue(BlockingConcurrentQueue&& other) MOODYCAMEL_NOEXCEPT
489 : inner(std::move(other.inner)), sema(std::move(other.sema))
490 { }
491
492 inline BlockingConcurrentQueue& operator=(BlockingConcurrentQueue&& other) MOODYCAMEL_NOEXCEPT
493 {
494 return swap_internal(other);
495 }
496
497 // Swaps this queue's state with the other's. Not thread-safe.
498 // Swapping two queues does not invalidate their tokens, however
499 // the tokens that were created for one queue must be used with
500 // only the swapped queue (i.e. the tokens are tied to the
501 // queue's movable state, not the object itself).
502 inline void swap(BlockingConcurrentQueue& other) MOODYCAMEL_NOEXCEPT
503 {
504 swap_internal(other);
505 }
506
507private:
508 BlockingConcurrentQueue& swap_internal(BlockingConcurrentQueue& other)
509 {
510 if (this == &other) {
511 return *this;
512 }
513
514 inner.swap(other.inner);
515 sema.swap(other.sema);
516 return *this;
517 }
518
519public:
520 // Enqueues a single item (by copying it).
521 // Allocates memory if required. Only fails if memory allocation fails (or implicit
522 // production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0,
523 // or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
524 // Thread-safe.
525 inline bool enqueue(T const& item)
526 {
527 if (details::likely(inner.enqueue(item))) {
528 sema->signal();
529 return true;
530 }
531 return false;
532 }
533
534 // Enqueues a single item (by moving it, if possible).
535 // Allocates memory if required. Only fails if memory allocation fails (or implicit
536 // production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0,
537 // or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
538 // Thread-safe.
539 inline bool enqueue(T&& item)
540 {
541 if (details::likely(inner.enqueue(std::move(item)))) {
542 sema->signal();
543 return true;
544 }
545 return false;
546 }
547
548 // Enqueues a single item (by copying it) using an explicit producer token.
549 // Allocates memory if required. Only fails if memory allocation fails (or
550 // Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
551 // Thread-safe.
552 inline bool enqueue(producer_token_t const& token, T const& item)
553 {
554 if (details::likely(inner.enqueue(token, item))) {
555 sema->signal();
556 return true;
557 }
558 return false;
559 }
560
561 // Enqueues a single item (by moving it, if possible) using an explicit producer token.
562 // Allocates memory if required. Only fails if memory allocation fails (or
563 // Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
564 // Thread-safe.
565 inline bool enqueue(producer_token_t const& token, T&& item)
566 {
567 if (details::likely(inner.enqueue(token, std::move(item)))) {
568 sema->signal();
569 return true;
570 }
571 return false;
572 }
573
574 // Enqueues several items.
575 // Allocates memory if required. Only fails if memory allocation fails (or
576 // implicit production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE
577 // is 0, or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
578 // Note: Use std::make_move_iterator if the elements should be moved instead of copied.
579 // Thread-safe.
580 template<typename It>
581 inline bool enqueue_bulk(It itemFirst, size_t count)
582 {
583 if (details::likely(inner.enqueue_bulk(std::forward<It>(itemFirst), count))) {
584 sema->signal((LightweightSemaphore::ssize_t)(ssize_t)count);
585 return true;
586 }
587 return false;
588 }
589
590 // Enqueues several items using an explicit producer token.
591 // Allocates memory if required. Only fails if memory allocation fails
592 // (or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
593 // Note: Use std::make_move_iterator if the elements should be moved
594 // instead of copied.
595 // Thread-safe.
596 template<typename It>
597 inline bool enqueue_bulk(producer_token_t const& token, It itemFirst, size_t count)
598 {
599 if (details::likely(inner.enqueue_bulk(token, std::forward<It>(itemFirst), count))) {
600 sema->signal((LightweightSemaphore::ssize_t)(ssize_t)count);
601 return true;
602 }
603 return false;
604 }
605
606 // Enqueues a single item (by copying it).
607 // Does not allocate memory. Fails if not enough room to enqueue (or implicit
608 // production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE
609 // is 0).
610 // Thread-safe.
611 inline bool try_enqueue(T const& item)
612 {
613 if (inner.try_enqueue(item)) {
614 sema->signal();
615 return true;
616 }
617 return false;
618 }
619
620 // Enqueues a single item (by moving it, if possible).
621 // Does not allocate memory (except for one-time implicit producer).
622 // Fails if not enough room to enqueue (or implicit production is
623 // disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0).
624 // Thread-safe.
625 inline bool try_enqueue(T&& item)
626 {
627 if (inner.try_enqueue(std::move(item))) {
628 sema->signal();
629 return true;
630 }
631 return false;
632 }
633
634 // Enqueues a single item (by copying it) using an explicit producer token.
635 // Does not allocate memory. Fails if not enough room to enqueue.
636 // Thread-safe.
637 inline bool try_enqueue(producer_token_t const& token, T const& item)
638 {
639 if (inner.try_enqueue(token, item)) {
640 sema->signal();
641 return true;
642 }
643 return false;
644 }
645
646 // Enqueues a single item (by moving it, if possible) using an explicit producer token.
647 // Does not allocate memory. Fails if not enough room to enqueue.
648 // Thread-safe.
649 inline bool try_enqueue(producer_token_t const& token, T&& item)
650 {
651 if (inner.try_enqueue(token, std::move(item))) {
652 sema->signal();
653 return true;
654 }
655 return false;
656 }
657
658 // Enqueues several items.
659 // Does not allocate memory (except for one-time implicit producer).
660 // Fails if not enough room to enqueue (or implicit production is
661 // disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0).
662 // Note: Use std::make_move_iterator if the elements should be moved
663 // instead of copied.
664 // Thread-safe.
665 template<typename It>
666 inline bool try_enqueue_bulk(It itemFirst, size_t count)
667 {
668 if (inner.try_enqueue_bulk(std::forward<It>(itemFirst), count)) {
669 sema->signal((LightweightSemaphore::ssize_t)(ssize_t)count);
670 return true;
671 }
672 return false;
673 }
674
675 // Enqueues several items using an explicit producer token.
676 // Does not allocate memory. Fails if not enough room to enqueue.
677 // Note: Use std::make_move_iterator if the elements should be moved
678 // instead of copied.
679 // Thread-safe.
680 template<typename It>
681 inline bool try_enqueue_bulk(producer_token_t const& token, It itemFirst, size_t count)
682 {
683 if (inner.try_enqueue_bulk(token, std::forward<It>(itemFirst), count)) {
684 sema->signal((LightweightSemaphore::ssize_t)(ssize_t)count);
685 return true;
686 }
687 return false;
688 }
689
690
691 // Attempts to dequeue from the queue.
692 // Returns false if all producer streams appeared empty at the time they
693 // were checked (so, the queue is likely but not guaranteed to be empty).
694 // Never allocates. Thread-safe.
695 template<typename U>
696 inline bool try_dequeue(U& item)
697 {
698 if (sema->tryWait()) {
699 while (!inner.try_dequeue(item)) {
700 continue;
701 }
702 return true;
703 }
704 return false;
705 }
706
707 // Attempts to dequeue from the queue using an explicit consumer token.
708 // Returns false if all producer streams appeared empty at the time they
709 // were checked (so, the queue is likely but not guaranteed to be empty).
710 // Never allocates. Thread-safe.
711 template<typename U>
712 inline bool try_dequeue(consumer_token_t& token, U& item)
713 {
714 if (sema->tryWait()) {
715 while (!inner.try_dequeue(token, item)) {
716 continue;
717 }
718 return true;
719 }
720 return false;
721 }
722
723 // Attempts to dequeue several elements from the queue.
724 // Returns the number of items actually dequeued.
725 // Returns 0 if all producer streams appeared empty at the time they
726 // were checked (so, the queue is likely but not guaranteed to be empty).
727 // Never allocates. Thread-safe.
728 template<typename It>
729 inline size_t try_dequeue_bulk(It itemFirst, size_t max)
730 {
731 size_t count = 0;
732 max = (size_t)sema->tryWaitMany((LightweightSemaphore::ssize_t)(ssize_t)max);
733 while (count != max) {
734 count += inner.template try_dequeue_bulk<It&>(itemFirst, max - count);
735 }
736 return count;
737 }
738
739 // Attempts to dequeue several elements from the queue using an explicit consumer token.
740 // Returns the number of items actually dequeued.
741 // Returns 0 if all producer streams appeared empty at the time they
742 // were checked (so, the queue is likely but not guaranteed to be empty).
743 // Never allocates. Thread-safe.
744 template<typename It>
745 inline size_t try_dequeue_bulk(consumer_token_t& token, It itemFirst, size_t max)
746 {
747 size_t count = 0;
748 max = (size_t)sema->tryWaitMany((LightweightSemaphore::ssize_t)(ssize_t)max);
749 while (count != max) {
750 count += inner.template try_dequeue_bulk<It&>(token, itemFirst, max - count);
751 }
752 return count;
753 }
754
755
756
757 // Blocks the current thread until there's something to dequeue, then
758 // dequeues it.
759 // Never allocates. Thread-safe.
760 template<typename U>
761 inline void wait_dequeue(U& item)
762 {
763 sema->wait();
764 while (!inner.try_dequeue(item)) {
765 continue;
766 }
767 }
768
769 // Blocks the current thread until either there's something to dequeue
770 // or the timeout (specified in microseconds) expires. Returns false
771 // without setting `item` if the timeout expires, otherwise assigns
772 // to `item` and returns true.
773 // Using a negative timeout indicates an indefinite timeout,
774 // and is thus functionally equivalent to calling wait_dequeue.
775 // Never allocates. Thread-safe.
776 template<typename U>
777 inline bool wait_dequeue_timed(U& item, std::int64_t timeout_usecs)
778 {
779 if (!sema->wait(timeout_usecs)) {
780 return false;
781 }
782 while (!inner.try_dequeue(item)) {
783 continue;
784 }
785 return true;
786 }
787
788 // Blocks the current thread until either there's something to dequeue
789 // or the timeout expires. Returns false without setting `item` if the
790 // timeout expires, otherwise assigns to `item` and returns true.
791 // Never allocates. Thread-safe.
792 template<typename U, typename Rep, typename Period>
793 inline bool wait_dequeue_timed(U& item, std::chrono::duration<Rep, Period> const& timeout)
794 {
795 return wait_dequeue_timed(item, std::chrono::duration_cast<std::chrono::microseconds>(timeout).count());
796 }
797
798 // Blocks the current thread until there's something to dequeue, then
799 // dequeues it using an explicit consumer token.
800 // Never allocates. Thread-safe.
801 template<typename U>
802 inline void wait_dequeue(consumer_token_t& token, U& item)
803 {
804 sema->wait();
805 while (!inner.try_dequeue(token, item)) {
806 continue;
807 }
808 }
809
810 // Blocks the current thread until either there's something to dequeue
811 // or the timeout (specified in microseconds) expires. Returns false
812 // without setting `item` if the timeout expires, otherwise assigns
813 // to `item` and returns true.
814 // Using a negative timeout indicates an indefinite timeout,
815 // and is thus functionally equivalent to calling wait_dequeue.
816 // Never allocates. Thread-safe.
817 template<typename U>
818 inline bool wait_dequeue_timed(consumer_token_t& token, U& item, std::int64_t timeout_usecs)
819 {
820 if (!sema->wait(timeout_usecs)) {
821 return false;
822 }
823 while (!inner.try_dequeue(token, item)) {
824 continue;
825 }
826 return true;
827 }
828
829 // Blocks the current thread until either there's something to dequeue
830 // or the timeout expires. Returns false without setting `item` if the
831 // timeout expires, otherwise assigns to `item` and returns true.
832 // Never allocates. Thread-safe.
833 template<typename U, typename Rep, typename Period>
834 inline bool wait_dequeue_timed(consumer_token_t& token, U& item, std::chrono::duration<Rep, Period> const& timeout)
835 {
836 return wait_dequeue_timed(token, item, std::chrono::duration_cast<std::chrono::microseconds>(timeout).count());
837 }
838
839 // Attempts to dequeue several elements from the queue.
840 // Returns the number of items actually dequeued, which will
841 // always be at least one (this method blocks until the queue
842 // is non-empty) and at most max.
843 // Never allocates. Thread-safe.
844 template<typename It>
845 inline size_t wait_dequeue_bulk(It itemFirst, size_t max)
846 {
847 size_t count = 0;
848 max = (size_t)sema->waitMany((LightweightSemaphore::ssize_t)(ssize_t)max);
849 while (count != max) {
850 count += inner.template try_dequeue_bulk<It&>(itemFirst, max - count);
851 }
852 return count;
853 }
854
855 // Attempts to dequeue several elements from the queue.
856 // Returns the number of items actually dequeued, which can
857 // be 0 if the timeout expires while waiting for elements,
858 // and at most max.
859 // Using a negative timeout indicates an indefinite timeout,
860 // and is thus functionally equivalent to calling wait_dequeue_bulk.
861 // Never allocates. Thread-safe.
862 template<typename It>
863 inline size_t wait_dequeue_bulk_timed(It itemFirst, size_t max, std::int64_t timeout_usecs)
864 {
865 size_t count = 0;
866 max = (size_t)sema->waitMany((LightweightSemaphore::ssize_t)(ssize_t)max, timeout_usecs);
867 while (count != max) {
868 count += inner.template try_dequeue_bulk<It&>(itemFirst, max - count);
869 }
870 return count;
871 }
872
873 // Attempts to dequeue several elements from the queue.
874 // Returns the number of items actually dequeued, which can
875 // be 0 if the timeout expires while waiting for elements,
876 // and at most max.
877 // Never allocates. Thread-safe.
878 template<typename It, typename Rep, typename Period>
879 inline size_t wait_dequeue_bulk_timed(It itemFirst, size_t max, std::chrono::duration<Rep, Period> const& timeout)
880 {
881 return wait_dequeue_bulk_timed<It&>(itemFirst, max, std::chrono::duration_cast<std::chrono::microseconds>(timeout).count());
882 }
883
884 // Attempts to dequeue several elements from the queue using an explicit consumer token.
885 // Returns the number of items actually dequeued, which will
886 // always be at least one (this method blocks until the queue
887 // is non-empty) and at most max.
888 // Never allocates. Thread-safe.
889 template<typename It>
890 inline size_t wait_dequeue_bulk(consumer_token_t& token, It itemFirst, size_t max)
891 {
892 size_t count = 0;
893 max = (size_t)sema->waitMany((LightweightSemaphore::ssize_t)(ssize_t)max);
894 while (count != max) {
895 count += inner.template try_dequeue_bulk<It&>(token, itemFirst, max - count);
896 }
897 return count;
898 }
899
900 // Attempts to dequeue several elements from the queue using an explicit consumer token.
901 // Returns the number of items actually dequeued, which can
902 // be 0 if the timeout expires while waiting for elements,
903 // and at most max.
904 // Using a negative timeout indicates an indefinite timeout,
905 // and is thus functionally equivalent to calling wait_dequeue_bulk.
906 // Never allocates. Thread-safe.
907 template<typename It>
908 inline size_t wait_dequeue_bulk_timed(consumer_token_t& token, It itemFirst, size_t max, std::int64_t timeout_usecs)
909 {
910 size_t count = 0;
911 max = (size_t)sema->waitMany((LightweightSemaphore::ssize_t)(ssize_t)max, timeout_usecs);
912 while (count != max) {
913 count += inner.template try_dequeue_bulk<It&>(token, itemFirst, max - count);
914 }
915 return count;
916 }
917
918 // Attempts to dequeue several elements from the queue using an explicit consumer token.
919 // Returns the number of items actually dequeued, which can
920 // be 0 if the timeout expires while waiting for elements,
921 // and at most max.
922 // Never allocates. Thread-safe.
923 template<typename It, typename Rep, typename Period>
924 inline size_t wait_dequeue_bulk_timed(consumer_token_t& token, It itemFirst, size_t max, std::chrono::duration<Rep, Period> const& timeout)
925 {
926 return wait_dequeue_bulk_timed<It&>(token, itemFirst, max, std::chrono::duration_cast<std::chrono::microseconds>(timeout).count());
927 }
928
929
930 // Returns an estimate of the total number of elements currently in the queue. This
931 // estimate is only accurate if the queue has completely stabilized before it is called
932 // (i.e. all enqueue and dequeue operations have completed and their memory effects are
933 // visible on the calling thread, and no further operations start while this method is
934 // being called).
935 // Thread-safe.
936 inline size_t size_approx() const
937 {
938 return (size_t)sema->availableApprox();
939 }
940
941
942 // Returns true if the underlying atomic variables used by
943 // the queue are lock-free (they should be on most platforms).
944 // Thread-safe.
945 static bool is_lock_free()
946 {
947 return ConcurrentQueue::is_lock_free();
948 }
949
950
951private:
952 template<typename U>
953 static inline U* create()
954 {
955 auto p = (Traits::malloc)(sizeof(U));
956 return p != nullptr ? new (p) U : nullptr;
957 }
958
959 template<typename U, typename A1>
960 static inline U* create(A1&& a1)
961 {
962 auto p = (Traits::malloc)(sizeof(U));
963 return p != nullptr ? new (p) U(std::forward<A1>(a1)) : nullptr;
964 }
965
966 template<typename U>
967 static inline void destroy(U* p)
968 {
969 if (p != nullptr) {
970 p->~U();
971 }
972 (Traits::free)(p);
973 }
974
975private:
976 ConcurrentQueue inner;
977 std::unique_ptr<LightweightSemaphore, void (*)(LightweightSemaphore*)> sema;
978};
979
980
981template<typename T, typename Traits>
982inline void swap(BlockingConcurrentQueue<T, Traits>& a, BlockingConcurrentQueue<T, Traits>& b) MOODYCAMEL_NOEXCEPT
983{
984 a.swap(b);
985}
986
987} // end namespace moodycamel
988} // namespace dmlc
989
990#endif // DMLC_BLOCKINGCONCURRENTQUEUE_H_
namespace for dmlc
Definition array_view.h:12
unsigned index_t
this defines the unsigned integer type that can normally be used to store feature index
Definition data.h:32