source: Nuclex.Support.Native/trunk/Source/Threading/WindowsThreadPool.cpp

Last change on this file was 1802, checked in by cygon, 11 months ago

Updated copyright statement for the year 2013

File size: 6.3 KB
Line 
1#pragma region CPL License
2/*
3Nuclex Native Framework
4Copyright (C) 2002-2013 Nuclex Development Labs
5
6This library is free software; you can redistribute it and/or
7modify it under the terms of the IBM Common Public License as
8published by the IBM Corporation; either version 1.0 of the
9License, or (at your option) any later version.
10
11This library is distributed in the hope that it will be useful,
12but WITHOUT ANY WARRANTY; without even the implied warranty of
13MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14IBM Common Public License for more details.
15
16You should have received a copy of the IBM Common Public
17License along with this library
18*/
19#pragma endregion // CPL License
20
21// If the library is compiled as a DLL, this ensures symbols are exported
22#define NUCLEX_SUPPORT_SOURCE 1
23
24#include "Nuclex/Support/Threading/WindowsThreadPool.h"
25
26#if defined(NUCLEX_SUPPORT_WIN32)
27
28#include <stdexcept>
29
30#define WIN32_LEAN_AND_MEAN
31#define VC_EXTRALEAN
32#include <Windows.h>
33
34namespace {
35
36  // ------------------------------------------------------------------------------------------- //
37
38  /// <summary>Called by the thread pool to execute a work item</summary>
39  /// <param name="parameter">Task the user has queued for execution</param>
40  /// <returns>Always 0</returns>
41  DWORD WINAPI threadPoolWorkCallback(void *parameter) {
42    typedef std::function<void()> Task;
43    typedef std::pair<Task, std::size_t> ReferenceCountedTask;
44
45    ReferenceCountedTask *task = reinterpret_cast<ReferenceCountedTask *>(parameter);
46    try {
47      task->first.operator()();
48    }
49    catch(const std::exception &) {
50      if(::InterlockedDecrement(&task->second) == 0) {
51        delete task;
52      }
53      std::unexpected();
54    }
55
56    if(::InterlockedDecrement(&task->second) == 0) {
57      delete task;
58    }
59    return 0;
60  }
61
62  // ------------------------------------------------------------------------------------------- //
63
64  /// <summary>Called by the thread pool to execute a work item</summary>
65  /// <param name="context">Task the user has queued for execution</param>
66  void NTAPI threadPoolWorkCallback(PTP_CALLBACK_INSTANCE, void *context, PTP_WORK) {
67    typedef std::function<void()> Task;
68    typedef std::pair<Task, std::size_t> ReferenceCountedTask;
69
70    ReferenceCountedTask *task = reinterpret_cast<ReferenceCountedTask *>(context);
71    try {
72      task->first.operator()();
73    }
74    catch(const std::exception &) {
75      if(::InterlockedDecrement(&task->second) == 0) {
76        delete task;
77      }
78      std::unexpected();
79    }
80
81    if(::InterlockedDecrement(&task->second) == 0) {
82      delete task;
83    }
84  }
85
86  // ------------------------------------------------------------------------------------------- //
87
88} // anonymous namespace
89
90namespace Nuclex { namespace Support { namespace Threading {
91
92  // ------------------------------------------------------------------------------------------- //
93
94  WindowsThreadPool::WindowsThreadPool() :
95    useNewThreadPoolApi(isAtLeastWindowsVersion(6, 0)) {}
96
97  // ------------------------------------------------------------------------------------------- //
98
99  WindowsThreadPool::~WindowsThreadPool() {}
100
101  // ------------------------------------------------------------------------------------------- //
102
103  std::size_t WindowsThreadPool::CountMaximumParallelTasks() const {
104    static SYSTEM_INFO systemInfo = SYSTEM_INFO();
105
106    if(systemInfo.dwNumberOfProcessors == 0) {
107      ::GetSystemInfo(&systemInfo);
108    }
109
110    return static_cast<std::size_t>(systemInfo.dwNumberOfProcessors);
111  }
112
113  // ------------------------------------------------------------------------------------------- //
114
115  void WindowsThreadPool::AddTask(
116    const std::function<void()> &task, std::size_t count /* = 1 */
117  ) {
118    typedef std::function<void()> Task;
119    typedef std::pair<Task, std::size_t> ReferenceCountedTask;
120
121    ReferenceCountedTask *countedTask = new ReferenceCountedTask(task, count);
122
123    if(this->useNewThreadPoolApi) { // Vista and later can use the new API
124
125      // Try to create a work item for the task we have been given
126      PTP_WORK work = ::CreateThreadpoolWork(&threadPoolWorkCallback, countedTask, nullptr);
127      if(work == nullptr) {
128        delete countedTask;
129        throw std::runtime_error("Could not create thread pool work item");
130      }
131
132      // Work item was created, submit it to the thread pool
133      while(count > 0) {
134        ::SubmitThreadpoolWork(work);
135        --count;
136      }
137
138    } else { // We're on XP, so we need to use the old thread pool API
139
140      // Queue all of the work items
141      while(count > 0) {
142        BOOL result = ::QueueUserWorkItem(
143          &threadPoolWorkCallback, new std::function<void()>(task), 0
144        );
145        if(result == FALSE) {
146          break;
147        }
148
149        --count;
150      }
151
152      // If we exited before all items were queued, an error has occurred
153      if(count > 0) {
154        while(count > 1) { // Some work items may already be running, can't assign
155          ::InterlockedDecrement(&countedTask->second);
156          --count;
157        }
158
159        // The final decrement may reveal that we're responsible for deleting the task
160        // if all threads have finished their execution already.
161        if(::InterlockedDecrement(&countedTask->second) == 0) {
162          delete countedTask;
163        }
164
165        throw std::runtime_error("Could not queue thread pool work item");
166      }
167
168    }
169  }
170
171  // ------------------------------------------------------------------------------------------- //
172
173  bool WindowsThreadPool::isAtLeastWindowsVersion(int major, int minor) {
174    OSVERSIONINFOW osVersionInfo = {0};
175    osVersionInfo.dwOSVersionInfoSize = sizeof(osVersionInfo);
176    BOOL result = ::GetVersionExW(&osVersionInfo);
177    if(result == FALSE) {
178      throw std::runtime_error("Could not determine operating system version");
179    }
180
181    if(osVersionInfo.dwMajorVersion == static_cast<DWORD>(major)) {
182      return (osVersionInfo.dwMinorVersion >= static_cast<DWORD>(minor));
183    } else {
184      return (osVersionInfo.dwMajorVersion > static_cast<DWORD>(major));
185    }
186  }
187
188  // ------------------------------------------------------------------------------------------- //
189
190}}} // namespace Nuclex::Support::Threading
191
192#endif // defined(NUCLEX_SUPPORT_WIN32)
Note: See TracBrowser for help on using the repository browser.