Medial Code Documentation
Loading...
Searching...
No Matches
concurrentqueue.h
1
2// Provides a C++11 implementation of a multi-producer, multi-consumer lock-free queue.
3// An overview, including benchmark results, is provided here:
4// http://moodycamel.com/blog/2014/a-fast-general-purpose-lock-free-queue-for-c++
5// The full design is also described in excruciating detail at:
6// http://moodycamel.com/blog/2014/detailed-design-of-a-lock-free-queue
7
8// Simplified BSD license:
9// Copyright (c) 2013-2016, Cameron Desrochers.
10// All rights reserved.
11//
12// Redistribution and use in source and binary forms, with or without modification,
13// are permitted provided that the following conditions are met:
14//
15// - Redistributions of source code must retain the above copyright notice, this list of
16// conditions and the following disclaimer.
17// - Redistributions in binary form must reproduce the above copyright notice, this list of
18// conditions and the following disclaimer in the documentation and/or other materials
19// provided with the distribution.
20//
21// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
22// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
23// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
24// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
26// OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
27// HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR
28// TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
29// EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30#ifndef DMLC_CONCURRENTQUEUE_H_
31#define DMLC_CONCURRENTQUEUE_H_
32#pragma once
33
34#if defined(__GNUC__)
35// Disable -Wconversion warnings (spuriously triggered when Traits::size_t and
36// Traits::index_t are set to < 32 bits, causing integer promotion, causing warnings
37// upon assigning any computed values)
38#pragma GCC diagnostic push
39#pragma GCC diagnostic ignored "-Wconversion"
40
41#ifdef MCDBGQ_USE_RELACY
42#pragma GCC diagnostic ignored "-Wint-to-pointer-cast"
43#endif
44#endif
45
46#if defined(_WIN32) || defined(__WINDOWS__) || defined(__WIN32__) || defined(_WIN64)
47#include <windows.h> // for GetCurrentThreadId()
48#endif
49
50#if defined(__APPLE__)
51#include "TargetConditionals.h"
52#endif
53
54#ifdef MCDBGQ_USE_RELACY
55#include "relacy/relacy_std.hpp"
56#include "relacy_shims.h"
57// We only use malloc/free anyway, and the delete macro messes up `= delete` method declarations.
58// We'll override the default trait malloc ourselves without a macro.
59#undef new
60#undef delete
61#undef malloc
62#undef free
63#else
64#include <atomic> // Requires C++11. Sorry VS2010.
65#include <cassert>
66#endif
67#include <cstddef> // for max_align_t
68#include <cstdint>
69#include <cstdlib>
70#include <type_traits>
71#include <algorithm>
72#include <utility>
73#include <limits>
74#include <climits> // for CHAR_BIT
75#include <array>
76#include <thread> // partly for __WINPTHREADS_VERSION if on MinGW-w64 w/ POSIX threading
77
78namespace dmlc {
79
80// Platform-specific definitions of a numeric thread ID type and an invalid value
81namespace moodycamel { namespace details {
82template<typename thread_id_t> struct thread_id_converter {
83 typedef thread_id_t thread_id_numeric_size_t;
84 typedef thread_id_t thread_id_hash_t;
85 static thread_id_hash_t prehash(thread_id_t const& x) { return x; }
86};
87} }
88#if defined(MCDBGQ_USE_RELACY)
89namespace moodycamel { namespace details {
90 typedef std::uint32_t thread_id_t;
91 static const thread_id_t invalid_thread_id = 0xFFFFFFFFU;
92 static const thread_id_t invalid_thread_id2 = 0xFFFFFFFEU;
93 static inline thread_id_t thread_id() { return rl::thread_index(); }
94} }
95#elif defined(_WIN32) || defined(__WINDOWS__) || defined(__WIN32__)
96// No sense pulling in windows.h in a header, we'll manually declare the function
97// we use and rely on backwards-compatibility for this not to break
98extern "C" __declspec(dllimport) unsigned long __stdcall GetCurrentThreadId(void);
99namespace moodycamel { namespace details {
100 static_assert(sizeof(unsigned long) == sizeof(std::uint32_t), "Expected size of unsigned long to be 32 bits on Windows");
101 typedef std::uint32_t thread_id_t;
102 static const thread_id_t invalid_thread_id = 0; // See http://blogs.msdn.com/b/oldnewthing/archive/2004/02/23/78395.aspx
103 static const thread_id_t invalid_thread_id2 = 0xFFFFFFFFU; // Not technically guaranteed to be invalid, but is never used in practice. Note that all Win32 thread IDs are presently multiples of 4.
104 static inline thread_id_t thread_id() { return static_cast<thread_id_t>(::GetCurrentThreadId()); }
105} }
106#elif defined(__arm__) || defined(_M_ARM) || defined(__aarch64__) || (defined(__APPLE__) && TARGET_OS_IPHONE)
107namespace moodycamel { namespace details {
108 static_assert(sizeof(std::thread::id) == 4 || sizeof(std::thread::id) == 8, "std::thread::id is expected to be either 4 or 8 bytes");
109
110 typedef std::thread::id thread_id_t;
111 static const thread_id_t invalid_thread_id; // Default ctor creates invalid ID
112
113 // Note we don't define a invalid_thread_id2 since std::thread::id doesn't have one; it's
114 // only used if MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED is defined anyway, which it won't
115 // be.
116 static inline thread_id_t thread_id() { return std::this_thread::get_id(); }
117
118 template<std::size_t> struct thread_id_size { };
119 template<> struct thread_id_size<4> { typedef std::uint32_t numeric_t; };
120 template<> struct thread_id_size<8> { typedef std::uint64_t numeric_t; };
121
122 template<> struct thread_id_converter<thread_id_t> {
123 typedef thread_id_size<sizeof(thread_id_t)>::numeric_t thread_id_numeric_size_t;
124#ifndef __APPLE__
125 typedef std::size_t thread_id_hash_t;
126#else
127 typedef thread_id_numeric_size_t thread_id_hash_t;
128#endif
129
130 static thread_id_hash_t prehash(thread_id_t const& x)
131 {
132#ifndef __APPLE__
133 return std::hash<std::thread::id>()(x);
134#else
135 return *reinterpret_cast<thread_id_hash_t const*>(&x);
136#endif
137 }
138 };
139} }
140#else
141// Use a nice trick from this answer: http://stackoverflow.com/a/8438730/21475
142// In order to get a numeric thread ID in a platform-independent way, we use a thread-local
143// static variable's address as a thread identifier :-)
144#if defined(__GNUC__) || defined(__INTEL_COMPILER)
145#define MOODYCAMEL_THREADLOCAL __thread
146#elif defined(_MSC_VER)
147#define MOODYCAMEL_THREADLOCAL __declspec(thread)
148#else
149// Assume C++11 compliant compiler
150#define MOODYCAMEL_THREADLOCAL thread_local
151#endif
152namespace moodycamel { namespace details {
153typedef std::uintptr_t thread_id_t;
154static const thread_id_t invalid_thread_id = 0; // Address can't be nullptr
155static const thread_id_t invalid_thread_id2 = 1; // Member accesses off a null pointer are also generally invalid. Plus it's not aligned.
156static inline thread_id_t thread_id() { static MOODYCAMEL_THREADLOCAL int x; return reinterpret_cast<thread_id_t>(&x); }
157} }
158#endif
159
160// Exceptions
161#ifndef MOODYCAMEL_EXCEPTIONS_ENABLED
162#if (defined(_MSC_VER) && defined(_CPPUNWIND)) || (defined(__GNUC__) && defined(__EXCEPTIONS)) || (!defined(_MSC_VER) && !defined(__GNUC__))
163#define MOODYCAMEL_EXCEPTIONS_ENABLED
164#endif
165#endif
166#ifdef MOODYCAMEL_EXCEPTIONS_ENABLED
167#define MOODYCAMEL_TRY try
168#define MOODYCAMEL_CATCH(...) catch(__VA_ARGS__)
169#define MOODYCAMEL_RETHROW throw
170#define MOODYCAMEL_THROW(expr) throw (expr)
171#else
172#define MOODYCAMEL_TRY if (true)
173#define MOODYCAMEL_CATCH(...) else if (false)
174#define MOODYCAMEL_RETHROW
175#define MOODYCAMEL_THROW(expr)
176#endif
177
178#ifndef MOODYCAMEL_NOEXCEPT
179#if !defined(MOODYCAMEL_EXCEPTIONS_ENABLED)
180#define MOODYCAMEL_NOEXCEPT
181#define MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr) true
182#define MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr) true
183#elif defined(_MSC_VER) && defined(_NOEXCEPT) && _MSC_VER < 1800
184// VS2012's std::is_nothrow_[move_]constructible is broken and returns true when it shouldn't :-(
185// We have to assume *all* non-trivial constructors may throw on VS2012!
186#define MOODYCAMEL_NOEXCEPT _NOEXCEPT
187#define MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr) (std::is_rvalue_reference<valueType>::value && std::is_move_constructible<type>::value ? std::is_trivially_move_constructible<type>::value : std::is_trivially_copy_constructible<type>::value)
188#define MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr) ((std::is_rvalue_reference<valueType>::value && std::is_move_assignable<type>::value ? std::is_trivially_move_assignable<type>::value || std::is_nothrow_move_assignable<type>::value : std::is_trivially_copy_assignable<type>::value || std::is_nothrow_copy_assignable<type>::value) && MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr))
189#elif defined(_MSC_VER) && defined(_NOEXCEPT) && _MSC_VER < 1900
190#define MOODYCAMEL_NOEXCEPT _NOEXCEPT
191#define MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr) (std::is_rvalue_reference<valueType>::value && std::is_move_constructible<type>::value ? std::is_trivially_move_constructible<type>::value || std::is_nothrow_move_constructible<type>::value : std::is_trivially_copy_constructible<type>::value || std::is_nothrow_copy_constructible<type>::value)
192#define MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr) ((std::is_rvalue_reference<valueType>::value && std::is_move_assignable<type>::value ? std::is_trivially_move_assignable<type>::value || std::is_nothrow_move_assignable<type>::value : std::is_trivially_copy_assignable<type>::value || std::is_nothrow_copy_assignable<type>::value) && MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr))
193#else
194#define MOODYCAMEL_NOEXCEPT noexcept
195#define MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr) noexcept(expr)
196#define MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr) noexcept(expr)
197#endif
198#endif
199
200#ifndef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
201#ifdef MCDBGQ_USE_RELACY
202#define MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
203#else
204// VS2013 doesn't support `thread_local`, and MinGW-w64 w/ POSIX threading has a crippling bug: http://sourceforge.net/p/mingw-w64/bugs/445
205// g++ <=4.7 doesn't support thread_local either.
206// Finally, iOS/ARM doesn't have support for it either, and g++/ARM allows it to compile but it's unconfirmed to actually work
207#if (!defined(_MSC_VER) || _MSC_VER >= 1900) && (!defined(__MINGW32__) && !defined(__MINGW64__) || !defined(__WINPTHREADS_VERSION)) && (!defined(__GNUC__) || __GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 8)) && (!defined(__APPLE__) || !TARGET_OS_IPHONE) && !defined(__arm__) && !defined(_M_ARM) && !defined(__aarch64__)
208// Assume `thread_local` is fully supported in all other C++11 compilers/platforms
209//#define MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED // always disabled for now since several users report having problems with it on
210#endif
211#endif
212#endif
213
214// VS2012 doesn't support deleted functions.
215// In this case, we declare the function normally but don't define it. A link error will be generated if the function is called.
216#ifndef MOODYCAMEL_DELETE_FUNCTION
217#if defined(_MSC_VER) && _MSC_VER < 1800
218#define MOODYCAMEL_DELETE_FUNCTION
219#else
220#define MOODYCAMEL_DELETE_FUNCTION = delete
221#endif
222#endif
223
224// Compiler-specific likely/unlikely hints
225namespace moodycamel { namespace details {
226#if defined(__GNUC__)
227inline bool likely(bool x) { return __builtin_expect((x), true); }
228inline bool unlikely(bool x) { return __builtin_expect((x), false); }
229#else
230inline bool likely(bool x) { return x; }
231 inline bool unlikely(bool x) { return x; }
232#endif
233} }
234
235#ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
236#include "internal/concurrentqueue_internal_debug.h"
237#endif
238
239namespace moodycamel {
240namespace details {
241template<typename T>
242struct const_numeric_max {
243 static_assert(std::is_integral<T>::value, "const_numeric_max can only be used with integers");
244 static const T value = std::numeric_limits<T>::is_signed
245 ? (static_cast<T>(1) << (sizeof(T) * CHAR_BIT - 1)) - static_cast<T>(1)
246 : static_cast<T>(-1);
247};
248
249#if defined(__GLIBCXX__)
250typedef ::max_align_t std_max_align_t; // libstdc++ forgot to add it to std:: for a while
251#else
252typedef std::max_align_t std_max_align_t; // Others (e.g. MSVC) insist it can *only* be accessed via std::
253#endif
254
255// Some platforms have incorrectly set max_align_t to a type with <8 bytes alignment even while supporting
256// 8-byte aligned scalar values (*cough* 32-bit iOS). Work around this with our own union. See issue #64.
257typedef union {
258 std_max_align_t x;
259 long long y;
260 void* z;
261} max_align_t;
262}
263
264// Default traits for the ConcurrentQueue. To change some of the
265// traits without re-implementing all of them, inherit from this
266// struct and shadow the declarations you wish to be different;
267// since the traits are used as a template type parameter, the
268// shadowed declarations will be used where defined, and the defaults
269// otherwise.
270struct ConcurrentQueueDefaultTraits
271{
272 // General-purpose size type. std::size_t is strongly recommended.
273 typedef std::size_t size_t;
274
275 // The type used for the enqueue and dequeue indices. Must be at least as
276 // large as size_t. Should be significantly larger than the number of elements
277 // you expect to hold at once, especially if you have a high turnover rate;
278 // for example, on 32-bit x86, if you expect to have over a hundred million
279 // elements or pump several million elements through your queue in a very
280 // short space of time, using a 32-bit type *may* trigger a race condition.
281 // A 64-bit int type is recommended in that case, and in practice will
282 // prevent a race condition no matter the usage of the queue. Note that
283 // whether the queue is lock-free with a 64-int type depends on the whether
284 // std::atomic<std::uint64_t> is lock-free, which is platform-specific.
285 typedef std::size_t index_t;
286
287 // Internally, all elements are enqueued and dequeued from multi-element
288 // blocks; this is the smallest controllable unit. If you expect few elements
289 // but many producers, a smaller block size should be favoured. For few producers
290 // and/or many elements, a larger block size is preferred. A sane default
291 // is provided. Must be a power of 2.
292 static const size_t BLOCK_SIZE = 32;
293
294 // For explicit producers (i.e. when using a producer token), the block is
295 // checked for being empty by iterating through a list of flags, one per element.
296 // For large block sizes, this is too inefficient, and switching to an atomic
297 // counter-based approach is faster. The switch is made for block sizes strictly
298 // larger than this threshold.
299 static const size_t EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD = 32;
300
301 // How many full blocks can be expected for a single explicit producer? This should
302 // reflect that number's maximum for optimal performance. Must be a power of 2.
303 static const size_t EXPLICIT_INITIAL_INDEX_SIZE = 32;
304
305 // How many full blocks can be expected for a single implicit producer? This should
306 // reflect that number's maximum for optimal performance. Must be a power of 2.
307 static const size_t IMPLICIT_INITIAL_INDEX_SIZE = 32;
308
309 // The initial size of the hash table mapping thread IDs to implicit producers.
310 // Note that the hash is resized every time it becomes half full.
311 // Must be a power of two, and either 0 or at least 1. If 0, implicit production
312 // (using the enqueue methods without an explicit producer token) is disabled.
313 static const size_t INITIAL_IMPLICIT_PRODUCER_HASH_SIZE = 32;
314
315 // Controls the number of items that an explicit consumer (i.e. one with a token)
316 // must consume before it causes all consumers to rotate and move on to the next
317 // internal queue.
318 static const std::uint32_t EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE = 256;
319
320 // The maximum number of elements (inclusive) that can be enqueued to a sub-queue.
321 // Enqueue operations that would cause this limit to be surpassed will fail. Note
322 // that this limit is enforced at the block level (for performance reasons), i.e.
323 // it's rounded up to the nearest block size.
324 static const size_t MAX_SUBQUEUE_SIZE = details::const_numeric_max<size_t>::value;
325
326
327#ifndef MCDBGQ_USE_RELACY
328 // Memory allocation can be customized if needed.
329 // malloc should return nullptr on failure, and handle alignment like std::malloc.
330#if defined(malloc) || defined(free)
331 // Gah, this is 2015, stop defining macros that break standard code already!
332 // Work around malloc/free being special macros:
333 static inline void* WORKAROUND_malloc(size_t size) { return malloc(size); }
334 static inline void WORKAROUND_free(void* ptr) { return free(ptr); }
335 static inline void* (malloc)(size_t size) { return WORKAROUND_malloc(size); }
336 static inline void (free)(void* ptr) { return WORKAROUND_free(ptr); }
337#else
338 static inline void* malloc(size_t size) { return std::malloc(size); }
339 static inline void free(void* ptr) { return std::free(ptr); }
340#endif
341#else
342 // Debug versions when running under the Relacy race detector (ignore
343 // these in user code)
344 static inline void* malloc(size_t size) { return rl::rl_malloc(size, $); }
345 static inline void free(void* ptr) { return rl::rl_free(ptr, $); }
346#endif
347};
348
349
350// When producing or consuming many elements, the most efficient way is to:
351// 1) Use one of the bulk-operation methods of the queue with a token
352// 2) Failing that, use the bulk-operation methods without a token
353// 3) Failing that, create a token and use that with the single-item methods
354// 4) Failing that, use the single-parameter methods of the queue
355// Having said that, don't create tokens willy-nilly -- ideally there should be
356// a maximum of one token per thread (of each kind).
357struct ProducerToken;
358struct ConsumerToken;
359
360template<typename T, typename Traits> class ConcurrentQueue;
361template<typename T, typename Traits> class BlockingConcurrentQueue;
362class ConcurrentQueueTests;
363
364
365namespace details
366{
367struct ConcurrentQueueProducerTypelessBase
368{
369 ConcurrentQueueProducerTypelessBase* next;
370 std::atomic<bool> inactive;
371 ProducerToken* token;
372
373 ConcurrentQueueProducerTypelessBase()
374 : next(nullptr), inactive(false), token(nullptr)
375 {
376 }
377};
378
379template<bool use32> struct _hash_32_or_64 {
380 static inline std::uint32_t hash(std::uint32_t h)
381 {
382 // MurmurHash3 finalizer -- see https://code.google.com/p/smhasher/source/browse/trunk/MurmurHash3.cpp
383 // Since the thread ID is already unique, all we really want to do is propagate that
384 // uniqueness evenly across all the bits, so that we can use a subset of the bits while
385 // reducing collisions significantly
386 h ^= h >> 16;
387 h *= 0x85ebca6b;
388 h ^= h >> 13;
389 h *= 0xc2b2ae35;
390 return h ^ (h >> 16);
391 }
392};
393template<> struct _hash_32_or_64<1> {
394 static inline std::uint64_t hash(std::uint64_t h)
395 {
396 h ^= h >> 33;
397 h *= 0xff51afd7ed558ccd;
398 h ^= h >> 33;
399 h *= 0xc4ceb9fe1a85ec53;
400 return h ^ (h >> 33);
401 }
402};
403template<std::size_t size> struct hash_32_or_64 : public _hash_32_or_64<(size > 4)> { };
404
405static inline size_t hash_thread_id(thread_id_t id)
406{
407 static_assert(sizeof(thread_id_t) <= 8, "Expected a platform where thread IDs are at most 64-bit values");
408 return static_cast<size_t>(hash_32_or_64<sizeof(thread_id_converter<thread_id_t>::thread_id_hash_t)>::hash(
409 thread_id_converter<thread_id_t>::prehash(id)));
410}
411
412template<typename T>
413static inline bool circular_less_than(T a, T b)
414{
415#ifdef _MSC_VER
416 #pragma warning(push)
417#pragma warning(disable: 4554)
418#endif
419 static_assert(std::is_integral<T>::value && !std::numeric_limits<T>::is_signed, "circular_less_than is intended to be used only with unsigned integer types");
420 return static_cast<T>(a - b) > static_cast<T>(static_cast<T>(1) << static_cast<T>(sizeof(T) * CHAR_BIT - 1));
421#ifdef _MSC_VER
422#pragma warning(pop)
423#endif
424}
425
426template<typename U>
427static inline char* align_for(char* ptr)
428{
429 const std::size_t alignment = std::alignment_of<U>::value;
430 return ptr + (alignment - (reinterpret_cast<std::uintptr_t>(ptr) % alignment)) % alignment;
431}
432
433template<typename T>
434static inline T ceil_to_pow_2(T x)
435{
436 static_assert(std::is_integral<T>::value && !std::numeric_limits<T>::is_signed, "ceil_to_pow_2 is intended to be used only with unsigned integer types");
437
438 // Adapted from http://graphics.stanford.edu/~seander/bithacks.html#RoundUpPowerOf2
439 --x;
440 x |= x >> 1;
441 x |= x >> 2;
442 x |= x >> 4;
443 for (std::size_t i = 1; i < sizeof(T); i <<= 1) {
444 x |= x >> (i << 3);
445 }
446 ++x;
447 return x;
448}
449
450template<typename T>
451static inline void swap_relaxed(std::atomic<T>& left, std::atomic<T>& right)
452{
453 T temp = std::move(left.load(std::memory_order_relaxed));
454 left.store(std::move(right.load(std::memory_order_relaxed)), std::memory_order_relaxed);
455 right.store(std::move(temp), std::memory_order_relaxed);
456}
457
458template<typename T>
459static inline T const& nomove(T const& x)
460{
461 return x;
462}
463
464template<bool Enable>
465struct nomove_if
466{
467 template<typename T>
468 static inline T const& eval(T const& x)
469 {
470 return x;
471 }
472};
473
474template<>
475struct nomove_if<false>
476{
477 template<typename U>
478 static inline auto eval(U&& x)
479 -> decltype(std::forward<U>(x))
480 {
481 return std::forward<U>(x);
482 }
483};
484
485template<typename It>
486static inline auto deref_noexcept(It& it) MOODYCAMEL_NOEXCEPT -> decltype(*it)
487{
488 return *it;
489}
490
491#if defined(__clang__) || !defined(__GNUC__) || __GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 8)
492template<typename T> struct is_trivially_destructible : std::is_trivially_destructible<T> { };
493#else
494template<typename T> struct is_trivially_destructible : std::has_trivial_destructor<T> { };
495#endif
496
497#ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
498#ifdef MCDBGQ_USE_RELACY
499 typedef RelacyThreadExitListener ThreadExitListener;
500 typedef RelacyThreadExitNotifier ThreadExitNotifier;
501#else
502 struct ThreadExitListener
503 {
504 typedef void (*callback_t)(void*);
505 callback_t callback;
506 void* userData;
507
508 ThreadExitListener* next; // reserved for use by the ThreadExitNotifier
509 };
510
511
512 class ThreadExitNotifier
513 {
514 public:
515 static void subscribe(ThreadExitListener* listener)
516 {
517 auto& tlsInst = instance();
518 listener->next = tlsInst.tail;
519 tlsInst.tail = listener;
520 }
521
522 static void unsubscribe(ThreadExitListener* listener)
523 {
524 auto& tlsInst = instance();
525 ThreadExitListener** prev = &tlsInst.tail;
526 for (auto ptr = tlsInst.tail; ptr != nullptr; ptr = ptr->next) {
527 if (ptr == listener) {
528 *prev = ptr->next;
529 break;
530 }
531 prev = &ptr->next;
532 }
533 }
534
535 private:
536 ThreadExitNotifier() : tail(nullptr) { }
537 ThreadExitNotifier(ThreadExitNotifier const&) MOODYCAMEL_DELETE_FUNCTION;
538 ThreadExitNotifier& operator=(ThreadExitNotifier const&) MOODYCAMEL_DELETE_FUNCTION;
539
540 ~ThreadExitNotifier()
541 {
542 // This thread is about to exit, let everyone know!
543 assert(this == &instance() && "If this assert fails, you likely have a buggy compiler! Change the preprocessor conditions such that MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED is no longer defined.");
544 for (auto ptr = tail; ptr != nullptr; ptr = ptr->next) {
545 ptr->callback(ptr->userData);
546 }
547 }
548
549 // Thread-local
550 static inline ThreadExitNotifier& instance()
551 {
552 static thread_local ThreadExitNotifier notifier;
553 return notifier;
554 }
555
556 private:
557 ThreadExitListener* tail;
558 };
559#endif
560#endif
561
562template<typename T> struct static_is_lock_free_num { enum { value = 0 }; };
563template<> struct static_is_lock_free_num<signed char> { enum { value = ATOMIC_CHAR_LOCK_FREE }; };
564template<> struct static_is_lock_free_num<short> { enum { value = ATOMIC_SHORT_LOCK_FREE }; };
565template<> struct static_is_lock_free_num<int> { enum { value = ATOMIC_INT_LOCK_FREE }; };
566template<> struct static_is_lock_free_num<long> { enum { value = ATOMIC_LONG_LOCK_FREE }; };
567template<> struct static_is_lock_free_num<long long> { enum { value = ATOMIC_LLONG_LOCK_FREE }; };
568template<typename T> struct static_is_lock_free : static_is_lock_free_num<typename std::make_signed<T>::type> { };
569template<> struct static_is_lock_free<bool> { enum { value = ATOMIC_BOOL_LOCK_FREE }; };
570template<typename U> struct static_is_lock_free<U*> { enum { value = ATOMIC_POINTER_LOCK_FREE }; };
571}
572
573
574struct ProducerToken
575{
576 template<typename T, typename Traits>
577 explicit ProducerToken(ConcurrentQueue<T, Traits>& queue);
578
579 template<typename T, typename Traits>
580 explicit ProducerToken(BlockingConcurrentQueue<T, Traits>& queue);
581
582 ProducerToken(ProducerToken&& other) MOODYCAMEL_NOEXCEPT
583 : producer(other.producer)
584 {
585 other.producer = nullptr;
586 if (producer != nullptr) {
587 producer->token = this;
588 }
589 }
590
591 inline ProducerToken& operator=(ProducerToken&& other) MOODYCAMEL_NOEXCEPT
592 {
593 swap(other);
594 return *this;
595 }
596
597 void swap(ProducerToken& other) MOODYCAMEL_NOEXCEPT
598 {
599 std::swap(producer, other.producer);
600 if (producer != nullptr) {
601 producer->token = this;
602 }
603 if (other.producer != nullptr) {
604 other.producer->token = &other;
605 }
606 }
607
608 // A token is always valid unless:
609 // 1) Memory allocation failed during construction
610 // 2) It was moved via the move constructor
611 // (Note: assignment does a swap, leaving both potentially valid)
612 // 3) The associated queue was destroyed
613 // Note that if valid() returns true, that only indicates
614 // that the token is valid for use with a specific queue,
615 // but not which one; that's up to the user to track.
616 inline bool valid() const { return producer != nullptr; }
617
618 ~ProducerToken()
619 {
620 if (producer != nullptr) {
621 producer->token = nullptr;
622 producer->inactive.store(true, std::memory_order_release);
623 }
624 }
625
626 // Disable copying and assignment
627 ProducerToken(ProducerToken const&) MOODYCAMEL_DELETE_FUNCTION;
628 ProducerToken& operator=(ProducerToken const&) MOODYCAMEL_DELETE_FUNCTION;
629
630 private:
631 template<typename T, typename Traits> friend class ConcurrentQueue;
632 friend class ConcurrentQueueTests;
633
634 protected:
635 details::ConcurrentQueueProducerTypelessBase* producer;
636};
637
638
639struct ConsumerToken
640{
641 template<typename T, typename Traits>
642 explicit ConsumerToken(ConcurrentQueue<T, Traits>& q);
643
644 template<typename T, typename Traits>
645 explicit ConsumerToken(BlockingConcurrentQueue<T, Traits>& q);
646
647 ConsumerToken(ConsumerToken&& other) MOODYCAMEL_NOEXCEPT
648 : initialOffset(other.initialOffset), lastKnownGlobalOffset(other.lastKnownGlobalOffset), itemsConsumedFromCurrent(other.itemsConsumedFromCurrent), currentProducer(other.currentProducer), desiredProducer(other.desiredProducer)
649 {
650 }
651
652 inline ConsumerToken& operator=(ConsumerToken&& other) MOODYCAMEL_NOEXCEPT
653 {
654 swap(other);
655 return *this;
656 }
657
658 void swap(ConsumerToken& other) MOODYCAMEL_NOEXCEPT
659 {
660 std::swap(initialOffset, other.initialOffset);
661 std::swap(lastKnownGlobalOffset, other.lastKnownGlobalOffset);
662 std::swap(itemsConsumedFromCurrent, other.itemsConsumedFromCurrent);
663 std::swap(currentProducer, other.currentProducer);
664 std::swap(desiredProducer, other.desiredProducer);
665 }
666
667 // Disable copying and assignment
668 ConsumerToken(ConsumerToken const&) MOODYCAMEL_DELETE_FUNCTION;
669 ConsumerToken& operator=(ConsumerToken const&) MOODYCAMEL_DELETE_FUNCTION;
670
671 private:
672 template<typename T, typename Traits> friend class ConcurrentQueue;
673 friend class ConcurrentQueueTests;
674
675 private: // but shared with ConcurrentQueue
676 std::uint32_t initialOffset;
677 std::uint32_t lastKnownGlobalOffset;
678 std::uint32_t itemsConsumedFromCurrent;
679 details::ConcurrentQueueProducerTypelessBase* currentProducer;
680 details::ConcurrentQueueProducerTypelessBase* desiredProducer;
681};
682
683// Need to forward-declare this swap because it's in a namespace.
684// See http://stackoverflow.com/questions/4492062/why-does-a-c-friend-class-need-a-forward-declaration-only-in-other-namespaces
685template<typename T, typename Traits>
686inline void swap(typename ConcurrentQueue<T, Traits>::ImplicitProducerKVP& a, typename ConcurrentQueue<T, Traits>::ImplicitProducerKVP& b) MOODYCAMEL_NOEXCEPT;
687
688
689template<typename T, typename Traits = ConcurrentQueueDefaultTraits>
690class ConcurrentQueue {
691 public:
692 typedef ::dmlc::moodycamel::ProducerToken producer_token_t;
693 typedef ::dmlc::moodycamel::ConsumerToken consumer_token_t;
694
695 typedef typename Traits::index_t index_t;
696 typedef typename Traits::size_t size_t;
697
698 static const size_t BLOCK_SIZE = static_cast<size_t>(Traits::BLOCK_SIZE);
699 static const size_t EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD = static_cast<size_t>(Traits::EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD);
700 static const size_t EXPLICIT_INITIAL_INDEX_SIZE = static_cast<size_t>(Traits::EXPLICIT_INITIAL_INDEX_SIZE);
701 static const size_t IMPLICIT_INITIAL_INDEX_SIZE = static_cast<size_t>(Traits::IMPLICIT_INITIAL_INDEX_SIZE);
702 static const size_t INITIAL_IMPLICIT_PRODUCER_HASH_SIZE = static_cast<size_t>(Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE);
703 static const std::uint32_t EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE = static_cast<std::uint32_t>(Traits::EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE);
704#ifdef _MSC_VER
705 #pragma warning(push)
706#pragma warning(disable: 4307) // + integral constant overflow (that's what the ternary expression is for!)
707#pragma warning(disable: 4309) // static_cast: Truncation of constant value
708#endif
709 static const size_t MAX_SUBQUEUE_SIZE = (details::const_numeric_max<size_t>::value -
710 static_cast<size_t>(Traits::MAX_SUBQUEUE_SIZE) <
711 BLOCK_SIZE) ? details::const_numeric_max<size_t>::value
712 : (
713 (static_cast<size_t>(Traits::MAX_SUBQUEUE_SIZE) +
714 (BLOCK_SIZE - 1)) / BLOCK_SIZE * BLOCK_SIZE);
715#ifdef _MSC_VER
716#pragma warning(pop)
717#endif
718
719 static_assert(!std::numeric_limits<size_t>::is_signed && std::is_integral<size_t>::value,
720 "Traits::size_t must be an unsigned integral type");
721 static_assert(!std::numeric_limits<index_t>::is_signed && std::is_integral<index_t>::value,
722 "Traits::index_t must be an unsigned integral type");
723 static_assert(sizeof(index_t) >= sizeof(size_t),
724 "Traits::index_t must be at least as wide as Traits::size_t");
725 static_assert((BLOCK_SIZE > 1) && !(BLOCK_SIZE & (BLOCK_SIZE - 1)),
726 "Traits::BLOCK_SIZE must be a power of 2 (and at least 2)");
727 static_assert((EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD > 1) &&
728 !(EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD &
729 (EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD - 1)),
730 "Traits::EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD must be a power of 2 (and greater than 1)");
731 static_assert((EXPLICIT_INITIAL_INDEX_SIZE > 1) &&
732 !(EXPLICIT_INITIAL_INDEX_SIZE & (EXPLICIT_INITIAL_INDEX_SIZE - 1)),
733 "Traits::EXPLICIT_INITIAL_INDEX_SIZE must be a power of 2 (and greater than 1)");
734 static_assert((IMPLICIT_INITIAL_INDEX_SIZE > 1) &&
735 !(IMPLICIT_INITIAL_INDEX_SIZE & (IMPLICIT_INITIAL_INDEX_SIZE - 1)),
736 "Traits::IMPLICIT_INITIAL_INDEX_SIZE must be a power of 2 (and greater than 1)");
737 static_assert((INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) ||
738 !(INITIAL_IMPLICIT_PRODUCER_HASH_SIZE & (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE - 1)),
739 "Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE must be a power of 2");
740 static_assert(
741 INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0 || INITIAL_IMPLICIT_PRODUCER_HASH_SIZE >= 1,
742 "Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE must be at least 1 (or 0 to disable implicit enqueueing)");
743
744 public:
745 // Creates a queue with at least `capacity` element slots; note that the
746 // actual number of elements that can be inserted without additional memory
747 // allocation depends on the number of producers and the block size (e.g. if
748 // the block size is equal to `capacity`, only a single block will be allocated
749 // up-front, which means only a single producer will be able to enqueue elements
750 // without an extra allocation -- blocks aren't shared between producers).
751 // This method is not thread safe -- it is up to the user to ensure that the
752 // queue is fully constructed before it starts being used by other threads (this
753 // includes making the memory effects of construction visible, possibly with a
754 // memory barrier).
755 explicit ConcurrentQueue(size_t capacity = 6 * BLOCK_SIZE)
756 : producerListTail(nullptr), producerCount(0), initialBlockPoolIndex(0), nextExplicitConsumerId(
757 0), globalExplicitConsumerOffset(0) {
758 implicitProducerHashResizeInProgress.clear(std::memory_order_relaxed);
759 populate_initial_implicit_producer_hash();
760 populate_initial_block_list(
761 capacity / BLOCK_SIZE + ((capacity & (BLOCK_SIZE - 1)) == 0 ? 0 : 1));
762
763#ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
764 // Track all the producers using a fully-resolved typed list for
765 // each kind; this makes it possible to debug them starting from
766 // the root queue object (otherwise wacky casts are needed that
767 // don't compile in the debugger's expression evaluator).
768 explicitProducers.store(nullptr, std::memory_order_relaxed);
769 implicitProducers.store(nullptr, std::memory_order_relaxed);
770#endif
771 }
772
773 // Computes the correct amount of pre-allocated blocks for you based
774 // on the minimum number of elements you want available at any given
775 // time, and the maximum concurrent number of each type of producer.
776 ConcurrentQueue(size_t minCapacity, size_t maxExplicitProducers, size_t maxImplicitProducers)
777 : producerListTail(nullptr), producerCount(0), initialBlockPoolIndex(0), nextExplicitConsumerId(
778 0), globalExplicitConsumerOffset(0) {
779 implicitProducerHashResizeInProgress.clear(std::memory_order_relaxed);
780 populate_initial_implicit_producer_hash();
781 size_t blocks =
782 (((minCapacity + BLOCK_SIZE - 1) / BLOCK_SIZE) - 1) * (maxExplicitProducers + 1) +
783 2 * (maxExplicitProducers + maxImplicitProducers);
784 populate_initial_block_list(blocks);
785
786#ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
787 explicitProducers.store(nullptr, std::memory_order_relaxed);
788 implicitProducers.store(nullptr, std::memory_order_relaxed);
789#endif
790 }
791
792 // Note: The queue should not be accessed concurrently while it's
793 // being deleted. It's up to the user to synchronize this.
794 // This method is not thread safe.
795 ~ConcurrentQueue() {
796 // Destroy producers
797 auto ptr = producerListTail.load(std::memory_order_relaxed);
798 while (ptr != nullptr) {
799 auto next = ptr->next_prod();
800 if (ptr->token != nullptr) {
801 ptr->token->producer = nullptr;
802 }
803 destroy(ptr);
804 ptr = next;
805 }
806
807 // Destroy implicit producer hash tables
808 if (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE != 0) {
809 auto hash = implicitProducerHash.load(std::memory_order_relaxed);
810 while (hash != nullptr) {
811 auto prev = hash->prev;
812 if (prev !=
813 nullptr) { // The last hash is part of this object and was not allocated dynamically
814 for (size_t i = 0; i != hash->capacity; ++i) {
815 hash->entries[i].~ImplicitProducerKVP();
816 }
817 hash->~ImplicitProducerHash();
818 (Traits::free)(hash);
819 }
820 hash = prev;
821 }
822 }
823
824 // Destroy global free list
825 auto block = freeList.head_unsafe();
826 while (block != nullptr) {
827 auto next = block->freeListNext.load(std::memory_order_relaxed);
828 if (block->dynamicallyAllocated) {
829 destroy(block);
830 }
831 block = next;
832 }
833
834 // Destroy initial free list
835 destroy_array(initialBlockPool, initialBlockPoolSize);
836 }
837
838 // Disable copying and copy assignment
839 ConcurrentQueue(ConcurrentQueue const &) MOODYCAMEL_DELETE_FUNCTION;
840
841 ConcurrentQueue &operator=(ConcurrentQueue const &) MOODYCAMEL_DELETE_FUNCTION;
842
843 // Moving is supported, but note that it is *not* a thread-safe operation.
844 // Nobody can use the queue while it's being moved, and the memory effects
845 // of that move must be propagated to other threads before they can use it.
846 // Note: When a queue is moved, its tokens are still valid but can only be
847 // used with the destination queue (i.e. semantically they are moved along
848 // with the queue itself).
849 ConcurrentQueue(ConcurrentQueue &&other) MOODYCAMEL_NOEXCEPT
850 : producerListTail(other.producerListTail.load(std::memory_order_relaxed)), producerCount(
851 other.producerCount.load(std::memory_order_relaxed)), initialBlockPoolIndex(
852 other.initialBlockPoolIndex.load(std::memory_order_relaxed)), initialBlockPool(
853 other.initialBlockPool), initialBlockPoolSize(other.initialBlockPoolSize), freeList(
854 std::move(other.freeList)), nextExplicitConsumerId(
855 other.nextExplicitConsumerId.load(std::memory_order_relaxed)), globalExplicitConsumerOffset(
856 other.globalExplicitConsumerOffset.load(std::memory_order_relaxed)) {
857 // Move the other one into this, and leave the other one as an empty queue
858 implicitProducerHashResizeInProgress.clear(std::memory_order_relaxed);
859 populate_initial_implicit_producer_hash();
860 swap_implicit_producer_hashes(other);
861
862 other.producerListTail.store(nullptr, std::memory_order_relaxed);
863 other.producerCount.store(0, std::memory_order_relaxed);
864 other.nextExplicitConsumerId.store(0, std::memory_order_relaxed);
865 other.globalExplicitConsumerOffset.store(0, std::memory_order_relaxed);
866
867#ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
868 explicitProducers.store(other.explicitProducers.load(std::memory_order_relaxed), std::memory_order_relaxed);
869 other.explicitProducers.store(nullptr, std::memory_order_relaxed);
870 implicitProducers.store(other.implicitProducers.load(std::memory_order_relaxed), std::memory_order_relaxed);
871 other.implicitProducers.store(nullptr, std::memory_order_relaxed);
872#endif
873
874 other.initialBlockPoolIndex.store(0, std::memory_order_relaxed);
875 other.initialBlockPoolSize = 0;
876 other.initialBlockPool = nullptr;
877
878 reown_producers();
879 }
880
881 inline ConcurrentQueue &operator=(ConcurrentQueue &&other) MOODYCAMEL_NOEXCEPT {
882 return swap_internal(other);
883 }
884
885 // Swaps this queue's state with the other's. Not thread-safe.
886 // Swapping two queues does not invalidate their tokens, however
887 // the tokens that were created for one queue must be used with
888 // only the swapped queue (i.e. the tokens are tied to the
889 // queue's movable state, not the object itself).
890 inline void swap(ConcurrentQueue &other) MOODYCAMEL_NOEXCEPT {
891 swap_internal(other);
892 }
893
894 private:
895 ConcurrentQueue &swap_internal(ConcurrentQueue &other) {
896 if (this == &other) {
897 return *this;
898 }
899
900 details::swap_relaxed(producerListTail, other.producerListTail);
901 details::swap_relaxed(producerCount, other.producerCount);
902 details::swap_relaxed(initialBlockPoolIndex, other.initialBlockPoolIndex);
903 std::swap(initialBlockPool, other.initialBlockPool);
904 std::swap(initialBlockPoolSize, other.initialBlockPoolSize);
905 freeList.swap(other.freeList);
906 details::swap_relaxed(nextExplicitConsumerId, other.nextExplicitConsumerId);
907 details::swap_relaxed(globalExplicitConsumerOffset, other.globalExplicitConsumerOffset);
908
909 swap_implicit_producer_hashes(other);
910
911 reown_producers();
912 other.reown_producers();
913
914#ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
915 details::swap_relaxed(explicitProducers, other.explicitProducers);
916 details::swap_relaxed(implicitProducers, other.implicitProducers);
917#endif
918
919 return *this;
920 }
921
922 public:
923 // Enqueues a single item (by copying it).
924 // Allocates memory if required. Only fails if memory allocation fails (or implicit
925 // production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0,
926 // or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
927 // Thread-safe.
928 inline bool enqueue(T const &item) {
929 if (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return false;
930 return inner_enqueue<CanAlloc>(item);
931 }
932
933 // Enqueues a single item (by moving it, if possible).
934 // Allocates memory if required. Only fails if memory allocation fails (or implicit
935 // production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0,
936 // or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
937 // Thread-safe.
938 inline bool enqueue(T &&item) {
939 if (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return false;
940 return inner_enqueue<CanAlloc>(std::move(item));
941 }
942
943 // Enqueues a single item (by copying it) using an explicit producer token.
944 // Allocates memory if required. Only fails if memory allocation fails (or
945 // Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
946 // Thread-safe.
947 inline bool enqueue(producer_token_t const &token, T const &item) {
948 return inner_enqueue<CanAlloc>(token, item);
949 }
950
951 // Enqueues a single item (by moving it, if possible) using an explicit producer token.
952 // Allocates memory if required. Only fails if memory allocation fails (or
953 // Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
954 // Thread-safe.
955 inline bool enqueue(producer_token_t const &token, T &&item) {
956 return inner_enqueue<CanAlloc>(token, std::move(item));
957 }
958
959 // Enqueues several items.
960 // Allocates memory if required. Only fails if memory allocation fails (or
961 // implicit production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE
962 // is 0, or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
963 // Note: Use std::make_move_iterator if the elements should be moved instead of copied.
964 // Thread-safe.
965 template<typename It>
966 bool enqueue_bulk(It itemFirst, size_t count) {
967 if (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return false;
968 return inner_enqueue_bulk<CanAlloc>(itemFirst, count);
969 }
970
971 // Enqueues several items using an explicit producer token.
972 // Allocates memory if required. Only fails if memory allocation fails
973 // (or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
974 // Note: Use std::make_move_iterator if the elements should be moved
975 // instead of copied.
976 // Thread-safe.
977 template<typename It>
978 bool enqueue_bulk(producer_token_t const &token, It itemFirst, size_t count) {
979 return inner_enqueue_bulk<CanAlloc>(token, itemFirst, count);
980 }
981
982 // Enqueues a single item (by copying it).
983 // Does not allocate memory. Fails if not enough room to enqueue (or implicit
984 // production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE
985 // is 0).
986 // Thread-safe.
987 inline bool try_enqueue(T const &item) {
988 if (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return false;
989 return inner_enqueue<CannotAlloc>(item);
990 }
991
992 // Enqueues a single item (by moving it, if possible).
993 // Does not allocate memory (except for one-time implicit producer).
994 // Fails if not enough room to enqueue (or implicit production is
995 // disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0).
996 // Thread-safe.
997 inline bool try_enqueue(T &&item) {
998 if (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return false;
999 return inner_enqueue<CannotAlloc>(std::move(item));
1000 }
1001
1002 // Enqueues a single item (by copying it) using an explicit producer token.
1003 // Does not allocate memory. Fails if not enough room to enqueue.
1004 // Thread-safe.
1005 inline bool try_enqueue(producer_token_t const &token, T const &item) {
1006 return inner_enqueue<CannotAlloc>(token, item);
1007 }
1008
1009 // Enqueues a single item (by moving it, if possible) using an explicit producer token.
1010 // Does not allocate memory. Fails if not enough room to enqueue.
1011 // Thread-safe.
1012 inline bool try_enqueue(producer_token_t const &token, T &&item) {
1013 return inner_enqueue<CannotAlloc>(token, std::move(item));
1014 }
1015
1016 // Enqueues several items.
1017 // Does not allocate memory (except for one-time implicit producer).
1018 // Fails if not enough room to enqueue (or implicit production is
1019 // disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0).
1020 // Note: Use std::make_move_iterator if the elements should be moved
1021 // instead of copied.
1022 // Thread-safe.
1023 template<typename It>
1024 bool try_enqueue_bulk(It itemFirst, size_t count) {
1025 if (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return false;
1026 return inner_enqueue_bulk<CannotAlloc>(itemFirst, count);
1027 }
1028
1029 // Enqueues several items using an explicit producer token.
1030 // Does not allocate memory. Fails if not enough room to enqueue.
1031 // Note: Use std::make_move_iterator if the elements should be moved
1032 // instead of copied.
1033 // Thread-safe.
1034 template<typename It>
1035 bool try_enqueue_bulk(producer_token_t const &token, It itemFirst, size_t count) {
1036 return inner_enqueue_bulk<CannotAlloc>(token, itemFirst, count);
1037 }
1038
1039
1040 // Attempts to dequeue from the queue.
1041 // Returns false if all producer streams appeared empty at the time they
1042 // were checked (so, the queue is likely but not guaranteed to be empty).
1043 // Never allocates. Thread-safe.
1044 template<typename U>
1045 bool try_dequeue(U &item) {
1046 // Instead of simply trying each producer in turn (which could cause needless contention on the first
1047 // producer), we score them heuristically.
1048 size_t nonEmptyCount = 0;
1049 ProducerBase *best = nullptr;
1050 size_t bestSize = 0;
1051 for (auto ptr = producerListTail.load(std::memory_order_acquire);
1052 nonEmptyCount < 3 && ptr != nullptr; ptr = ptr->next_prod()) {
1053 auto size = ptr->size_approx();
1054 if (size > 0) {
1055 if (size > bestSize) {
1056 bestSize = size;
1057 best = ptr;
1058 }
1059 ++nonEmptyCount;
1060 }
1061 }
1062
1063 // If there was at least one non-empty queue but it appears empty at the time
1064 // we try to dequeue from it, we need to make sure every queue's been tried
1065 if (nonEmptyCount > 0) {
1066 if (details::likely(best->dequeue(item))) {
1067 return true;
1068 }
1069 for (auto ptr = producerListTail.load(std::memory_order_acquire);
1070 ptr != nullptr; ptr = ptr->next_prod()) {
1071 if (ptr != best && ptr->dequeue(item)) {
1072 return true;
1073 }
1074 }
1075 }
1076 return false;
1077 }
1078
1079 // Attempts to dequeue from the queue.
1080 // Returns false if all producer streams appeared empty at the time they
1081 // were checked (so, the queue is likely but not guaranteed to be empty).
1082 // This differs from the try_dequeue(item) method in that this one does
1083 // not attempt to reduce contention by interleaving the order that producer
1084 // streams are dequeued from. So, using this method can reduce overall throughput
1085 // under contention, but will give more predictable results in single-threaded
1086 // consumer scenarios. This is mostly only useful for internal unit tests.
1087 // Never allocates. Thread-safe.
1088 template<typename U>
1089 bool try_dequeue_non_interleaved(U &item) {
1090 for (auto ptr = producerListTail.load(std::memory_order_acquire);
1091 ptr != nullptr; ptr = ptr->next_prod()) {
1092 if (ptr->dequeue(item)) {
1093 return true;
1094 }
1095 }
1096 return false;
1097 }
1098
1099 // Attempts to dequeue from the queue using an explicit consumer token.
1100 // Returns false if all producer streams appeared empty at the time they
1101 // were checked (so, the queue is likely but not guaranteed to be empty).
1102 // Never allocates. Thread-safe.
1103 template<typename U>
1104 bool try_dequeue(consumer_token_t &token, U &item) {
1105 // The idea is roughly as follows:
1106 // Every 256 items from one producer, make everyone rotate (increase the global offset) -> this means the highest efficiency consumer dictates the rotation speed of everyone else, more or less
1107 // If you see that the global offset has changed, you must reset your consumption counter and move to your designated place
1108 // If there's no items where you're supposed to be, keep moving until you find a producer with some items
1109 // If the global offset has not changed but you've run out of items to consume, move over from your current position until you find an producer with something in it
1110
1111 if (token.desiredProducer == nullptr || token.lastKnownGlobalOffset !=
1112 globalExplicitConsumerOffset.load(
1113 std::memory_order_relaxed)) {
1114 if (!update_current_producer_after_rotation(token)) {
1115 return false;
1116 }
1117 }
1118
1119 // If there was at least one non-empty queue but it appears empty at the time
1120 // we try to dequeue from it, we need to make sure every queue's been tried
1121 if (static_cast<ProducerBase *>(token.currentProducer)->dequeue(item)) {
1122 if (++token.itemsConsumedFromCurrent == EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE) {
1123 globalExplicitConsumerOffset.fetch_add(1, std::memory_order_relaxed);
1124 }
1125 return true;
1126 }
1127
1128 auto tail = producerListTail.load(std::memory_order_acquire);
1129 auto ptr = static_cast<ProducerBase *>(token.currentProducer)->next_prod();
1130 if (ptr == nullptr) {
1131 ptr = tail;
1132 }
1133 while (ptr != static_cast<ProducerBase *>(token.currentProducer)) {
1134 if (ptr->dequeue(item)) {
1135 token.currentProducer = ptr;
1136 token.itemsConsumedFromCurrent = 1;
1137 return true;
1138 }
1139 ptr = ptr->next_prod();
1140 if (ptr == nullptr) {
1141 ptr = tail;
1142 }
1143 }
1144 return false;
1145 }
1146
1147 // Attempts to dequeue several elements from the queue.
1148 // Returns the number of items actually dequeued.
1149 // Returns 0 if all producer streams appeared empty at the time they
1150 // were checked (so, the queue is likely but not guaranteed to be empty).
1151 // Never allocates. Thread-safe.
1152 template<typename It>
1153 size_t try_dequeue_bulk(It itemFirst, size_t max) {
1154 size_t count = 0;
1155 for (auto ptr = producerListTail.load(std::memory_order_acquire);
1156 ptr != nullptr; ptr = ptr->next_prod()) {
1157 count += ptr->dequeue_bulk(itemFirst, max - count);
1158 if (count == max) {
1159 break;
1160 }
1161 }
1162 return count;
1163 }
1164
1165 // Attempts to dequeue several elements from the queue using an explicit consumer token.
1166 // Returns the number of items actually dequeued.
1167 // Returns 0 if all producer streams appeared empty at the time they
1168 // were checked (so, the queue is likely but not guaranteed to be empty).
1169 // Never allocates. Thread-safe.
1170 template<typename It>
1171 size_t try_dequeue_bulk(consumer_token_t &token, It itemFirst, size_t max) {
1172 if (token.desiredProducer == nullptr || token.lastKnownGlobalOffset !=
1173 globalExplicitConsumerOffset.load(
1174 std::memory_order_relaxed)) {
1175 if (!update_current_producer_after_rotation(token)) {
1176 return 0;
1177 }
1178 }
1179
1180 size_t count = static_cast<ProducerBase *>(token.currentProducer)->dequeue_bulk(itemFirst, max);
1181 if (count == max) {
1182 if ((token.itemsConsumedFromCurrent += static_cast<std::uint32_t>(max)) >=
1183 EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE) {
1184 globalExplicitConsumerOffset.fetch_add(1, std::memory_order_relaxed);
1185 }
1186 return max;
1187 }
1188 token.itemsConsumedFromCurrent += static_cast<std::uint32_t>(count);
1189 max -= count;
1190
1191 auto tail = producerListTail.load(std::memory_order_acquire);
1192 auto ptr = static_cast<ProducerBase *>(token.currentProducer)->next_prod();
1193 if (ptr == nullptr) {
1194 ptr = tail;
1195 }
1196 while (ptr != static_cast<ProducerBase *>(token.currentProducer)) {
1197 auto dequeued = ptr->dequeue_bulk(itemFirst, max);
1198 count += dequeued;
1199 if (dequeued != 0) {
1200 token.currentProducer = ptr;
1201 token.itemsConsumedFromCurrent = static_cast<std::uint32_t>(dequeued);
1202 }
1203 if (dequeued == max) {
1204 break;
1205 }
1206 max -= dequeued;
1207 ptr = ptr->next_prod();
1208 if (ptr == nullptr) {
1209 ptr = tail;
1210 }
1211 }
1212 return count;
1213 }
1214
1215
1216 // Attempts to dequeue from a specific producer's inner queue.
1217 // If you happen to know which producer you want to dequeue from, this
1218 // is significantly faster than using the general-case try_dequeue methods.
1219 // Returns false if the producer's queue appeared empty at the time it
1220 // was checked (so, the queue is likely but not guaranteed to be empty).
1221 // Never allocates. Thread-safe.
1222 template<typename U>
1223 inline bool try_dequeue_from_producer(producer_token_t const &producer, U &item) {
1224 return static_cast<ExplicitProducer *>(producer.producer)->dequeue(item);
1225 }
1226
1227 // Attempts to dequeue several elements from a specific producer's inner queue.
1228 // Returns the number of items actually dequeued.
1229 // If you happen to know which producer you want to dequeue from, this
1230 // is significantly faster than using the general-case try_dequeue methods.
1231 // Returns 0 if the producer's queue appeared empty at the time it
1232 // was checked (so, the queue is likely but not guaranteed to be empty).
1233 // Never allocates. Thread-safe.
1234 template<typename It>
1235 inline size_t
1236 try_dequeue_bulk_from_producer(producer_token_t const &producer, It itemFirst, size_t max) {
1237 return static_cast<ExplicitProducer *>(producer.producer)->dequeue_bulk(itemFirst, max);
1238 }
1239
1240
1241 // Returns an estimate of the total number of elements currently in the queue. This
1242 // estimate is only accurate if the queue has completely stabilized before it is called
1243 // (i.e. all enqueue and dequeue operations have completed and their memory effects are
1244 // visible on the calling thread, and no further operations start while this method is
1245 // being called).
1246 // Thread-safe.
1247 size_t size_approx() const {
1248 size_t size = 0;
1249 for (auto ptr = producerListTail.load(std::memory_order_acquire);
1250 ptr != nullptr; ptr = ptr->next_prod()) {
1251 size += ptr->size_approx();
1252 }
1253 return size;
1254 }
1255
1256
1257 // Returns true if the underlying atomic variables used by
1258 // the queue are lock-free (they should be on most platforms).
1259 // Thread-safe.
1260 static bool is_lock_free() {
1261 return
1262 details::static_is_lock_free<bool>::value == 2 &&
1263 details::static_is_lock_free<size_t>::value == 2 &&
1264 details::static_is_lock_free<std::uint32_t>::value == 2 &&
1265 details::static_is_lock_free<index_t>::value == 2 &&
1266 details::static_is_lock_free<void *>::value == 2 &&
1267 details::static_is_lock_free<typename details::thread_id_converter<details::thread_id_t>::thread_id_numeric_size_t>::value ==
1268 2;
1269 }
1270
1271
1272 private:
1273 friend struct ProducerToken;
1274 friend struct ConsumerToken;
1275 friend struct ExplicitProducer;
1276
1277 friend class ConcurrentQueueTests;
1278
1279 enum AllocationMode {
1280 CanAlloc, CannotAlloc
1281 };
1282
1283
1285 // Queue methods
1287
1288 template<AllocationMode canAlloc, typename U>
1289 inline bool inner_enqueue(producer_token_t const &token, U &&element) {
1290 return static_cast<ExplicitProducer *>(token.producer)->ConcurrentQueue::ExplicitProducer::template enqueue<canAlloc>(
1291 std::forward<U>(element));
1292 }
1293
1294 template<AllocationMode canAlloc, typename U>
1295 inline bool inner_enqueue(U &&element) {
1296 auto producer = get_or_add_implicit_producer();
1297 return producer == nullptr ? false
1298 : producer->ConcurrentQueue::ImplicitProducer::template enqueue<canAlloc>(
1299 std::forward<U>(element));
1300 }
1301
1302 template<AllocationMode canAlloc, typename It>
1303 inline bool inner_enqueue_bulk(producer_token_t const &token, It itemFirst, size_t count) {
1304 return static_cast<ExplicitProducer *>(token.producer)->ConcurrentQueue::ExplicitProducer::template enqueue_bulk<canAlloc>(
1305 itemFirst, count);
1306 }
1307
1308 template<AllocationMode canAlloc, typename It>
1309 inline bool inner_enqueue_bulk(It itemFirst, size_t count) {
1310 auto producer = get_or_add_implicit_producer();
1311 return producer == nullptr ? false
1312 : producer->ConcurrentQueue::ImplicitProducer::template enqueue_bulk<canAlloc>(
1313 itemFirst, count);
1314 }
1315
1316 inline bool update_current_producer_after_rotation(consumer_token_t &token) {
1317 // Ah, there's been a rotation, figure out where we should be!
1318 auto tail = producerListTail.load(std::memory_order_acquire);
1319 if (token.desiredProducer == nullptr && tail == nullptr) {
1320 return false;
1321 }
1322 auto prodCount = producerCount.load(std::memory_order_relaxed);
1323 auto globalOffset = globalExplicitConsumerOffset.load(std::memory_order_relaxed);
1324 if (details::unlikely(token.desiredProducer == nullptr)) {
1325 // Aha, first time we're dequeueing anything.
1326 // Figure out our local position
1327 // Note: offset is from start, not end, but we're traversing from end -- subtract from count first
1328 std::uint32_t offset = prodCount - 1 - (token.initialOffset % prodCount);
1329 token.desiredProducer = tail;
1330 for (std::uint32_t i = 0; i != offset; ++i) {
1331 token.desiredProducer = static_cast<ProducerBase *>(token.desiredProducer)->next_prod();
1332 if (token.desiredProducer == nullptr) {
1333 token.desiredProducer = tail;
1334 }
1335 }
1336 }
1337
1338 std::uint32_t delta = globalOffset - token.lastKnownGlobalOffset;
1339 if (delta >= prodCount) {
1340 delta = delta % prodCount;
1341 }
1342 for (std::uint32_t i = 0; i != delta; ++i) {
1343 token.desiredProducer = static_cast<ProducerBase *>(token.desiredProducer)->next_prod();
1344 if (token.desiredProducer == nullptr) {
1345 token.desiredProducer = tail;
1346 }
1347 }
1348
1349 token.lastKnownGlobalOffset = globalOffset;
1350 token.currentProducer = token.desiredProducer;
1351 token.itemsConsumedFromCurrent = 0;
1352 return true;
1353 }
1354
1355
1357 // Free list
1359
1360 template<typename N>
1361 struct FreeListNode {
1362 FreeListNode()
1363 : freeListRefs(0), freeListNext(nullptr) {}
1364
1365 std::atomic<std::uint32_t> freeListRefs;
1366 std::atomic<N *> freeListNext;
1367 };
1368
1369 // A simple CAS-based lock-free free list. Not the fastest thing in the world under heavy contention, but
1370 // simple and correct (assuming nodes are never freed until after the free list is destroyed), and fairly
1371 // speedy under low contention.
1372 template<typename N> // N must inherit FreeListNode or have the same fields (and initialization of them)
1373 struct FreeList {
1374 FreeList()
1375 : freeListHead(nullptr) {}
1376
1377 FreeList(FreeList &&other)
1378 : freeListHead(other.freeListHead.load(std::memory_order_relaxed)) {
1379 other.freeListHead.store(nullptr, std::memory_order_relaxed);
1380 }
1381
1382 void swap(FreeList &other) { details::swap_relaxed(freeListHead, other.freeListHead); }
1383
1384 FreeList(FreeList const &) MOODYCAMEL_DELETE_FUNCTION;
1385
1386 FreeList &operator=(FreeList const &) MOODYCAMEL_DELETE_FUNCTION;
1387
1388 inline void add(N *node) {
1389#if MCDBGQ_NOLOCKFREE_FREELIST
1390 debug::DebugLock lock(mutex);
1391#endif
1392 // We know that the should-be-on-freelist bit is 0 at this point, so it's safe to
1393 // set it using a fetch_add
1394 if (node->freeListRefs.fetch_add(SHOULD_BE_ON_FREELIST, std::memory_order_acq_rel) == 0) {
1395 // Oh look! We were the last ones referencing this node, and we know
1396 // we want to add it to the free list, so let's do it!
1397 add_knowing_refcount_is_zero(node);
1398 }
1399 }
1400
1401 inline N *try_get() {
1402#if MCDBGQ_NOLOCKFREE_FREELIST
1403 debug::DebugLock lock(mutex);
1404#endif
1405 auto head = freeListHead.load(std::memory_order_acquire);
1406 while (head != nullptr) {
1407 auto prevHead = head;
1408 auto refs = head->freeListRefs.load(std::memory_order_relaxed);
1409 if ((refs & REFS_MASK) == 0 ||
1410 !head->freeListRefs.compare_exchange_strong(refs, refs + 1, std::memory_order_acquire,
1411 std::memory_order_relaxed)) {
1412 head = freeListHead.load(std::memory_order_acquire);
1413 continue;
1414 }
1415
1416 // Good, reference count has been incremented (it wasn't at zero), which means we can read the
1417 // next and not worry about it changing between now and the time we do the CAS
1418 auto next = head->freeListNext.load(std::memory_order_relaxed);
1419 if (freeListHead.compare_exchange_strong(head, next, std::memory_order_acquire,
1420 std::memory_order_relaxed)) {
1421 // Yay, got the node. This means it was on the list, which means shouldBeOnFreeList must be false no
1422 // matter the refcount (because nobody else knows it's been taken off yet, it can't have been put back on).
1423 assert((head->freeListRefs.load(std::memory_order_relaxed) & SHOULD_BE_ON_FREELIST) == 0);
1424
1425 // Decrease refcount twice, once for our ref, and once for the list's ref
1426 head->freeListRefs.fetch_sub(2, std::memory_order_release);
1427 return head;
1428 }
1429
1430 // OK, the head must have changed on us, but we still need to decrease the refcount we increased.
1431 // Note that we don't need to release any memory effects, but we do need to ensure that the reference
1432 // count decrement happens-after the CAS on the head.
1433 refs = prevHead->freeListRefs.fetch_sub(1, std::memory_order_acq_rel);
1434 if (refs == SHOULD_BE_ON_FREELIST + 1) {
1435 add_knowing_refcount_is_zero(prevHead);
1436 }
1437 }
1438
1439 return nullptr;
1440 }
1441
1442 // Useful for traversing the list when there's no contention (e.g. to destroy remaining nodes)
1443 N *head_unsafe() const { return freeListHead.load(std::memory_order_relaxed); }
1444
1445 private:
1446 inline void add_knowing_refcount_is_zero(N *node) {
1447 // Since the refcount is zero, and nobody can increase it once it's zero (except us, and we run
1448 // only one copy of this method per node at a time, i.e. the single thread case), then we know
1449 // we can safely change the next pointer of the node; however, once the refcount is back above
1450 // zero, then other threads could increase it (happens under heavy contention, when the refcount
1451 // goes to zero in between a load and a refcount increment of a node in try_get, then back up to
1452 // something non-zero, then the refcount increment is done by the other thread) -- so, if the CAS
1453 // to add the node to the actual list fails, decrease the refcount and leave the add operation to
1454 // the next thread who puts the refcount back at zero (which could be us, hence the loop).
1455 auto head = freeListHead.load(std::memory_order_relaxed);
1456 while (true) {
1457 node->freeListNext.store(head, std::memory_order_relaxed);
1458 node->freeListRefs.store(1, std::memory_order_release);
1459 if (!freeListHead.compare_exchange_strong(head, node, std::memory_order_release,
1460 std::memory_order_relaxed)) {
1461 // Hmm, the add failed, but we can only try again when the refcount goes back to zero
1462 if (node->freeListRefs.fetch_add(SHOULD_BE_ON_FREELIST - 1, std::memory_order_release) ==
1463 1) {
1464 continue;
1465 }
1466 }
1467 return;
1468 }
1469 }
1470
1471 private:
1472 // Implemented like a stack, but where node order doesn't matter (nodes are inserted out of order under contention)
1473 std::atomic<N *> freeListHead;
1474
1475 static const std::uint32_t REFS_MASK = 0x7FFFFFFF;
1476 static const std::uint32_t SHOULD_BE_ON_FREELIST = 0x80000000;
1477
1478#if MCDBGQ_NOLOCKFREE_FREELIST
1479 debug::DebugMutex mutex;
1480#endif
1481 };
1482
1483
1485 // Block
1487
1488 enum InnerQueueContext {
1489 implicit_context = 0, explicit_context = 1
1490 };
1491
1492 struct Block {
1493 Block()
1494 : next(nullptr), elementsCompletelyDequeued(0), freeListRefs(0), freeListNext(nullptr)
1495 , shouldBeOnFreeList(false), dynamicallyAllocated(true) {
1496#if MCDBGQ_TRACKMEM
1497 owner = nullptr;
1498#endif
1499 }
1500
1501 template<InnerQueueContext context>
1502 inline bool is_empty() const {
1503 if (context == explicit_context && BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD) {
1504 // Check flags
1505 for (size_t i = 0; i < BLOCK_SIZE; ++i) {
1506 if (!emptyFlags[i].load(std::memory_order_relaxed)) {
1507 return false;
1508 }
1509 }
1510
1511 // Aha, empty; make sure we have all other memory effects that happened before the empty flags were set
1512 std::atomic_thread_fence(std::memory_order_acquire);
1513 return true;
1514 } else {
1515 // Check counter
1516 if (elementsCompletelyDequeued.load(std::memory_order_relaxed) == BLOCK_SIZE) {
1517 std::atomic_thread_fence(std::memory_order_acquire);
1518 return true;
1519 }
1520 assert(elementsCompletelyDequeued.load(std::memory_order_relaxed) <= BLOCK_SIZE);
1521 return false;
1522 }
1523 }
1524
1525 // Returns true if the block is now empty (does not apply in explicit context)
1526 template<InnerQueueContext context>
1527 inline bool set_empty(index_t i) {
1528 if (context == explicit_context && BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD) {
1529 // Set flag
1530 assert(!emptyFlags[BLOCK_SIZE - 1 -
1531 static_cast<size_t>(i & static_cast<index_t>(BLOCK_SIZE - 1))].load(
1532 std::memory_order_relaxed));
1533 emptyFlags[BLOCK_SIZE - 1 -
1534 static_cast<size_t>(i & static_cast<index_t>(BLOCK_SIZE - 1))].store(true,
1535 std::memory_order_release);
1536 return false;
1537 } else {
1538 // Increment counter
1539 auto prevVal = elementsCompletelyDequeued.fetch_add(1, std::memory_order_release);
1540 assert(prevVal < BLOCK_SIZE);
1541 return prevVal == BLOCK_SIZE - 1;
1542 }
1543 }
1544
1545 // Sets multiple contiguous item statuses to 'empty' (assumes no wrapping and count > 0).
1546 // Returns true if the block is now empty (does not apply in explicit context).
1547 template<InnerQueueContext context>
1548 inline bool set_many_empty(index_t i, size_t count) {
1549 if (context == explicit_context && BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD) {
1550 // Set flags
1551 std::atomic_thread_fence(std::memory_order_release);
1552 i = BLOCK_SIZE - 1 - static_cast<size_t>(i & static_cast<index_t>(BLOCK_SIZE - 1)) - count +
1553 1;
1554 for (size_t j = 0; j != count; ++j) {
1555 assert(!emptyFlags[i + j].load(std::memory_order_relaxed));
1556 emptyFlags[i + j].store(true, std::memory_order_relaxed);
1557 }
1558 return false;
1559 } else {
1560 // Increment counter
1561 auto prevVal = elementsCompletelyDequeued.fetch_add(count, std::memory_order_release);
1562 assert(prevVal + count <= BLOCK_SIZE);
1563 return prevVal + count == BLOCK_SIZE;
1564 }
1565 }
1566
1567 template<InnerQueueContext context>
1568 inline void set_all_empty() {
1569 if (context == explicit_context && BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD) {
1570 // Set all flags
1571 for (size_t i = 0; i != BLOCK_SIZE; ++i) {
1572 emptyFlags[i].store(true, std::memory_order_relaxed);
1573 }
1574 } else {
1575 // Reset counter
1576 elementsCompletelyDequeued.store(BLOCK_SIZE, std::memory_order_relaxed);
1577 }
1578 }
1579
1580 template<InnerQueueContext context>
1581 inline void reset_empty() {
1582 if (context == explicit_context && BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD) {
1583 // Reset flags
1584 for (size_t i = 0; i != BLOCK_SIZE; ++i) {
1585 emptyFlags[i].store(false, std::memory_order_relaxed);
1586 }
1587 } else {
1588 // Reset counter
1589 elementsCompletelyDequeued.store(0, std::memory_order_relaxed);
1590 }
1591 }
1592
1593 inline T *operator[](index_t idx) MOODYCAMEL_NOEXCEPT {
1594 return static_cast<T *>(static_cast<void *>(elements)) +
1595 static_cast<size_t>(idx & static_cast<index_t>(BLOCK_SIZE - 1));
1596 }
1597
1598 inline T const *operator[](index_t idx) const MOODYCAMEL_NOEXCEPT {
1599 return static_cast<T const *>(static_cast<void const *>(elements)) +
1600 static_cast<size_t>(idx & static_cast<index_t>(BLOCK_SIZE - 1));
1601 }
1602
1603 private:
1604 // IMPORTANT: This must be the first member in Block, so that if T depends on the alignment of
1605 // addresses returned by malloc, that alignment will be preserved. Apparently clang actually
1606 // generates code that uses this assumption for AVX instructions in some cases. Ideally, we
1607 // should also align Block to the alignment of T in case it's higher than malloc's 16-byte
1608 // alignment, but this is hard to do in a cross-platform way. Assert for this case:
1609 static_assert(std::alignment_of<T>::value <= std::alignment_of<details::max_align_t>::value,
1610 "The queue does not support super-aligned types at this time");
1611 // Additionally, we need the alignment of Block itself to be a multiple of max_align_t since
1612 // otherwise the appropriate padding will not be added at the end of Block in order to make
1613 // arrays of Blocks all be properly aligned (not just the first one). We use a union to force
1614 // this.
1615 union {
1616 char elements[sizeof(T) * BLOCK_SIZE];
1617 details::max_align_t dummy;
1618 };
1619 public:
1620 Block *next;
1621 std::atomic<size_t> elementsCompletelyDequeued;
1622 std::atomic<bool> emptyFlags[
1623 BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD ? BLOCK_SIZE : 1];
1624 public:
1625 std::atomic<std::uint32_t> freeListRefs;
1626 std::atomic<Block *> freeListNext;
1627 std::atomic<bool> shouldBeOnFreeList;
1628 bool dynamicallyAllocated; // Perhaps a better name for this would be 'isNotPartOfInitialBlockPool'
1629
1630#if MCDBGQ_TRACKMEM
1631 void* owner;
1632#endif
1633 };
1634
1635 static_assert(std::alignment_of<Block>::value >= std::alignment_of<details::max_align_t>::value,
1636 "Internal error: Blocks must be at least as aligned as the type they are wrapping");
1637
1638
1639#if MCDBGQ_TRACKMEM
1640 public:
1641 struct MemStats;
1642 private:
1643#endif
1644
1646 // Producer base
1648
1649 struct ProducerBase : public details::ConcurrentQueueProducerTypelessBase {
1650 ProducerBase(ConcurrentQueue *parent_, bool isExplicit_)
1651 :
1652 tailIndex(0), headIndex(0), dequeueOptimisticCount(0), dequeueOvercommit(0), tailBlock(
1653 nullptr), isExplicit(isExplicit_), parent(parent_) {
1654 }
1655
1656 virtual ~ProducerBase() {};
1657
1658 template<typename U>
1659 inline bool dequeue(U &element) {
1660 if (isExplicit) {
1661 return static_cast<ExplicitProducer *>(this)->dequeue(element);
1662 } else {
1663 return static_cast<ImplicitProducer *>(this)->dequeue(element);
1664 }
1665 }
1666
1667 template<typename It>
1668 inline size_t dequeue_bulk(It &itemFirst, size_t max) {
1669 if (isExplicit) {
1670 return static_cast<ExplicitProducer *>(this)->dequeue_bulk(itemFirst, max);
1671 } else {
1672 return static_cast<ImplicitProducer *>(this)->dequeue_bulk(itemFirst, max);
1673 }
1674 }
1675
1676 inline ProducerBase *next_prod() const { return static_cast<ProducerBase *>(next); }
1677
1678 inline size_t size_approx() const {
1679 auto tail = tailIndex.load(std::memory_order_relaxed);
1680 auto head = headIndex.load(std::memory_order_relaxed);
1681 return details::circular_less_than(head, tail) ? static_cast<size_t>(tail - head) : 0;
1682 }
1683
1684 inline index_t getTail() const { return tailIndex.load(std::memory_order_relaxed); }
1685
1686 protected:
1687 std::atomic<index_t> tailIndex; // Where to enqueue to next
1688 std::atomic<index_t> headIndex; // Where to dequeue from next
1689
1690 std::atomic<index_t> dequeueOptimisticCount;
1691 std::atomic<index_t> dequeueOvercommit;
1692
1693 Block *tailBlock;
1694
1695 public:
1696 bool isExplicit;
1697 ConcurrentQueue *parent;
1698
1699 protected:
1700#if MCDBGQ_TRACKMEM
1701 friend struct MemStats;
1702#endif
1703 };
1704
1705
1707 // Explicit queue
1709
1710 struct ExplicitProducer : public ProducerBase {
1711 explicit ExplicitProducer(ConcurrentQueue *parent)
1712 :
1713 ProducerBase(parent, true), blockIndex(nullptr), pr_blockIndexSlotsUsed(0), pr_blockIndexSize(
1714 EXPLICIT_INITIAL_INDEX_SIZE >> 1), pr_blockIndexFront(0), pr_blockIndexEntries(nullptr)
1715 , pr_blockIndexRaw(nullptr) {
1716 size_t poolBasedIndexSize = details::ceil_to_pow_2(parent->initialBlockPoolSize) >> 1;
1717 if (poolBasedIndexSize > pr_blockIndexSize) {
1718 pr_blockIndexSize = poolBasedIndexSize;
1719 }
1720
1721 new_block_index(
1722 0); // This creates an index with double the number of current entries, i.e. EXPLICIT_INITIAL_INDEX_SIZE
1723 }
1724
1725 ~ExplicitProducer() {
1726 // Destruct any elements not yet dequeued.
1727 // Since we're in the destructor, we can assume all elements
1728 // are either completely dequeued or completely not (no halfways).
1729 if (this->tailBlock != nullptr) { // Note this means there must be a block index too
1730 // First find the block that's partially dequeued, if any
1731 Block *halfDequeuedBlock = nullptr;
1732 if ((this->headIndex.load(std::memory_order_relaxed) &
1733 static_cast<index_t>(BLOCK_SIZE - 1)) != 0) {
1734 // The head's not on a block boundary, meaning a block somewhere is partially dequeued
1735 // (or the head block is the tail block and was fully dequeued, but the head/tail are still not on a boundary)
1736 size_t i = (pr_blockIndexFront - pr_blockIndexSlotsUsed) & (pr_blockIndexSize - 1);
1737 while (details::circular_less_than<index_t>(pr_blockIndexEntries[i].base + BLOCK_SIZE,
1738 this->headIndex.load(
1739 std::memory_order_relaxed))) {
1740 i = (i + 1) & (pr_blockIndexSize - 1);
1741 }
1742 assert(details::circular_less_than<index_t>(pr_blockIndexEntries[i].base,
1743 this->headIndex.load(
1744 std::memory_order_relaxed)));
1745 halfDequeuedBlock = pr_blockIndexEntries[i].block;
1746 }
1747
1748 // Start at the head block (note the first line in the loop gives us the head from the tail on the first iteration)
1749 auto block = this->tailBlock;
1750 do {
1751 block = block->next;
1752 if (block->template is_empty<explicit_context>()) {
1753 continue;
1754 }
1755
1756 size_t i = 0; // Offset into block
1757 if (block == halfDequeuedBlock) {
1758 i = static_cast<size_t>(this->headIndex.load(std::memory_order_relaxed) &
1759 static_cast<index_t>(BLOCK_SIZE - 1));
1760 }
1761
1762 // Walk through all the items in the block; if this is the tail block, we need to stop when we reach the tail index
1763 auto lastValidIndex = (this->tailIndex.load(std::memory_order_relaxed) &
1764 static_cast<index_t>(BLOCK_SIZE - 1)) == 0 ? BLOCK_SIZE
1765 : static_cast<size_t>(
1766 this->tailIndex.load(std::memory_order_relaxed) &
1767 static_cast<index_t>(BLOCK_SIZE - 1));
1768 while (i != BLOCK_SIZE && (block != this->tailBlock || i != lastValidIndex)) {
1769 (*block)[i++]->~T();
1770 }
1771 } while (block != this->tailBlock);
1772 }
1773
1774 // Destroy all blocks that we own
1775 if (this->tailBlock != nullptr) {
1776 auto block = this->tailBlock;
1777 do {
1778 auto nextBlock = block->next;
1779 if (block->dynamicallyAllocated) {
1780 destroy(block);
1781 } else {
1782 this->parent->add_block_to_free_list(block);
1783 }
1784 block = nextBlock;
1785 } while (block != this->tailBlock);
1786 }
1787
1788 // Destroy the block indices
1789 auto header = static_cast<BlockIndexHeader *>(pr_blockIndexRaw);
1790 while (header != nullptr) {
1791 auto prev = static_cast<BlockIndexHeader *>(header->prev);
1792 header->~BlockIndexHeader();
1793 (Traits::free)(header);
1794 header = prev;
1795 }
1796 }
1797
1798 template<AllocationMode allocMode, typename U>
1799 inline bool enqueue(U &&element) {
1800 index_t currentTailIndex = this->tailIndex.load(std::memory_order_relaxed);
1801 index_t newTailIndex = 1 + currentTailIndex;
1802 if ((currentTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0) {
1803 // We reached the end of a block, start a new one
1804 auto startBlock = this->tailBlock;
1805 auto originalBlockIndexSlotsUsed = pr_blockIndexSlotsUsed;
1806 if (this->tailBlock != nullptr &&
1807 this->tailBlock->next->template is_empty<explicit_context>()) {
1808 // We can re-use the block ahead of us, it's empty!
1809 this->tailBlock = this->tailBlock->next;
1810 this->tailBlock->template reset_empty<explicit_context>();
1811
1812 // We'll put the block on the block index (guaranteed to be room since we're conceptually removing the
1813 // last block from it first -- except instead of removing then adding, we can just overwrite).
1814 // Note that there must be a valid block index here, since even if allocation failed in the ctor,
1815 // it would have been re-attempted when adding the first block to the queue; since there is such
1816 // a block, a block index must have been successfully allocated.
1817 } else {
1818 // Whatever head value we see here is >= the last value we saw here (relatively),
1819 // and <= its current value. Since we have the most recent tail, the head must be
1820 // <= to it.
1821 auto head = this->headIndex.load(std::memory_order_relaxed);
1822 assert(!details::circular_less_than<index_t>(currentTailIndex, head));
1823 if (!details::circular_less_than<index_t>(head, currentTailIndex + BLOCK_SIZE)
1824 || (MAX_SUBQUEUE_SIZE != details::const_numeric_max<size_t>::value &&
1825 (MAX_SUBQUEUE_SIZE == 0 ||
1826 MAX_SUBQUEUE_SIZE - BLOCK_SIZE < currentTailIndex - head))) {
1827 // We can't enqueue in another block because there's not enough leeway -- the
1828 // tail could surpass the head by the time the block fills up! (Or we'll exceed
1829 // the size limit, if the second part of the condition was true.)
1830 return false;
1831 }
1832 // We're going to need a new block; check that the block index has room
1833 if (pr_blockIndexRaw == nullptr || pr_blockIndexSlotsUsed == pr_blockIndexSize) {
1834 // Hmm, the circular block index is already full -- we'll need
1835 // to allocate a new index. Note pr_blockIndexRaw can only be nullptr if
1836 // the initial allocation failed in the constructor.
1837
1838 if (allocMode == CannotAlloc || !new_block_index(pr_blockIndexSlotsUsed)) {
1839 return false;
1840 }
1841 }
1842
1843 // Insert a new block in the circular linked list
1844 auto newBlock = this->parent->ConcurrentQueue::template requisition_block<allocMode>();
1845 if (newBlock == nullptr) {
1846 return false;
1847 }
1848#if MCDBGQ_TRACKMEM
1849 newBlock->owner = this;
1850#endif
1851 newBlock->template reset_empty<explicit_context>();
1852 if (this->tailBlock == nullptr) {
1853 newBlock->next = newBlock;
1854 } else {
1855 newBlock->next = this->tailBlock->next;
1856 this->tailBlock->next = newBlock;
1857 }
1858 this->tailBlock = newBlock;
1859 ++pr_blockIndexSlotsUsed;
1860 }
1861
1862 if (!MOODYCAMEL_NOEXCEPT_CTOR(T, U, new(nullptr) T(std::forward<U>(element)))) {
1863 // The constructor may throw. We want the element not to appear in the queue in
1864 // that case (without corrupting the queue):
1865 MOODYCAMEL_TRY {
1866 new((*this->tailBlock)[currentTailIndex]) T(std::forward<U>(element));
1867 }
1868 MOODYCAMEL_CATCH (...) {
1869 // Revert change to the current block, but leave the new block available
1870 // for next time
1871 pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed;
1872 this->tailBlock = startBlock == nullptr ? this->tailBlock : startBlock;
1873 MOODYCAMEL_RETHROW;
1874 }
1875 } else {
1876 (void) startBlock;
1877 (void) originalBlockIndexSlotsUsed;
1878 }
1879
1880 // Add block to block index
1881 auto &entry = blockIndex.load(std::memory_order_relaxed)->entries[pr_blockIndexFront];
1882 entry.base = currentTailIndex;
1883 entry.block = this->tailBlock;
1884 blockIndex.load(std::memory_order_relaxed)->front.store(pr_blockIndexFront,
1885 std::memory_order_release);
1886 pr_blockIndexFront = (pr_blockIndexFront + 1) & (pr_blockIndexSize - 1);
1887
1888 if (!MOODYCAMEL_NOEXCEPT_CTOR(T, U, new(nullptr) T(std::forward<U>(element)))) {
1889 this->tailIndex.store(newTailIndex, std::memory_order_release);
1890 return true;
1891 }
1892 }
1893
1894 // Enqueue
1895 new((*this->tailBlock)[currentTailIndex]) T(std::forward<U>(element));
1896
1897 this->tailIndex.store(newTailIndex, std::memory_order_release);
1898 return true;
1899 }
1900
1901 template<typename U>
1902 bool dequeue(U &element) {
1903 auto tail = this->tailIndex.load(std::memory_order_relaxed);
1904 auto overcommit = this->dequeueOvercommit.load(std::memory_order_relaxed);
1905 if (details::circular_less_than<index_t>(
1906 this->dequeueOptimisticCount.load(std::memory_order_relaxed) - overcommit, tail)) {
1907 // Might be something to dequeue, let's give it a try
1908
1909 // Note that this if is purely for performance purposes in the common case when the queue is
1910 // empty and the values are eventually consistent -- we may enter here spuriously.
1911
1912 // Note that whatever the values of overcommit and tail are, they are not going to change (unless we
1913 // change them) and must be the same value at this point (inside the if) as when the if condition was
1914 // evaluated.
1915
1916 // We insert an acquire fence here to synchronize-with the release upon incrementing dequeueOvercommit below.
1917 // This ensures that whatever the value we got loaded into overcommit, the load of dequeueOptisticCount in
1918 // the fetch_add below will result in a value at least as recent as that (and therefore at least as large).
1919 // Note that I believe a compiler (signal) fence here would be sufficient due to the nature of fetch_add (all
1920 // read-modify-write operations are guaranteed to work on the latest value in the modification order), but
1921 // unfortunately that can't be shown to be correct using only the C++11 standard.
1922 // See http://stackoverflow.com/questions/18223161/what-are-the-c11-memory-ordering-guarantees-in-this-corner-case
1923 std::atomic_thread_fence(std::memory_order_acquire);
1924
1925 // Increment optimistic counter, then check if it went over the boundary
1926 auto myDequeueCount = this->dequeueOptimisticCount.fetch_add(1, std::memory_order_relaxed);
1927
1928 // Note that since dequeueOvercommit must be <= dequeueOptimisticCount (because dequeueOvercommit is only ever
1929 // incremented after dequeueOptimisticCount -- this is enforced in the `else` block below), and since we now
1930 // have a version of dequeueOptimisticCount that is at least as recent as overcommit (due to the release upon
1931 // incrementing dequeueOvercommit and the acquire above that synchronizes with it), overcommit <= myDequeueCount.
1932 assert(overcommit <= myDequeueCount);
1933
1934 // Note that we reload tail here in case it changed; it will be the same value as before or greater, since
1935 // this load is sequenced after (happens after) the earlier load above. This is supported by read-read
1936 // coherency (as defined in the standard), explained here: http://en.cppreference.com/w/cpp/atomic/memory_order
1937 tail = this->tailIndex.load(std::memory_order_acquire);
1938 if (details::likely(
1939 details::circular_less_than<index_t>(myDequeueCount - overcommit, tail))) {
1940 // Guaranteed to be at least one element to dequeue!
1941
1942 // Get the index. Note that since there's guaranteed to be at least one element, this
1943 // will never exceed tail. We need to do an acquire-release fence here since it's possible
1944 // that whatever condition got us to this point was for an earlier enqueued element (that
1945 // we already see the memory effects for), but that by the time we increment somebody else
1946 // has incremented it, and we need to see the memory effects for *that* element, which is
1947 // in such a case is necessarily visible on the thread that incremented it in the first
1948 // place with the more current condition (they must have acquired a tail that is at least
1949 // as recent).
1950 auto index = this->headIndex.fetch_add(1, std::memory_order_acq_rel);
1951
1952
1953 // Determine which block the element is in
1954
1955 auto localBlockIndex = blockIndex.load(std::memory_order_acquire);
1956 auto localBlockIndexHead = localBlockIndex->front.load(std::memory_order_acquire);
1957
1958 // We need to be careful here about subtracting and dividing because of index wrap-around.
1959 // When an index wraps, we need to preserve the sign of the offset when dividing it by the
1960 // block size (in order to get a correct signed block count offset in all cases):
1961 auto headBase = localBlockIndex->entries[localBlockIndexHead].base;
1962 auto blockBaseIndex = index & ~static_cast<index_t>(BLOCK_SIZE - 1);
1963 auto offset = static_cast<size_t>(
1964 static_cast<typename std::make_signed<index_t>::type>(blockBaseIndex - headBase) /
1965 BLOCK_SIZE);
1966 auto block = localBlockIndex->entries[(localBlockIndexHead + offset) &
1967 (localBlockIndex->size - 1)].block;
1968
1969 // Dequeue
1970 auto &el = *((*block)[index]);
1971 if (!MOODYCAMEL_NOEXCEPT_ASSIGN(T, T &&, element = std::move(el))) {
1972 // Make sure the element is still fully dequeued and destroyed even if the assignment
1973 // throws
1974 struct Guard {
1975 Block *block;
1976 index_t index;
1977
1978 ~Guard() {
1979 (*block)[index]->~T();
1980 block->template set_empty<explicit_context>(index);
1981 }
1982 } guard = {block, index};
1983
1984 element = std::move(el);
1985 } else {
1986 element = std::move(el);
1987 el.~T();
1988 block->template set_empty<explicit_context>(index);
1989 }
1990
1991 return true;
1992 } else {
1993 // Wasn't anything to dequeue after all; make the effective dequeue count eventually consistent
1994 this->dequeueOvercommit.fetch_add(1,
1995 std::memory_order_release); // Release so that the fetch_add on dequeueOptimisticCount is guaranteed to happen before this write
1996 }
1997 }
1998
1999 return false;
2000 }
2001
2002 template<AllocationMode allocMode, typename It>
2003 bool enqueue_bulk(It itemFirst, size_t count) {
2004 // First, we need to make sure we have enough room to enqueue all of the elements;
2005 // this means pre-allocating blocks and putting them in the block index (but only if
2006 // all the allocations succeeded).
2007 index_t startTailIndex = this->tailIndex.load(std::memory_order_relaxed);
2008 auto startBlock = this->tailBlock;
2009 auto originalBlockIndexFront = pr_blockIndexFront;
2010 auto originalBlockIndexSlotsUsed = pr_blockIndexSlotsUsed;
2011
2012 Block *firstAllocatedBlock = nullptr;
2013
2014 // Figure out how many blocks we'll need to allocate, and do so
2015 size_t blockBaseDiff =
2016 ((startTailIndex + count - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1)) -
2017 ((startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1));
2018 index_t currentTailIndex = (startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1);
2019 if (blockBaseDiff > 0) {
2020 // Allocate as many blocks as possible from ahead
2021 while (blockBaseDiff > 0 && this->tailBlock != nullptr &&
2022 this->tailBlock->next != firstAllocatedBlock &&
2023 this->tailBlock->next->template is_empty<explicit_context>()) {
2024 blockBaseDiff -= static_cast<index_t>(BLOCK_SIZE);
2025 currentTailIndex += static_cast<index_t>(BLOCK_SIZE);
2026
2027 this->tailBlock = this->tailBlock->next;
2028 firstAllocatedBlock =
2029 firstAllocatedBlock == nullptr ? this->tailBlock : firstAllocatedBlock;
2030
2031 auto &entry = blockIndex.load(std::memory_order_relaxed)->entries[pr_blockIndexFront];
2032 entry.base = currentTailIndex;
2033 entry.block = this->tailBlock;
2034 pr_blockIndexFront = (pr_blockIndexFront + 1) & (pr_blockIndexSize - 1);
2035 }
2036
2037 // Now allocate as many blocks as necessary from the block pool
2038 while (blockBaseDiff > 0) {
2039 blockBaseDiff -= static_cast<index_t>(BLOCK_SIZE);
2040 currentTailIndex += static_cast<index_t>(BLOCK_SIZE);
2041
2042 auto head = this->headIndex.load(std::memory_order_relaxed);
2043 assert(!details::circular_less_than<index_t>(currentTailIndex, head));
2044 bool full = !details::circular_less_than<index_t>(head, currentTailIndex + BLOCK_SIZE) ||
2045 (MAX_SUBQUEUE_SIZE != details::const_numeric_max<size_t>::value &&
2046 (MAX_SUBQUEUE_SIZE == 0 ||
2047 MAX_SUBQUEUE_SIZE - BLOCK_SIZE < currentTailIndex - head));
2048 if (pr_blockIndexRaw == nullptr || pr_blockIndexSlotsUsed == pr_blockIndexSize || full) {
2049 if (allocMode == CannotAlloc || full || !new_block_index(originalBlockIndexSlotsUsed)) {
2050 // Failed to allocate, undo changes (but keep injected blocks)
2051 pr_blockIndexFront = originalBlockIndexFront;
2052 pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed;
2053 this->tailBlock = startBlock == nullptr ? firstAllocatedBlock : startBlock;
2054 return false;
2055 }
2056
2057 // pr_blockIndexFront is updated inside new_block_index, so we need to
2058 // update our fallback value too (since we keep the new index even if we
2059 // later fail)
2060 originalBlockIndexFront = originalBlockIndexSlotsUsed;
2061 }
2062
2063 // Insert a new block in the circular linked list
2064 auto newBlock = this->parent->ConcurrentQueue::template requisition_block<allocMode>();
2065 if (newBlock == nullptr) {
2066 pr_blockIndexFront = originalBlockIndexFront;
2067 pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed;
2068 this->tailBlock = startBlock == nullptr ? firstAllocatedBlock : startBlock;
2069 return false;
2070 }
2071
2072#if MCDBGQ_TRACKMEM
2073 newBlock->owner = this;
2074#endif
2075 newBlock->template set_all_empty<explicit_context>();
2076 if (this->tailBlock == nullptr) {
2077 newBlock->next = newBlock;
2078 } else {
2079 newBlock->next = this->tailBlock->next;
2080 this->tailBlock->next = newBlock;
2081 }
2082 this->tailBlock = newBlock;
2083 firstAllocatedBlock =
2084 firstAllocatedBlock == nullptr ? this->tailBlock : firstAllocatedBlock;
2085
2086 ++pr_blockIndexSlotsUsed;
2087
2088 auto &entry = blockIndex.load(std::memory_order_relaxed)->entries[pr_blockIndexFront];
2089 entry.base = currentTailIndex;
2090 entry.block = this->tailBlock;
2091 pr_blockIndexFront = (pr_blockIndexFront + 1) & (pr_blockIndexSize - 1);
2092 }
2093
2094 // Excellent, all allocations succeeded. Reset each block's emptiness before we fill them up, and
2095 // publish the new block index front
2096 auto block = firstAllocatedBlock;
2097 while (true) {
2098 block->template reset_empty<explicit_context>();
2099 if (block == this->tailBlock) {
2100 break;
2101 }
2102 block = block->next;
2103 }
2104
2105 if (MOODYCAMEL_NOEXCEPT_CTOR(T, decltype(*itemFirst),
2106 new(nullptr) T(details::deref_noexcept(itemFirst)))) {
2107 blockIndex.load(std::memory_order_relaxed)->front.store(
2108 (pr_blockIndexFront - 1) & (pr_blockIndexSize - 1), std::memory_order_release);
2109 }
2110 }
2111
2112 // Enqueue, one block at a time
2113 index_t newTailIndex = startTailIndex + static_cast<index_t>(count);
2114 currentTailIndex = startTailIndex;
2115 auto endBlock = this->tailBlock;
2116 this->tailBlock = startBlock;
2117 assert((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) != 0 ||
2118 firstAllocatedBlock != nullptr || count == 0);
2119 if ((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0 &&
2120 firstAllocatedBlock != nullptr) {
2121 this->tailBlock = firstAllocatedBlock;
2122 }
2123 while (true) {
2124 auto stopIndex = (currentTailIndex & ~static_cast<index_t>(BLOCK_SIZE - 1)) +
2125 static_cast<index_t>(BLOCK_SIZE);
2126 if (details::circular_less_than<index_t>(newTailIndex, stopIndex)) {
2127 stopIndex = newTailIndex;
2128 }
2129 if (MOODYCAMEL_NOEXCEPT_CTOR(T, decltype(*itemFirst),
2130 new(nullptr) T(details::deref_noexcept(itemFirst)))) {
2131 while (currentTailIndex != stopIndex) {
2132 new((*this->tailBlock)[currentTailIndex++]) T(*itemFirst++);
2133 }
2134 } else {
2135 MOODYCAMEL_TRY {
2136 while (currentTailIndex != stopIndex) {
2137 // Must use copy constructor even if move constructor is available
2138 // because we may have to revert if there's an exception.
2139 // Sorry about the horrible templated next line, but it was the only way
2140 // to disable moving *at compile time*, which is important because a type
2141 // may only define a (noexcept) move constructor, and so calls to the
2142 // cctor will not compile, even if they are in an if branch that will never
2143 // be executed
2144 new((*this->tailBlock)[currentTailIndex]) T(
2145 details::nomove_if<(bool) !MOODYCAMEL_NOEXCEPT_CTOR(T, decltype(*itemFirst),
2146 new(nullptr) T(
2147 details::deref_noexcept(
2148 itemFirst)))>::eval(
2149 *itemFirst));
2150 ++currentTailIndex;
2151 ++itemFirst;
2152 }
2153 }
2154 MOODYCAMEL_CATCH (...) {
2155 // Oh dear, an exception's been thrown -- destroy the elements that
2156 // were enqueued so far and revert the entire bulk operation (we'll keep
2157 // any allocated blocks in our linked list for later, though).
2158 auto constructedStopIndex = currentTailIndex;
2159 auto lastBlockEnqueued = this->tailBlock;
2160
2161 pr_blockIndexFront = originalBlockIndexFront;
2162 pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed;
2163 this->tailBlock = startBlock == nullptr ? firstAllocatedBlock : startBlock;
2164
2165 if (!details::is_trivially_destructible<T>::value) {
2166 auto block = startBlock;
2167 if ((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0) {
2168 block = firstAllocatedBlock;
2169 }
2170 currentTailIndex = startTailIndex;
2171 while (true) {
2172 stopIndex = (currentTailIndex & ~static_cast<index_t>(BLOCK_SIZE - 1)) +
2173 static_cast<index_t>(BLOCK_SIZE);
2174 if (details::circular_less_than<index_t>(constructedStopIndex, stopIndex)) {
2175 stopIndex = constructedStopIndex;
2176 }
2177 while (currentTailIndex != stopIndex) {
2178 (*block)[currentTailIndex++]->~T();
2179 }
2180 if (block == lastBlockEnqueued) {
2181 break;
2182 }
2183 block = block->next;
2184 }
2185 }
2186 MOODYCAMEL_RETHROW;
2187 }
2188 }
2189
2190 if (this->tailBlock == endBlock) {
2191 assert(currentTailIndex == newTailIndex);
2192 break;
2193 }
2194 this->tailBlock = this->tailBlock->next;
2195 }
2196
2197 if (!MOODYCAMEL_NOEXCEPT_CTOR(T, decltype(*itemFirst),
2198 new(nullptr) T(details::deref_noexcept(itemFirst))) &&
2199 firstAllocatedBlock != nullptr) {
2200 blockIndex.load(std::memory_order_relaxed)->front.store(
2201 (pr_blockIndexFront - 1) & (pr_blockIndexSize - 1), std::memory_order_release);
2202 }
2203
2204 this->tailIndex.store(newTailIndex, std::memory_order_release);
2205 return true;
2206 }
2207
2208 template<typename It>
2209 size_t dequeue_bulk(It &itemFirst, size_t max) {
2210 auto tail = this->tailIndex.load(std::memory_order_relaxed);
2211 auto overcommit = this->dequeueOvercommit.load(std::memory_order_relaxed);
2212 auto desiredCount = static_cast<size_t>(tail - (this->dequeueOptimisticCount.load(
2213 std::memory_order_relaxed) - overcommit));
2214 if (details::circular_less_than<size_t>(0, desiredCount)) {
2215 desiredCount = desiredCount < max ? desiredCount : max;
2216 std::atomic_thread_fence(std::memory_order_acquire);
2217
2218 auto myDequeueCount = this->dequeueOptimisticCount.fetch_add(desiredCount,
2219 std::memory_order_relaxed);
2220 assert(overcommit <= myDequeueCount);
2221
2222 tail = this->tailIndex.load(std::memory_order_acquire);
2223 auto actualCount = static_cast<size_t>(tail - (myDequeueCount - overcommit));
2224 if (details::circular_less_than<size_t>(0, actualCount)) {
2225 actualCount = desiredCount < actualCount ? desiredCount : actualCount;
2226 if (actualCount < desiredCount) {
2227 this->dequeueOvercommit.fetch_add(desiredCount - actualCount,
2228 std::memory_order_release);
2229 }
2230
2231 // Get the first index. Note that since there's guaranteed to be at least actualCount elements, this
2232 // will never exceed tail.
2233 auto firstIndex = this->headIndex.fetch_add(actualCount, std::memory_order_acq_rel);
2234
2235 // Determine which block the first element is in
2236 auto localBlockIndex = blockIndex.load(std::memory_order_acquire);
2237 auto localBlockIndexHead = localBlockIndex->front.load(std::memory_order_acquire);
2238
2239 auto headBase = localBlockIndex->entries[localBlockIndexHead].base;
2240 auto firstBlockBaseIndex = firstIndex & ~static_cast<index_t>(BLOCK_SIZE - 1);
2241 auto offset = static_cast<size_t>(
2242 static_cast<typename std::make_signed<index_t>::type>(firstBlockBaseIndex - headBase) /
2243 BLOCK_SIZE);
2244 auto indexIndex = (localBlockIndexHead + offset) & (localBlockIndex->size - 1);
2245
2246 // Iterate the blocks and dequeue
2247 auto index = firstIndex;
2248 do {
2249 auto firstIndexInBlock = index;
2250 auto endIndex =
2251 (index & ~static_cast<index_t>(BLOCK_SIZE - 1)) + static_cast<index_t>(BLOCK_SIZE);
2252 endIndex = details::circular_less_than<index_t>(
2253 firstIndex + static_cast<index_t>(actualCount), endIndex) ? firstIndex +
2254 static_cast<index_t>(actualCount)
2255 : endIndex;
2256 auto block = localBlockIndex->entries[indexIndex].block;
2257 if (MOODYCAMEL_NOEXCEPT_ASSIGN(T, T &&, details::deref_noexcept(itemFirst) = std::move(
2258 (*(*block)[index])))) {
2259 while (index != endIndex) {
2260 auto &el = *((*block)[index]);
2261 *itemFirst++ = std::move(el);
2262 el.~T();
2263 ++index;
2264 }
2265 } else {
2266 MOODYCAMEL_TRY {
2267 while (index != endIndex) {
2268 auto &el = *((*block)[index]);
2269 *itemFirst = std::move(el);
2270 ++itemFirst;
2271 el.~T();
2272 ++index;
2273 }
2274 }
2275 MOODYCAMEL_CATCH (...) {
2276 // It's too late to revert the dequeue, but we can make sure that all
2277 // the dequeued objects are properly destroyed and the block index
2278 // (and empty count) are properly updated before we propagate the exception
2279 do {
2280 block = localBlockIndex->entries[indexIndex].block;
2281 while (index != endIndex) {
2282 (*block)[index++]->~T();
2283 }
2284 block->template set_many_empty<explicit_context>(
2285 firstIndexInBlock, static_cast<size_t>(endIndex - firstIndexInBlock));
2286 indexIndex = (indexIndex + 1) & (localBlockIndex->size - 1);
2287
2288 firstIndexInBlock = index;
2289 endIndex = (index & ~static_cast<index_t>(BLOCK_SIZE - 1)) +
2290 static_cast<index_t>(BLOCK_SIZE);
2291 endIndex = details::circular_less_than<index_t>(
2292 firstIndex + static_cast<index_t>(actualCount), endIndex) ? firstIndex +
2293 static_cast<index_t>(actualCount)
2294 : endIndex;
2295 } while (index != firstIndex + actualCount);
2296
2297 MOODYCAMEL_RETHROW;
2298 }
2299 }
2300 block->template set_many_empty<explicit_context>(
2301 firstIndexInBlock, static_cast<size_t>(endIndex - firstIndexInBlock));
2302 indexIndex = (indexIndex + 1) & (localBlockIndex->size - 1);
2303 } while (index != firstIndex + actualCount);
2304
2305 return actualCount;
2306 } else {
2307 // Wasn't anything to dequeue after all; make the effective dequeue count eventually consistent
2308 this->dequeueOvercommit.fetch_add(desiredCount, std::memory_order_release);
2309 }
2310 }
2311
2312 return 0;
2313 }
2314
2315 private:
2316 struct BlockIndexEntry {
2317 index_t base;
2318 Block *block;
2319 };
2320
2321 struct BlockIndexHeader {
2322 size_t size;
2323 std::atomic<size_t> front; // Current slot (not next, like pr_blockIndexFront)
2324 BlockIndexEntry *entries;
2325 void *prev;
2326 };
2327
2328
2329 bool new_block_index(size_t numberOfFilledSlotsToExpose) {
2330 auto prevBlockSizeMask = pr_blockIndexSize - 1;
2331
2332 // Create the new block
2333 pr_blockIndexSize <<= 1;
2334 auto newRawPtr = static_cast<char *>((Traits::malloc)(
2335 sizeof(BlockIndexHeader) + std::alignment_of<BlockIndexEntry>::value - 1 +
2336 sizeof(BlockIndexEntry) * pr_blockIndexSize));
2337 if (newRawPtr == nullptr) {
2338 pr_blockIndexSize >>= 1; // Reset to allow graceful retry
2339 return false;
2340 }
2341
2342 auto newBlockIndexEntries = reinterpret_cast<BlockIndexEntry *>(details::align_for<BlockIndexEntry>(
2343 newRawPtr + sizeof(BlockIndexHeader)));
2344
2345 // Copy in all the old indices, if any
2346 size_t j = 0;
2347 if (pr_blockIndexSlotsUsed != 0) {
2348 auto i = (pr_blockIndexFront - pr_blockIndexSlotsUsed) & prevBlockSizeMask;
2349 do {
2350 newBlockIndexEntries[j++] = pr_blockIndexEntries[i];
2351 i = (i + 1) & prevBlockSizeMask;
2352 } while (i != pr_blockIndexFront);
2353 }
2354
2355 // Update everything
2356 auto header = new(newRawPtr) BlockIndexHeader;
2357 header->size = pr_blockIndexSize;
2358 header->front.store(numberOfFilledSlotsToExpose - 1, std::memory_order_relaxed);
2359 header->entries = newBlockIndexEntries;
2360 header->prev = pr_blockIndexRaw; // we link the new block to the old one so we can free it later
2361
2362 pr_blockIndexFront = j;
2363 pr_blockIndexEntries = newBlockIndexEntries;
2364 pr_blockIndexRaw = newRawPtr;
2365 blockIndex.store(header, std::memory_order_release);
2366
2367 return true;
2368 }
2369
2370 private:
2371 std::atomic<BlockIndexHeader *> blockIndex;
2372
2373 // To be used by producer only -- consumer must use the ones in referenced by blockIndex
2374 size_t pr_blockIndexSlotsUsed;
2375 size_t pr_blockIndexSize;
2376 size_t pr_blockIndexFront; // Next slot (not current)
2377 BlockIndexEntry *pr_blockIndexEntries;
2378 void *pr_blockIndexRaw;
2379
2380#ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
2381 public:
2382 ExplicitProducer* nextExplicitProducer;
2383 private:
2384#endif
2385
2386#if MCDBGQ_TRACKMEM
2387 friend struct MemStats;
2388#endif
2389 };
2390
2391
2393 // Implicit queue
2395
2396 struct ImplicitProducer : public ProducerBase {
2397 ImplicitProducer(ConcurrentQueue *parent)
2398 :
2399 ProducerBase(parent, false), nextBlockIndexCapacity(IMPLICIT_INITIAL_INDEX_SIZE), blockIndex(
2400 nullptr) {
2401 new_block_index();
2402 }
2403
2404 ~ImplicitProducer() {
2405 // Note that since we're in the destructor we can assume that all enqueue/dequeue operations
2406 // completed already; this means that all undequeued elements are placed contiguously across
2407 // contiguous blocks, and that only the first and last remaining blocks can be only partially
2408 // empty (all other remaining blocks must be completely full).
2409
2410#ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
2411 // Unregister ourselves for thread termination notification
2412 if (!this->inactive.load(std::memory_order_relaxed)) {
2413 details::ThreadExitNotifier::unsubscribe(&threadExitListener);
2414 }
2415#endif
2416
2417 // Destroy all remaining elements!
2418 auto tail = this->tailIndex.load(std::memory_order_relaxed);
2419 auto index = this->headIndex.load(std::memory_order_relaxed);
2420 Block *block = nullptr;
2421 assert(index == tail || details::circular_less_than(index, tail));
2422 bool forceFreeLastBlock =
2423 index != tail; // If we enter the loop, then the last (tail) block will not be freed
2424 while (index != tail) {
2425 if ((index & static_cast<index_t>(BLOCK_SIZE - 1)) == 0 || block == nullptr) {
2426 if (block != nullptr) {
2427 // Free the old block
2428 this->parent->add_block_to_free_list(block);
2429 }
2430
2431 block = get_block_index_entry_for_index(index)->value.load(std::memory_order_relaxed);
2432 }
2433
2434 ((*block)[index])->~T();
2435 ++index;
2436 }
2437 // Even if the queue is empty, there's still one block that's not on the free list
2438 // (unless the head index reached the end of it, in which case the tail will be poised
2439 // to create a new block).
2440 if (this->tailBlock != nullptr &&
2441 (forceFreeLastBlock || (tail & static_cast<index_t>(BLOCK_SIZE - 1)) != 0)) {
2442 this->parent->add_block_to_free_list(this->tailBlock);
2443 }
2444
2445 // Destroy block index
2446 auto localBlockIndex = blockIndex.load(std::memory_order_relaxed);
2447 if (localBlockIndex != nullptr) {
2448 for (size_t i = 0; i != localBlockIndex->capacity; ++i) {
2449 localBlockIndex->index[i]->~BlockIndexEntry();
2450 }
2451 do {
2452 auto prev = localBlockIndex->prev;
2453 localBlockIndex->~BlockIndexHeader();
2454 (Traits::free)(localBlockIndex);
2455 localBlockIndex = prev;
2456 } while (localBlockIndex != nullptr);
2457 }
2458 }
2459
2460 template<AllocationMode allocMode, typename U>
2461 inline bool enqueue(U &&element) {
2462 index_t currentTailIndex = this->tailIndex.load(std::memory_order_relaxed);
2463 index_t newTailIndex = 1 + currentTailIndex;
2464 if ((currentTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0) {
2465 // We reached the end of a block, start a new one
2466 auto head = this->headIndex.load(std::memory_order_relaxed);
2467 assert(!details::circular_less_than<index_t>(currentTailIndex, head));
2468 if (!details::circular_less_than<index_t>(head, currentTailIndex + BLOCK_SIZE) ||
2469 (MAX_SUBQUEUE_SIZE != details::const_numeric_max<size_t>::value &&
2470 (MAX_SUBQUEUE_SIZE == 0 ||
2471 MAX_SUBQUEUE_SIZE - BLOCK_SIZE < currentTailIndex - head))) {
2472 return false;
2473 }
2474#if MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2475 debug::DebugLock lock(mutex);
2476#endif
2477 // Find out where we'll be inserting this block in the block index
2478 BlockIndexEntry *idxEntry;
2479 if (!insert_block_index_entry<allocMode>(idxEntry, currentTailIndex)) {
2480 return false;
2481 }
2482
2483 // Get ahold of a new block
2484 auto newBlock = this->parent->ConcurrentQueue::template requisition_block<allocMode>();
2485 if (newBlock == nullptr) {
2486 rewind_block_index_tail();
2487 idxEntry->value.store(nullptr, std::memory_order_relaxed);
2488 return false;
2489 }
2490#if MCDBGQ_TRACKMEM
2491 newBlock->owner = this;
2492#endif
2493 newBlock->template reset_empty<implicit_context>();
2494
2495 if (!MOODYCAMEL_NOEXCEPT_CTOR(T, U, new(nullptr) T(std::forward<U>(element)))) {
2496 // May throw, try to insert now before we publish the fact that we have this new block
2497 MOODYCAMEL_TRY {
2498 new((*newBlock)[currentTailIndex]) T(std::forward<U>(element));
2499 }
2500 MOODYCAMEL_CATCH (...) {
2501 rewind_block_index_tail();
2502 idxEntry->value.store(nullptr, std::memory_order_relaxed);
2503 this->parent->add_block_to_free_list(newBlock);
2504 MOODYCAMEL_RETHROW;
2505 }
2506 }
2507
2508 // Insert the new block into the index
2509 idxEntry->value.store(newBlock, std::memory_order_relaxed);
2510
2511 this->tailBlock = newBlock;
2512
2513 if (!MOODYCAMEL_NOEXCEPT_CTOR(T, U, new(nullptr) T(std::forward<U>(element)))) {
2514 this->tailIndex.store(newTailIndex, std::memory_order_release);
2515 return true;
2516 }
2517 }
2518
2519 // Enqueue
2520 new((*this->tailBlock)[currentTailIndex]) T(std::forward<U>(element));
2521
2522 this->tailIndex.store(newTailIndex, std::memory_order_release);
2523 return true;
2524 }
2525
2526 template<typename U>
2527 bool dequeue(U &element) {
2528 // See ExplicitProducer::dequeue for rationale and explanation
2529 index_t tail = this->tailIndex.load(std::memory_order_relaxed);
2530 index_t overcommit = this->dequeueOvercommit.load(std::memory_order_relaxed);
2531 if (details::circular_less_than<index_t>(
2532 this->dequeueOptimisticCount.load(std::memory_order_relaxed) - overcommit, tail)) {
2533 std::atomic_thread_fence(std::memory_order_acquire);
2534
2535 index_t myDequeueCount = this->dequeueOptimisticCount.fetch_add(1,
2536 std::memory_order_relaxed);
2537 assert(overcommit <= myDequeueCount);
2538 tail = this->tailIndex.load(std::memory_order_acquire);
2539 if (details::likely(
2540 details::circular_less_than<index_t>(myDequeueCount - overcommit, tail))) {
2541 index_t index = this->headIndex.fetch_add(1, std::memory_order_acq_rel);
2542
2543 // Determine which block the element is in
2544 auto entry = get_block_index_entry_for_index(index);
2545
2546 // Dequeue
2547 auto block = entry->value.load(std::memory_order_relaxed);
2548 auto &el = *((*block)[index]);
2549
2550 if (!MOODYCAMEL_NOEXCEPT_ASSIGN(T, T &&, element = std::move(el))) {
2551#if MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2552 // Note: Acquiring the mutex with every dequeue instead of only when a block
2553 // is released is very sub-optimal, but it is, after all, purely debug code.
2554 debug::DebugLock lock(producer->mutex);
2555#endif
2556 struct Guard {
2557 Block *block;
2558 index_t index;
2559 BlockIndexEntry *entry;
2560 ConcurrentQueue *parent;
2561
2562 ~Guard() {
2563 (*block)[index]->~T();
2564 if (block->template set_empty<implicit_context>(index)) {
2565 entry->value.store(nullptr, std::memory_order_relaxed);
2566 parent->add_block_to_free_list(block);
2567 }
2568 }
2569 } guard = {block, index, entry, this->parent};
2570
2571 element = std::move(el);
2572 } else {
2573 element = std::move(el);
2574 el.~T();
2575
2576 if (block->template set_empty<implicit_context>(index)) {
2577 {
2578#if MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2579 debug::DebugLock lock(mutex);
2580#endif
2581 // Add the block back into the global free pool (and remove from block index)
2582 entry->value.store(nullptr, std::memory_order_relaxed);
2583 }
2584 this->parent->add_block_to_free_list(block); // releases the above store
2585 }
2586 }
2587
2588 return true;
2589 } else {
2590 this->dequeueOvercommit.fetch_add(1, std::memory_order_release);
2591 }
2592 }
2593
2594 return false;
2595 }
2596
2597 template<AllocationMode allocMode, typename It>
2598 bool enqueue_bulk(It itemFirst, size_t count) {
2599 // First, we need to make sure we have enough room to enqueue all of the elements;
2600 // this means pre-allocating blocks and putting them in the block index (but only if
2601 // all the allocations succeeded).
2602
2603 // Note that the tailBlock we start off with may not be owned by us any more;
2604 // this happens if it was filled up exactly to the top (setting tailIndex to
2605 // the first index of the next block which is not yet allocated), then dequeued
2606 // completely (putting it on the free list) before we enqueue again.
2607
2608 index_t startTailIndex = this->tailIndex.load(std::memory_order_relaxed);
2609 auto startBlock = this->tailBlock;
2610 Block *firstAllocatedBlock = nullptr;
2611 auto endBlock = this->tailBlock;
2612
2613 // Figure out how many blocks we'll need to allocate, and do so
2614 size_t blockBaseDiff =
2615 ((startTailIndex + count - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1)) -
2616 ((startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1));
2617 index_t currentTailIndex = (startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1);
2618 if (blockBaseDiff > 0) {
2619#if MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2620 debug::DebugLock lock(mutex);
2621#endif
2622 do {
2623 blockBaseDiff -= static_cast<index_t>(BLOCK_SIZE);
2624 currentTailIndex += static_cast<index_t>(BLOCK_SIZE);
2625
2626 // Find out where we'll be inserting this block in the block index
2627 BlockIndexEntry *idxEntry = nullptr; // initialization here unnecessary but compiler can't always tell
2628 Block *newBlock;
2629 bool indexInserted = false;
2630 auto head = this->headIndex.load(std::memory_order_relaxed);
2631 assert(!details::circular_less_than<index_t>(currentTailIndex, head));
2632 bool full = !details::circular_less_than<index_t>(head, currentTailIndex + BLOCK_SIZE) ||
2633 (MAX_SUBQUEUE_SIZE != details::const_numeric_max<size_t>::value &&
2634 (MAX_SUBQUEUE_SIZE == 0 ||
2635 MAX_SUBQUEUE_SIZE - BLOCK_SIZE < currentTailIndex - head));
2636 if (full ||
2637 !(indexInserted = insert_block_index_entry<allocMode>(idxEntry, currentTailIndex)) ||
2638 (newBlock = this->parent->ConcurrentQueue::template requisition_block<allocMode>()) ==
2639 nullptr) {
2640 // Index allocation or block allocation failed; revert any other allocations
2641 // and index insertions done so far for this operation
2642 if (indexInserted) {
2643 rewind_block_index_tail();
2644 idxEntry->value.store(nullptr, std::memory_order_relaxed);
2645 }
2646 currentTailIndex = (startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1);
2647 for (auto block = firstAllocatedBlock; block != nullptr; block = block->next) {
2648 currentTailIndex += static_cast<index_t>(BLOCK_SIZE);
2649 idxEntry = get_block_index_entry_for_index(currentTailIndex);
2650 idxEntry->value.store(nullptr, std::memory_order_relaxed);
2651 rewind_block_index_tail();
2652 }
2653 this->parent->add_blocks_to_free_list(firstAllocatedBlock);
2654 this->tailBlock = startBlock;
2655
2656 return false;
2657 }
2658
2659#if MCDBGQ_TRACKMEM
2660 newBlock->owner = this;
2661#endif
2662 newBlock->template reset_empty<implicit_context>();
2663 newBlock->next = nullptr;
2664
2665 // Insert the new block into the index
2666 idxEntry->value.store(newBlock, std::memory_order_relaxed);
2667
2668 // Store the chain of blocks so that we can undo if later allocations fail,
2669 // and so that we can find the blocks when we do the actual enqueueing
2670 if ((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) != 0 ||
2671 firstAllocatedBlock != nullptr) {
2672 assert(this->tailBlock != nullptr);
2673 this->tailBlock->next = newBlock;
2674 }
2675 this->tailBlock = newBlock;
2676 endBlock = newBlock;
2677 firstAllocatedBlock = firstAllocatedBlock == nullptr ? newBlock : firstAllocatedBlock;
2678 } while (blockBaseDiff > 0);
2679 }
2680
2681 // Enqueue, one block at a time
2682 index_t newTailIndex = startTailIndex + static_cast<index_t>(count);
2683 currentTailIndex = startTailIndex;
2684 this->tailBlock = startBlock;
2685 assert((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) != 0 ||
2686 firstAllocatedBlock != nullptr || count == 0);
2687 if ((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0 &&
2688 firstAllocatedBlock != nullptr) {
2689 this->tailBlock = firstAllocatedBlock;
2690 }
2691 while (true) {
2692 auto stopIndex = (currentTailIndex & ~static_cast<index_t>(BLOCK_SIZE - 1)) +
2693 static_cast<index_t>(BLOCK_SIZE);
2694 if (details::circular_less_than<index_t>(newTailIndex, stopIndex)) {
2695 stopIndex = newTailIndex;
2696 }
2697 if (MOODYCAMEL_NOEXCEPT_CTOR(T, decltype(*itemFirst),
2698 new(nullptr) T(details::deref_noexcept(itemFirst)))) {
2699 while (currentTailIndex != stopIndex) {
2700 new((*this->tailBlock)[currentTailIndex++]) T(*itemFirst++);
2701 }
2702 } else {
2703 MOODYCAMEL_TRY {
2704 while (currentTailIndex != stopIndex) {
2705 new((*this->tailBlock)[currentTailIndex]) T(
2706 details::nomove_if<(bool) !MOODYCAMEL_NOEXCEPT_CTOR(T, decltype(*itemFirst),
2707 new(nullptr) T(
2708 details::deref_noexcept(
2709 itemFirst)))>::eval(
2710 *itemFirst));
2711 ++currentTailIndex;
2712 ++itemFirst;
2713 }
2714 }
2715 MOODYCAMEL_CATCH (...) {
2716 auto constructedStopIndex = currentTailIndex;
2717 auto lastBlockEnqueued = this->tailBlock;
2718
2719 if (!details::is_trivially_destructible<T>::value) {
2720 auto block = startBlock;
2721 if ((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0) {
2722 block = firstAllocatedBlock;
2723 }
2724 currentTailIndex = startTailIndex;
2725 while (true) {
2726 stopIndex = (currentTailIndex & ~static_cast<index_t>(BLOCK_SIZE - 1)) +
2727 static_cast<index_t>(BLOCK_SIZE);
2728 if (details::circular_less_than<index_t>(constructedStopIndex, stopIndex)) {
2729 stopIndex = constructedStopIndex;
2730 }
2731 while (currentTailIndex != stopIndex) {
2732 (*block)[currentTailIndex++]->~T();
2733 }
2734 if (block == lastBlockEnqueued) {
2735 break;
2736 }
2737 block = block->next;
2738 }
2739 }
2740
2741 currentTailIndex = (startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1);
2742 for (auto block = firstAllocatedBlock; block != nullptr; block = block->next) {
2743 currentTailIndex += static_cast<index_t>(BLOCK_SIZE);
2744 auto idxEntry = get_block_index_entry_for_index(currentTailIndex);
2745 idxEntry->value.store(nullptr, std::memory_order_relaxed);
2746 rewind_block_index_tail();
2747 }
2748 this->parent->add_blocks_to_free_list(firstAllocatedBlock);
2749 this->tailBlock = startBlock;
2750 MOODYCAMEL_RETHROW;
2751 }
2752 }
2753
2754 if (this->tailBlock == endBlock) {
2755 assert(currentTailIndex == newTailIndex);
2756 break;
2757 }
2758 this->tailBlock = this->tailBlock->next;
2759 }
2760 this->tailIndex.store(newTailIndex, std::memory_order_release);
2761 return true;
2762 }
2763
2764 template<typename It>
2765 size_t dequeue_bulk(It &itemFirst, size_t max) {
2766 auto tail = this->tailIndex.load(std::memory_order_relaxed);
2767 auto overcommit = this->dequeueOvercommit.load(std::memory_order_relaxed);
2768 auto desiredCount = static_cast<size_t>(tail - (this->dequeueOptimisticCount.load(
2769 std::memory_order_relaxed) - overcommit));
2770 if (details::circular_less_than<size_t>(0, desiredCount)) {
2771 desiredCount = desiredCount < max ? desiredCount : max;
2772 std::atomic_thread_fence(std::memory_order_acquire);
2773
2774 auto myDequeueCount = this->dequeueOptimisticCount.fetch_add(desiredCount,
2775 std::memory_order_relaxed);
2776 assert(overcommit <= myDequeueCount);
2777
2778 tail = this->tailIndex.load(std::memory_order_acquire);
2779 auto actualCount = static_cast<size_t>(tail - (myDequeueCount - overcommit));
2780 if (details::circular_less_than<size_t>(0, actualCount)) {
2781 actualCount = desiredCount < actualCount ? desiredCount : actualCount;
2782 if (actualCount < desiredCount) {
2783 this->dequeueOvercommit.fetch_add(desiredCount - actualCount,
2784 std::memory_order_release);
2785 }
2786
2787 // Get the first index. Note that since there's guaranteed to be at least actualCount elements, this
2788 // will never exceed tail.
2789 auto firstIndex = this->headIndex.fetch_add(actualCount, std::memory_order_acq_rel);
2790
2791 // Iterate the blocks and dequeue
2792 auto index = firstIndex;
2793 BlockIndexHeader *localBlockIndex;
2794 auto indexIndex = get_block_index_index_for_index(index, localBlockIndex);
2795 do {
2796 auto blockStartIndex = index;
2797 auto endIndex =
2798 (index & ~static_cast<index_t>(BLOCK_SIZE - 1)) + static_cast<index_t>(BLOCK_SIZE);
2799 endIndex = details::circular_less_than<index_t>(
2800 firstIndex + static_cast<index_t>(actualCount), endIndex) ? firstIndex +
2801 static_cast<index_t>(actualCount)
2802 : endIndex;
2803
2804 auto entry = localBlockIndex->index[indexIndex];
2805 auto block = entry->value.load(std::memory_order_relaxed);
2806 if (MOODYCAMEL_NOEXCEPT_ASSIGN(T, T &&, details::deref_noexcept(itemFirst) = std::move(
2807 (*(*block)[index])))) {
2808 while (index != endIndex) {
2809 auto &el = *((*block)[index]);
2810 *itemFirst++ = std::move(el);
2811 el.~T();
2812 ++index;
2813 }
2814 } else {
2815 MOODYCAMEL_TRY {
2816 while (index != endIndex) {
2817 auto &el = *((*block)[index]);
2818 *itemFirst = std::move(el);
2819 ++itemFirst;
2820 el.~T();
2821 ++index;
2822 }
2823 }
2824 MOODYCAMEL_CATCH (...) {
2825 do {
2826 entry = localBlockIndex->index[indexIndex];
2827 block = entry->value.load(std::memory_order_relaxed);
2828 while (index != endIndex) {
2829 (*block)[index++]->~T();
2830 }
2831
2832 if (block->template set_many_empty<implicit_context>(
2833 blockStartIndex, static_cast<size_t>(endIndex - blockStartIndex))) {
2834#if MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2835 debug::DebugLock lock(mutex);
2836#endif
2837 entry->value.store(nullptr, std::memory_order_relaxed);
2838 this->parent->add_block_to_free_list(block);
2839 }
2840 indexIndex = (indexIndex + 1) & (localBlockIndex->capacity - 1);
2841
2842 blockStartIndex = index;
2843 endIndex = (index & ~static_cast<index_t>(BLOCK_SIZE - 1)) +
2844 static_cast<index_t>(BLOCK_SIZE);
2845 endIndex = details::circular_less_than<index_t>(
2846 firstIndex + static_cast<index_t>(actualCount), endIndex) ? firstIndex +
2847 static_cast<index_t>(actualCount)
2848 : endIndex;
2849 } while (index != firstIndex + actualCount);
2850
2851 MOODYCAMEL_RETHROW;
2852 }
2853 }
2854 if (block->template set_many_empty<implicit_context>(
2855 blockStartIndex, static_cast<size_t>(endIndex - blockStartIndex))) {
2856 {
2857#if MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2858 debug::DebugLock lock(mutex);
2859#endif
2860 // Note that the set_many_empty above did a release, meaning that anybody who acquires the block
2861 // we're about to free can use it safely since our writes (and reads!) will have happened-before then.
2862 entry->value.store(nullptr, std::memory_order_relaxed);
2863 }
2864 this->parent->add_block_to_free_list(block); // releases the above store
2865 }
2866 indexIndex = (indexIndex + 1) & (localBlockIndex->capacity - 1);
2867 } while (index != firstIndex + actualCount);
2868
2869 return actualCount;
2870 } else {
2871 this->dequeueOvercommit.fetch_add(desiredCount, std::memory_order_release);
2872 }
2873 }
2874
2875 return 0;
2876 }
2877
2878 private:
2879 // The block size must be > 1, so any number with the low bit set is an invalid block base index
2880 static const index_t INVALID_BLOCK_BASE = 1;
2881
2882 struct BlockIndexEntry {
2883 std::atomic<index_t> key;
2884 std::atomic<Block *> value;
2885 };
2886
2887 struct BlockIndexHeader {
2888 size_t capacity;
2889 std::atomic<size_t> tail;
2890 BlockIndexEntry *entries;
2891 BlockIndexEntry **index;
2892 BlockIndexHeader *prev;
2893 };
2894
2895 template<AllocationMode allocMode>
2896 inline bool insert_block_index_entry(BlockIndexEntry *&idxEntry, index_t blockStartIndex) {
2897 auto localBlockIndex = blockIndex.load(
2898 std::memory_order_relaxed); // We're the only writer thread, relaxed is OK
2899 if (localBlockIndex == nullptr) {
2900 return false; // this can happen if new_block_index failed in the constructor
2901 }
2902 auto newTail = (localBlockIndex->tail.load(std::memory_order_relaxed) + 1) &
2903 (localBlockIndex->capacity - 1);
2904 idxEntry = localBlockIndex->index[newTail];
2905 if (idxEntry->key.load(std::memory_order_relaxed) == INVALID_BLOCK_BASE ||
2906 idxEntry->value.load(std::memory_order_relaxed) == nullptr) {
2907
2908 idxEntry->key.store(blockStartIndex, std::memory_order_relaxed);
2909 localBlockIndex->tail.store(newTail, std::memory_order_release);
2910 return true;
2911 }
2912
2913 // No room in the old block index, try to allocate another one!
2914 if (allocMode == CannotAlloc || !new_block_index()) {
2915 return false;
2916 }
2917 localBlockIndex = blockIndex.load(std::memory_order_relaxed);
2918 newTail = (localBlockIndex->tail.load(std::memory_order_relaxed) + 1) &
2919 (localBlockIndex->capacity - 1);
2920 idxEntry = localBlockIndex->index[newTail];
2921 assert(idxEntry->key.load(std::memory_order_relaxed) == INVALID_BLOCK_BASE);
2922 idxEntry->key.store(blockStartIndex, std::memory_order_relaxed);
2923 localBlockIndex->tail.store(newTail, std::memory_order_release);
2924 return true;
2925 }
2926
2927 inline void rewind_block_index_tail() {
2928 auto localBlockIndex = blockIndex.load(std::memory_order_relaxed);
2929 localBlockIndex->tail.store((localBlockIndex->tail.load(std::memory_order_relaxed) - 1) &
2930 (localBlockIndex->capacity - 1), std::memory_order_relaxed);
2931 }
2932
2933 inline BlockIndexEntry *get_block_index_entry_for_index(index_t index) const {
2934 BlockIndexHeader *localBlockIndex;
2935 auto idx = get_block_index_index_for_index(index, localBlockIndex);
2936 return localBlockIndex->index[idx];
2937 }
2938
2939 inline size_t
2940 get_block_index_index_for_index(index_t index, BlockIndexHeader *&localBlockIndex) const {
2941#if MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2942 debug::DebugLock lock(mutex);
2943#endif
2944 index &= ~static_cast<index_t>(BLOCK_SIZE - 1);
2945 localBlockIndex = blockIndex.load(std::memory_order_acquire);
2946 auto tail = localBlockIndex->tail.load(std::memory_order_acquire);
2947 auto tailBase = localBlockIndex->index[tail]->key.load(std::memory_order_relaxed);
2948 assert(tailBase != INVALID_BLOCK_BASE);
2949 // Note: Must use division instead of shift because the index may wrap around, causing a negative
2950 // offset, whose negativity we want to preserve
2951 auto offset = static_cast<size_t>(
2952 static_cast<typename std::make_signed<index_t>::type>(index - tailBase) / BLOCK_SIZE);
2953 size_t idx = (tail + offset) & (localBlockIndex->capacity - 1);
2954 assert(localBlockIndex->index[idx]->key.load(std::memory_order_relaxed) == index &&
2955 localBlockIndex->index[idx]->value.load(std::memory_order_relaxed) != nullptr);
2956 return idx;
2957 }
2958
2959 bool new_block_index() {
2960 auto prev = blockIndex.load(std::memory_order_relaxed);
2961 size_t prevCapacity = prev == nullptr ? 0 : prev->capacity;
2962 auto entryCount = prev == nullptr ? nextBlockIndexCapacity : prevCapacity;
2963 auto raw = static_cast<char *>((Traits::malloc)(
2964 sizeof(BlockIndexHeader) +
2965 std::alignment_of<BlockIndexEntry>::value - 1 + sizeof(BlockIndexEntry) * entryCount +
2966 std::alignment_of<BlockIndexEntry *>::value - 1 +
2967 sizeof(BlockIndexEntry * ) * nextBlockIndexCapacity));
2968 if (raw == nullptr) {
2969 return false;
2970 }
2971
2972 auto header = new(raw) BlockIndexHeader;
2973 auto entries = reinterpret_cast<BlockIndexEntry *>(details::align_for<BlockIndexEntry>(
2974 raw + sizeof(BlockIndexHeader)));
2975 auto index = reinterpret_cast<BlockIndexEntry **>(details::align_for<BlockIndexEntry *>(
2976 reinterpret_cast<char *>(entries) + sizeof(BlockIndexEntry) * entryCount));
2977 if (prev != nullptr) {
2978 auto prevTail = prev->tail.load(std::memory_order_relaxed);
2979 auto prevPos = prevTail;
2980 size_t i = 0;
2981 do {
2982 prevPos = (prevPos + 1) & (prev->capacity - 1);
2983 index[i++] = prev->index[prevPos];
2984 } while (prevPos != prevTail);
2985 assert(i == prevCapacity);
2986 }
2987 for (size_t i = 0; i != entryCount; ++i) {
2988 new(entries + i) BlockIndexEntry;
2989 entries[i].key.store(INVALID_BLOCK_BASE, std::memory_order_relaxed);
2990 index[prevCapacity + i] = entries + i;
2991 }
2992 header->prev = prev;
2993 header->entries = entries;
2994 header->index = index;
2995 header->capacity = nextBlockIndexCapacity;
2996 header->tail.store((prevCapacity - 1) & (nextBlockIndexCapacity - 1),
2997 std::memory_order_relaxed);
2998
2999 blockIndex.store(header, std::memory_order_release);
3000
3001 nextBlockIndexCapacity <<= 1;
3002
3003 return true;
3004 }
3005
3006 private:
3007 size_t nextBlockIndexCapacity;
3008 std::atomic<BlockIndexHeader *> blockIndex;
3009
3010#ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
3011 public:
3012 details::ThreadExitListener threadExitListener;
3013 private:
3014#endif
3015
3016#ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
3017 public:
3018 ImplicitProducer* nextImplicitProducer;
3019 private:
3020#endif
3021
3022#if MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
3023 mutable debug::DebugMutex mutex;
3024#endif
3025#if MCDBGQ_TRACKMEM
3026 friend struct MemStats;
3027#endif
3028 };
3029
3030
3032 // Block pool manipulation
3034
3035 void populate_initial_block_list(size_t blockCount) {
3036 initialBlockPoolSize = blockCount;
3037 if (initialBlockPoolSize == 0) {
3038 initialBlockPool = nullptr;
3039 return;
3040 }
3041
3042 initialBlockPool = create_array<Block>(blockCount);
3043 if (initialBlockPool == nullptr) {
3044 initialBlockPoolSize = 0;
3045 }
3046 for (size_t i = 0; i < initialBlockPoolSize; ++i) {
3047 initialBlockPool[i].dynamicallyAllocated = false;
3048 }
3049 }
3050
3051 inline Block *try_get_block_from_initial_pool() {
3052 if (initialBlockPoolIndex.load(std::memory_order_relaxed) >= initialBlockPoolSize) {
3053 return nullptr;
3054 }
3055
3056 auto index = initialBlockPoolIndex.fetch_add(1, std::memory_order_relaxed);
3057
3058 return index < initialBlockPoolSize ? (initialBlockPool + index) : nullptr;
3059 }
3060
3061 inline void add_block_to_free_list(Block *block) {
3062#if MCDBGQ_TRACKMEM
3063 block->owner = nullptr;
3064#endif
3065 freeList.add(block);
3066 }
3067
3068 inline void add_blocks_to_free_list(Block *block) {
3069 while (block != nullptr) {
3070 auto next = block->next;
3071 add_block_to_free_list(block);
3072 block = next;
3073 }
3074 }
3075
3076 inline Block *try_get_block_from_free_list() {
3077 return freeList.try_get();
3078 }
3079
3080 // Gets a free block from one of the memory pools, or allocates a new one (if applicable)
3081 template<AllocationMode canAlloc>
3082 Block *requisition_block() {
3083 auto block = try_get_block_from_initial_pool();
3084 if (block != nullptr) {
3085 return block;
3086 }
3087
3088 block = try_get_block_from_free_list();
3089 if (block != nullptr) {
3090 return block;
3091 }
3092
3093 if (canAlloc == CanAlloc) {
3094 return create<Block>();
3095 }
3096
3097 return nullptr;
3098 }
3099
3100
3101#if MCDBGQ_TRACKMEM
3102 public:
3103 struct MemStats {
3104 size_t allocatedBlocks;
3105 size_t usedBlocks;
3106 size_t freeBlocks;
3107 size_t ownedBlocksExplicit;
3108 size_t ownedBlocksImplicit;
3109 size_t implicitProducers;
3110 size_t explicitProducers;
3111 size_t elementsEnqueued;
3112 size_t blockClassBytes;
3113 size_t queueClassBytes;
3114 size_t implicitBlockIndexBytes;
3115 size_t explicitBlockIndexBytes;
3116
3117 friend class ConcurrentQueue;
3118
3119 private:
3120 static MemStats getFor(ConcurrentQueue* q)
3121 {
3122 MemStats stats = { 0 };
3123
3124 stats.elementsEnqueued = q->size_approx();
3125
3126 auto block = q->freeList.head_unsafe();
3127 while (block != nullptr) {
3128 ++stats.allocatedBlocks;
3129 ++stats.freeBlocks;
3130 block = block->freeListNext.load(std::memory_order_relaxed);
3131 }
3132
3133 for (auto ptr = q->producerListTail.load(std::memory_order_acquire); ptr != nullptr; ptr = ptr->next_prod()) {
3134 bool implicit = dynamic_cast<ImplicitProducer*>(ptr) != nullptr;
3135 stats.implicitProducers += implicit ? 1 : 0;
3136 stats.explicitProducers += implicit ? 0 : 1;
3137
3138 if (implicit) {
3139 auto prod = static_cast<ImplicitProducer*>(ptr);
3140 stats.queueClassBytes += sizeof(ImplicitProducer);
3141 auto head = prod->headIndex.load(std::memory_order_relaxed);
3142 auto tail = prod->tailIndex.load(std::memory_order_relaxed);
3143 auto hash = prod->blockIndex.load(std::memory_order_relaxed);
3144 if (hash != nullptr) {
3145 for (size_t i = 0; i != hash->capacity; ++i) {
3146 if (hash->index[i]->key.load(std::memory_order_relaxed) != ImplicitProducer::INVALID_BLOCK_BASE && hash->index[i]->value.load(std::memory_order_relaxed) != nullptr) {
3147 ++stats.allocatedBlocks;
3148 ++stats.ownedBlocksImplicit;
3149 }
3150 }
3151 stats.implicitBlockIndexBytes += hash->capacity * sizeof(typename ImplicitProducer::BlockIndexEntry);
3152 for (; hash != nullptr; hash = hash->prev) {
3153 stats.implicitBlockIndexBytes += sizeof(typename ImplicitProducer::BlockIndexHeader) + hash->capacity * sizeof(typename ImplicitProducer::BlockIndexEntry*);
3154 }
3155 }
3156 for (; details::circular_less_than<index_t>(head, tail); head += BLOCK_SIZE) {
3157 //auto block = prod->get_block_index_entry_for_index(head);
3158 ++stats.usedBlocks;
3159 }
3160 }
3161 else {
3162 auto prod = static_cast<ExplicitProducer*>(ptr);
3163 stats.queueClassBytes += sizeof(ExplicitProducer);
3164 auto tailBlock = prod->tailBlock;
3165 bool wasNonEmpty = false;
3166 if (tailBlock != nullptr) {
3167 auto block = tailBlock;
3168 do {
3169 ++stats.allocatedBlocks;
3170 if (!block->template is_empty<explicit_context>() || wasNonEmpty) {
3171 ++stats.usedBlocks;
3172 wasNonEmpty = wasNonEmpty || block != tailBlock;
3173 }
3174 ++stats.ownedBlocksExplicit;
3175 block = block->next;
3176 } while (block != tailBlock);
3177 }
3178 auto index = prod->blockIndex.load(std::memory_order_relaxed);
3179 while (index != nullptr) {
3180 stats.explicitBlockIndexBytes += sizeof(typename ExplicitProducer::BlockIndexHeader) + index->size * sizeof(typename ExplicitProducer::BlockIndexEntry);
3181 index = static_cast<typename ExplicitProducer::BlockIndexHeader*>(index->prev);
3182 }
3183 }
3184 }
3185
3186 auto freeOnInitialPool = q->initialBlockPoolIndex.load(std::memory_order_relaxed) >= q->initialBlockPoolSize ? 0 : q->initialBlockPoolSize - q->initialBlockPoolIndex.load(std::memory_order_relaxed);
3187 stats.allocatedBlocks += freeOnInitialPool;
3188 stats.freeBlocks += freeOnInitialPool;
3189
3190 stats.blockClassBytes = sizeof(Block) * stats.allocatedBlocks;
3191 stats.queueClassBytes += sizeof(ConcurrentQueue);
3192
3193 return stats;
3194 }
3195 };
3196
3197 // For debugging only. Not thread-safe.
3198 MemStats getMemStats()
3199 {
3200 return MemStats::getFor(this);
3201 }
3202 private:
3203 friend struct MemStats;
3204#endif
3205
3206
3208 // Producer list manipulation
3210
3211 ProducerBase *recycle_or_create_producer(bool isExplicit) {
3212 bool recycled;
3213 return recycle_or_create_producer(isExplicit, recycled);
3214 }
3215
3216 ProducerBase *recycle_or_create_producer(bool isExplicit, bool &recycled) {
3217#if MCDBGQ_NOLOCKFREE_IMPLICITPRODHASH
3218 debug::DebugLock lock(implicitProdMutex);
3219#endif
3220 // Try to re-use one first
3221 for (auto ptr = producerListTail.load(std::memory_order_acquire);
3222 ptr != nullptr; ptr = ptr->next_prod()) {
3223 if (ptr->inactive.load(std::memory_order_relaxed) && ptr->isExplicit == isExplicit) {
3224 bool expected = true;
3225 if (ptr->inactive.compare_exchange_strong(expected, /* desired */ false,
3226 std::memory_order_acquire,
3227 std::memory_order_relaxed)) {
3228 // We caught one! It's been marked as activated, the caller can have it
3229 recycled = true;
3230 return ptr;
3231 }
3232 }
3233 }
3234
3235 recycled = false;
3236 return add_producer(isExplicit ? static_cast<ProducerBase *>(create<ExplicitProducer>(this))
3237 : create<ImplicitProducer>(this));
3238 }
3239
3240 ProducerBase *add_producer(ProducerBase *producer) {
3241 // Handle failed memory allocation
3242 if (producer == nullptr) {
3243 return nullptr;
3244 }
3245
3246 producerCount.fetch_add(1, std::memory_order_relaxed);
3247
3248 // Add it to the lock-free list
3249 auto prevTail = producerListTail.load(std::memory_order_relaxed);
3250 do {
3251 producer->next = prevTail;
3252 } while (!producerListTail.compare_exchange_weak(prevTail, producer, std::memory_order_release,
3253 std::memory_order_relaxed));
3254
3255#ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
3256 if (producer->isExplicit) {
3257 auto prevTailExplicit = explicitProducers.load(std::memory_order_relaxed);
3258 do {
3259 static_cast<ExplicitProducer*>(producer)->nextExplicitProducer = prevTailExplicit;
3260 } while (!explicitProducers.compare_exchange_weak(prevTailExplicit, static_cast<ExplicitProducer*>(producer), std::memory_order_release, std::memory_order_relaxed));
3261 }
3262 else {
3263 auto prevTailImplicit = implicitProducers.load(std::memory_order_relaxed);
3264 do {
3265 static_cast<ImplicitProducer*>(producer)->nextImplicitProducer = prevTailImplicit;
3266 } while (!implicitProducers.compare_exchange_weak(prevTailImplicit, static_cast<ImplicitProducer*>(producer), std::memory_order_release, std::memory_order_relaxed));
3267 }
3268#endif
3269
3270 return producer;
3271 }
3272
3273 void reown_producers() {
3274 // After another instance is moved-into/swapped-with this one, all the
3275 // producers we stole still think their parents are the other queue.
3276 // So fix them up!
3277 for (auto ptr = producerListTail.load(std::memory_order_relaxed);
3278 ptr != nullptr; ptr = ptr->next_prod()) {
3279 ptr->parent = this;
3280 }
3281 }
3282
3283
3285 // Implicit producer hash
3287
3288 struct ImplicitProducerKVP {
3289 std::atomic<details::thread_id_t> key;
3290 ImplicitProducer *value; // No need for atomicity since it's only read by the thread that sets it in the first place
3291
3292 ImplicitProducerKVP()
3293 : value(nullptr) {}
3294
3295 ImplicitProducerKVP(ImplicitProducerKVP &&other) MOODYCAMEL_NOEXCEPT {
3296 key.store(other.key.load(std::memory_order_relaxed), std::memory_order_relaxed);
3297 value = other.value;
3298 }
3299
3300 inline ImplicitProducerKVP &operator=(ImplicitProducerKVP &&other) MOODYCAMEL_NOEXCEPT {
3301 swap(other);
3302 return *this;
3303 }
3304
3305 inline void swap(ImplicitProducerKVP &other) MOODYCAMEL_NOEXCEPT {
3306 if (this != &other) {
3307 details::swap_relaxed(key, other.key);
3308 std::swap(value, other.value);
3309 }
3310 }
3311 };
3312
3313 template<typename XT, typename XTraits>
3314 friend void moodycamel::swap(typename ConcurrentQueue<XT, XTraits>::ImplicitProducerKVP &,
3315 typename ConcurrentQueue<XT, XTraits>::ImplicitProducerKVP &) MOODYCAMEL_NOEXCEPT;
3316
3317 struct ImplicitProducerHash {
3318 size_t capacity;
3319 ImplicitProducerKVP *entries;
3320 ImplicitProducerHash *prev;
3321 };
3322
3323 inline void populate_initial_implicit_producer_hash() {
3324 if (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return;
3325
3326 implicitProducerHashCount.store(0, std::memory_order_relaxed);
3327 auto hash = &initialImplicitProducerHash;
3328 hash->capacity = INITIAL_IMPLICIT_PRODUCER_HASH_SIZE;
3329 hash->entries = &initialImplicitProducerHashEntries[0];
3330 for (size_t i = 0; i != INITIAL_IMPLICIT_PRODUCER_HASH_SIZE; ++i) {
3331 initialImplicitProducerHashEntries[i].key.store(details::invalid_thread_id,
3332 std::memory_order_relaxed);
3333 }
3334 hash->prev = nullptr;
3335 implicitProducerHash.store(hash, std::memory_order_relaxed);
3336 }
3337
3338 void swap_implicit_producer_hashes(ConcurrentQueue &other) {
3339 if (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return;
3340
3341 // Swap (assumes our implicit producer hash is initialized)
3342 initialImplicitProducerHashEntries.swap(other.initialImplicitProducerHashEntries);
3343 initialImplicitProducerHash.entries = &initialImplicitProducerHashEntries[0];
3344 other.initialImplicitProducerHash.entries = &other.initialImplicitProducerHashEntries[0];
3345
3346 details::swap_relaxed(implicitProducerHashCount, other.implicitProducerHashCount);
3347
3348 details::swap_relaxed(implicitProducerHash, other.implicitProducerHash);
3349 if (implicitProducerHash.load(std::memory_order_relaxed) ==
3350 &other.initialImplicitProducerHash) {
3351 implicitProducerHash.store(&initialImplicitProducerHash, std::memory_order_relaxed);
3352 } else {
3353 ImplicitProducerHash *hash;
3354 for (hash = implicitProducerHash.load(std::memory_order_relaxed);
3355 hash->prev != &other.initialImplicitProducerHash; hash = hash->prev) {
3356 continue;
3357 }
3358 hash->prev = &initialImplicitProducerHash;
3359 }
3360 if (other.implicitProducerHash.load(std::memory_order_relaxed) ==
3361 &initialImplicitProducerHash) {
3362 other.implicitProducerHash.store(&other.initialImplicitProducerHash,
3363 std::memory_order_relaxed);
3364 } else {
3365 ImplicitProducerHash *hash;
3366 for (hash = other.implicitProducerHash.load(std::memory_order_relaxed);
3367 hash->prev != &initialImplicitProducerHash; hash = hash->prev) {
3368 continue;
3369 }
3370 hash->prev = &other.initialImplicitProducerHash;
3371 }
3372 }
3373
3374 // Only fails (returns nullptr) if memory allocation fails
3375 ImplicitProducer *get_or_add_implicit_producer() {
3376 // Note that since the data is essentially thread-local (key is thread ID),
3377 // there's a reduced need for fences (memory ordering is already consistent
3378 // for any individual thread), except for the current table itself.
3379
3380 // Start by looking for the thread ID in the current and all previous hash tables.
3381 // If it's not found, it must not be in there yet, since this same thread would
3382 // have added it previously to one of the tables that we traversed.
3383
3384 // Code and algorithm adapted from http://preshing.com/20130605/the-worlds-simplest-lock-free-hash-table
3385
3386#if MCDBGQ_NOLOCKFREE_IMPLICITPRODHASH
3387 debug::DebugLock lock(implicitProdMutex);
3388#endif
3389
3390 auto id = details::thread_id();
3391 auto hashedId = details::hash_thread_id(id);
3392
3393 auto mainHash = implicitProducerHash.load(std::memory_order_acquire);
3394 for (auto hash = mainHash; hash != nullptr; hash = hash->prev) {
3395 // Look for the id in this hash
3396 auto index = hashedId;
3397 while (true) { // Not an infinite loop because at least one slot is free in the hash table
3398 index &= hash->capacity - 1;
3399
3400 auto probedKey = hash->entries[index].key.load(std::memory_order_relaxed);
3401 if (probedKey == id) {
3402 // Found it! If we had to search several hashes deep, though, we should lazily add it
3403 // to the current main hash table to avoid the extended search next time.
3404 // Note there's guaranteed to be room in the current hash table since every subsequent
3405 // table implicitly reserves space for all previous tables (there's only one
3406 // implicitProducerHashCount).
3407 auto value = hash->entries[index].value;
3408 if (hash != mainHash) {
3409 index = hashedId;
3410 while (true) {
3411 index &= mainHash->capacity - 1;
3412 probedKey = mainHash->entries[index].key.load(std::memory_order_relaxed);
3413 auto empty = details::invalid_thread_id;
3414#ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
3415 auto reusable = details::invalid_thread_id2;
3416 if ((probedKey == empty && mainHash->entries[index].key.compare_exchange_strong(empty, id, std::memory_order_relaxed, std::memory_order_relaxed)) ||
3417 (probedKey == reusable && mainHash->entries[index].key.compare_exchange_strong(reusable, id, std::memory_order_acquire, std::memory_order_acquire))) {
3418#else
3419 if ((probedKey == empty &&
3420 mainHash->entries[index].key.compare_exchange_strong(empty, id,
3421 std::memory_order_relaxed,
3422 std::memory_order_relaxed))) {
3423#endif
3424 mainHash->entries[index].value = value;
3425 break;
3426 }
3427 ++index;
3428 }
3429 }
3430
3431 return value;
3432 }
3433 if (probedKey == details::invalid_thread_id) {
3434 break; // Not in this hash table
3435 }
3436 ++index;
3437 }
3438 }
3439
3440 // Insert!
3441 auto newCount = 1 + implicitProducerHashCount.fetch_add(1, std::memory_order_relaxed);
3442 while (true) {
3443 if (newCount >= (mainHash->capacity >> 1) &&
3444 !implicitProducerHashResizeInProgress.test_and_set(std::memory_order_acquire)) {
3445 // We've acquired the resize lock, try to allocate a bigger hash table.
3446 // Note the acquire fence synchronizes with the release fence at the end of this block, and hence when
3447 // we reload implicitProducerHash it must be the most recent version (it only gets changed within this
3448 // locked block).
3449 mainHash = implicitProducerHash.load(std::memory_order_acquire);
3450 if (newCount >= (mainHash->capacity >> 1)) {
3451 auto newCapacity = mainHash->capacity << 1;
3452 while (newCount >= (newCapacity >> 1)) {
3453 newCapacity <<= 1;
3454 }
3455 auto raw = static_cast<char *>((Traits::malloc)(
3456 sizeof(ImplicitProducerHash) + std::alignment_of<ImplicitProducerKVP>::value - 1 +
3457 sizeof(ImplicitProducerKVP) * newCapacity));
3458 if (raw == nullptr) {
3459 // Allocation failed
3460 implicitProducerHashCount.fetch_sub(1, std::memory_order_relaxed);
3461 implicitProducerHashResizeInProgress.clear(std::memory_order_relaxed);
3462 return nullptr;
3463 }
3464
3465 auto newHash = new(raw) ImplicitProducerHash;
3466 newHash->capacity = newCapacity;
3467 newHash->entries = reinterpret_cast<ImplicitProducerKVP *>(details::align_for<ImplicitProducerKVP>(
3468 raw + sizeof(ImplicitProducerHash)));
3469 for (size_t i = 0; i != newCapacity; ++i) {
3470 new(newHash->entries + i) ImplicitProducerKVP;
3471 newHash->entries[i].key.store(details::invalid_thread_id, std::memory_order_relaxed);
3472 }
3473 newHash->prev = mainHash;
3474 implicitProducerHash.store(newHash, std::memory_order_release);
3475 implicitProducerHashResizeInProgress.clear(std::memory_order_release);
3476 mainHash = newHash;
3477 } else {
3478 implicitProducerHashResizeInProgress.clear(std::memory_order_release);
3479 }
3480 }
3481
3482 // If it's < three-quarters full, add to the old one anyway so that we don't have to wait for the next table
3483 // to finish being allocated by another thread (and if we just finished allocating above, the condition will
3484 // always be true)
3485 if (newCount < (mainHash->capacity >> 1) + (mainHash->capacity >> 2)) {
3486 bool recycled;
3487 auto producer = static_cast<ImplicitProducer *>(recycle_or_create_producer(false,
3488 recycled));
3489 if (producer == nullptr) {
3490 implicitProducerHashCount.fetch_sub(1, std::memory_order_relaxed);
3491 return nullptr;
3492 }
3493 if (recycled) {
3494 implicitProducerHashCount.fetch_sub(1, std::memory_order_relaxed);
3495 }
3496
3497#ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
3498 producer->threadExitListener.callback = &ConcurrentQueue::implicit_producer_thread_exited_callback;
3499 producer->threadExitListener.userData = producer;
3500 details::ThreadExitNotifier::subscribe(&producer->threadExitListener);
3501#endif
3502
3503 auto index = hashedId;
3504 while (true) {
3505 index &= mainHash->capacity - 1;
3506 auto probedKey = mainHash->entries[index].key.load(std::memory_order_relaxed);
3507
3508 auto empty = details::invalid_thread_id;
3509#ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
3510 auto reusable = details::invalid_thread_id2;
3511 if ((probedKey == empty && mainHash->entries[index].key.compare_exchange_strong(empty, id, std::memory_order_relaxed, std::memory_order_relaxed)) ||
3512 (probedKey == reusable && mainHash->entries[index].key.compare_exchange_strong(reusable, id, std::memory_order_acquire, std::memory_order_acquire))) {
3513#else
3514 if ((probedKey == empty && mainHash->entries[index].key.compare_exchange_strong(empty, id,
3515 std::memory_order_relaxed,
3516 std::memory_order_relaxed))) {
3517#endif
3518 mainHash->entries[index].value = producer;
3519 break;
3520 }
3521 ++index;
3522 }
3523 return producer;
3524 }
3525
3526 // Hmm, the old hash is quite full and somebody else is busy allocating a new one.
3527 // We need to wait for the allocating thread to finish (if it succeeds, we add, if not,
3528 // we try to allocate ourselves).
3529 mainHash = implicitProducerHash.load(std::memory_order_acquire);
3530 }
3531 }
3532
3533#ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
3534 void implicit_producer_thread_exited(ImplicitProducer* producer)
3535 {
3536 // Remove from thread exit listeners
3537 details::ThreadExitNotifier::unsubscribe(&producer->threadExitListener);
3538
3539 // Remove from hash
3540#if MCDBGQ_NOLOCKFREE_IMPLICITPRODHASH
3541 debug::DebugLock lock(implicitProdMutex);
3542#endif
3543 auto hash = implicitProducerHash.load(std::memory_order_acquire);
3544 assert(hash != nullptr); // The thread exit listener is only registered if we were added to a hash in the first place
3545 auto id = details::thread_id();
3546 auto hashedId = details::hash_thread_id(id);
3547 details::thread_id_t probedKey;
3548
3549 // We need to traverse all the hashes just in case other threads aren't on the current one yet and are
3550 // trying to add an entry thinking there's a free slot (because they reused a producer)
3551 for (; hash != nullptr; hash = hash->prev) {
3552 auto index = hashedId;
3553 do {
3554 index &= hash->capacity - 1;
3555 probedKey = hash->entries[index].key.load(std::memory_order_relaxed);
3556 if (probedKey == id) {
3557 hash->entries[index].key.store(details::invalid_thread_id2, std::memory_order_release);
3558 break;
3559 }
3560 ++index;
3561 } while (probedKey != details::invalid_thread_id); // Can happen if the hash has changed but we weren't put back in it yet, or if we weren't added to this hash in the first place
3562 }
3563
3564 // Mark the queue as being recyclable
3565 producer->inactive.store(true, std::memory_order_release);
3566 }
3567
3568 static void implicit_producer_thread_exited_callback(void* userData)
3569 {
3570 auto producer = static_cast<ImplicitProducer*>(userData);
3571 auto queue = producer->parent;
3572 queue->implicit_producer_thread_exited(producer);
3573 }
3574#endif
3575
3577 // Utility functions
3579
3580 template<typename U>
3581 static inline U *create_array(size_t count) {
3582 assert(count > 0);
3583 auto p = static_cast<U *>((Traits::malloc)(sizeof(U) * count));
3584 if (p == nullptr) {
3585 return nullptr;
3586 }
3587
3588 for (size_t i = 0; i != count; ++i) {
3589 new(p + i) U();
3590 }
3591 return p;
3592 }
3593
3594 template<typename U>
3595 static inline void destroy_array(U *p, size_t count) {
3596 if (p != nullptr) {
3597 assert(count > 0);
3598 for (size_t i = count; i != 0;) {
3599 (p + --i)->~U();
3600 }
3601 (Traits::free)(p);
3602 }
3603 }
3604
3605 template<typename U>
3606 static inline U *create() {
3607 auto p = (Traits::malloc)(sizeof(U));
3608 return p != nullptr ? new(p) U : nullptr;
3609 }
3610
3611 template<typename U, typename A1>
3612 static inline U *create(A1 &&a1) {
3613 auto p = (Traits::malloc)(sizeof(U));
3614 return p != nullptr ? new(p) U(std::forward<A1>(a1)) : nullptr;
3615 }
3616
3617 template<typename U>
3618 static inline void destroy(U *p) {
3619 if (p != nullptr) {
3620 p->~U();
3621 }
3622 (Traits::free)(p);
3623 }
3624
3625 private:
3626 std::atomic<ProducerBase *> producerListTail;
3627 std::atomic<std::uint32_t> producerCount;
3628
3629 std::atomic<size_t> initialBlockPoolIndex;
3630 Block *initialBlockPool;
3631 size_t initialBlockPoolSize;
3632
3633#if !MCDBGQ_USEDEBUGFREELIST
3634 FreeList<Block> freeList;
3635#else
3636 debug::DebugFreeList<Block> freeList;
3637#endif
3638
3639 std::atomic<ImplicitProducerHash *> implicitProducerHash;
3640 std::atomic<size_t> implicitProducerHashCount; // Number of slots logically used
3641 ImplicitProducerHash initialImplicitProducerHash;
3642 std::array<ImplicitProducerKVP, INITIAL_IMPLICIT_PRODUCER_HASH_SIZE> initialImplicitProducerHashEntries;
3643 std::atomic_flag implicitProducerHashResizeInProgress;
3644
3645 std::atomic<std::uint32_t> nextExplicitConsumerId;
3646 std::atomic<std::uint32_t> globalExplicitConsumerOffset;
3647
3648#if MCDBGQ_NOLOCKFREE_IMPLICITPRODHASH
3649 debug::DebugMutex implicitProdMutex;
3650#endif
3651
3652#ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
3653 std::atomic<ExplicitProducer*> explicitProducers;
3654 std::atomic<ImplicitProducer*> implicitProducers;
3655#endif
3656};
3657
3658
3659template<typename T, typename Traits>
3660ProducerToken::ProducerToken(ConcurrentQueue<T, Traits> &queue)
3661 : producer(queue.recycle_or_create_producer(true)) {
3662 if (producer != nullptr) {
3663 producer->token = this;
3664 }
3665}
3666
3667template<typename T, typename Traits>
3668ProducerToken::ProducerToken(BlockingConcurrentQueue<T, Traits> &queue)
3669 : producer(
3670 reinterpret_cast<ConcurrentQueue<T, Traits> *>(&queue)->recycle_or_create_producer(true)) {
3671 if (producer != nullptr) {
3672 producer->token = this;
3673 }
3674}
3675
3676template<typename T, typename Traits>
3677ConsumerToken::ConsumerToken(ConcurrentQueue<T, Traits> &queue)
3678 : itemsConsumedFromCurrent(0), currentProducer(nullptr), desiredProducer(nullptr) {
3679 initialOffset = queue.nextExplicitConsumerId.fetch_add(1, std::memory_order_release);
3680 lastKnownGlobalOffset = -1;
3681}
3682
3683template<typename T, typename Traits>
3684ConsumerToken::ConsumerToken(BlockingConcurrentQueue<T, Traits> &queue)
3685 : itemsConsumedFromCurrent(0), currentProducer(nullptr), desiredProducer(nullptr) {
3686 initialOffset = reinterpret_cast<ConcurrentQueue <T, Traits> *>(&queue)->nextExplicitConsumerId.fetch_add(
3687 1, std::memory_order_release);
3688 lastKnownGlobalOffset = -1;
3689}
3690
3691template<typename T, typename Traits>
3692inline void swap(ConcurrentQueue<T, Traits> &a, ConcurrentQueue<T, Traits> &b) MOODYCAMEL_NOEXCEPT {
3693 a.swap(b);
3694}
3695
3696inline void swap(ProducerToken &a, ProducerToken &b) MOODYCAMEL_NOEXCEPT {
3697 a.swap(b);
3698}
3699
3700inline void swap(ConsumerToken &a, ConsumerToken &b) MOODYCAMEL_NOEXCEPT {
3701 a.swap(b);
3702}
3703
3704template<typename T, typename Traits>
3705inline void swap(typename ConcurrentQueue<T, Traits>::ImplicitProducerKVP &a,
3706 typename ConcurrentQueue<T, Traits>::ImplicitProducerKVP &b) MOODYCAMEL_NOEXCEPT {
3707 a.swap(b);
3708}
3709
3710}
3711
3712} // namespace dmlc
3713
3714#if defined(__GNUC__)
3715#pragma GCC diagnostic pop
3716#endif
3717
3718#endif // DMLC_CONCURRENTQUEUE_H_
namespace for dmlc
Definition array_view.h:12
unsigned index_t
this defines the unsigned integer type that can normally be used to store feature index
Definition data.h:32
int N
Simulate some binary data with a single categorical and single continuous predictor.
Definition logistic_regression.py:26
Definition StdDeque.h:58
NLOHMANN_BASIC_JSON_TPL_DECLARATION void swap(nlohmann::NLOHMANN_BASIC_JSON_TPL &j1, nlohmann::NLOHMANN_BASIC_JSON_TPL &j2) noexcept(//NOLINT(readability-inconsistent-declaration-parameter-name, cert-dcl58-cpp) is_nothrow_move_constructible< nlohmann::NLOHMANN_BASIC_JSON_TPL >::value &&//NOLINT(misc-redundant-expression) is_nothrow_move_assignable< nlohmann::NLOHMANN_BASIC_JSON_TPL >::value)
exchanges the values of two JSON objects
Definition json.hpp:24418