diff --git a/external/atomic_queue/include/atomic_queue/atomic_queue.h b/external/atomic_queue/include/atomic_queue/atomic_queue.h index 21dc029bf..94d7a5681 100644 --- a/external/atomic_queue/include/atomic_queue/atomic_queue.h +++ b/external/atomic_queue/include/atomic_queue/atomic_queue.h @@ -29,12 +29,14 @@ namespace details { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// template struct GetCacheLineIndexBits { static int constexpr value = 0; }; -template<> struct GetCacheLineIndexBits<64> { static int constexpr value = 6; }; -template<> struct GetCacheLineIndexBits<32> { static int constexpr value = 5; }; -template<> struct GetCacheLineIndexBits<16> { static int constexpr value = 4; }; -template<> struct GetCacheLineIndexBits< 8> { static int constexpr value = 3; }; -template<> struct GetCacheLineIndexBits< 4> { static int constexpr value = 2; }; -template<> struct GetCacheLineIndexBits< 2> { static int constexpr value = 1; }; +template<> struct GetCacheLineIndexBits<256> { static int constexpr value = 8; }; +template<> struct GetCacheLineIndexBits<128> { static int constexpr value = 7; }; +template<> struct GetCacheLineIndexBits< 64> { static int constexpr value = 6; }; +template<> struct GetCacheLineIndexBits< 32> { static int constexpr value = 5; }; +template<> struct GetCacheLineIndexBits< 16> { static int constexpr value = 4; }; +template<> struct GetCacheLineIndexBits< 8> { static int constexpr value = 3; }; +template<> struct GetCacheLineIndexBits< 4> { static int constexpr value = 2; }; +template<> struct GetCacheLineIndexBits< 2> { static int constexpr value = 1; }; template struct GetIndexShuffleBits { @@ -54,14 +56,11 @@ struct GetIndexShuffleBits { // minimizes contention. This is done by swapping the lowest order N bits (which are the index of // the element within the cache line) with the next N bits (which are the index of the cache line) // of the element index. -template -constexpr unsigned remap_index_with_mix(unsigned index, unsigned mix) { - return index ^ mix ^ (mix << BITS); -} - template constexpr unsigned remap_index(unsigned index) noexcept { - return remap_index_with_mix(index, (index ^ (index >> BITS)) & ((1u << BITS) - 1)); + unsigned constexpr mix_mask{(1u << BITS) - 1}; + unsigned const mix{(index ^ (index >> BITS)) & mix_mask}; + return index ^ mix ^ (mix << BITS); } template<> @@ -76,8 +75,8 @@ constexpr T& map(T* elements, unsigned index) noexcept { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -// Implement a "bit-twiddling hack" for finding the next power of 2 -// in either 32 bits or 64 bits in C++11 compatible constexpr functions +// Implement a "bit-twiddling hack" for finding the next power of 2 in either 32 bits or 64 bits +// in C++11 compatible constexpr functions. The library no longer maintains C++11 compatibility. // "Runtime" version for 32 bits // --a; @@ -89,22 +88,22 @@ constexpr T& map(T* elements, unsigned index) noexcept { // ++a; template -constexpr T decrement(T x) { +constexpr T decrement(T x) noexcept { return x - 1; } template -constexpr T increment(T x) { +constexpr T increment(T x) noexcept { return x + 1; } template -constexpr T or_equal(T x, unsigned u) { - return (x | x >> u); +constexpr T or_equal(T x, unsigned u) noexcept { + return x | x >> u; } template -constexpr T or_equal(T x, unsigned u, Args... rest) { +constexpr T or_equal(T x, unsigned u, Args... rest) noexcept { return or_equal(or_equal(x, u), rest...); } @@ -118,6 +117,16 @@ constexpr uint64_t round_up_to_power_of_2(uint64_t a) noexcept { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +template +constexpr T nil() noexcept { +#if __cpp_lib_atomic_is_always_lock_free // Better compile-time error message requires C++17. + static_assert(std::atomic::is_always_lock_free, "Queue element type T is not atomic. Use AtomicQueue2/AtomicQueueB2 for such element types."); +#endif + return {}; +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + } // namespace details //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -131,7 +140,7 @@ class AtomicQueueCommon { // The special member functions are not thread-safe. - AtomicQueueCommon() = default; + AtomicQueueCommon() noexcept = default; AtomicQueueCommon(AtomicQueueCommon const& b) noexcept : head_(b.head_.load(X)) @@ -156,9 +165,9 @@ class AtomicQueueCommon { static T do_pop_atomic(std::atomic& q_element) noexcept { if(Derived::spsc_) { for(;;) { - T element = q_element.load(X); + T element = q_element.load(A); if(ATOMIC_QUEUE_LIKELY(element != NIL)) { - q_element.store(NIL, R); + q_element.store(NIL, X); return element; } if(Derived::maximize_throughput_) @@ -167,7 +176,7 @@ class AtomicQueueCommon { } else { for(;;) { - T element = q_element.exchange(NIL, R); // (2) The store to wait for. + T element = q_element.exchange(NIL, A); // (2) The store to wait for. if(ATOMIC_QUEUE_LIKELY(element != NIL)) return element; // Do speculative loads while busy-waiting to avoid broadcasting RFO messages. @@ -188,7 +197,7 @@ class AtomicQueueCommon { q_element.store(element, R); } else { - for(T expected = NIL; ATOMIC_QUEUE_UNLIKELY(!q_element.compare_exchange_strong(expected, element, R, X)); expected = NIL) { + for(T expected = NIL; ATOMIC_QUEUE_UNLIKELY(!q_element.compare_exchange_weak(expected, element, R, X)); expected = NIL) { do spin_loop_pause(); // (1) Wait for store (2) to complete. while(Derived::maximize_throughput_ && q_element.load(X) != NIL); @@ -211,7 +220,7 @@ class AtomicQueueCommon { else { for(;;) { unsigned char expected = STORED; - if(ATOMIC_QUEUE_LIKELY(state.compare_exchange_strong(expected, LOADING, A, X))) { + if(ATOMIC_QUEUE_LIKELY(state.compare_exchange_weak(expected, LOADING, A, X))) { T element{std::move(q_element)}; state.store(EMPTY, R); return element; @@ -236,7 +245,7 @@ class AtomicQueueCommon { else { for(;;) { unsigned char expected = EMPTY; - if(ATOMIC_QUEUE_LIKELY(state.compare_exchange_strong(expected, STORING, A, X))) { + if(ATOMIC_QUEUE_LIKELY(state.compare_exchange_weak(expected, STORING, A, X))) { q_element = std::forward(element); state.store(STORED, R); return; @@ -262,7 +271,7 @@ class AtomicQueueCommon { do { if(static_cast(head - tail_.load(X)) >= static_cast(static_cast(*this).size_)) return false; - } while(ATOMIC_QUEUE_UNLIKELY(!head_.compare_exchange_strong(head, head + 1, A, X))); // This loop is not FIFO. + } while(ATOMIC_QUEUE_UNLIKELY(!head_.compare_exchange_weak(head, head + 1, X, X))); // This loop is not FIFO. } static_cast(*this).do_push(std::forward(element), head); @@ -281,7 +290,7 @@ class AtomicQueueCommon { do { if(static_cast(head_.load(X) - tail) <= 0) return false; - } while(ATOMIC_QUEUE_UNLIKELY(!tail_.compare_exchange_strong(tail, tail + 1, A, X))); // This loop is not FIFO. + } while(ATOMIC_QUEUE_UNLIKELY(!tail_.compare_exchange_weak(tail, tail + 1, X, X))); // This loop is not FIFO. } element = static_cast(*this).do_pop(tail); @@ -296,20 +305,20 @@ class AtomicQueueCommon { head_.store(head + 1, X); } else { - constexpr auto memory_order = Derived::total_order_ ? std::memory_order_seq_cst : std::memory_order_acquire; + constexpr auto memory_order = Derived::total_order_ ? std::memory_order_seq_cst : std::memory_order_relaxed; head = head_.fetch_add(1, memory_order); // FIFO and total order on Intel regardless, as of 2019. } static_cast(*this).do_push(std::forward(element), head); } - Derived pop() noexcept { + auto pop() noexcept { unsigned tail; if(Derived::spsc_) { tail = tail_.load(X); tail_.store(tail + 1, X); } else { - constexpr auto memory_order = Derived::total_order_ ? std::memory_order_seq_cst : std::memory_order_acquire; + constexpr auto memory_order = Derived::total_order_ ? std::memory_order_seq_cst : std::memory_order_relaxed; tail = tail_.fetch_add(1, memory_order); // FIFO and total order on Intel regardless, as of 2019. } return static_cast(*this).do_pop(tail); @@ -335,7 +344,7 @@ class AtomicQueueCommon { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -template +template(), bool MINIMIZE_CONTENTION = true, bool MAXIMIZE_THROUGHPUT = true, bool TOTAL_ORDER = false, bool SPSC = false> class AtomicQueue : public AtomicQueueCommon> { using Base = AtomicQueueCommon>; friend Base; @@ -362,8 +371,8 @@ class AtomicQueue : public AtomicQueueCommon{NIL}.is_lock_free()); // This queue is for atomic elements only. AtomicQueue2 is for non-atomic ones. - if(T{} != NIL) + assert(std::atomic{NIL}.is_lock_free()); // Queue element type T is not atomic. Use AtomicQueue2/AtomicQueueB2 for such element types. + if(details::nil() != NIL) for(auto& element : elements_) element.store(NIL, X); } @@ -403,16 +412,17 @@ class AtomicQueue2 : public AtomicQueueCommon, T NIL = T{}, bool MAXIMIZE_THROUGHPUT = true, bool TOTAL_ORDER = false, bool SPSC = false> -class AtomicQueueB : public AtomicQueueCommon>, - private std::allocator_traits::template rebind_alloc> { +template, T NIL = details::nil(), bool MAXIMIZE_THROUGHPUT = true, bool TOTAL_ORDER = false, bool SPSC = false> +class AtomicQueueB : private std::allocator_traits::template rebind_alloc>, + public AtomicQueueCommon> { + using AllocatorElements = typename std::allocator_traits::template rebind_alloc>; using Base = AtomicQueueCommon>; friend Base; @@ -420,8 +430,6 @@ class AtomicQueueB : public AtomicQueueCommon::template rebind_alloc>; - static constexpr auto ELEMENTS_PER_CACHE_LINE = CACHE_LINE_SIZE / sizeof(std::atomic); static_assert(ELEMENTS_PER_CACHE_LINE, "Unexpected ELEMENTS_PER_CACHE_LINE."); @@ -445,25 +453,25 @@ class AtomicQueueB : public AtomicQueueCommon{NIL}.is_lock_free()); // This queue is for atomic elements only. AtomicQueueB2 is for non-atomic ones. + assert(std::atomic{NIL}.is_lock_free()); // Queue element type T is not atomic. Use AtomicQueue2/AtomicQueueB2 for such element types. for(auto p = elements_, q = elements_ + size_; p < q; ++p) p->store(NIL, X); } AtomicQueueB(AtomicQueueB&& b) noexcept - : Base(static_cast(b)) - , AllocatorElements(static_cast(b)) // TODO: This must be noexcept, static_assert that. - , size_(b.size_) - , elements_(b.elements_) { - b.size_ = 0; - b.elements_ = 0; - } + : AllocatorElements(static_cast(b)) // TODO: This must be noexcept, static_assert that. + , Base(static_cast(b)) + , size_(std::exchange(b.size_, 0)) + , elements_(std::exchange(b.elements_, nullptr)) + {} AtomicQueueB& operator=(AtomicQueueB&& b) noexcept { b.swap(*this); @@ -475,15 +483,19 @@ class AtomicQueueB : public AtomicQueueCommonBase::swap(b); swap(static_cast(*this), static_cast(b)); + Base::swap(b); swap(size_, b.size_); swap(elements_, b.elements_); } - friend void swap(AtomicQueueB& a, AtomicQueueB& b) { + friend void swap(AtomicQueueB& a, AtomicQueueB& b) noexcept { a.swap(b); } }; @@ -491,27 +503,25 @@ class AtomicQueueB : public AtomicQueueCommon, bool MAXIMIZE_THROUGHPUT = true, bool TOTAL_ORDER = false, bool SPSC = false> -class AtomicQueueB2 : public AtomicQueueCommon>, - private A, - private std::allocator_traits::template rebind_alloc> { +class AtomicQueueB2 : private std::allocator_traits::template rebind_alloc, + public AtomicQueueCommon> { + using StorageAllocator = typename std::allocator_traits::template rebind_alloc; using Base = AtomicQueueCommon>; using State = typename Base::State; + using AtomicState = std::atomic; friend Base; static constexpr bool total_order_ = TOTAL_ORDER; static constexpr bool spsc_ = SPSC; static constexpr bool maximize_throughput_ = MAXIMIZE_THROUGHPUT; - using AllocatorElements = A; - using AllocatorStates = typename std::allocator_traits::template rebind_alloc>; - // AtomicQueueCommon members are stored into by readers and writers. // Allocate these immutable members on another cache line which never gets invalidated by stores. alignas(CACHE_LINE_SIZE) unsigned size_; - std::atomic* states_; + AtomicState* states_; T* elements_; - static constexpr auto STATES_PER_CACHE_LINE = CACHE_LINE_SIZE / sizeof(State); + static constexpr auto STATES_PER_CACHE_LINE = CACHE_LINE_SIZE / sizeof(AtomicState); static_assert(STATES_PER_CACHE_LINE, "Unexpected STATES_PER_CACHE_LINE."); static constexpr auto SHUFFLE_BITS = details::GetCacheLineIndexBits::value; @@ -528,34 +538,43 @@ class AtomicQueueB2 : public AtomicQueueCommon(element), states_[index], elements_[index]); } + template + U* allocate_() { + U* p = reinterpret_cast(StorageAllocator::allocate(size_ * sizeof(U))); + assert(reinterpret_cast(p) % alignof(U) == 0); // Allocated storage must be suitably aligned for U. + return p; + } + + template + void deallocate_(U* p) noexcept { + StorageAllocator::deallocate(reinterpret_cast(p), size_ * sizeof(U)); // TODO: This must be noexcept, static_assert that. + } + public: using value_type = T; + using allocator_type = A; // The special member functions are not thread-safe. - AtomicQueueB2(unsigned size) - : size_(std::max(details::round_up_to_power_of_2(size), 1u << (SHUFFLE_BITS * 2))) - , states_(AllocatorStates::allocate(size_)) - , elements_(AllocatorElements::allocate(size_)) { + AtomicQueueB2(unsigned size, A const& allocator = A{}) + : StorageAllocator(allocator) + , size_(std::max(details::round_up_to_power_of_2(size), 1u << (SHUFFLE_BITS * 2))) + , states_(allocate_()) + , elements_(allocate_()) { for(auto p = states_, q = states_ + size_; p < q; ++p) p->store(Base::EMPTY, X); - - AllocatorElements& ae = *this; + A a = get_allocator(); for(auto p = elements_, q = elements_ + size_; p < q; ++p) - std::allocator_traits::construct(ae, p); + std::allocator_traits::construct(a, p); } AtomicQueueB2(AtomicQueueB2&& b) noexcept - : Base(static_cast(b)) - , AllocatorElements(static_cast(b)) // TODO: This must be noexcept, static_assert that. - , AllocatorStates(static_cast(b)) // TODO: This must be noexcept, static_assert that. - , size_(b.size_) - , states_(b.states_) - , elements_(b.elements_) { - b.size_ = 0; - b.states_ = 0; - b.elements_ = 0; - } + : StorageAllocator(static_cast(b)) // TODO: This must be noexcept, static_assert that. + , Base(static_cast(b)) + , size_(std::exchange(b.size_, 0)) + , states_(std::exchange(b.states_, nullptr)) + , elements_(std::exchange(b.elements_, nullptr)) + {} AtomicQueueB2& operator=(AtomicQueueB2&& b) noexcept { b.swap(*this); @@ -564,19 +583,22 @@ class AtomicQueueB2 : public AtomicQueueCommon::destroy(ae, p); - AllocatorElements::deallocate(elements_, size_); // TODO: This must be noexcept, static_assert that. - AllocatorStates::deallocate(states_, size_); // TODO: This must be noexcept, static_assert that. + std::allocator_traits::destroy(a, p); + deallocate_(elements_); + deallocate_(states_); } } + A get_allocator() const noexcept { + return *this; + } + void swap(AtomicQueueB2& b) noexcept { using std::swap; - this->Base::swap(b); - swap(static_cast(*this), static_cast(b)); - swap(static_cast(*this), static_cast(b)); + swap(static_cast(*this), static_cast(b)); + Base::swap(b); swap(size_, b.size_); swap(states_, b.states_); swap(elements_, b.elements_); diff --git a/external/atomic_queue/include/atomic_queue/defs.h b/external/atomic_queue/include/atomic_queue/defs.h index a279dcf8d..4601b1d46 100644 --- a/external/atomic_queue/include/atomic_queue/defs.h +++ b/external/atomic_queue/include/atomic_queue/defs.h @@ -6,8 +6,7 @@ #include -#if defined(__x86_64__) || defined(_M_X64) || \ - defined(__i386__) || defined(_M_IX86) +#if defined(__x86_64__) || defined(_M_X64) || defined(__i386__) || defined(_M_IX86) #include namespace atomic_queue { constexpr int CACHE_LINE_SIZE = 64; @@ -15,7 +14,7 @@ static inline void spin_loop_pause() noexcept { _mm_pause(); } } // namespace atomic_queue -#elif defined(__arm__) || defined(__aarch64__) +#elif defined(__arm__) || defined(__aarch64__) || defined(_M_ARM64) namespace atomic_queue { constexpr int CACHE_LINE_SIZE = 64; static inline void spin_loop_pause() noexcept { @@ -31,13 +30,42 @@ static inline void spin_loop_pause() noexcept { defined(__ARM_ARCH_8A__) || \ defined(__aarch64__)) asm volatile ("yield" ::: "memory"); +#elif defined(_M_ARM64) + __yield(); #else asm volatile ("nop" ::: "memory"); #endif } } // namespace atomic_queue +#elif defined(__ppc64__) || defined(__powerpc64__) +namespace atomic_queue { +constexpr int CACHE_LINE_SIZE = 128; // TODO: Review that this is the correct value. +static inline void spin_loop_pause() noexcept { + asm volatile("or 31,31,31 # very low priority"); // TODO: Review and benchmark that this is the right instruction. +} +} // namespace atomic_queue +#elif defined(__s390x__) +namespace atomic_queue { +constexpr int CACHE_LINE_SIZE = 256; // TODO: Review that this is the correct value. +static inline void spin_loop_pause() noexcept {} // TODO: Find the right instruction to use here, if any. +} // namespace atomic_queue +#elif defined(__riscv) +namespace atomic_queue { +constexpr int CACHE_LINE_SIZE = 64; +static inline void spin_loop_pause() noexcept { + asm volatile (".insn i 0x0F, 0, x0, x0, 0x010"); +} +} // namespace atomic_queue +#else +#ifdef _MSC_VER +#pragma message("Unknown CPU architecture. Using L1 cache line size of 64 bytes and no spinloop pause instruction.") #else -#error "Unknown CPU architecture." +#warning "Unknown CPU architecture. Using L1 cache line size of 64 bytes and no spinloop pause instruction." +#endif +namespace atomic_queue { +constexpr int CACHE_LINE_SIZE = 64; // TODO: Review that this is the correct value. +static inline void spin_loop_pause() noexcept {} +} // namespace atomic_queue #endif //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -60,6 +88,7 @@ auto constexpr A = std::memory_order_acquire; auto constexpr R = std::memory_order_release; auto constexpr X = std::memory_order_relaxed; auto constexpr C = std::memory_order_seq_cst; +auto constexpr AR = std::memory_order_acq_rel; ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////