#pragma region CPL License /* Nuclex Native Framework Copyright (C) 2002-2023 Nuclex Development Labs This library is free software; you can redistribute it and/or modify it under the terms of the IBM Common Public License as published by the IBM Corporation; either version 1.0 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the IBM Common Public License for more details. You should have received a copy of the IBM Common Public License along with this library */ #pragma endregion // CPL License #ifndef NUCLEX_SUPPORT_THREADING_THREADPOOLTASKPOOL_H #define NUCLEX_SUPPORT_THREADING_THREADPOOLTASKPOOL_H #include "Nuclex/Support/Config.h" #include "ThreadPoolConfig.h" // Boost-licensed MoodyCamel queue. // This is a lock-free, unbounded queue that works on Windows and Linux. // Its performance is at the top end of such queues. The header does a lot of stuff, // involving many other headers and preprocessor constants, so we include it last. #include "./cameron314-concurrentqueue-1.0.4/concurrentqueue.h" namespace Nuclex { namespace Support { namespace Threading { // ------------------------------------------------------------------------------------------- // /// Manages reusable tasks for the thread pool /// Store all informations about a submitted task /// Offset at which the variable payload begins template class ThreadPoolTaskPool { #pragma region struct SubmittedTaskTemplate #if defined(NUCLEX_SUPPORT_ENABLE_TASK_POOL_VERIFICATION) /// Template for the contents of the submitted task structure /// /// Apart from requiring the PayloadSize to be of type std::size_t and /// located at the start of the structure, everything else is up to the owner. /// private: struct SubmittedTaskTemplate { /// Size of the variable payload at the end of the structure public: std::size_t PayloadSize; /// Example element, a pointer to the task instance public: void *Task; /// Placeholder for the variable payload attached to the task public: std::uint8_t Payload[sizeof(std::uintptr_t)]; }; #endif // defined(NUCLEX_SUPPORT_ENABLE_TASK_POOL_VERIFICATION) #pragma endregion // struct SubmittedTaskTemplate public: ThreadPoolTaskPool() { #if defined(NUCLEX_SUPPORT_ENABLE_TASK_POOL_VERIFICATION) // This will both check that an attribute 'PayloadSize' is present in the submitted // task structure and that it is at the beginning of the structure. If this assertion // triggers, your submitted task type is not compatible with the task pool. static_assert( offsetof(TSubmittedTask, PayloadSize) == offsetof(SubmittedTaskTemplate, PayloadSize) ); #endif // defined(NUCLEX_SUPPORT_ENABLE_TASK_POOL_VERIFICATION) } /// Destroys all remaining tasks public: ~ThreadPoolTaskPool() { DeleteAllRecyclableTasks(); } /// Destroys all tasks currently waiting to be recycled public: void DeleteAllRecyclableTasks() { TSubmittedTask *submittedTask; while(this->returnedTasks.try_dequeue(submittedTask)) { DeleteTask(submittedTask); } } /// Creates a new task with the specified payload size /// Size of the payload the new task must carry /// A new blob containing the public: TSubmittedTask *GetNewTask(std::size_t payloadSize) { std::size_t totalRequiredMemory = (PayloadOffset + payloadSize); // Try to obtain a returned task with adequate payload size that can // be re-used instead of allocating a new one if(likely(totalRequiredMemory < ThreadPoolConfig::SubmittedTaskReuseLimit)) { TSubmittedTask *submittedTask; for(std::size_t attempt = 0; attempt < 3; ++attempt) { if(this->returnedTasks.try_dequeue(submittedTask)) { if(submittedTask->PayloadSize >= payloadSize) { return submittedTask; } else { DeleteTask(submittedTask); } } else { break; // No more submitted tasks in queue } } } // We found no task that we could re-use, so create a new one { std::uint8_t *taskMemory = new std::uint8_t[totalRequiredMemory]; TSubmittedTask *submittedTask = new(taskMemory) TSubmittedTask(); submittedTask->PayloadSize = payloadSize; return submittedTask; } } /// Checks if a task can be returned to the pool /// Task that will be checked /// True if the task is suitable to be returned to the pool public: static bool IsReturnable(TSubmittedTask *task) { std::size_t totalSize = task->PayloadSize + PayloadOffset; return (totalSize < ThreadPoolConfig::SubmittedTaskReuseLimit); } /// Returns a task to the task pool, allowing for it to be re-used /// Task that will be returned for re-use public: void ReturnTask(TSubmittedTask *submittedTask) { //assert( // IsReturnable(submittedTask) && u8"Task is small enough to be returned to the pool" //); if(IsReturnable(submittedTask)) { this->returnedTasks.enqueue(submittedTask); } else { DeleteTask(submittedTask); } } /// Frees the memory used by a task /// Task that will be destroyed public: static void DeleteTask(TSubmittedTask *submittedTask) { submittedTask->~TSubmittedTask(); delete[] reinterpret_cast(submittedTask); } /// Tasks that have been given back and wait for their reuse private: moodycamel::ConcurrentQueue returnedTasks; }; // ------------------------------------------------------------------------------------------- // }}} // namespace Nuclex::Support::Threading #endif // NUCLEX_SUPPORT_THREADING_THREADPOOLTASKPOOL_H