#pragma region CPL License /* Nuclex Native Framework Copyright (C) 2002-2023 Nuclex Development Labs This library is free software; you can redistribute it and/or modify it under the terms of the IBM Common Public License as published by the IBM Corporation; either version 1.0 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the IBM Common Public License for more details. You should have received a copy of the IBM Common Public License along with this library */ #pragma endregion // CPL License #ifndef NUCLEX_SUPPORT_EVENTS_CONCURRENTEVENT_H #define NUCLEX_SUPPORT_EVENTS_CONCURRENTEVENT_H #include "Nuclex/Support/Config.h" #include "Nuclex/Support/Events/Delegate.h" #include "Nuclex/Support/BitTricks.h" #include // for std::size_t #include // for std::uint8_t #include // for std::atomic #include // for std::copy_n() #include // for std::shared_ptr #include // for std::vector namespace Nuclex { namespace Support { namespace Events { // ------------------------------------------------------------------------------------------- // // Prototype, required for variable argument template template class ConcurrentEvent; // ------------------------------------------------------------------------------------------- // /// Manages a list of subscribers that receive callbacks when the event fires /// Type of results the callbacks will return /// Types of the arguments accepted by the callback /// /// /// This is a special variant of the class, /// a very lean signal/slot implementation. Whereas the normal event attempts to achieve /// maximum performance and minimum resource use in a single-threaded scenario, /// the concurrent event attempts the same while allowing free-threaded use. /// /// /// Like the single-threaded event, it is optimized towards granular use, meaning you /// create many individual events rather than one big multi-purpose notification. It also /// assumes that events typically have none or only a small number of subscribers and /// is optimized for firing over subscription/unsubscription. /// /// /// This concurrent event implementation can be freely used from any thread, including /// simultaneous firing, subscription and unsubscription without any synchronization on /// the side of the user of the event. Firing uses a micro-spinlock around a piece of code /// covering just a few CPU cyles (two instructions ideally), so waits are highly unlikely /// and should be resolved in just a few cycles if they happen. /// /// /// A concurrent event should be equivalent in size to 3 pointers on its own. /// It does not allocate any memory upon construction or firing, but will allocate /// a memory block each time the number of subscribers passes a power of two. /// Said memory block is the size of 4 pointers + two more pointers per subscriber. /// /// /// Usage example: /// /// /// /// int Dummy(int first, std::string second) { return 123; } /// /// class Mock { /// public: int Dummy(int first, std::string second) { return 456; } /// }; /// /// int main() { /// typedef ConcurrentEvent<int(int foo, std::string bar)> FooBarEvent; /// /// FooBarEvent test; /// /// // Subscribe the dummy function /// test.Subscribe<Dummy>(); /// /// // Subscribe an object method /// Mock myMock; /// test.Subscribe<Mock, &Mock::Dummy>(&myMock); /// /// // Fire the event /// std::vector<int> returnedValues = test(123, u8"Hello"); /// /// // Fire the event again but don't collect returned values /// test.Emit(123, u8"Hello"); /// } /// /// /// /// Cheat sheet /// /// /// 🛈 Optimized for granular events (many event instances w/few subscribers)
/// 🛈 Optimized for fast broadcast performance over subscribe/unsubscribe
/// 🛈 Two allocations per power of two reached by the subscriber count
/// âš« Can optionally collect return values from all event callbacks
/// âš« New subscribers can be added freely even during event broadcast
/// âš« Subscribers can freely unsubscribe anyone from within event callback
/// âš« For free-threaded use (anything allowed, any number of times simultaneously)
/// 🛇 Lambda expressions can not be subscribers
/// (adds huge runtime costs, see std::function, would have no way to unsubscribe)
///
///
template class ConcurrentEvent { /// Type of value that will be returned by the delegate public: typedef TResult ResultType; /// Method signature for the callbacks notified through this event public: typedef TResult CallType(TArguments...); /// Type of delegate used to call the event's subscribers public: typedef Delegate DelegateType; /// List of results returned by subscribers /// /// Having an std::vector<void> anywhere, even in a SFINAE-disabled method, /// will trigger deprecation compiler warnings on Microsoft compilers. /// Consider this type to be an alias for std::vector<TResult> and nothing else. /// private: typedef std::vector< typename std::conditional::value, char, TResult>::type > ResultVectorType; /// Initializes a new concurrent event public: ConcurrentEvent() : spinLock(false), subscribers(nullptr), recyclableSubscribers(nullptr) {} /// Frees all memory used by a concurrent event public: ~ConcurrentEvent(); // TODO: Implement copy and move constructors + assignment operators /// Returns the current number of subscribers to the event /// The number of current subscribers public: std::size_t CountSubscribers() const; /// Calls all subscribers of the event and collects their return values /// Arguments that will be passed to the event /// An list of the values returned by the event subscribers /// /// This overload is enabled if the event signature returns anything other than 'void'. /// The returned value is an std::vector<TResult> in all cases. /// public: template typename std::enable_if< !std::is_void::value, ResultVectorType >::type operator()(TArguments&&... arguments) const; /// Calls all subscribers of the event /// Arguments that will be passed to the event /// /// This overload is enabled if the event signature has the return type 'void' /// public: template typename std::enable_if< std::is_void::value, void >::type operator()(TArguments&&... arguments) const { Emit(std::forward(arguments)...); } /// Calls all subscribers of the event and collects their return values /// /// Output iterator into which the subscribers' return values will be written /// /// Arguments that will be passed to the event public: template void EmitAndCollect(TOutputIterator results, TArguments&&... arguments) const; /// Calls all subscribers of the event and discards their return values /// Arguments that will be passed to the event public: void Emit(TArguments... arguments) const; /// Subscribes the specified free function to the event /// Free function that will be subscribed public: template void Subscribe() { Subscribe(DelegateType::template Create()); } /// Subscribes the specified object method to the event /// Class the object method is a member of /// Object method that will be subscribed to the event /// Instance on which the object method will be called public: template void Subscribe(TClass *instance) { Subscribe(DelegateType::template Create(instance)); } /// Subscribes the specified const object method to the event /// Class the object method is a member of /// Object method that will be subscribed to the event /// Instance on which the object method will be called public: template void Subscribe(const TClass *instance) { Subscribe(DelegateType::template Create(instance)); } /// Subscribes the specified delegate to the event /// Delegate that will be subscribed public: void Subscribe(const DelegateType &delegate); /// Unsubscribes the specified free function from the event /// /// Free function that will be unsubscribed from the event /// /// True if the object method was subscribed and has been unsubscribed public: template bool Unsubscribe() { return Unsubscribe(DelegateType::template Create()); } /// Unsubscribes the specified object method from the event /// Class the object method is a member of /// /// Object method that will be unsubscribes from the event /// /// Instance on which the object method was subscribed /// True if the object method was subscribed and has been unsubscribed public: template bool Unsubscribe(TClass *instance) { return Unsubscribe(DelegateType::template Create(instance)); } /// Unsubscribes the specified object method from the event /// Class the object method is a member of /// /// Object method that will be unsubscribes from the event /// /// Instance on which the object method was subscribed /// True if the object method was subscribed and has been unsubscribed public: template bool Unsubscribe(const TClass *instance) { return Unsubscribe(DelegateType::template Create(instance)); } /// Unsubscribes the specified delegate from the event /// Delegate that will be unsubscribed /// True if the callback was found and unsubscribed, false otherwise public: bool Unsubscribe(const DelegateType &delegate); #pragma region struct BroadcastQueue /// Queue of subscribers to which the event will be broadcast private: struct BroadcastQueue { /// /// Initializes a new broadcast queue for the specified number of subscribers /// /// /// Number of subscribers the broadcast queue can contain at most /// /// /// Number of subscribers the broadcast queue will contain /// /// /// The reference count is initialized to one since it would be pointless to create /// an instance and then have to always run an extra increment operation. /// public: BroadcastQueue(std::size_t capacity, std::size_t count) noexcept : ReferenceCount(1), Capacity(capacity), Count(count) {} /// Frees all memory owned by the broadcast queue public: ~BroadcastQueue() = default; /// Number of references that exist to the instance public: mutable std::atomic ReferenceCount; /// Number of subscribers for which memory has been allocated public: std::size_t Capacity; /// Number of subscribers stored in the array public: std::size_t Count; /// Plain array of all subscribers to which the event is broadcast public: DelegateType *Callbacks; }; #pragma endregion // struct BroadcastQueue #pragma region class ReleaseBroadcastQueueScope /// Decrements a queue's reference counter upon scope exit class ReleaseBroadcastQueueScope { /// Initializes a new broadcase queue reference releasing scope /// Event to which the broadcast queue belongs (for recycling) /// /// Queue that will be released and recycled in case the last reference is dropped /// public: ReleaseBroadcastQueueScope( const ConcurrentEvent &self, BroadcastQueue *queueToRelease ) : self(self), queueToRelease(queueToRelease) {} /// Decrements the queue's reference counter and possibly recycles it public: ~ReleaseBroadcastQueueScope() { // The spinlock does not need to be acquired here. If the queue is still assigned as // the active subscriber list, the reference counter will not reach 0. Otherwise, // it was already replaced by another thread, so we don't even need to check. std::size_t totalReferences = this->queueToRelease->ReferenceCount.fetch_sub( 1, std::memory_order::memory_order_release ); // Did we just release the last reference to the queue? if(unlikely(totalReferences == 1)) { BroadcastQueue *recycledQueue = this->self.recyclableSubscribers.exchange( this->queueToRelease ); if(likely(recycledQueue != nullptr)) { freeBroadcastQueue(recycledQueue); } } } /// Event that wants to decrement the queue's reference counter private: const ConcurrentEvent &self; /// Queue whose reference counter will be decremented on scope exit private: BroadcastQueue *queueToRelease; }; #pragma endregion // class ReleaseBroadcastQueueScope /// /// Allocates a new broadcast queue for the specified number of subscribers /// /// Number of subscribers the queue should hold /// A new broadcast queue with an uninitialized subscriber list private: BroadcastQueue *allocateBroadcastQueue(std::size_t subscriberCount) { constexpr std::size_t subscriberStartOffset = ( sizeof(BroadcastQueue) + ( ((sizeof(BroadcastQueue) % alignof(DelegateType)) == 0) ? 0 : // size happened to fit needed alignment of subscriber list (alignof(DelegateType) - (sizeof(BroadcastQueue) % alignof(DelegateType))) ) ); std::size_t capacity; if(subscriberCount < 5) { capacity = 4; } else { capacity = BitTricks::GetUpperPowerOfTwo(subscriberCount); } std::uint8_t *memory = new std::uint8_t[ subscriberStartOffset + (sizeof(DelegateType[2]) * capacity / 2) ]; BroadcastQueue *newQueue = new(memory) BroadcastQueue(capacity, subscriberCount); newQueue->Callbacks = reinterpret_cast(memory + subscriberStartOffset); //static_assert(std::is_trivially_constructible::value); return newQueue; } /// Frees all memory owned by a broadcast queue /// Queue whose memory will be freed private: static void freeBroadcastQueue(BroadcastQueue *queue) noexcept { queue->~BroadcastQueue(); //static_assert(std::is_trivially_destructible::value); //static_assert(std::is_trivially_destructible::value); delete[] reinterpret_cast(queue); } /// Acquires the spinlock to access the subscriber queues /// /// /// Why are we implementing a manual spinlock here? It's essentially a rip-off of /// what std::atomic> does: acquire a spinlock for a very short period /// (2 or 3 machine instructions) to make grabbing the pointer and incrementing /// the reference counter an atomic operation. Even under very high contention, /// it will only loop a bunch of times. /// /// /// If we relied std::shared_ptr, that would mean it has to acquire the spinlock often, /// even in situations where we can reason that one of the following must be true: /// * either the reference counter is not being decremented down to zero /// * or the object the reference counter is decremented for is abandoned. /// In short, in our special case, we can achieve correctness while doing fewer steps /// than a full std::shared_ptr would have to, avoiding a few spinlock accesses! /// /// /// For general spinlock implementation notes, see https://rigtorp.se/spinlock/ /// /// private: inline void acquireSpinLock() const noexcept { for(;;) { // Optimistically assume the lock is free on the first try if(!this->spinLock.exchange(true, std::memory_order::memory_order_acquire)) { return; } // Wait for lock to be released without generating cache misses while(this->spinLock.load(std::memory_order::memory_order_relaxed)) { // Issue X86 PAUSE or ARM YIELD instruction to reduce contention // between hyper-threads NUCLEX_SUPPORT_CPU_YIELD; } } // for(;;) } /// Releases the spinlock again private: inline void releaseSpinLock() const noexcept { this->spinLock.store(false, std::memory_order::memory_order_release); } /// Micro-spinlock to synchronize access to the subscriber list + refcount public: mutable std::atomic spinLock; /// Stores the current subscribers to the event public: std::atomic subscribers; /// Stores the previous subscriber list to the event /// /// In order to avoid needless allocations, the queue keeps the previous subscriber list /// around for reuse. This wastes a little bit of memory but speeds things up drastically /// if an event has massive numbers of subscribers. /// public: mutable std::atomic recyclableSubscribers; }; // ------------------------------------------------------------------------------------------- // template ConcurrentEvent::~ConcurrentEvent() { // Don't care about the spinlock, if the event is being destroyed, nobody is accessing // it anymore (and if it was, it'll be a race between that thread and the destructor of // the class that owns the event, this humble destructor can do little about that anyway) BroadcastQueue *currentQueue = this->subscribers.load( std::memory_order::memory_order_relaxed ); if(currentQueue != nullptr) { freeBroadcastQueue(currentQueue); } currentQueue = this->recyclableSubscribers.load( std::memory_order::memory_order_relaxed ); if(currentQueue != nullptr) { freeBroadcastQueue(currentQueue); } } // ------------------------------------------------------------------------------------------- // template std::size_t ConcurrentEvent::CountSubscribers() const { acquireSpinLock(); const BroadcastQueue *currentQueue = this->subscribers.load( std::memory_order::memory_order_consume // if carries dependency ); if(unlikely(currentQueue == nullptr)) { releaseSpinLock(); return 0; } else { std::size_t subscriberCount = currentQueue->Count; releaseSpinLock(); return subscriberCount; } } // ------------------------------------------------------------------------------------------- // template template typename std::enable_if< !std::is_void::value, typename ConcurrentEvent::ResultVectorType >::type ConcurrentEvent::operator()(TArguments&&... arguments) const { ResultVectorType results; // ResultVectorType is an alias for std::vector // Get a hold of the current queue. If there is no current queue, bail out // without touching anything else (anticipated zero-subscriber case) acquireSpinLock(); BroadcastQueue *currentQueue = this->subscribers.load( std::memory_order::memory_order_consume // if carries dependency ); if(likely(currentQueue == nullptr)) { releaseSpinLock(); return results; } else { // A queue is present, increment its reference count so it isn't deleted currentQueue->ReferenceCount.fetch_add(1, std::memory_order::memory_order_release); releaseSpinLock(); } // There are subscribers, so the event needs to be fired and we have incremented // the queue's reference counter, requiring us to decrement it again { ReleaseBroadcastQueueScope releaseActiveQueue(*this, currentQueue); // Actually fire the event by calling all the subscribers std::size_t subscriberCount = currentQueue->Count; results.reserve(subscriberCount); for(std::size_t index = 0; index < subscriberCount; ++index) { results.push_back(currentQueue->Callbacks[index](std::forward(arguments)...)); // We don't need to worry about queue edits within the callbacks because // it will result in a new broadcast queue being placed while we happily // continue working with the immutable copy we hold a reference to. } return results; } } // ------------------------------------------------------------------------------------------- // template template void ConcurrentEvent::EmitAndCollect( TOutputIterator results, TArguments&&... arguments ) const { // Get a hold of the current queue. If there is no current queue, bail out // without touching anything else (anticipated zero-subscriber case) acquireSpinLock(); BroadcastQueue *currentQueue = this->subscribers.load( std::memory_order::memory_order_consume // if carries dependency ); if(likely(currentQueue == nullptr)) { releaseSpinLock(); return; } else { // A queue is present, increment its reference count so it isn't deleted currentQueue->ReferenceCount.fetch_add(1, std::memory_order::memory_order_release); releaseSpinLock(); } // There are subscribers, so the event needs to be fired and we have incremented // the queue's reference counter, requiring us to decrement it again { ReleaseBroadcastQueueScope releaseActiveQueue(*this, currentQueue); // Actually fire the event by calling all the subscribers std::size_t subscriberCount = currentQueue->Count; for(std::size_t index = 0; index < subscriberCount; ++index) { *results = currentQueue->Callbacks[index](std::forward(arguments)...); // We don't need to worry about queue edits within the callbacks because // it will result in a new broadcast queue being placed while we happily // continue working with the immutable copy we hold a reference to. ++results; } } } // ------------------------------------------------------------------------------------------- // template void ConcurrentEvent::Emit(TArguments... arguments) const { // Get a hold of the current queue. acquireSpinLock(); BroadcastQueue *currentQueue = this->subscribers.load( std::memory_order::memory_order_consume // if carries dependency ); if(likely(currentQueue == nullptr)) { releaseSpinLock(); return; } else { // A queue is present, increment its reference count so it isn't deleted currentQueue->ReferenceCount.fetch_add(1, std::memory_order::memory_order_release); releaseSpinLock(); } // There are subscribers, so the event needs to be fired and we have incremented // the queue's reference counter, requiring us to decrement it again { ReleaseBroadcastQueueScope releaseActiveQueue(*this, currentQueue); // Actually fire the event by calling all the subscribers std::size_t subscriberCount = currentQueue->Count; for(std::size_t index = 0; index < subscriberCount; ++index) { currentQueue->Callbacks[index](std::forward(arguments)...); // We don't need to worry about queue edits within the callbacks because // it will result in a new broadcast queue being placed while we happily // continue working with the immutable copy we hold a reference to. } } } // ------------------------------------------------------------------------------------------- // template void ConcurrentEvent::Subscribe(const DelegateType &delegate) { // This is a C-A-S replacement attempt, so we may have to go through the whole opration // multiple times. We expect this to be the case only very rarely, as contention should // happen when events are fired, not by threads subscribing & unsubscribing rapidly. for(;;) { BroadcastQueue *newQueue; // Get a hold of the current queue. acquireSpinLock(); BroadcastQueue *currentQueue = this->subscribers.load( std::memory_order::memory_order_consume // if carries dependency ); if(likely(currentQueue == nullptr)) { releaseSpinLock(); // Try to recycle an earlier queue. If we get one, we don't need to check its // capacity because any queue created will have space for at least 1 subscriber. newQueue = this->recyclableSubscribers.exchange(nullptr); if(unlikely(newQueue == nullptr)) { newQueue = allocateBroadcastQueue(1); } else { newQueue->Count = 1; } newQueue->Callbacks[0] = delegate; } else { // A queue is present, increment its reference count so it isn't deleted currentQueue->ReferenceCount.fetch_add(1, std::memory_order::memory_order_release); releaseSpinLock(); ReleaseBroadcastQueueScope releaseActiveQueue(*this, currentQueue); // Obtain a new queue to fill the subscribers into, either by re-=using the event's // previous queue or by creating a new one std::size_t subscriberCount = currentQueue->Count; newQueue = this->recyclableSubscribers.exchange(nullptr); if(unlikely(newQueue == nullptr)) { // No previous queue available? newQueue = allocateBroadcastQueue(subscriberCount + 1); } else if(unlikely(subscriberCount >= newQueue->Capacity)) { // Not enough capacity? freeBroadcastQueue(newQueue); newQueue = allocateBroadcastQueue(subscriberCount + 1); } else { // Recycled queue can be reused, raise its reference count newQueue->ReferenceCount.fetch_add(1, std::memory_order::memory_order_relaxed); newQueue->Count = subscriberCount + 1; } // This section is at-risk of leaking memory if it throws. I don't think a memcpy() // equivalent call can throw, but if I'm mistaken, we need another scope guard here. std::copy_n(currentQueue->Callbacks, subscriberCount, newQueue->Callbacks); newQueue->Callbacks[subscriberCount] = delegate; } // else block, also decrements active queue's reference count upon scope xit // Try to replace the active queue with our modified clone. Yes we're acquiring // the spinlock for an atomic operation. Why? The fetch operation (load + fetch_add) // must be atomic (thus we wrap the two operations in a spinlock) and those two // operations must not be interrupted by this single C-A-S instruction, so we hold // the spinlock here, thereby preventing any acquiring threads from grabbing // the reference before we decrement the reference counter to possibly 0. acquireSpinLock(); bool wasReplaced = std::atomic_compare_exchange_strong( &this->subscribers, ¤tQueue, newQueue ); releaseSpinLock(); if(likely(wasReplaced)) { if(likely(currentQueue != nullptr)) { std::size_t totalReferences = currentQueue->ReferenceCount.fetch_sub( 1, std::memory_order::memory_order_release ); if(unlikely(totalReferences == 1)) { // We just released the last reference currentQueue = this->recyclableSubscribers.exchange(currentQueue); if(likely(currentQueue != nullptr)) { freeBroadcastQueue(currentQueue); } } } return; // Edited version of broadcast queue is in place, we're done } else { // Put our queue back into the loop, hopefully we can still reuse it above newQueue->ReferenceCount.store(0, std::memory_order::memory_order_relaxed); newQueue = this->recyclableSubscribers.exchange(newQueue); if(unlikely(newQueue != nullptr)) { freeBroadcastQueue(newQueue); } } } // C-A-S loop } // ------------------------------------------------------------------------------------------- // template bool ConcurrentEvent::Unsubscribe(const DelegateType &delegate) { // This is a C-A-S replacement attempt, so we may have to go through the whole opration // multiple times. We expect this to be the case only very rarely, as contention should // happen when events are fired, not by threads subscribing & unsubscribing rapidly. for(;;) { // Get a hold of the current queue. acquireSpinLock(); BroadcastQueue *currentQueue = this->subscribers.load( std::memory_order::memory_order_consume // if carries dependency ); if(unlikely(currentQueue == nullptr)) { releaseSpinLock(); return false; // No queue -> no subscribers -> subscriber not found -> exit! } else { currentQueue->ReferenceCount.fetch_add(1, std::memory_order::memory_order_release); releaseSpinLock(); } { // A queue is present, increment its reference count so it isn't deleted ReleaseBroadcastQueueScope releaseActiveQueue(*this, currentQueue); BroadcastQueue *newQueue; // Hunt for the delegate the caller wishes to unsubscribe std::size_t index = 0; std::size_t currentSubscriberCount = currentQueue->Count; for(;;) { if(unlikely(currentQueue->Callbacks[index] == delegate)) { if(currentSubscriberCount == 1) { // Last subscriber just left newQueue = nullptr; } else { // Obtain a new queue to put the subscribers into newQueue = this->recyclableSubscribers.exchange(nullptr); if(unlikely(newQueue == nullptr)) { // No previous queue available? newQueue = allocateBroadcastQueue(currentSubscriberCount - 1); } else if(unlikely(newQueue->Capacity < currentSubscriberCount)) { freeBroadcastQueue(newQueue); newQueue = allocateBroadcastQueue(currentSubscriberCount - 1); } else { newQueue->ReferenceCount.fetch_add(1, std::memory_order::memory_order_relaxed); newQueue->Count = currentSubscriberCount - 1; } std::copy_n(currentQueue->Callbacks, index, newQueue->Callbacks); std::copy_n( currentQueue->Callbacks + index + 1, currentSubscriberCount - index - 1, newQueue->Callbacks + index ); } // Try to replace the active queue with our modified clone. Yes we're acquiring // the spinlock for an atomic operation. Why? The fetch operation (load + fetch_add) // must be atomic (thus we wrap the two operations in a spinlock) and those two // operations must not be interrupted by this single C-A-S instruction, so we hold // the spinlock here, thereby preventing any acquiring threads from grabbing // the reference before we decrement the reference counter to possibly 0. acquireSpinLock(); bool wasReplaced = std::atomic_compare_exchange_strong( &this->subscribers, ¤tQueue, newQueue ); releaseSpinLock(); if(likely(wasReplaced)) { if(likely(currentQueue != nullptr)) { std::size_t totalReferences = currentQueue->ReferenceCount.fetch_sub( 1, std::memory_order::memory_order_release ); if(unlikely(totalReferences == 1)) { // We just released the last reference currentQueue = this->recyclableSubscribers.exchange(currentQueue); if(likely(currentQueue != nullptr)) { freeBroadcastQueue(currentQueue); } } } return true; // Edited version of broadcast queue is in place, we're done } else { // Put our queue back into the loop, hopefully we can still reuse it above newQueue->ReferenceCount.store(0, std::memory_order::memory_order_relaxed); newQueue = this->recyclableSubscribers.exchange(newQueue); if(unlikely(newQueue != nullptr)) { freeBroadcastQueue(newQueue); } break; // C-A-S loop needs to run again } } // if subscriber to be removed found // Search loop still running, advance to next item until end reached ++index; if(likely(index == currentSubscriberCount)) { return false; // Loop completed without finding the delegate } } // delegate search loop } } // C-A-S loop } // ------------------------------------------------------------------------------------------- // }}} // namespace Nuclex::Support::Events #endif // NUCLEX_SUPPORT_EVENTS_CONCURRENTEVENT_H