#pragma region CPL License /* Nuclex Native Framework Copyright (C) 2002-2023 Nuclex Development Labs This library is free software; you can redistribute it and/or modify it under the terms of the IBM Common Public License as published by the IBM Corporation; either version 1.0 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the IBM Common Public License for more details. You should have received a copy of the IBM Common Public License along with this library */ #pragma endregion // CPL License #if !defined(NUCLEX_SUPPORT_COLLECTIONS_CONCURRENTRINGBUFFER_H) #error This file must be included through via ConcurrentRingBuffer.h #endif #if defined(_MSC_VER) && (_MSC_VER >= 1920) // https://github.com/microsoft/STL/issues/1673 // https://developercommunity2.visualstudio.com/t/compiler/1346618 #define STRING2(x) #x #define STRING(x) STRING2(x) #pragma message ( \ __FILE__ "(" STRING(__LINE__) "): " \ "warning: compiling the Nuclex MPMC Queue on VS2019 will trigger a compiler bug " \ "at the point of usage (see https://github.com/microsoft/STL/issues/1673)" \ ) // Unfortunately, there seems to be no usable workaround for this compiler. // I don't know when or if it will be fixed, so I'm leaving this warning open-ended. // If I spot a version of Microsoft's C++ compiler that works, I'll limit the range. #endif namespace Nuclex { namespace Support { namespace Collections { // ------------------------------------------------------------------------------------------- // /// Fixed-size circular buffer for multiple consumers and producers /// /// /// This multi-producer, multi-consumer variant of the concurrent buffer can be /// freely used from any number of threads. Any thread can append items to the buffer /// and any thread can take items from the buffer without any restrictions. /// /// /// This implementation is lock-free and also wait-free (i.e. no compare-and-swap loops). /// Batch operations are supported and this variant gives a strong-ish exception /// guarantee: if an operation fails, the buffer's state remains as if it never happened, /// but the buffer's capacity will be temporarily reduced. /// /// /// Container type: bounded ring buffer /// /// /// Thread safety: any number of consumers, any numbers of producers /// /// /// Exception guarantee: strong-ish (exception = buffer unchanged) /// /// /// /// Footprint (stack): 48 bytes. /// Footprint (heap): +1 extra byte per element /// template class ConcurrentRingBuffer< TElement, ConcurrentAccessBehavior::MultipleProducersMultipleConsumers > { /// /// Initializes a new concurrent queue for multiple producers and consumers /// /// Maximum amount of items the queue can hold public: explicit ConcurrentRingBuffer(std::size_t capacity) : capacity(capacity), itemMemory(nullptr), itemStatus(nullptr), readIndex(0), writeIndex(0), occupiedCount(0), availableCount(0) { std::uint8_t *buffer = new std::uint8_t[sizeof(TElement[2]) * capacity / 2U]; { auto itemMemoryDeleter = ON_SCOPE_EXIT_TRANSACTION { delete[] buffer; }; this->itemStatus = new std::atomic[capacity]; itemMemoryDeleter.Commit(); // disarm the item memory deleter } this->itemMemory = reinterpret_cast(buffer); // Initialize the status of all items for(std::size_t index = 0; index < capacity; ++index) { this->itemStatus[index].store(0, std::memory_order_relaxed); } std::atomic_thread_fence(std::memory_order_release); } /// Frees all memory owned by the concurrent queue and the items therein public: ~ConcurrentRingBuffer() { if constexpr(!std::is_trivially_destructible::value) { std::size_t safeCount = this->occupiedCount.load( std::memory_order_consume // consume: if() below carries dependency ); if(safeCount >= 1) { std::size_t safeReadIndex = this->readIndex.load( std::memory_order_consume // consume: while() below carries dependency ); while(safeCount >= 1) { this->itemMemory[safeReadIndex].~TElement(); safeReadIndex = (safeReadIndex + 1) % this->capacity; --safeCount; // Don't update free slot occupiedCount, read index, item status because it's the d'tor } } } delete[] this->itemStatus; #if !defined(NDEBUG) this->itemStatus = nullptr; #endif delete[] reinterpret_cast(this->itemMemory); #if !defined(NDEBUG) this->itemMemory = nullptr; #endif } /// Estimates the number of items stored in the queue /// The probable number of items the queue held at the time of the call /// /// This method can be called from any thread and will have just about the same /// accuracy as when it is called from the consumer thread or one of the producers. /// If an item constructor throws an exception while the item is being copied/moved /// into the buffer's memory, this will still increase the occupiedCount (until ) /// public: std::size_t Count() const { // If many producers add at the same time, the item count may for a moment jump above // 'capacity' (the producer that incremented it above capacity silently decrements it // again and reports to its caller that the queue was full). return std::min( this->occupiedCount.load(std::memory_order_relaxed), this->capacity ); } /// Tries to append the specified element to the queue /// Element that will be appended to the queue /// True if the element was appended, false if the queue had no space left public: bool TryAppend(const TElement &element) { // Try to reserve a slot. If the queue is full, the value will hit the capacity (or even // exceed it if highly contested), in which case we just hand the unusable slot back. { std::size_t safeCount = this->occupiedCount.fetch_add( 1, std::memory_order_consume // consume: if() below carries dependency ); if(safeCount >= this->capacity) { // can happen under high contention of this code spot this->occupiedCount.fetch_sub(1, std::memory_order_release); return false; } } // If we reach this spot, we know there was at least 1 slot free in the queue and we // just captured it (i.e. no other thread will cause less than 1 slot to remain free). // So we just need to 'take' a slot index from the write index list std::size_t targetSlotIndex; { int safeOccupiedIndex = this->writeIndex.fetch_add( 1, std::memory_order_consume // consume: if() below carries dependency ); // If the write index goes past 'capacity', do a wrap-around (ring buffer). // Multiple threads may simultaneously hit this spot, moving write index // into the negative. That is fine (we do a positive modulo on the index). if(safeOccupiedIndex > 0) { if(static_cast(safeOccupiedIndex) >= this->capacity) { this->writeIndex.fetch_sub( static_cast(this->capacity), std::memory_order_relaxed ); } } targetSlotIndex = positiveModulo(safeOccupiedIndex, static_cast(this->capacity)); } // Mark the slot as under construction for the reading thread #if !defined(NDEBUG) // not really needed, empty and under construction are treated the same this->itemStatus[targetSlotIndex].store(1, std::memory_order_release); #endif // Copy the item into the slot. If its copy constructor throws, the slot must be // marked as broken so the reading thread will skip it. { auto brokenSlotScope = ON_SCOPE_EXIT_TRANSACTION { this->itemStatus[targetSlotIndex].store(3, std::memory_order_release); this->availableCount.fetch_add(1, std::memory_order_release); }; new(this->itemMemory + targetSlotIndex) TElement(element); brokenSlotScope.Commit(); } // Mark the slot as available for the reading thread this->itemStatus[targetSlotIndex].store(2, std::memory_order_release); this->availableCount.fetch_add(1, std::memory_order_release); // Item was appended! return true; } /// Tries to remove an element from the queue /// Element into which the queue's element will be placed /// True if an item was available and return, false otherwise /// /// This method always attempts to use move semantics because item in the buffer is /// rendered inaccessible and eventually destroyed anyway. /// public: bool TryTake(TElement &element) { int safeAvailableCount = this->availableCount.fetch_sub(1, std::memory_order_release); if(safeAvailableCount < 1) { this->availableCount.fetch_add(1, std::memory_order_relaxed); return false; } // At this point, we know there's at least one item in the queue and no other thread // is going to take it (because we reserved it through the availableCount). std::size_t sourceSlotIndex; { int safeReadIndex = this->readIndex.fetch_add( 1, std::memory_order_consume // consume: if() below carries dependency ); // If the write index goes past 'capacity', do a wrap-around (ring buffer). // Multiple threads may simultaneously hit this spot, moving write index // into the negative. That is fine (we do a positive modulo on the index). if(safeReadIndex > static_cast(this->capacity)) { this->readIndex.fetch_sub( static_cast(this->capacity), std::memory_order_relaxed ); } sourceSlotIndex = positiveModulo(safeReadIndex, static_cast(this->capacity)); } // Move the item to the caller-provided memory. This may throw. TElement *readAddress = this->itemMemory + sourceSlotIndex; { ON_SCOPE_EXIT { if constexpr(!std::is_trivially_destructible::value) { readAddress->~TElement(); } this->itemStatus[sourceSlotIndex].store(0, std::memory_order_release); this->occupiedCount.fetch_sub(1, std::memory_order_relaxed); }; element = std::move(*readAddress); } return true; // Item was read } /// Returns the maximum number of items the queue can hold /// The maximum number of items the queue can hold public: std::size_t GetCapacity() const { return this->capacity; } /// Performs the modulo operation, but returns 0..divisor-1 /// Value for which the positive modulo will be calculated /// Divisor of which the remainder will be calculated /// The positive division remainder of the specified value /// /// There are various tricks to achieve this without branching, but they're all slower. /// Reason: x86, amd64 and ARM CPUs have conditional move instructions, allowing cases /// like this one to execute without branching at the machine code level. /// private: static std::size_t positiveModulo(int value, int divisor) { value %= divisor; if(value < 0) { return static_cast(value + divisor); } else { return static_cast(value); } } /// Number of items the ring buffer can hold private: const std::size_t capacity; /// Memory block that holds the items currently stored in the queue /// /// This is allocated as a buffer of unsigned characters, thus it points to /// uninitialized memory, except for the items which have been placed into it. /// private: TElement *itemMemory; /// Status of items in buffer, 0: empty, 1: filling, 2: present, 3: gap private: std::atomic *itemStatus; /// Index from which the next item will be read /// /// Once a thread knows that an item is available and has reserved it through /// , it will blindly increment this value. If /// the incrementing thread sees that the read index is past the capacity, it will /// just as blindly decrement it by the capacity to force a wrap-around. In turn, /// readIndex can be both less than 0 and more than capacity, but when wrapped into /// the valid range, it will point to the correct item. /// private: std::atomic readIndex; /// Index at which the most recently written item is stored /// /// /// Notice that contrary to normal practice, this does not point one past the last /// item (i.e. to the position of the next write), but is the index of the last item /// that has been stored in the buffer. Lock-free synchronization is easier this way. /// /// /// The write index follows the same behavior documented for the read index, it may /// go beyond capacity or be less than 0 if multiple threads see it there and subtract /// the buffer's capacity. It, too, will point to the correct item with wrapp-around. /// /// private: std::atomic writeIndex; /// Number of free slots the queue can store elements in /// /// /// This allows the method to know whether a slot will be free /// after the current write index, eliminating the whole C-A-S loop. While reserving, /// the value will be blindly incremented, checked and - if beyond capacity - decremented /// back down. /// /// /// Also important is that this counts slots, not items. If a constructor throws during /// an append operation, the slot will remain occupied (because it can't be safely /// returned due to other threads being able to grab slots after it) but marked as a gap. /// /// private: std::atomic occupiedCount; /// Number of guaranteed available slots the queue can read elements from /// /// /// As the counterpart to , this will be blindly decremented /// when a thread is attempting to take an element from the queue. If it ends up less /// than 0 (meaning no elements were available), it is immediately incremented back up. /// /// /// Similarly to its counterpart, an available slot does not guarantee an available item. /// Slots may contain gap items (this happens when the copy or move constructur of /// an item throws an exception while it's being added). /// /// private: std::atomic availableCount; }; // ------------------------------------------------------------------------------------------- // }}} // namespace Nuclex::Support::Collections