16 #ifndef INCLUDE_NN_NLIB_THREADING_THREADPOOL_H_ 17 #define INCLUDE_NN_NLIB_THREADING_THREADPOOL_H_ 38 class ThPoolTask : public ThPoolTaskBase {
40 explicit ThPoolTask(PackagedTask<R()>& task, move_tag)
NLIB_NOEXCEPT 41 : task_(task, move_tag()) {}
46 PackagedTask<R()> task_;
57 return this->Init(thread_count, settings);
63 template <
class R,
class FUNC>
71 struct ThreadPoolPrivate;
72 ThreadPoolPrivate* prv_;
76 template <
class R,
class FUNC>
78 if (!prv_)
return EINVAL;
86 detail::ThPoolTaskBase* ptask =
new (std::nothrow) detail::ThPoolTask<R>(task,
move_tag());
87 if (!ptask)
return ENOMEM;
88 return Submit_(ptask);
95 LockFreeThreadPoolBase(
void (*primary_worker_thread)(
void*),
void (*worker_thread)(
void*))
96 : primary_worker_thread_(primary_worker_thread),
97 worker_thread_(worker_thread),
98 status_(0), thread_count_(0), thread_list_(
nullptr) {}
100 typedef void* (*Func)(
void**, size_t) NLIB_NOEXCEPT_FUNCPTR;
101 typedef void (*Callback)(
void*) NLIB_NOEXCEPT_FUNCPTR;
103 bool PrimaryWorkerThread_(Func func, Callback callback,
105 bool WorkerThread_(Func func, Callback callback,
void** args,
110 size_t GetThreadCount() NLIB_NOEXCEPT {
return thread_count_; }
114 void (*primary_worker_thread_)(
void*);
115 void (*worker_thread_)(
void*);
117 uint32_t thread_count_;
125 template<
size_t N = 5>
129 typedef void* (*Func)(
void** args,
size_t n);
130 typedef void (*Callback)(
void* result);
132 typedef NLIB_NS::threading::detail::LockFreeThreadPoolBase::Func Func;
133 typedef NLIB_NS::threading::detail::LockFreeThreadPoolBase::Callback Callback;
137 : LockFreeThreadPoolBase(PrimaryWorkerThread, WorkerThread) {}
139 errno_t Init(
size_t work_queue_size,
size_t thread_count,
144 errno_t Submit(Func func, Callback callback);
145 errno_t Submit(Func func, Callback callback,
void* arg0);
146 errno_t Submit(Func func, Callback callback,
void* arg0,
void* arg1);
147 errno_t Submit(Func func, Callback callback,
void* arg0,
void* arg1,
void* arg2);
148 errno_t Submit(Func func, Callback callback,
void* arg0,
void* arg1,
void* arg2,
void* arg3);
149 errno_t Submit(Func func, Callback callback,
void* arg0,
void* arg1,
void* arg2,
void* arg3,
151 errno_t Submit(Func func, Callback callback,
void* arg0,
void* arg1,
void* arg2,
void* arg3,
152 void* arg4,
void* arg5);
153 errno_t Submit(Func func, Callback callback,
void* arg0,
void* arg1,
void* arg2,
void* arg3,
154 void* arg4,
void* arg5,
void* arg6);
155 errno_t Submit(Func func, Callback callback,
void* arg0,
void* arg1,
void* arg2,
void* arg3,
156 void* arg4,
void* arg5,
void* arg6,
void* arg7);
157 errno_t Submit(Func func, Callback callback,
void* arg0,
void* arg1,
void* arg2,
void* arg3,
158 void* arg4,
void* arg5,
void* arg6,
void* arg7,
void* arg8);
159 errno_t Submit(Func func, Callback callback,
void* arg0,
void* arg1,
void* arg2,
void* arg3,
160 void* arg4,
void* arg5,
void* arg6,
void* arg7,
void* arg8,
void* arg9);
161 size_t GetThreadCount();
163 errno_t Submit(Func func, Callback callback) NLIB_NOEXCEPT {
164 LockFreeThreadPoolExecInfo info;
166 info.callback = callback;
168 return work_queue_.Enqueue(info);
171 errno_t Submit(X func, Callback callback,
172 typename EnableIf<N >= 1 && IsSame<X, Func>::value,
173 void*>::type arg0) NLIB_NOEXCEPT {
174 LockFreeThreadPoolExecInfo info;
176 info.callback = callback;
179 return work_queue_.Enqueue(info);
182 errno_t Submit(X func, Callback callback,
183 typename EnableIf<N >= 2 && IsSame<X, Func>::value,
185 void* arg1) NLIB_NOEXCEPT {
186 LockFreeThreadPoolExecInfo info;
188 info.callback = callback;
192 return work_queue_.Enqueue(info);
195 errno_t Submit(X func, Callback callback,
196 typename EnableIf<N >= 3 && IsSame<X, Func>::value,
198 void* arg1,
void* arg2) NLIB_NOEXCEPT {
199 LockFreeThreadPoolExecInfo info;
201 info.callback = callback;
206 return work_queue_.Enqueue(info);
209 errno_t Submit(X func, Callback callback,
210 typename EnableIf<N >= 4 && IsSame<X, Func>::value,
212 void* arg1,
void* arg2,
void* arg3) NLIB_NOEXCEPT {
213 LockFreeThreadPoolExecInfo info;
215 info.callback = callback;
221 return work_queue_.Enqueue(info);
224 errno_t Submit(X func, Callback callback,
225 typename EnableIf<N >= 5 && IsSame<X, Func>::value,
227 void* arg1,
void* arg2,
void* arg3,
void* arg4) NLIB_NOEXCEPT {
228 LockFreeThreadPoolExecInfo info;
230 info.callback = callback;
237 return work_queue_.Enqueue(info);
240 errno_t Submit(X func, Callback callback,
241 typename EnableIf<N >= 6 && IsSame<X, Func>::value,
243 void* arg1,
void* arg2,
void* arg3,
void* arg4,
244 void* arg5) NLIB_NOEXCEPT {
245 LockFreeThreadPoolExecInfo info;
247 info.callback = callback;
255 return work_queue_.Enqueue(info);
258 errno_t Submit(X func, Callback callback,
259 typename EnableIf<N >= 7 && IsSame<X, Func>::value,
261 void* arg1,
void* arg2,
void* arg3,
void* arg4,
262 void* arg5,
void* arg6) NLIB_NOEXCEPT {
263 LockFreeThreadPoolExecInfo info;
265 info.callback = callback;
274 return work_queue_.Enqueue(info);
277 errno_t Submit(X func, Callback callback,
278 typename EnableIf<N >= 8 && IsSame<X, Func>::value,
280 void* arg1,
void* arg2,
void* arg3,
void* arg4,
281 void* arg5,
void* arg6,
void* arg7) NLIB_NOEXCEPT {
282 LockFreeThreadPoolExecInfo info;
284 info.callback = callback;
294 return work_queue_.Enqueue(info);
297 errno_t Submit(X func, Callback callback,
298 typename EnableIf<N >= 9 && IsSame<X, Func>::value,
300 void* arg1,
void* arg2,
void* arg3,
void* arg4,
301 void* arg5,
void* arg6,
void* arg7,
void* arg8) NLIB_NOEXCEPT {
302 LockFreeThreadPoolExecInfo info;
304 info.callback = callback;
315 return work_queue_.Enqueue(info);
318 errno_t Submit(X func, Callback callback,
319 typename EnableIf<N >= 10 && IsSame<X, Func>::value,
321 void* arg1,
void* arg2,
void* arg3,
void* arg4,
322 void* arg5,
void* arg6,
void* arg7,
void* arg8,
void* arg9) NLIB_NOEXCEPT {
323 LockFreeThreadPoolExecInfo info;
325 info.callback = callback;
337 return work_queue_.Enqueue(info);
345 struct LockFreeThreadPoolExecInfo {
358 errno_t e = work_queue_.Init(work_queue_size);
359 if (e != 0)
return e;
360 e = InitBase(thread_count, settings);
378 LockFreeThreadPoolExecInfo info;
380 info.callback = callback;
383 for (
size_t i = 0; i < n; ++i) {
384 info.arg[i] = va_arg(ap,
void*);
387 return work_queue_.Enqueue(info);
393 LockFreeThreadPoolExecInfo info;
397 if (!This->PrimaryWorkerThread_(info.func, info.callback,
398 e == 0 ? &info.arg[0] :
nullptr, N))
406 LockFreeThreadPoolExecInfo info;
407 uint32_t counter = 0;
411 if (!This->WorkerThread_(info.func, info.callback,
412 e == 0 ? &info.arg[0] :
nullptr, N, &counter))
420 #endif // INCLUDE_NN_NLIB_THREADING_THREADPOOL_H_ Lock-free algorithms are supported.
#define NLIB_OVERRIDE
Defines override if it is available for use. If not, holds an empty string.
#define NLIB_DISALLOW_COPY_AND_ASSIGN(TypeName)
Prohibits use of the copy constructor and assignment operator for the class specified by TypeName...
ThreadPool() noexcept
Instantiates the object with default parameters (default constructor). Must be initialized with Init...
errno_t Init(size_t thread_count) noexcept
Starts a thread and initializes the thread pool.
~LockFreeThreadPool() noexcept
Destructor. Joins all threads started internally.
LockFreeThreadPool() noexcept
Instantiates the object with default parameters (default constructor). Must be initialized with Init...
Class to wrap nlib_thread_attr. nlib_thread_attr_init() and nlib_thread_attr_destroy() are run automa...
errno_t GetFuture(Future< R > *p) noexcept
Sets Future to get the result of execution.
An empty structure indicating that an argument to a function needs to be moved.
This thread pool is suitable for deploying fine grain tasks.
errno_t Init() noexcept
Starts a thread and initializes the thread pool.
void SwapUnsafe(LockFreeQueue &rhs) noexcept
Swaps an object. This is not thread-safe.
#define NLIB_NOEXCEPT
Defines noexcept geared to the environment, or the equivalent.
A container-like class similar to std::vector that can store objects that do not have copy constructo...
Class that gets the output of a different thread executing in a thread safe manner. This class is similar to the std::shared_future class of C++11.
errno_t Dequeue(DequeueType x) noexcept
Picks up an element from the queue and stores it in x. This is thread-safe.
size_t GetHardwareConcurrency() noexcept
Returns the number of hardware threads.
Class that wraps a function to run in a different thread, and gets the return value in a thread safe ...
Implements the Future pattern for multithread programming.
#define NLIB_FINAL
Defines final if it is available for use. If not, holds an empty string.
Tasks may be deployed to a pool of threads that have been already created.
errno_t Init(FUNC &func)
Performs initialization.