#pragma region CPL License /* Nuclex Native Framework Copyright (C) 2002-2023 Nuclex Development Labs This library is free software; you can redistribute it and/or modify it under the terms of the IBM Common Public License as published by the IBM Corporation; either version 1.0 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the IBM Common Public License for more details. You should have received a copy of the IBM Common Public License along with this library */ #pragma endregion // CPL License #ifndef NUCLEX_SUPPORT_COLLECTIONS_CONCURRENTBUFFERTEST_H #define NUCLEX_SUPPORT_COLLECTIONS_CONCURRENTBUFFERTEST_H #include #include "Nuclex/Support/BitTricks.h" #include // for std::vector #include // for std::unique_ptr #include // for std::thread #include // for std::atomic #include // for assert() namespace Nuclex { namespace Support { namespace Collections { // ------------------------------------------------------------------------------------------- // /// Base class that allows testing lock-free buffers under high contention /// /// /// The problem in actually forcing a buffer into a high contention situation is generally /// to make the threads really run at the same time. Thread scheduling can introduce /// millisecond delays and mutexes, too - depending on the OS used. /// /// /// This class will put threads into a busy spin until all threads are confirmed running /// and then have them set off all at the same time (synchronized lock-free and without /// waiting on a mutex or similar synchronization primitive). This has a very decent /// chance of making all threads bugger the buffer being tested at the same time right /// from the get-go. /// /// class HighContentionBufferTest { /// Initializes a new high contention buffer test /// Number of threads that will run at the same time public: HighContentionBufferTest(std::size_t threadCount) : threadCount(threadCount), threads(), allThreadsMask(getBitMaskForThreadCount(threadCount)), startSignals(0), constructionTime(std::chrono::high_resolution_clock::now()), startMicroseconds(0), endMicroseconds(0) { // If we don't have enough bits for the threads, our start signal will not work... assert( (threadCount < (sizeof(std::size_t) * 8)) && u8"Number of threads tested does not exceed number of bits in std::size_t" ); } /// Waits for all threads to complete when the test is terminated public: ~HighContentionBufferTest() { JoinThreads(); } /// Starts all threads at the same time /// /// Call this after all other test preparations are complete. /// public: void StartThreads(); /// Waits for all threads to finish executing /// /// Call this if you want to retrieve test results. Note that this method does not /// stop the threads, it merely waits for them to stop by themselves. /// public: void JoinThreads(); /// Number of microseconds that have elapsed during the benchmark /// The elapsed number of microseconds public: std::size_t GetElapsedMicroseconds() const { // Better hope the high_resolution_clock was monotonic... assert( (this->endMicroseconds >= this->startMicroseconds) && u8"std::chrono::high_resolution_clock counts monotonically" ); return static_cast(this->endMicroseconds - this->startMicroseconds); } /// Method that will be executed by many threads at the same time /// Zero-based index of the thread in the test /// /// The thread index is simply a unique sequential number assigned to each thread. /// It can be used if you want a set of threads to do something different than others. /// protected: virtual void Thread(std::size_t threadIndex) { (void)threadIndex; } /// /// Thread entry point, keeps each thread in a busy spin until all threads are ready /// /// Zero-based index of the thread in the test private: void threadStarter(std::size_t threadIndex); /// Marks the benchmark starting time if this is the first call private: void markStartTime(); /// Marks the benchmark ending time if this is the first call private: void markEndTime(); /// Forms a bit mask where one bit is set for each thread /// Number of threads for which bits should be set /// A bit mask with sequential bits set one for each thread private: static std::size_t getBitMaskForThreadCount(std::size_t threadCount); /// Number of threads that will be involved in the test private: std::size_t threadCount; /// Threads that are being used to run the tests private: std::vector> threads; /// Mask of bits for all threads private: std::size_t allThreadsMask; /// Used to make all threads start at the same time private: std::atomic startSignals; /// Time at which the instance was constructed private: std::chrono::high_resolution_clock::time_point constructionTime; /// Recorded start time, in microseconds, for the benchmark private: std::atomic startMicroseconds; /// Recorded end time, in microseconds, for the benchmark private: std::atomic endMicroseconds; }; // ------------------------------------------------------------------------------------------- // /// Benchmark that tests the performance of appending single items /// Buffer that will be used for the benchmark template class TConcurrentBuffer> class BufferAppendBenchmark : public HighContentionBufferTest { public: const std::size_t BenchmarkedItemCount = 1048576 * 4; // 4 Million items /// Initializes a new single item append benchmark /// Number of threads that will be hammering the buffer public: BufferAppendBenchmark(std::size_t threadCount) : HighContentionBufferTest(threadCount), buffer(BenchmarkedItemCount), addedItemCount(0) {} /// Thread worker method, performs the work being benchmarked /// Index of the thread this method is running in protected: void Thread(std::size_t threadIndex) override { std::size_t randomNumber = BitTricks::XorShiftRandom(threadIndex); for(;;) { std::size_t newAddedItemCount = this->addedItemCount.fetch_add( 1, std::memory_order_consume // if() below carries dependency ) + 1; if(newAddedItemCount > BenchmarkedItemCount) { this->addedItemCount.fetch_sub(1, std::memory_order_relaxed); // decrement back break; } bool wasAdded = this->buffer.TryAppend(static_cast(randomNumber | 1)); EXPECT_TRUE(wasAdded); randomNumber = BitTricks::XorShiftRandom(randomNumber); } } /// Number of microseconds that have elapsed during the benchmark /// The elapsed number of microseconds public: std::size_t CountAddedItems() const { return this->addedItemCount.load(std::memory_order_acquire); } /// Buffer that is being benchmarked private: TConcurrentBuffer buffer; /// Number of items that have been added to the buffer private: std::atomic addedItemCount; }; // ------------------------------------------------------------------------------------------- // /// Benchmarks the single item append method of a concurrent buffer /// /// Type of concurrent buffer that will be tested /// /// Number of threads up to which to test template class TConcurrentBuffer> void benchmarkSingleItemAppends( std::size_t maximumThreadCount = std::thread::hardware_concurrency() ) { typedef BufferAppendBenchmark BenchmarkType; for(std::size_t threadCount = 1; threadCount <= maximumThreadCount; ++threadCount) { BenchmarkType bench(threadCount); bench.StartThreads(); bench.JoinThreads(); EXPECT_EQ(bench.CountAddedItems(), bench.BenchmarkedItemCount); double kitemsPerSecond = static_cast(bench.CountAddedItems()); kitemsPerSecond /= static_cast(bench.GetElapsedMicroseconds()); kitemsPerSecond *= static_cast(1000.0); // items/microsecond -> kitems/second std::cout << "Adding " << bench.BenchmarkedItemCount << " items " << "from " << threadCount << " threads: " << std::fixed << (static_cast(bench.GetElapsedMicroseconds()) / 1000.0) << " ms" << " (" << std::fixed << kitemsPerSecond << "K ops/second)" << std::endl; } } // ------------------------------------------------------------------------------------------- // /// Benchmark that tests the performance of taking single items /// Buffer that will be used for the benchmark template class TConcurrentBuffer> class BufferTakeBenchmark : public HighContentionBufferTest { public: const std::size_t BenchmarkedItemCount = 1048576 * 4; // 4 Million items /// Initializes a new single item append benchmark /// Number of threads that will be hammering the buffer public: BufferTakeBenchmark(std::size_t threadCount) : HighContentionBufferTest(threadCount), buffer(BenchmarkedItemCount), takenItemCount(0) { std::size_t randomNumber = BitTricks::XorShiftRandom(threadCount); for(std::size_t index = 0; index < BenchmarkedItemCount; ++index) { EXPECT_TRUE(this->buffer.TryAppend(static_cast(randomNumber))); randomNumber = BitTricks::XorShiftRandom(randomNumber); } EXPECT_EQ(buffer.Count(), BenchmarkedItemCount); } /// Thread worker method, performs the work being benchmarked protected: void Thread(std::size_t) override { int value = 0; for(;;) { std::size_t newTakenItemCount = this->takenItemCount.fetch_add( 1, std::memory_order_consume // if() below carries dependency ) + 1; if(newTakenItemCount > BenchmarkedItemCount) { this->takenItemCount.fetch_sub(1, std::memory_order_relaxed); // decrement back break; } bool wasTaken = this->buffer.TryTake(value); EXPECT_TRUE(wasTaken); } } /// Number of microseconds that have elapsed during the benchmark /// The elapsed number of microseconds public: std::size_t CountTakenItems() const { return this->takenItemCount.load(std::memory_order_acquire); } /// Buffer that is being benchmarked private: TConcurrentBuffer buffer; /// Number of items that have been taken from the buffer private: std::atomic takenItemCount; }; // ------------------------------------------------------------------------------------------- // /// Benchmarks the single item taking method of a concurrent buffer /// /// Type of concurrent buffer that will be tested /// /// Number of threads up to which to test template class TConcurrentBuffer> void benchmarkSingleItemTakes( std::size_t maximumThreadCount = std::thread::hardware_concurrency() ) { typedef BufferTakeBenchmark BenchmarkType; for(std::size_t threadCount = 1; threadCount <= maximumThreadCount; ++threadCount) { BenchmarkType bench(threadCount); bench.StartThreads(); bench.JoinThreads(); EXPECT_EQ(bench.CountTakenItems(), bench.BenchmarkedItemCount); double kitemsPerSecond = static_cast(bench.CountTakenItems()); kitemsPerSecond /= static_cast(bench.GetElapsedMicroseconds()); kitemsPerSecond *= static_cast(1000.0); // items/microsecond -> kitems/second std::cout << "Taking " << bench.BenchmarkedItemCount << " items " << "from " << threadCount << " threads: " << std::fixed << (static_cast(bench.GetElapsedMicroseconds()) / 1000.0) << " ms" << " (" << std::fixed << kitemsPerSecond << "K ops/second)" << std::endl; } } // ------------------------------------------------------------------------------------------- // /// Benchmark that tests the performance of taking single items /// Buffer that will be used for the benchmark template class TConcurrentBuffer> class BufferMixedBenchmark : public HighContentionBufferTest { public: const std::size_t BenchmarkedItemCount = 1048576 * 4; // 4 Million items /// Initializes a new single item append benchmark /// Number of threads that will be hammering the buffer public: BufferMixedBenchmark(std::size_t threadCount) : HighContentionBufferTest(threadCount), buffer(BenchmarkedItemCount / 4), operationCount(0) { // Pre-fill the buffer half-full so we don't benchmark a full adds or empty takes std::size_t randomNumber = BitTricks::XorShiftRandom(threadCount); for(std::size_t index = 0; index < BenchmarkedItemCount / 8; ++index) { EXPECT_TRUE(this->buffer.TryAppend(static_cast(randomNumber))); randomNumber = BitTricks::XorShiftRandom(randomNumber); } EXPECT_EQ(buffer.Count(), BenchmarkedItemCount / 8); } /// Thread worker method, performs the work being benchmarked /// Index of the thread this method is running in protected: void Thread(std::size_t threadIndex) override { if(threadIndex % 1 == 0) { std::size_t randomNumber = BitTricks::XorShiftRandom(threadIndex); for(;;) { std::size_t safeOperationCount = this->operationCount.fetch_add( 1, std::memory_order_consume // if() below carries dependency ) + 1; if(safeOperationCount > BenchmarkedItemCount) { this->operationCount.fetch_sub(1, std::memory_order_release); break; } this->buffer.TryAppend(static_cast(randomNumber | 1)); randomNumber = BitTricks::XorShiftRandom(randomNumber); } } else { int value = 0; for(;;) { std::size_t safeOperationCount = this->operationCount.fetch_add( 1, std::memory_order_consume // if() below carries dependency ) + 1; if(safeOperationCount > BenchmarkedItemCount) { this->operationCount.fetch_sub(1, std::memory_order_release); break; } this->buffer.TryTake(value); } } } /// Number of microseconds that have elapsed during the benchmark /// The elapsed number of microseconds public: std::size_t CountOperations() const { return this->operationCount.load(std::memory_order_acquire); } /// Buffer that is being benchmarked private: TConcurrentBuffer buffer; /// Number of items that have been added or taken from the buffer private: std::atomic operationCount; }; // ------------------------------------------------------------------------------------------- // /// /// Benchmarks the single item adding and taking methods of a concurrent buffer /// /// /// Type of concurrent buffer that will be tested /// /// Number of threads up to which to test template class TConcurrentBuffer> void benchmarkSingleItemMixed( std::size_t maximumThreadCount = std::thread::hardware_concurrency() ) { typedef BufferMixedBenchmark BenchmarkType; for(std::size_t threadCount = 1; threadCount <= maximumThreadCount; ++threadCount) { BenchmarkType bench(threadCount); bench.StartThreads(); bench.JoinThreads(); EXPECT_GE(bench.CountOperations(), bench.BenchmarkedItemCount); EXPECT_LE(bench.CountOperations(), bench.BenchmarkedItemCount + threadCount); double kitemsPerSecond = static_cast(bench.CountOperations()); kitemsPerSecond /= static_cast(bench.GetElapsedMicroseconds()); kitemsPerSecond *= static_cast(1000.0); // items/microsecond -> kitems/second std::cout << "Mixed Adding/Taking " << bench.CountOperations() << " items " << "from " << threadCount << " threads: " << std::fixed << (static_cast(bench.GetElapsedMicroseconds()) / 1000.0) << " ms" << " (" << std::fixed << kitemsPerSecond << "K ops/second)" << std::endl; } } // ------------------------------------------------------------------------------------------- // }}} // namespace Nuclex::Support::Collections #endif // NUCLEX_SUPPORT_COLLECTIONS_CONCURRENTBUFFERTEST_H