16 #ifndef INCLUDE_NN_NLIB_THREADING_THREADPOOL_H_ 17 #define INCLUDE_NN_NLIB_THREADING_THREADPOOL_H_ 38 class ThPoolTask : public ThPoolTaskBase {
40 explicit ThPoolTask(PackagedTask<R()>& task, move_tag)
NLIB_NOEXCEPT : task_(task, move_tag()) {
46 PackagedTask<R()> task_;
57 return this->Init(thread_count, settings);
61 template<class R, class FUNC>
69 struct ThreadPoolPrivate;
70 ThreadPoolPrivate* prv_;
74 template<class R, class FUNC>
76 if (!prv_)
return EINVAL;
84 detail::ThPoolTaskBase* ptask =
new (std::nothrow) detail::ThPoolTask<R>(task,
move_tag());
85 if (!ptask)
return ENOMEM;
86 return Submit_(ptask);
93 LockFreeThreadPoolBase(
void (*primary_worker_thread)(
void*),
void (*worker_thread)(
void*)) :
94 primary_worker_thread_(primary_worker_thread),
95 worker_thread_(worker_thread),
98 thread_list_(nullptr) {}
100 typedef
void* (*Func)(
void**,
size_t)NLIB_NOEXCEPT_FUNCPTR;
101 typedef
void (*Callback)(
void*) NLIB_NOEXCEPT_FUNCPTR;
103 bool PrimaryWorkerThread_(Func func, Callback callback,
void** args,
size_t n)
NLIB_NOEXCEPT;
104 bool WorkerThread_(Func func, Callback callback,
void** args,
size_t n,
109 size_t GetThreadCount()
NLIB_NOEXCEPT {
return thread_count_; }
113 void (*primary_worker_thread_)(
void*);
114 void (*worker_thread_)(
void*);
116 uint32_t thread_count_;
124 template<
size_t N = 5>
128 typedef void* (*Func)(
void** args,
size_t n);
129 typedef void (*Callback)(
void* result);
131 typedef NLIB_NS::threading::detail::LockFreeThreadPoolBase::Func Func;
132 typedef NLIB_NS::threading::detail::LockFreeThreadPoolBase::Callback Callback;
143 errno_t Submit(Func func, Callback callback);
144 errno_t Submit(Func func, Callback callback,
void* arg0);
145 errno_t Submit(Func func, Callback callback,
void* arg0,
void* arg1);
146 errno_t Submit(Func func, Callback callback,
void* arg0,
void* arg1,
void* arg2);
147 errno_t Submit(Func func, Callback callback,
void* arg0,
void* arg1,
void* arg2,
void* arg3);
148 errno_t Submit(Func func, Callback callback,
void* arg0,
void* arg1,
void* arg2,
void* arg3,
150 errno_t Submit(Func func, Callback callback,
void* arg0,
void* arg1,
void* arg2,
void* arg3,
151 void* arg4,
void* arg5);
152 errno_t Submit(Func func, Callback callback,
void* arg0,
void* arg1,
void* arg2,
void* arg3,
153 void* arg4,
void* arg5,
void* arg6);
154 errno_t Submit(Func func, Callback callback,
void* arg0,
void* arg1,
void* arg2,
void* arg3,
155 void* arg4,
void* arg5,
void* arg6,
void* arg7);
156 errno_t Submit(Func func, Callback callback,
void* arg0,
void* arg1,
void* arg2,
void* arg3,
157 void* arg4,
void* arg5,
void* arg6,
void* arg7,
void* arg8);
158 errno_t Submit(Func func, Callback callback,
void* arg0,
void* arg1,
void* arg2,
void* arg3,
159 void* arg4,
void* arg5,
void* arg6,
void* arg7,
void* arg8,
void* arg9);
160 size_t GetThreadCount();
163 LockFreeThreadPoolExecInfo info;
165 info.callback = callback;
167 return work_queue_.Enqueue(info);
171 Submit(X func, Callback callback,
172 typename EnableIf<N >= 1 && IsSame<X, Func>::value,
void*>::type arg0)
NLIB_NOEXCEPT {
173 LockFreeThreadPoolExecInfo info;
175 info.callback = callback;
178 return work_queue_.Enqueue(info);
181 errno_t Submit(X func, Callback callback,
182 typename EnableIf<N >= 2 && IsSame<X, Func>::value,
void*>::type arg0,
184 LockFreeThreadPoolExecInfo info;
186 info.callback = callback;
190 return work_queue_.Enqueue(info);
193 errno_t Submit(X func, Callback callback,
194 typename EnableIf<N >= 3 && IsSame<X, Func>::value,
void*>::type arg0,
196 LockFreeThreadPoolExecInfo info;
198 info.callback = callback;
203 return work_queue_.Enqueue(info);
206 errno_t Submit(X func, Callback callback,
207 typename EnableIf<N >= 4 && IsSame<X, Func>::value,
void*>::type arg0,
209 LockFreeThreadPoolExecInfo info;
211 info.callback = callback;
217 return work_queue_.Enqueue(info);
220 errno_t Submit(X func, Callback callback,
221 typename EnableIf<N >= 5 && IsSame<X, Func>::value,
void*>::type arg0,
222 void* arg1,
void* arg2,
void* arg3,
void* arg4)
NLIB_NOEXCEPT {
223 LockFreeThreadPoolExecInfo info;
225 info.callback = callback;
232 return work_queue_.Enqueue(info);
235 errno_t Submit(X func, Callback callback,
236 typename EnableIf<N >= 6 && IsSame<X, Func>::value,
void*>::type arg0,
237 void* arg1,
void* arg2,
void* arg3,
void* arg4,
void* arg5)
NLIB_NOEXCEPT {
238 LockFreeThreadPoolExecInfo info;
240 info.callback = callback;
248 return work_queue_.Enqueue(info);
252 Submit(X func, Callback callback,
253 typename EnableIf<N >= 7 && IsSame<X, Func>::value,
void*>::type arg0,
void* arg1,
254 void* arg2,
void* arg3,
void* arg4,
void* arg5,
void* arg6)
NLIB_NOEXCEPT {
255 LockFreeThreadPoolExecInfo info;
257 info.callback = callback;
266 return work_queue_.Enqueue(info);
270 Submit(X func, Callback callback,
271 typename EnableIf<N >= 8 && IsSame<X, Func>::value,
void*>::type arg0,
void* arg1,
272 void* arg2,
void* arg3,
void* arg4,
void* arg5,
void* arg6,
void* arg7)
NLIB_NOEXCEPT {
273 LockFreeThreadPoolExecInfo info;
275 info.callback = callback;
285 return work_queue_.Enqueue(info);
288 errno_t Submit(X func, Callback callback,
289 typename EnableIf<N >= 9 && IsSame<X, Func>::value,
void*>::type arg0,
290 void* arg1,
void* arg2,
void* arg3,
void* arg4,
void* arg5,
void* arg6,
292 LockFreeThreadPoolExecInfo info;
294 info.callback = callback;
305 return work_queue_.Enqueue(info);
308 errno_t Submit(X func, Callback callback,
309 typename EnableIf<N >= 10 && IsSame<X, Func>::value,
void*>::type arg0,
310 void* arg1,
void* arg2,
void* arg3,
void* arg4,
void* arg5,
void* arg6,
312 LockFreeThreadPoolExecInfo info;
314 info.callback = callback;
326 return work_queue_.Enqueue(info);
334 struct LockFreeThreadPoolExecInfo {
339 LockFreeQueue<LockFreeThreadPoolExecInfo> work_queue_;
347 errno_t e = work_queue_.Init(work_queue_size);
348 if (e != 0)
return e;
349 e = InitBase(thread_count, settings);
367 LockFreeThreadPoolExecInfo info;
369 info.callback = callback;
372 for (
size_t i = 0; i < n; ++i) {
373 info.arg[i] = va_arg(ap,
void*);
376 return work_queue_.Enqueue(info);
382 LockFreeThreadPoolExecInfo info;
386 if (!This->PrimaryWorkerThread_(info.func, info.callback, e == 0 ? &info.arg[0] :
nullptr,
393 void LockFreeThreadPool<N>::WorkerThread(
void* p)
NLIB_NOEXCEPT {
394 LockFreeThreadPool* This =
reinterpret_cast<LockFreeThreadPool*
>(p);
395 LockFreeThreadPoolExecInfo info;
396 uint32_t counter = 0;
399 errno_t e = This->work_queue_.Dequeue(&info);
400 if (!This->WorkerThread_(info.func, info.callback, e == 0 ? &info.arg[0] :
nullptr, N,
409 #endif // INCLUDE_NN_NLIB_THREADING_THREADPOOL_H_ Lock-free algorithms are supported.
#define NLIB_OVERRIDE
Defines override if it is available for use. If not, holds an empty string.
#define NLIB_DISALLOW_COPY_AND_ASSIGN(TypeName)
Prohibits use of the copy constructor and assignment operator for the class specified by TypeName...
ThreadPool() noexcept
Instantiates the object with default parameters (default constructor). Must be initialized with Init...
errno_t Init(size_t thread_count) noexcept
Starts a thread and initializes the thread pool.
~LockFreeThreadPool() noexcept
Destructor. Joins all threads started internally.
LockFreeThreadPool() noexcept
Instantiates the object with default parameters (default constructor). Must be initialized with Init...
Class to wrap nlib_thread_attr. nlib_thread_attr_init() and nlib_thread_attr_destroy() are run automa...
errno_t GetFuture(Future< R > *p) noexcept
Sets Future to get the result of execution.
An empty structure indicating that an argument to a function needs to be moved.
This thread pool is suitable for deploying fine grain tasks.
errno_t Init() noexcept
Starts a thread and initializes the thread pool.
void SwapUnsafe(LockFreeQueue &rhs) noexcept
Swaps an object. This is not thread-safe.
#define NLIB_NOEXCEPT
Defines noexcept geared to the environment, or the equivalent.
A container-like class similar to std::vector that can store objects that do not have copy constructo...
Class that gets the output of a different thread executing in a thread safe manner. This class is similar to the std::shared_future class of C++11.
errno_t Dequeue(DequeueType x) noexcept
Picks up an element from the queue and stores it in x. This is thread-safe.
size_t GetHardwareConcurrency() noexcept
Returns the number of hardware threads.
Class that wraps a function to run in a different thread, and gets the return value in a thread safe ...
Implements the Future pattern for multithread programming.
#define NLIB_FINAL
Defines final if it is available for use. If not, holds an empty string.
Tasks may be deployed to a pool of threads that have been already created.
errno_t Init(FUNC &func)
Performs initialization.