#pragma region CPL License /* Nuclex Native Framework Copyright (C) 2002-2023 Nuclex Development Labs This library is free software; you can redistribute it and/or modify it under the terms of the IBM Common Public License as published by the IBM Corporation; either version 1.0 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the IBM Common Public License for more details. You should have received a copy of the IBM Common Public License along with this library */ #pragma endregion // CPL License // If the library is compiled as a DLL, this ensures symbols are exported #define NUCLEX_PLATFORM_SOURCE 1 #include "Nuclex/Platform/Tasks/ThreadedTask.h" #include // for ThreadPool namespace { // ------------------------------------------------------------------------------------------- // /// Calls .get() on a set of 'std::future' instances in an array class FutureJoiner { /// Initializes a new future joiner using the specified array /// Array of futures on which to call the .get() method public: FutureJoiner(std::future *futures) : FutureCount(0), futures(futures) {} /// /// Calls the .get() method on all futures that have been set up for the joiner /// public: ~FutureJoiner() { while(this->FutureCount >= 1) { --this->FutureCount; this->futures[this->FutureCount].get(); this->futures[this->FutureCount].~future(); } } /// Number of futures that have been constructed in the array public: std::size_t FutureCount; /// Array of futures that will be freed on destruction private: std::future *futures; }; // ------------------------------------------------------------------------------------------- // } // anonymous namespace namespace Nuclex { namespace Platform { namespace Tasks { // ------------------------------------------------------------------------------------------- // void ThreadedTask::Run( const std::array &resourceUnitIndices, const CancellationWatcher &cancellationWatcher ) noexcept { constexpr const std::size_t MaximumStackMemoryAllowed = 256; // bytes if(maximumThreadCount >= 2) { const std::size_t requiredMemory = ( sizeof(std::future[2]) * this->maximumThreadCount / 2 ); if(requiredMemory >= MaximumStackMemoryAllowed) { // Heap-allocated std::unique_ptr memory(new std::uint8_t[requiredMemory]); FutureJoiner joiner(reinterpret_cast *>(memory.get())); // Launch all threads and remember their 'std::future's they return for(std::size_t index = 0; index < this->maximumThreadCount; ++index) { new(reinterpret_cast *>(memory.get()) + index) std::future( this->threadPool.Schedule( &ThreadedTask::invokeThreadedRun, this, &resourceUnitIndices, &cancellationWatcher ) ); ++joiner.FutureCount; } } else { // Stack-allocated std::uint8_t *memory = reinterpret_cast(alloca(requiredMemory)); FutureJoiner joiner(reinterpret_cast *>(memory)); // Launch all threads and remember their 'std::future's they return for(std::size_t index = 0; index < this->maximumThreadCount; ++index) { new(reinterpret_cast *>(memory) + index) std::future( this->threadPool.Schedule( &ThreadedTask::invokeThreadedRun, this, &resourceUnitIndices, &cancellationWatcher ) ); ++joiner.FutureCount; } } } else { // Single task (I'll slap you if this runs in production code!) ThreadedRun(resourceUnitIndices, cancellationWatcher); } } // ------------------------------------------------------------------------------------------- // void ThreadedTask::invokeThreadedRun( ThreadedTask *self, const std::array *resourceUnitIndices, const CancellationWatcher *cancellationWatcher ) { self->ThreadedRun(*resourceUnitIndices, *cancellationWatcher); } // ------------------------------------------------------------------------------------------- // }}} // namespace Nuclex::Platform::Tasks