#pragma region CPL License /* Nuclex Native Framework Copyright (C) 2002-2023 Nuclex Development Labs This library is free software; you can redistribute it and/or modify it under the terms of the IBM Common Public License as published by the IBM Corporation; either version 1.0 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the IBM Common Public License for more details. You should have received a copy of the IBM Common Public License along with this library */ #pragma endregion // CPL License #ifndef NUCLEX_SUPPORT_THREADING_THREADPOOL_H #define NUCLEX_SUPPORT_THREADING_THREADPOOL_H #include "Nuclex/Support/Config.h" // Currently, the thread pool only has implementations for Linux and Windows // // The Linux version may be fully or nearly Posix-compatible, so feel free to // remove this check and give it a try if your system is Posix but not Linux... #if defined(NUCLEX_SUPPORT_LINUX) || defined(NUCLEX_SUPPORT_WINDOWS) #include // for std::size_t #include // for std::packaged_task, std::future #include // for std::bind namespace Nuclex { namespace Support { namespace Threading { // ------------------------------------------------------------------------------------------- // /// Distributes tasks to several threads /// /// /// On some platforms (the Microsoft ones), creating a new threads is a heavy operation /// that makes it unsuitable for micro tasks, like parallelizing a mere loop. /// /// /// With the thread pool, a bunch of threads are created up front and simply wait for /// a task. This allows tasks of fine granularity to be split into multiple threads /// without having the setup time exceed the gains. /// /// /// Optimally, only use the thread pool if you have real number crunching that can be /// parallelized to as many CPU cores as the system can provide. Performing a single task /// in the background or doing something time consuming (like disk accesses) should /// be done with std::async or std::thread instead. /// /// /// Ideally, your tasks would be split into a large number of packages that can each run /// in just a few milliseconds, allowing them to be distributed over many cores and only /// encounter a small period of reduced concurrency at the end when tasks run out. /// /// /// You should not use this thread pool for general purpose tasks or waiting on mutexes, /// at least not with the default thread limits from its default constructor. It would /// quickly clog the thread pool's available threads and render it unable to complete any /// work because just a handful of waiting tasks would fully occupy all the threads. /// /// /// However, it is possible to specify an arbitrarily high maximum thread count and use /// this thread pool for general-purpose work, including long idle waits. Threads will be /// created as needed. In such cases, the use case mentioned earlier (with a large number /// of small work packages) becomes a problem, however, because the thread pool would /// create a silly number of threads and try to run everything at once. /// /// /// In summary, this thread pool has the same caveats as any other thread pool /// implementation. It merely uses defaults that are suitable for number churning rather /// than as a general purpose thread supermarket. In short: know what you're doing :) /// /// class NUCLEX_SUPPORT_TYPE ThreadPool { #pragma region class Task /// Base class for tasks that get executed by the thread pool /// /// Only used internally and does some creative memory acrobatics. Don't expose! /// private: class Task { /// Terminates the task. If the task was not executed, cancels it public: virtual ~Task() = default; /// Executes the task. Is called on the thread pool thread public: virtual void operator()() = 0; }; #pragma endregion // class Task /// Determines a good base number of threads to keep active /// The default minimum number of threads for new thread pools public: NUCLEX_SUPPORT_API static std::size_t GetDefaultMinimumThreadCount(); /// Determines a good maximum number of threads for a thread pool /// The default maximum number of threads for new thread pools public: NUCLEX_SUPPORT_API static std::size_t GetDefaultMaximumThreadCount(); /// Initializes a new thread pool /// /// Number of threads that will be created up-front and always stay active /// /// /// Highest number of threads to which the thread pool can grow under load /// public: NUCLEX_SUPPORT_API ThreadPool( std::size_t minimumThreadCount = GetDefaultMinimumThreadCount(), std::size_t maximumThreadCount = GetDefaultMaximumThreadCount() ); /// Stops all threads and frees all resources used public: NUCLEX_SUPPORT_API ~ThreadPool(); /// Schedules a task to be executed on a worker thread /// /// Type of the method that will be run on a worker thread /// /// /// Type of the arguments that will be passed to the method when it is called /// /// Method that will be called from a worker thread /// Argument values that will be passed to the method /// /// An std::future instance that will provide the result returned by the method /// /// /// /// This method is your main interface to schedule work on threads of the thread /// pool. Despite the slightly template-heavy signature, it is lean and convenient /// to use. Here's an example: /// /// /// /// int test(int a, int b) { /// Thread::Sleep(milliseconds(10)); /// return (a * b) - (a + b); /// } /// /// int main() { /// ThreadPool myThreadPool; /// /// std::future futureResult = myThreadPool.Schedule<&test>(12, 34); /// int result = futureResult.get(); // waits until result is available /// } /// /// /// /// The returned std::future behaves in every way like an std::future used with /// std::async(). You can ignore it (if your task has no return value), wait /// for a result with std::future::wait() or check its status. /// /// /// Don't be shy about ignoring the returned std::future, the task will still /// run and all std::future handling is inside this header, so the compiler has /// every opportunity to optimize it away. /// /// /// If the thread pool is destroyed before starting on a task, the task will be /// canceled. If you did take hold of the std::future instance, that means it /// will throw an std::future_error of type broken_promise in std::future::get(). /// /// public: template inline std::future::type> Schedule(TMethod &&method, TArguments &&... arguments); /// /// Creates (or fetches from the pool) a task with the specified payload size /// /// Size of the task instance /// A new or reused task with at least the requested payload size private: NUCLEX_SUPPORT_API std::uint8_t *getOrCreateTaskMemory(std::size_t payload); /// /// Submits a task (created via getOrCreateTaskMemory()) to the thread pool /// /// Memory block returned by getOrCreateTaskMemory /// Task that will be submitted private: NUCLEX_SUPPORT_API void submitTask(std::uint8_t *taskMemory, Task *task); /// Structure to hold platform dependent thread and sync objects private: struct PlatformDependentImplementation; /// Platform dependent thread and sync objects used for the pool private: PlatformDependentImplementation *implementation; }; // ------------------------------------------------------------------------------------------- // template inline std::future::type> ThreadPool::Schedule(TMethod &&method, TArguments &&... arguments) { typedef typename std::invoke_result::type ResultType; typedef std::packaged_task TaskType; #pragma region struct PackagedTask /// Custom packaged task that carries the method and parameters struct PackagedTask : public Task { /// Initializes the packaged task /// Method that should be called back by the thread pool /// Arguments to save until the invocation public: PackagedTask(TMethod &&method, TArguments &&... arguments) : Task(), Callback( std::bind(std::forward(method), std::forward(arguments)...) ) {} /// Terminates the task. If the task was not executed, cancels it public: ~PackagedTask() override = default; /// Executes the task. Is called on the thread pool thread public: void operator()() override { this->Callback(); } /// Stored method pointer and arguments that will be called back public: TaskType Callback; }; #pragma endregion // struct PackagedTask // Construct a new task with a callback to the caller-specified method and // saved arguments that can subsequently be scheduled on the thread pool. std::uint8_t *taskMemory = getOrCreateTaskMemory(sizeof(PackagedTask)); PackagedTask *packagedTask = new(taskMemory) PackagedTask( std::forward(method), std::forward(arguments)... ); // Grab the result before scheduling the task. If the stars are aligned and // the thread pool is churning, it may otherwise happen that the task is // completed and destroyed between submitTask() and the call to get_future() std::future result = packagedTask->Callback.get_future(); // Schedule for execution. The task will either be executed (default) or // destroyed if the thread pool shuts down, both outcomes will result in // the future completing with either a result or in an error state. submitTask(taskMemory, packagedTask); return result; } // ------------------------------------------------------------------------------------------- // }}} // namespace Nuclex::Support::Threading #endif // defined(NUCLEX_SUPPORT_LINUX) || defined(NUCLEX_SUPPORT_WINDOWS) #endif // NUCLEX_SUPPORT_THREADING_THREADPOOL_H