#pragma region CPL License /* Nuclex Native Framework Copyright (C) 2002-2023 Nuclex Development Labs This library is free software; you can redistribute it and/or modify it under the terms of the IBM Common Public License as published by the IBM Corporation; either version 1.0 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the IBM Common Public License for more details. You should have received a copy of the IBM Common Public License along with this library */ #pragma endregion // CPL License // If the library is compiled as a DLL, this ensures symbols are exported #define NUCLEX_SUPPORT_SOURCE 1 #include "Nuclex/Support/Config.h" #if defined(NUCLEX_SUPPORT_ENABLE_BENCHMARKS) #include "Nuclex/Support/Threading/Semaphore.h" #include "Nuclex/Support/Threading/Thread.h" #include "../Collections/ConcurrentBufferTest.h" // HighContentionBufferTest #include "../../Source/Helpers/PosixApi.h" // PosixApi #include #include // for std::atomic #include // for std::thread #if !defined(NUCLEX_SUPPORT_WINDOWS) #include // for ::sem_t, ::sem_init(), ::sem_post(), ::sem_wait()... #endif namespace { // ------------------------------------------------------------------------------------------- // #if !defined(NUCLEX_SUPPORT_WINDOWS) /// Benchmark that repeatedly increments and waits on a ::sem_t class SemTBenchmark : public Nuclex::Support::Collections::HighContentionBufferTest { /// Initializes a new benchmark public: SemTBenchmark() : HighContentionBufferTest(std::thread::hardware_concurrency()), fullLockCount(std::thread::hardware_concurrency()), waitingLockCount(0), cycleCount(0) { int result = ::sem_init(&this->semaphore, 0, fullLockCount); if(result == -1) { int errorNumber = errno; Nuclex::Support::Platform::PosixApi::ThrowExceptionForSystemError( u8"sem_init() failed", errorNumber ); } } /// Frees all resources owned by the instance public: ~SemTBenchmark() { int result = ::sem_destroy(&this->semaphore); NUCLEX_SUPPORT_NDEBUG_UNUSED(result); assert((result != -1) && u8"Semaphore is successfully destroyed"); } /// /// Increments the semaphore twice for each thread to launch the benchmark /// public: void KickOff() { for(std::size_t index = 0; index < this->fullLockCount * 2; ++index) { int result = ::sem_post(&this->semaphore); if(result == -1) { int errorNumber = errno; Nuclex::Support::Platform::PosixApi::ThrowExceptionForSystemError( u8"sem_post() failed", errorNumber ); } } } /// Executed by each thread simultaneously /// Unique index of the thread protected: void Thread(std::size_t threadIndex) override { (void)threadIndex; for(;;) { // Check if the current cycle is complete. If so, kick off a new cycle { std::size_t safeLockCount = ( this->waitingLockCount.fetch_add(1, std::memory_order_release) + 1 ); if(safeLockCount >= this->fullLockCount * 2) { this->waitingLockCount.store(0, std::memory_order_release); KickOff(); } } // Pass through or wait on the semaphore (first loop passes through, second waits) int result = ::sem_wait(&this->semaphore); if(result == -1) { int errorNumber = errno; Nuclex::Support::Platform::PosixApi::ThrowExceptionForSystemError( u8"sem_wait() failed", errorNumber ); } // Increment the cycle count to stop the benchmark after a certain number of loops { std::size_t safeCycleCount = ( this->cycleCount.fetch_add(1, std::memory_order_release) + 1 ); if(safeCycleCount >= 1000000) { break; } } } // for(;;) } /// Standard semaphore being tested private: ::sem_t semaphore; /// Lock count at which all threads would be waiting private: const std::size_t fullLockCount; /// Number of threads that have completed a loop private: std::atomic waitingLockCount; /// Number of cycles the loop has completed between all threads private: std::atomic cycleCount; }; #endif // !defined(NUCLEX_SUPPORT_WINDOWS) // ------------------------------------------------------------------------------------------- // /// Benchmark that repeatedly increments and waits on a Nuclex semaphore class SemaphoreBenchmark : public Nuclex::Support::Collections::HighContentionBufferTest { /// Initializes a new benchmark public: SemaphoreBenchmark() : HighContentionBufferTest(std::thread::hardware_concurrency()), semaphore(std::thread::hardware_concurrency()), fullLockCount(std::thread::hardware_concurrency()), waitingLockCount(0), cycleCount(0) {} /// Frees all resources owned by the instance public: ~SemaphoreBenchmark() {} /// /// Increments the semaphore twice for each thread to launch the benchmark /// public: void KickOff() { this->semaphore.Post(this->fullLockCount * 2); } /// Executed by each thread simultaneously /// Unique index of the thread protected: void Thread(std::size_t threadIndex) override { (void)threadIndex; for(;;) { // Check if the current cycle is complete. If so, kick off a new cycle { std::size_t safeLockCount = ( this->waitingLockCount.fetch_add(1, std::memory_order_release) + 1 ); if(safeLockCount >= this->fullLockCount * 2) { this->waitingLockCount.store(0, std::memory_order_release); KickOff(); } } // Pass through or wait on the semaphore (first loop passes through, second waits) this->semaphore.WaitThenDecrement(); // Increment the cycle count to stop the benchmark after a certain number of loops { std::size_t safeCycleCount = ( this->cycleCount.fetch_add(1, std::memory_order_release) + 1 ); if(safeCycleCount >= 1000000) { break; } } } // for(;;) } /// Standard semaphore being tested private: Nuclex::Support::Threading::Semaphore semaphore; /// Lock count at which all threads would be waiting private: const std::size_t fullLockCount; /// Number of threads that have completed a loop private: std::atomic waitingLockCount; /// Number of cycles the loop has completed between all threads private: std::atomic cycleCount; }; // ------------------------------------------------------------------------------------------- // } // anonymous namespace namespace Nuclex { namespace Support { namespace Threading { // ------------------------------------------------------------------------------------------- // #if !defined(NUCLEX_SUPPORT_WINDOWS) TEST(SemaphoreTest, SemTBenchmarkSucceeds) { SemTBenchmark bench; bench.StartThreads(); bench.JoinThreads(); std::cout << "Running " << 1000000 << " cycles " << "with " << std::thread::hardware_concurrency() << " threads: " << std::fixed << (static_cast(bench.GetElapsedMicroseconds()) / 1000.0) << " ms" << //" (" << std::fixed << kitemsPerSecond << "K ops/second)" << std::endl; } #endif // ------------------------------------------------------------------------------------------- // TEST(SemaphoreTest, SemaphoreBenchmarkSucceeds) { SemaphoreBenchmark bench; bench.StartThreads(); bench.JoinThreads(); std::cout << "Running " << 1000000 << " cycles " << "with " << std::thread::hardware_concurrency() << " threads: " << std::fixed << (static_cast(bench.GetElapsedMicroseconds()) / 1000.0) << " ms" << //" (" << std::fixed << kitemsPerSecond << "K ops/second)" << std::endl; } // ------------------------------------------------------------------------------------------- // }}} // namespace Nuclex::Support::Threading #endif // defined(NUCLEX_SUPPORT_ENABLE_BENCHMARKS)