#pragma region CPL License
/*
Nuclex Native Framework
Copyright (C) 2002-2023 Nuclex Development Labs
This library is free software; you can redistribute it and/or
modify it under the terms of the IBM Common Public License as
published by the IBM Corporation; either version 1.0 of the
License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
IBM Common Public License for more details.
You should have received a copy of the IBM Common Public
License along with this library
*/
#pragma endregion // CPL License
#if !defined(NUCLEX_SUPPORT_COLLECTIONS_CONCURRENTRINGBUFFER_H)
#error This file must be included through via ConcurrentRingBuffer.h
#endif
namespace Nuclex { namespace Support { namespace Collections {
// ------------------------------------------------------------------------------------------- //
/// Fixed-size circular buffer that can safely be used from two threads
///
///
/// The single-producer, single-consumer version of the concurrent buffer lets one
/// thread add items to the buffer and another take items from the buffer. No other
/// threads are allowed to interact with the buffer.
///
///
/// This implementation is lock-free and also wait-free (i.e. no compare-and-swap loops).
/// Batch operations are supported and this variant gives a strong exception guarantee:
/// if an operation fails, the buffer's state remains as if it never happened.
///
///
/// Container type: bounded ring buffer
///
///
/// Thread safety: one producing thread + one consuming thread
///
///
/// Exception guarantee: strong (exception = buffer unchanged)
///
///
/// Footprint (stack): 32 bytes.
/// Footprint (heap): only the elements
///
///
template
class ConcurrentRingBuffer {
/// Initializes a new concurrent ring buffer
/// Maximum number of items the ring buffer can hold
public: explicit ConcurrentRingBuffer(std::size_t capacity) :
capacity(capacity + 1), // One item is wasted in return for simpler full/empty math
itemMemory(
reinterpret_cast(
new std::uint8_t[sizeof(TElement[2]) * (capacity + 1U) / 2U]
)
),
readIndex(0),
writeIndex(0) {
std::atomic_thread_fence(std::memory_order_release);
}
/// Frees all memory owned by the concurrent queue and the items therein
///
/// The destructor may be called from any thread, so long as the producer and the consumer
/// threads are stopped (which is of course necessary in any case, otherwise either thread
/// will segfault accessing the destroyed buffer before long).
///
public: ~ConcurrentRingBuffer() {
// Call destructors if the type has them
if constexpr(!std::is_trivially_destructible::value) {
std::size_t safeReadIndex = this->readIndex.load(
std::memory_order_consume // consume: while() below carries dependency
);
std::size_t safeWriteIndex = this->writeIndex.load(
std::memory_order_consume // consume: while() below carries dependency
);
while(safeReadIndex != safeWriteIndex) {
this->itemMemory[safeReadIndex].~TElement();
safeReadIndex = (safeReadIndex + 1) % this->capacity;
}
// No updates to read and write index since this is the destructor
}
// Delete buffer under the same type it was constructed as. We also don't want TElement
// destructors called as a side effect (the memory block contains unitialized members).
delete[] reinterpret_cast(this->itemMemory);
#if !defined(NDEBUG)
this->itemMemory = nullptr;
#endif
}
/// Counts the items in the queue
/// The number of items stored in the queue at the time of the call
///
///
/// This method may be called from both the consuming and the producing thread.
///
///
/// So long as you conform to the single producer, single consumer requirement, you
/// can use this method a) in the consumer thread to find the number of items that
/// will at least be available via the method or
/// b) in the producer thread to find the amount of free space that will at
/// least be available to fill via the method (by
/// subtracting the from the ).
///
///
/// If you call this method from an unrelated thread, there's a low but non-zero
/// chance that it will return complete garbage. So don't do that.
///
///
public: std::size_t Count() const {
std::size_t safeReadIndex = this->readIndex.load(
std::memory_order_consume // consume: if() below carries dependency
);
// If this method is called from a third thread, it is possible that between the two
// loads, both consumer and producer thread do work, moving the read index to
// a position that has no more relation to the read index we just loaded.
std::size_t safeWriteIndex = this->writeIndex.load(
std::memory_order_acquire // acquire: access must happen after readIndex
);
// Are the items in the queue fragmented?
if(safeWriteIndex < safeReadIndex) {
return (this->capacity - safeReadIndex + safeWriteIndex);
} else { // Items are linear
return (safeWriteIndex - safeReadIndex);
}
}
/// Tries to append the specified element to the queue
/// Element that will be appended to the queue
/// True if the element was appended, false if the queue had no space left
public: bool TryAppend(const TElement &element) {
std::size_t safeWriteIndex = this->writeIndex.load(
std::memory_order_consume // consume: math below carries dependency
);
// Ordering of these two loads is unproblematic. We're in the producer thread, so only
// the read index can move. Loading it later may minimally increase the probability that
// a simultaneous read from the consumer thread may happen and make more space available.
std::size_t safeReadIndex = this->readIndex.load(
std::memory_order_consume // consume: if() below carries dependency
);
std::size_t nextWriteIndex = (safeWriteIndex + 1) % this->capacity;
if(likely(nextWriteIndex != safeReadIndex)) {
new(this->itemMemory + safeWriteIndex) TElement(element);
this->writeIndex.store(nextWriteIndex, std::memory_order_release);
return true; // Item was appended
} else {
return false; // Queue was full
}
}
#ifdef NUCLEX_SUPPORT_COLLECTIONS_UNTESTED_BATCH_OPERATIONS // no exception guarantee yet, too!
/// Tries to append multiple elements to the queue
/// First of a list of elements that will be appended
/// Number of elements available from the list
/// The number of items that have been appended to the queue
public: std::size_t TryAppend(const TElement *first, std::size_t count) {
std::size_t safeWriteIndex = this->writeIndex.load(
std::memory_order_consume // consume: if() below carries dependency
);
std::size_t safeReadIndex = this->readIndex.load(
std::memory_order_consume // consume: if() below carries dependency
);
// Is the used space fragmented? Then the free space is linear, easiest case!
if(safeWriteIndex < safeReadIndex) {
count = std::min(count, safeReadIndex - (safeWriteIndex + 1));
if(likely(count >= 1)) {
std::copy_n(first, count, this->itemMemory + safeWriteIndex + 1);
this->writeIndex.store(
safeWriteIndex + count, std::memory_order_release
);
}
return count;
} else { // Used space was linear, so free sapce might be fragmented...
std::size_t availableItemCount = this->capacity - (safeWriteIndex + 1);
if(likely(availableItemCount >= count)) {
std::copy_n(first, count, this->itemMemory + safeWriteIndex + 1);
this->writeIndex.store(
safeWriteIndex + count, std::memory_order_release
);
return count;
} else {
// Write the first fragment at the end of the buffer
if(availableItemCount >= 1) {
std::copy_n(first, availableItemCount, this->itemMemory + safeWriteIndex + 1);
this->writeIndex.store(
this->capacity - 1, std::memory_order_release
); // Intermediate store, allows reading threads to begin reading early
}
// Write the second fragment at the start of the buffer
count = std::min(safeReadIndex, count - availableItemCount);
if(likely(count >= 1)) { // Buffer may happen to be full!
std::copy_n(first + availableItemCount, count, this->itemMemory);
this->writeIndex.store(
count - 1, std::memory_order_release
);
}
return (availableItemCount + count);
}
}
}
#endif
/// Tries to move-append the specified element to the queue
/// Element that will be move-appended to the queue
/// True if the element was appended, false if the queue had no space left
public: bool TryShove(TElement &&element) {
std::size_t safeWriteIndex = this->writeIndex.load(
std::memory_order_consume // consume: math below carries dependency
);
// Ordering of these two loads is unproblematic. We're in the producer thread, so only
// the read index can move. Loading it later may minimally increase the probability that
// a simultaneous read from the consumer thread may happen and make more space available.
std::size_t safeReadIndex = this->readIndex.load(
std::memory_order_consume // consume: if() below carries dependency
);
std::size_t nextWriteIndex = (safeWriteIndex + 1) % this->capacity;
if(likely(nextWriteIndex != safeReadIndex)) {
new(this->itemMemory + safeWriteIndex) TElement(std::move(element));
this->writeIndex.store(nextWriteIndex, std::memory_order_release);
return true; // Item was move-appended
} else {
return false; // Buffer was full
}
}
/// Tries to remove an element from the queue
/// Element into which the queue's element will be placed
///
/// This method always attempts to use move semantics because item in the buffer is
/// rendered inaccessible and eventually destroyed anyway.
///
public: bool TryTake(TElement &element) {
std::size_t safeReadIndex = this->readIndex.load(
std::memory_order_consume // consume: if() below carries dependency
);
std::size_t safeWriteIndex = this->writeIndex.load(
std::memory_order_consume // consume: if() below carries dependency
);
if(safeReadIndex == safeWriteIndex) {
return false; // Queue was empty
} else {
TElement *readAddress = this->itemMemory + safeReadIndex;
element = std::move(*readAddress); // Does move assignment if available, otherwise copy
if constexpr(!std::is_trivially_destructible::value) {
readAddress->~TElement(); // Even after move, destructor would still have to be called
}
this->readIndex.store(
(safeReadIndex + 1) % static_cast(this->capacity), std::memory_order_release
);
return true; // Item was read
}
}
/// Returns the maximum number of items the queue can hold
/// The maximum number of items the queue can hold
public: std::size_t GetCapacity() const { return this->capacity - 1U; }
/// Number of items the ring buffer can hold
private: const std::size_t capacity;
/// Memory block that holds the items currently stored in the queue
///
/// Careful. This is allocated as an std::uint8_t buffer and absolutely will contain
/// uninitialized memory.
///
private: TElement *itemMemory;
/// Index from which the next item will be read
private: std::atomic readIndex;
/// Index at which the most recently written item is stored
///
/// Notice that contrary to normal practice, this does not point one past the last
/// item (i.e. to the position of the next write), but is the index of the last item
/// that has been stored in the buffer. The lock-free synchronization is easier this way.
///
private: std::atomic writeIndex;
};
// ------------------------------------------------------------------------------------------- //
}}} // namespace Nuclex::Support::Collections