#pragma region CPL License /* Nuclex Native Framework Copyright (C) 2002-2023 Nuclex Development Labs This library is free software; you can redistribute it and/or modify it under the terms of the IBM Common Public License as published by the IBM Corporation; either version 1.0 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the IBM Common Public License for more details. You should have received a copy of the IBM Common Public License along with this library */ #pragma endregion // CPL License #ifndef NUCLEX_PLATFORM_TASKS_NAIVETASKCOORDINATOR_H #define NUCLEX_PLATFORM_TASKS_NAIVETASKCOORDINATOR_H #include "Nuclex/Platform/Config.h" #include "Nuclex/Platform/Tasks/TaskCoordinator.h" #include // for ThreadPool #include // for Semaphore #include // for std::optional #include // for std::unique_ptr #include // for std::mutex #include // for std::deque #include // for std::array #include // for std::atomic namespace Nuclex { namespace Platform { namespace Tasks { // ------------------------------------------------------------------------------------------- // class ResourceBudget; // ------------------------------------------------------------------------------------------- // }}} // namespace Nuclex::Platform::Tasks namespace Nuclex { namespace Platform { namespace Tasks { // ------------------------------------------------------------------------------------------- // /// Coordinates background tasks based on their usage of system resouces class NUCLEX_PLATFORM_TYPE NaiveTaskCoordinator : public TaskCoordinator { /// Initializes a new task coordinator public: NUCLEX_PLATFORM_API NaiveTaskCoordinator(); /// Frees all resources owned by the task coordinator public: NUCLEX_PLATFORM_API ~NaiveTaskCoordinator() override; /// Adds a resource that the task coordinator can allocate to tasks /// Type of resource the task coordinator can allocate /// Amount of the resource available on the system /// /// Calling this method multiple times with the same resource type will not accumulate /// resources but instead handle it as an alternative resource unit (for example, /// adds two times 16 GiB video memory does not allow the coordinator to run tasks /// requiring 32 GiB video memory, but it will allow for two tasks requiring up to /// 16 GiB of video memory to run in parallel). /// public: NUCLEX_PLATFORM_API void AddResource( Tasks::ResourceType resourceType, std::size_t amountAvailable ); /// Begins execution of scheduled tasks /// /// After this method is called, the method must not be /// called anymore. It is okay to schedule tasks before calling Start(), however. /// These would simply sit in a queue and begin execution right after Start() is called. /// public: NUCLEX_PLATFORM_API void Start(); /// Queries the amount of a resource the system has in total /// Type of resource that will be queried /// The total amount of the queried resource in the system /// /// If there are multiple resource units, for example on a system with multiple GPUs, /// querying for video memory will return the highest amount of video memory available /// on any single GPU. The behavior is the same for all resource units. /// public: NUCLEX_PLATFORM_API std::size_t QueryResourceMaximum( ResourceType resourceType ) const override; /// Schedules the specified task for execution /// Task that will be executed as soon as resources permit /// Resources that the task will occupy public: NUCLEX_PLATFORM_API void Schedule(const std::shared_ptr &task) override; /// Schedules the specified task for execution /// /// Environment that needs to be active while the task executes /// /// Task that will be executed as soon as resources permit public: NUCLEX_PLATFORM_API void Schedule( const std::shared_ptr &environment, const std::shared_ptr &task ) override; /// Schedules a task for execution with an alternative task /// /// Task that will be executed if the resources are available /// /// /// Task that can be executed instead of the preferred resources are not available /// public: NUCLEX_PLATFORM_API void ScheduleWithAlternative( const std::shared_ptr &preferredTask, const std::shared_ptr &alternativeTask ) override; /// Schedules a task for execution with an alternative task /// /// Environment that needs to be active while the task executes /// /// /// Task that will be executed if the resources are available /// /// /// Task that can be executed instead of the preferred resources are not available /// public: NUCLEX_PLATFORM_API void ScheduleWithAlternative( const std::shared_ptr &environment, const std::shared_ptr &preferredTask, const std::shared_ptr &alternativeTask ) override; /// Cancels a waiting task /// Task that will be cancelled /// /// True if the task was still waiting and has been canceled, false if it wasn't found /// /// /// If the task has an alternative, that one will be cancelled, too. Specifying /// the alternative for cancellation is not allowed. /// public: NUCLEX_PLATFORM_API bool Cancel(const std::shared_ptr &task) override; /// Cancels all waiting tasks /// Whether to cancel all future tasks, too /// /// Is usually called when the task coordinator shuts down to cancel all waiting tasks /// public: NUCLEX_PLATFORM_API void CancelAll(bool forever = true) override; /// Fast check whether the coordination thread needs to be waken up /// Task the task coordinator has added to the queue /// Environment required by the task to run /// True if the coordination thread should be woken up protected: virtual bool IsCoordinationThreadWakeUpNeeded( const std::shared_ptr &task, const std::shared_ptr &environment = std::shared_ptr() ) const { return true; } /// Looks for runnable tasks and launches them protected: virtual void KickOffRunnableTasks(); /// Thread that launches incoming tasks acoording to available resources private: void coordinationThread(); /// /// Helper that calls the method /// /// The 'this' pointer of the task coordinator instance private: static void invokeCoordinationThread(NaiveTaskCoordinator *self); #pragma region class ScheduledTask /// Task that is waiting to be executed private: class ScheduledTask { /// Initializes a new scheduled task /// Task that will be wrapped as a scheduled task /// Environment that is needed for the task for run public: ScheduledTask( const std::shared_ptr &task, const std::shared_ptr &environment = std::shared_ptr() ) : PrimaryEnvironment(environment), PrimaryTask(task), AssignedResourceIndices() {} /// Environment that needs to be active for the task, can be empty public: std::shared_ptr PrimaryEnvironment; /// Task to be executed public: std::shared_ptr PrimaryTask; /// The indices of the resource units assigned to this task /// /// When there are multiple units providing a resource (for example, multiple GPUs), /// then the task coordinator has to decide which one to run the task on. This list /// will be filled when the task is launched to remember which of the units the task /// has been told to use so it can be freed again correctly. /// public: std::array AssignedResourceIndices; //public: std::uint8_t awaitedFuture[(sizeof(std::future[2]) / 2)]; }; #pragma endregion // class ScheduledTask #pragma region struct ActiveEnvironment /// Environment that has been activated by the task coordinator private: struct ActiveEnvironment { /// Task environment that is currently active public: std::shared_ptr Environment; /// Units that have been selected to provide the resources public: std::array SelectedUnits; /// Number of tasks that are using this environment right now public: std::size_t ActiveTaskCount; }; #pragma endregion // struct ActiveEnvironment /// Tracks the resources available on the system private: std::unique_ptr availableResources; /// Number of CPU cores that have been added as resources in total private: std::size_t totalCpuCoreCount; /// Thread pool used to start off the scheduled tasks /// /// Only optional so it can be constructed at a later time. Is set when /// the method is called. Contains as many ready threads /// as there are cpu cores added to the task coordinator. /// private: std::optional threadPool; /// Set after the coordination thread was started private: std::atomic coordinationThreadRunningFlag; /// Memory for the std::future that tracks the coordination thread private: std::uint8_t coordinationThreadFuture[sizeof(std::future)]; /// Set to shut down the coordination thread private: std::atomic coordinationThreadShutdownFlag; /// Mutex that must be held when accessing the task queues private: std::mutex queueAccessMutex; /// Tasks that are waiting to be executed by the task coordinator private: std::deque waitingTasks; /// Semaphore that gets posted once for each available task /// /// Also gets posted for a silly number of tasks when a shutdown is requested. /// private: Nuclex::Support::Threading::Semaphore tasksAvailableSemaphore; }; // ------------------------------------------------------------------------------------------- // }}} // namespace Nuclex::Platform::Tasks #endif // NUCLEX_PLATFORM_TASKS_NAIVETASKCOORDINATOR_H