#pragma region CPL License
/*
Nuclex Native Framework
Copyright (C) 2002-2023 Nuclex Development Labs
This library is free software; you can redistribute it and/or
modify it under the terms of the IBM Common Public License as
published by the IBM Corporation; either version 1.0 of the
License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
IBM Common Public License for more details.
You should have received a copy of the IBM Common Public
License along with this library
*/
#pragma endregion // CPL License
#if !defined(NUCLEX_SUPPORT_COLLECTIONS_CONCURRENTRINGBUFFER_H)
#error This file must be included through via ConcurrentRingBuffer.h
#endif
namespace Nuclex { namespace Support { namespace Collections {
// ------------------------------------------------------------------------------------------- //
/// Fixed-size circular buffer for one consumer and multiple producers
///
///
/// This multi-producer, single-consumer version of the concurrent buffer lets any
/// number of threads add items to the buffer. A single thread can take items from
/// the buffer.
///
///
/// This implementation is lock-free and also wait-free (i.e. no compare-and-swap loops).
/// Batch operations are supported and this variant gives a strong-ish exception
/// guarantee: if an operation fails, the buffer's state remains as if it never happened,
/// but the buffer's capacity will be temporarily reduced.
///
///
/// Container type: bounded ring buffer
///
///
/// Thread safety: unlimited producing threads + one consuming thread
///
///
/// Exception guarantee: strong-ish (exception = buffer unchanged)
///
///
/// Footprint (stack): 48 bytes.
/// Footprint (heap): +1 extra byte per element
///
template
class ConcurrentRingBuffer {
///
/// Initializes a new concurrent queue for a multiple producers and a single consumer
///
/// Maximum amount of items the queue can hold
public: explicit ConcurrentRingBuffer(std::size_t capacity) :
capacity(capacity),
itemMemory(nullptr),
itemStatus(nullptr),
count(0),
readIndex(0),
writeIndex(0) {
std::uint8_t *buffer = new std::uint8_t[sizeof(TElement[2]) * capacity / 2U];
{
auto itemMemoryDeleter = ON_SCOPE_EXIT_TRANSACTION {
delete[] buffer;
};
this->itemStatus = new std::atomic[capacity];
itemMemoryDeleter.Commit(); // disarm the item memory deleter
}
this->itemMemory = reinterpret_cast(buffer);
// Initialize the status of all items
for(std::size_t index = 0; index < capacity; ++index) {
this->itemStatus[index].store(0, std::memory_order_relaxed);
}
std::atomic_thread_fence(std::memory_order_release);
}
/// Frees all memory owned by the concurrent queue and the items therein
public: ~ConcurrentRingBuffer() {
if constexpr(!std::is_trivially_destructible::value) {
std::size_t safeCount = this->count.load(
std::memory_order_consume // consume: if() below carries dependency
);
if(safeCount >= 1) {
std::size_t safeReadIndex = this->readIndex.load(
std::memory_order_consume // consume: while() below carries dependency
);
while(safeCount >= 1) {
this->itemMemory[safeReadIndex].~TElement();
safeReadIndex = (safeReadIndex + 1) % this->capacity;
--safeCount;
// Don't update free slot count, read index, item status because it's the d'tor
}
}
}
delete[] this->itemStatus;
#if !defined(NDEBUG)
this->itemStatus = nullptr;
#endif
delete[] reinterpret_cast(this->itemMemory);
#if !defined(NDEBUG)
this->itemMemory = nullptr;
#endif
}
/// Estimates the number of items stored in the queue
/// The probably number of itemsthe queue held at the time of the call
///
/// This method can be called from any thread and will have just about the same
/// accuracy as when it is called from the consumer thread or one of the producers.
/// If an item constructor throws an exception while the item is being copied/moved
/// into the buffer's memory, this will still increase the count (until )
///
public: std::size_t Count() const {
// If many producers add at the same time, the item count may for a moment jump above
// 'capacity' (the producer that incremented it above capacity silently decrements it
// again and reports to its caller that the queue was full).
return std::min(this->count.load(std::memory_order_relaxed), this->capacity);
}
/// Tries to append the specified element to the queue
/// Element that will be appended to the queue
/// True if the element was appended, false if the queue had no space left
public: bool TryAppend(const TElement &element) {
// Try to reserve a slot. If the queue is full, the value will hit the capacity (or even
// exceed it if highly contested), in which case we just hand the unusable slot back.
{
std::size_t safeCount = this->count.fetch_add(
1, std::memory_order_consume // consume: if() below carries dependency
);
if(safeCount >= this->capacity) { // can happen under high contention of this code spot
this->count.fetch_sub(1, std::memory_order_release);
return false;
}
}
// If we reach this spot, we know there was at least 1 slot free in the queue and we
// just captured it (i.e. no other thread will cause less than 1 slot to remain free
// while the following code runs). So we can happily increment the write index here.
std::size_t targetSlotIndex;
{
int safeOccupiedIndex = this->writeIndex.fetch_add(
1, std::memory_order_consume // consume: if() below carries dependency
);
// If the write index goes past 'capacity', do a wrap-around (ring buffer).
// Multiple threads may simultaneously hit this spot, moving write index
// into the negative. That is fine (we do a positive modulo on the index).
if(
(safeOccupiedIndex > 0) && // To ensure static_cast below is safe
(static_cast(safeOccupiedIndex) >= this->capacity)
) {
this->writeIndex.fetch_sub(
static_cast(this->capacity), std::memory_order_relaxed
);
}
targetSlotIndex = positiveModulo(safeOccupiedIndex, static_cast(this->capacity));
}
// Mark the slot as under construction for the reading thread
#if !defined(NDEBUG) // not really needed, empty and under construction are treated the same
this->itemStatus[targetSlotIndex].store(1, std::memory_order_release);
#endif
// Copy the item into the slot. If its copy constructor throws, the slot must be
// marked as broken so the reading thread will skip it.
{
auto brokenSlotScope = ON_SCOPE_EXIT_TRANSACTION {
this->itemStatus[targetSlotIndex].store(3, std::memory_order_release);
};
new(this->itemMemory + targetSlotIndex) TElement(element);
brokenSlotScope.Commit();
}
// Mark the slot as available for the reading thread
this->itemStatus[targetSlotIndex].store(2, std::memory_order_release);
// Item was appended!
return true;
}
/// Tries to move-append the specified element to the queue
/// Element that will be appended to the queue
/// True if the element was appended, false if the queue had no space left
public: bool TryShove(const TElement &&element) {
// Try to reserve a slot. If the queue is full, the value will hit the capacity (or even
// exceed it if highly contested), in which case we just hand the unusable slot back.
{
std::size_t safeCount = this->count.fetch_add(
1, std::memory_order_consume // consume: if() below carries dependency
);
if(safeCount >= this->capacity) { // can happen under high contestion of this code spot
this->count.fetch_sub(1, std::memory_order_release);
return false;
}
}
// If we reach this spot, we know there was at least 1 slot free in the queue and we
// just captured it (i.e. no other thread will cause less than 1 slot to remain free
// while the following code runs). So we can happily increment the write index here.
std::size_t targetSlotIndex;
{
int safeOccupiedIndex = this->writeIndex.fetch_add(
1, std::memory_order_consume // consume: if() below carries dependency
);
// If the write index goes past 'capacity', do a wrap-around (ring buffer).
// Multiple threads may simultaneously hit this spot, moving write index
// into the negative. That is fine (we do a positive modulo on the index).
if(
(safeOccupiedIndex > 0) && // To ensure static_cast below is safe
(static_cast(safeOccupiedIndex) >= this->capacity)
) {
this->writeIndex.fetch_sub(this->capacity, std::memory_order_relaxed);
}
targetSlotIndex = positiveModulo(safeOccupiedIndex, this->capacity);
}
// Mark the slot as under construction for the reading thread
#if !defined(NDEBUG) // not really needed, empty and under construction are treated the same
this->itemStatus[targetSlotIndex].store(1, std::memory_order_release);
#endif
// Move the item into the slot. If its move constructor throws, the slot must be
// marked as broken so the reading thread will skip it.
{
auto brokenSlotScope = ON_SCOPE_EXIT_TRANSACTION {
this->itemStatus[targetSlotIndex].store(3, std::memory_order_release);
};
new(this->itemMemory + targetSlotIndex) TElement(std::move(element));
brokenSlotScope.Commit();
}
// Mark the slot as available for the reading thread
this->itemStatus[targetSlotIndex].store(2, std::memory_order_release);
// Item was appended!
return true;
}
/// Tries to remove an element from the queue
/// Element into which the queue's element will be placed
///
/// This method always attempts to use move semantics because item in the buffer is
/// rendered inaccessible and eventually destroyed anyway.
///
public: bool TryTake(TElement &element) {
std::size_t safeCount = this->count.load(
std::memory_order_consume // consume: if() below carries dependency
);
if(safeCount < 1) {
return false; // No more potential items in queue
}
// If we reach this point, there is at least one taken slot (which may contain
// a valid item or represent a gap due to a constructor exception while adding the item)
std::size_t safeReadIndex = this->readIndex.load(
std::memory_order_consume // consume: access below carries dependency
);
for(;;) { // Typical case: loop runs once. Case w/gaps: multiple runs, but deterministic
std::uint8_t safeItemStatus = this->itemStatus[safeReadIndex].load(
std::memory_order_consume // consume: if() below carries dependency
);
if(safeItemStatus < 2) { // 0: item is empty, 1: item is under construction
return false; // If the item is missing, act as if the queue had no more items
}
// Item status 2 means there is an item present in the slot
if(safeItemStatus == 2) {
break;
}
// safeItemStatus was 3, so the current item is a gap and can be skipped
// (this happens when an item constructor throws an exception)
if constexpr(!std::is_trivially_destructible::value) {
this->itemMemory[safeReadIndex].~TElement();
}
this->itemStatus[safeReadIndex].store(0, std::memory_order_relaxed);
// Why read again? Because 'count' may have been equal to or larger than our capacity
// (if many threads try to append at the same time), so for those cases, we re-read
// to make sure we re-enter accurate territory at (capacity - 1).
// CHECK: A simple std::min() at the top of the method would suffice, too, wouldn't it?
safeCount = (
this->count.fetch_sub(1, std::memory_order_consume) - 1 // if() below = dependency
);
if(safeCount < 1) {
return false; // No more potential items in queue (everything was a gap)
}
safeReadIndex = (safeReadIndex + 1) % this->capacity;
}
// Move the item to the caller-provided memory. This may throw.
TElement *readAddress = this->itemMemory + safeReadIndex;
element = std::move(*readAddress);
if constexpr(!std::is_trivially_destructible::value) {
readAddress->~TElement();
}
this->itemStatus[safeReadIndex].store(0, std::memory_order_release);
// For a single reader, the ordering here is not that important, i.e. another
// reader thread can't come by, see the free slot and read the un-updated read index
this->readIndex.store(
(safeReadIndex + 1) % this->capacity, std::memory_order_relaxed
);
this->count.fetch_sub(1, std::memory_order_release);
return true; // Item was read
}
/// Returns the maximum number of items the queue can hold
/// The maximum number of items the queue can hold
public: std::size_t GetCapacity() const { return this->capacity; }
/// Performs the modulo operation, but returns 0..divisor-1
/// Value for which the positive modulo will be calculated
/// Divisor of which the remainder will be calculated
/// The positive division remainder of the specified value
///
/// There are various tricks to achieve this without branching, but they're all slower.
/// Reason: x86, amd64 and ARM CPUs have conditional move instructions, allowing cases
/// like this one to execute without branching at the machine code level.
///
private: static std::size_t positiveModulo(int value, int divisor) {
value %= divisor;
if(value < 0) {
return static_cast(value + divisor);
} else {
return static_cast(value);
}
}
/// Number of items the ring buffer can hold
private: const std::size_t capacity;
/// Memory block that holds the items currently stored in the queue
private: TElement *itemMemory;
/// Status of items in buffer, 0: empty, 1: filling, 2: present, 3: gap
private: std::atomic *itemStatus;
/// Number of free slots the queue can store elements in
///
///
/// This allows the method to know whether a slot will be free
/// after the current write index, eliminating the whole C-A-S loop. While reserving,
/// the value will be blindly incremented, checked and - if beyond capacity - decremented
/// back down.
///
///
/// Also important is that this counts slots, not items. If a constructor throws during
/// an append operation, the slot will remain occupied (because it can't be safely
/// returned due to other threads being able to grab slots after it) but marked as a gap.
///
///
private: std::atomic count;
/// Index from which the next item will be read
private: std::atomic readIndex;
/// Index at which the most recently written item is stored
///
/// Notice that contrary to normal practice, this does not point one past the last
/// item (i.e. to the position of the next write), but is the index of the last item
/// that has been stored in the buffer. The lock-free synchronization is easier this way.
///
private: std::atomic writeIndex;
};
// ------------------------------------------------------------------------------------------- //
}}} // namespace Nuclex::Support::Collections