#pragma region CPL License
/*
Nuclex Native Framework
Copyright (C) 2002-2023 Nuclex Development Labs
This library is free software; you can redistribute it and/or
modify it under the terms of the IBM Common Public License as
published by the IBM Corporation; either version 1.0 of the
License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
IBM Common Public License for more details.
You should have received a copy of the IBM Common Public
License along with this library
*/
#pragma endregion // CPL License
#ifndef NUCLEX_SUPPORT_THREADING_THREADPOOLTASKPOOL_H
#define NUCLEX_SUPPORT_THREADING_THREADPOOLTASKPOOL_H
#include "Nuclex/Support/Config.h"
#include "ThreadPoolConfig.h"
// Boost-licensed MoodyCamel queue.
// This is a lock-free, unbounded queue that works on Windows and Linux.
// Its performance is at the top end of such queues. The header does a lot of stuff,
// involving many other headers and preprocessor constants, so we include it last.
#include "./cameron314-concurrentqueue-1.0.4/concurrentqueue.h"
namespace Nuclex { namespace Support { namespace Threading {
// ------------------------------------------------------------------------------------------- //
/// Manages reusable tasks for the thread pool
/// Store all informations about a submitted task
/// Offset at which the variable payload begins
template
class ThreadPoolTaskPool {
#pragma region struct SubmittedTaskTemplate
#if defined(NUCLEX_SUPPORT_ENABLE_TASK_POOL_VERIFICATION)
/// Template for the contents of the submitted task structure
///
/// Apart from requiring the PayloadSize to be of type std::size_t and
/// located at the start of the structure, everything else is up to the owner.
///
private: struct SubmittedTaskTemplate {
/// Size of the variable payload at the end of the structure
public: std::size_t PayloadSize;
/// Example element, a pointer to the task instance
public: void *Task;
/// Placeholder for the variable payload attached to the task
public: std::uint8_t Payload[sizeof(std::uintptr_t)];
};
#endif // defined(NUCLEX_SUPPORT_ENABLE_TASK_POOL_VERIFICATION)
#pragma endregion // struct SubmittedTaskTemplate
public: ThreadPoolTaskPool() {
#if defined(NUCLEX_SUPPORT_ENABLE_TASK_POOL_VERIFICATION)
// This will both check that an attribute 'PayloadSize' is present in the submitted
// task structure and that it is at the beginning of the structure. If this assertion
// triggers, your submitted task type is not compatible with the task pool.
static_assert(
offsetof(TSubmittedTask, PayloadSize) == offsetof(SubmittedTaskTemplate, PayloadSize)
);
#endif // defined(NUCLEX_SUPPORT_ENABLE_TASK_POOL_VERIFICATION)
}
/// Destroys all remaining tasks
public: ~ThreadPoolTaskPool() {
DeleteAllRecyclableTasks();
}
/// Destroys all tasks currently waiting to be recycled
public: void DeleteAllRecyclableTasks() {
TSubmittedTask *submittedTask;
while(this->returnedTasks.try_dequeue(submittedTask)) {
DeleteTask(submittedTask);
}
}
/// Creates a new task with the specified payload size
/// Size of the payload the new task must carry
/// A new blob containing the
public: TSubmittedTask *GetNewTask(std::size_t payloadSize) {
std::size_t totalRequiredMemory = (PayloadOffset + payloadSize);
// Try to obtain a returned task with adequate payload size that can
// be re-used instead of allocating a new one
if(likely(totalRequiredMemory < ThreadPoolConfig::SubmittedTaskReuseLimit)) {
TSubmittedTask *submittedTask;
for(std::size_t attempt = 0; attempt < 3; ++attempt) {
if(this->returnedTasks.try_dequeue(submittedTask)) {
if(submittedTask->PayloadSize >= payloadSize) {
return submittedTask;
} else {
DeleteTask(submittedTask);
}
} else {
break; // No more submitted tasks in queue
}
}
}
// We found no task that we could re-use, so create a new one
{
std::uint8_t *taskMemory = new std::uint8_t[totalRequiredMemory];
TSubmittedTask *submittedTask = new(taskMemory) TSubmittedTask();
submittedTask->PayloadSize = payloadSize;
return submittedTask;
}
}
/// Checks if a task can be returned to the pool
/// Task that will be checked
/// True if the task is suitable to be returned to the pool
public: static bool IsReturnable(TSubmittedTask *task) {
std::size_t totalSize = task->PayloadSize + PayloadOffset;
return (totalSize < ThreadPoolConfig::SubmittedTaskReuseLimit);
}
/// Returns a task to the task pool, allowing for it to be re-used
/// Task that will be returned for re-use
public: void ReturnTask(TSubmittedTask *submittedTask) {
//assert(
// IsReturnable(submittedTask) && u8"Task is small enough to be returned to the pool"
//);
if(IsReturnable(submittedTask)) {
this->returnedTasks.enqueue(submittedTask);
} else {
DeleteTask(submittedTask);
}
}
/// Frees the memory used by a task
/// Task that will be destroyed
public: static void DeleteTask(TSubmittedTask *submittedTask) {
submittedTask->~TSubmittedTask();
delete[] reinterpret_cast(submittedTask);
}
/// Tasks that have been given back and wait for their reuse
private: moodycamel::ConcurrentQueue returnedTasks;
};
// ------------------------------------------------------------------------------------------- //
}}} // namespace Nuclex::Support::Threading
#endif // NUCLEX_SUPPORT_THREADING_THREADPOOLTASKPOOL_H