3 #ifndef INCLUDE_NN_NLIB_THREADING_THREADPOOL_H_ 4 #define INCLUDE_NN_NLIB_THREADING_THREADPOOL_H_ 25 class ThPoolTask : public ThPoolTaskBase {
31 virtual void operator()() NLIB_NOEXCEPT
NLIB_OVERRIDE { task_(); }
34 PackagedTask<R()> task_;
45 return this->Init(thread_count, settings);
51 template <
class R,
class FUNC>
59 struct ThreadPoolPrivate;
60 ThreadPoolPrivate* prv_;
64 template <
class R,
class FUNC>
66 if (!prv_)
return EINVAL;
74 detail::ThPoolTaskBase* ptask =
new (std::nothrow) detail::ThPoolTask<R>(task);
75 if (!ptask)
return ENOMEM;
76 return Submit_(ptask);
83 LockFreeThreadPoolBase(
void (*primary_worker_thread)(
void*),
void (*worker_thread)(
void*))
84 : primary_worker_thread_(primary_worker_thread),
85 worker_thread_(worker_thread),
86 status_(0), thread_count_(0), thread_list_(NULL) {}
88 typedef void* (*Func)(
void**, size_t);
89 typedef void (*Callback)(
void*);
91 bool PrimaryWorkerThread_(Func func, Callback callback,
93 bool WorkerThread_(Func func, Callback callback,
void** args,
98 size_t GetThreadCount() NLIB_NOEXCEPT {
return thread_count_; }
102 void (*primary_worker_thread_)(
void*);
103 void (*worker_thread_)(
void*);
105 uint32_t thread_count_;
113 template<
size_t N = 5>
117 typedef void* (*Func)(
void** args,
size_t n);
118 typedef void (*Callback)(
void* result);
120 typedef NLIB_NS::threading::detail::LockFreeThreadPoolBase::Func Func;
121 typedef NLIB_NS::threading::detail::LockFreeThreadPoolBase::Callback Callback;
125 : LockFreeThreadPoolBase(PrimaryWorkerThread, WorkerThread) {}
127 errno_t Init(
size_t work_queue_size,
size_t thread_count,
132 errno_t Submit(Func func, Callback callback);
133 errno_t Submit(Func func, Callback callback,
void* arg0);
134 errno_t Submit(Func func, Callback callback,
void* arg0,
void* arg1);
135 errno_t Submit(Func func, Callback callback,
void* arg0,
void* arg1,
void* arg2);
136 errno_t Submit(Func func, Callback callback,
void* arg0,
void* arg1,
void* arg2,
void* arg3);
137 errno_t Submit(Func func, Callback callback,
void* arg0,
void* arg1,
void* arg2,
void* arg3,
139 errno_t Submit(Func func, Callback callback,
void* arg0,
void* arg1,
void* arg2,
void* arg3,
140 void* arg4,
void* arg5);
141 errno_t Submit(Func func, Callback callback,
void* arg0,
void* arg1,
void* arg2,
void* arg3,
142 void* arg4,
void* arg5,
void* arg6);
143 errno_t Submit(Func func, Callback callback,
void* arg0,
void* arg1,
void* arg2,
void* arg3,
144 void* arg4,
void* arg5,
void* arg6,
void* arg7);
145 errno_t Submit(Func func, Callback callback,
void* arg0,
void* arg1,
void* arg2,
void* arg3,
146 void* arg4,
void* arg5,
void* arg6,
void* arg7,
void* arg8);
147 errno_t Submit(Func func, Callback callback,
void* arg0,
void* arg1,
void* arg2,
void* arg3,
148 void* arg4,
void* arg5,
void* arg6,
void* arg7,
void* arg8,
void* arg9);
149 size_t GetThreadCount();
151 errno_t Submit(Func func, Callback callback) NLIB_NOEXCEPT {
152 LockFreeThreadPoolExecInfo info;
154 info.callback = callback;
156 return work_queue_.Enqueue(info);
159 errno_t Submit(X func, Callback callback,
160 typename EnableIf<N >= 1 && IsSame<X, Func>::value,
161 void*>::type arg0) NLIB_NOEXCEPT {
162 LockFreeThreadPoolExecInfo info;
164 info.callback = callback;
167 return work_queue_.Enqueue(info);
170 errno_t Submit(X func, Callback callback,
171 typename EnableIf<N >= 2 && IsSame<X, Func>::value,
173 void* arg1) NLIB_NOEXCEPT {
174 LockFreeThreadPoolExecInfo info;
176 info.callback = callback;
180 return work_queue_.Enqueue(info);
183 errno_t Submit(X func, Callback callback,
184 typename EnableIf<N >= 3 && IsSame<X, Func>::value,
186 void* arg1,
void* arg2) NLIB_NOEXCEPT {
187 LockFreeThreadPoolExecInfo info;
189 info.callback = callback;
194 return work_queue_.Enqueue(info);
197 errno_t Submit(X func, Callback callback,
198 typename EnableIf<N >= 4 && IsSame<X, Func>::value,
200 void* arg1,
void* arg2,
void* arg3) NLIB_NOEXCEPT {
201 LockFreeThreadPoolExecInfo info;
203 info.callback = callback;
209 return work_queue_.Enqueue(info);
212 errno_t Submit(X func, Callback callback,
213 typename EnableIf<N >= 5 && IsSame<X, Func>::value,
215 void* arg1,
void* arg2,
void* arg3,
void* arg4) NLIB_NOEXCEPT {
216 LockFreeThreadPoolExecInfo info;
218 info.callback = callback;
225 return work_queue_.Enqueue(info);
228 errno_t Submit(X func, Callback callback,
229 typename EnableIf<N >= 6 && IsSame<X, Func>::value,
231 void* arg1,
void* arg2,
void* arg3,
void* arg4,
232 void* arg5) NLIB_NOEXCEPT {
233 LockFreeThreadPoolExecInfo info;
235 info.callback = callback;
243 return work_queue_.Enqueue(info);
246 errno_t Submit(X func, Callback callback,
247 typename EnableIf<N >= 7 && IsSame<X, Func>::value,
249 void* arg1,
void* arg2,
void* arg3,
void* arg4,
250 void* arg5,
void* arg6) NLIB_NOEXCEPT {
251 LockFreeThreadPoolExecInfo info;
253 info.callback = callback;
262 return work_queue_.Enqueue(info);
265 errno_t Submit(X func, Callback callback,
266 typename EnableIf<N >= 8 && IsSame<X, Func>::value,
268 void* arg1,
void* arg2,
void* arg3,
void* arg4,
269 void* arg5,
void* arg6,
void* arg7) NLIB_NOEXCEPT {
270 LockFreeThreadPoolExecInfo info;
272 info.callback = callback;
282 return work_queue_.Enqueue(info);
285 errno_t Submit(X func, Callback callback,
286 typename EnableIf<N >= 9 && IsSame<X, Func>::value,
288 void* arg1,
void* arg2,
void* arg3,
void* arg4,
289 void* arg5,
void* arg6,
void* arg7,
void* arg8) NLIB_NOEXCEPT {
290 LockFreeThreadPoolExecInfo info;
292 info.callback = callback;
303 return work_queue_.Enqueue(info);
306 errno_t Submit(X func, Callback callback,
307 typename EnableIf<N >= 10 && IsSame<X, Func>::value,
309 void* arg1,
void* arg2,
void* arg3,
void* arg4,
310 void* arg5,
void* arg6,
void* arg7,
void* arg8,
void* arg9) NLIB_NOEXCEPT {
311 LockFreeThreadPoolExecInfo info;
313 info.callback = callback;
325 return work_queue_.Enqueue(info);
333 struct LockFreeThreadPoolExecInfo {
346 errno_t e = work_queue_.Init(work_queue_size);
347 if (e != 0)
return e;
348 e = InitBase(thread_count, settings);
366 LockFreeThreadPoolExecInfo info;
368 info.callback = callback;
371 for (
size_t i = 0; i < n; ++i) {
372 info.arg[i] = va_arg(ap,
void*);
375 return work_queue_.Enqueue(info);
381 LockFreeThreadPoolExecInfo info;
385 if (!This->PrimaryWorkerThread_(info.func, info.callback,
386 e == 0 ? &info.arg[0] : NULL, N))
394 LockFreeThreadPoolExecInfo info;
395 uint32_t counter = 0;
399 if (!This->WorkerThread_(info.func, info.callback,
400 e == 0 ? &info.arg[0] : NULL, N, &counter))
408 #endif // INCLUDE_NN_NLIB_THREADING_THREADPOOL_H_ Lock-free algorithms are supported.
#define NLIB_OVERRIDE
Defines override if it is available for use. If not, holds an empty string.
#define NLIB_DISALLOW_COPY_AND_ASSIGN(TypeName)
Prohibits use of the copy constructor and assignment operator for the class specified by TypeName...
ThreadPool() noexcept
Instantiates the object with default parameters (default constructor). Must be initialized with Init...
errno_t Init(size_t thread_count) noexcept
Starts a thread and initializes the thread pool.
~LockFreeThreadPool() noexcept
Destructor. Joins all threads started internally.
LockFreeThreadPool() noexcept
Instantiates the object with default parameters (default constructor). Must be initialized with Init...
Class to wrap nlib_thread_attr. nlib_thread_attr_init() and nlib_thread_attr_destroy() are run automa...
errno_t GetFuture(Future< R > *p) noexcept
Sets Future to get the result of execution.
This thread pool is suitable for deploying fine grain tasks.
errno_t Init() noexcept
Starts a thread and initializes the thread pool.
void SwapUnsafe(LockFreeQueue &rhs) noexcept
Swaps an object. This is not thread-safe.
#define NLIB_NOEXCEPT
Defines noexcept geared to the environment, or the equivalent.
A container-like class similar to std::vector that can store objects that do not have copy constructo...
Class that gets the output of a different thread executing in a thread safe manner. This class is similar to the std::shared_future class of C++11.
errno_t Dequeue(DequeueType x) noexcept
Picks up an element from the queue and stores it in x. This is thread-safe.
size_t GetHardwareConcurrency() noexcept
Returns the number of hardware threads.
Class that wraps a function to run in a different thread, and gets the return value in a thread safe ...
Implements the Future pattern for multithread programming.
#define NLIB_FINAL
Defines final if it is available for use. If not, holds an empty string.
Tasks may be deployed to a pool of threads that have been already created.
errno_t Init(FUNC &func)
Performs initialization.