16 #ifndef INCLUDE_NN_NLIB_THREADING_THREADPOOL_H_ 17 #define INCLUDE_NN_NLIB_THREADING_THREADPOOL_H_ 38 class ThPoolTask : public ThPoolTaskBase {
47 PackagedTask<R()> task_;
58 return this->Init(thread_count, settings);
64 template <
class R,
class FUNC>
72 struct ThreadPoolPrivate;
73 ThreadPoolPrivate* prv_;
77 template <
class R,
class FUNC>
79 if (!prv_)
return EINVAL;
87 detail::ThPoolTaskBase* ptask =
new (std::nothrow) detail::ThPoolTask<R>(task);
88 if (!ptask)
return ENOMEM;
89 return Submit_(ptask);
96 LockFreeThreadPoolBase(
void (*primary_worker_thread)(
void*),
void (*worker_thread)(
void*))
97 : primary_worker_thread_(primary_worker_thread),
98 worker_thread_(worker_thread),
99 status_(0), thread_count_(0), thread_list_(NULL) {}
101 typedef void* (*Func)(
void**, size_t) NLIB_NOEXCEPT_FUNCPTR;
102 typedef void (*Callback)(
void*) NLIB_NOEXCEPT_FUNCPTR;
104 bool PrimaryWorkerThread_(Func func, Callback callback,
106 bool WorkerThread_(Func func, Callback callback,
void** args,
111 size_t GetThreadCount() NLIB_NOEXCEPT {
return thread_count_; }
115 void (*primary_worker_thread_)(
void*);
116 void (*worker_thread_)(
void*);
118 uint32_t thread_count_;
126 template<
size_t N = 5>
130 typedef void* (*Func)(
void** args,
size_t n);
131 typedef void (*Callback)(
void* result);
133 typedef NLIB_NS::threading::detail::LockFreeThreadPoolBase::Func Func;
134 typedef NLIB_NS::threading::detail::LockFreeThreadPoolBase::Callback Callback;
138 : LockFreeThreadPoolBase(PrimaryWorkerThread, WorkerThread) {}
140 errno_t Init(
size_t work_queue_size,
size_t thread_count,
145 errno_t Submit(Func func, Callback callback);
146 errno_t Submit(Func func, Callback callback,
void* arg0);
147 errno_t Submit(Func func, Callback callback,
void* arg0,
void* arg1);
148 errno_t Submit(Func func, Callback callback,
void* arg0,
void* arg1,
void* arg2);
149 errno_t Submit(Func func, Callback callback,
void* arg0,
void* arg1,
void* arg2,
void* arg3);
150 errno_t Submit(Func func, Callback callback,
void* arg0,
void* arg1,
void* arg2,
void* arg3,
152 errno_t Submit(Func func, Callback callback,
void* arg0,
void* arg1,
void* arg2,
void* arg3,
153 void* arg4,
void* arg5);
154 errno_t Submit(Func func, Callback callback,
void* arg0,
void* arg1,
void* arg2,
void* arg3,
155 void* arg4,
void* arg5,
void* arg6);
156 errno_t Submit(Func func, Callback callback,
void* arg0,
void* arg1,
void* arg2,
void* arg3,
157 void* arg4,
void* arg5,
void* arg6,
void* arg7);
158 errno_t Submit(Func func, Callback callback,
void* arg0,
void* arg1,
void* arg2,
void* arg3,
159 void* arg4,
void* arg5,
void* arg6,
void* arg7,
void* arg8);
160 errno_t Submit(Func func, Callback callback,
void* arg0,
void* arg1,
void* arg2,
void* arg3,
161 void* arg4,
void* arg5,
void* arg6,
void* arg7,
void* arg8,
void* arg9);
162 size_t GetThreadCount();
164 errno_t Submit(Func func, Callback callback) NLIB_NOEXCEPT {
165 LockFreeThreadPoolExecInfo info;
167 info.callback = callback;
169 return work_queue_.Enqueue(info);
172 errno_t Submit(X func, Callback callback,
173 typename EnableIf<N >= 1 && IsSame<X, Func>::value,
174 void*>::type arg0) NLIB_NOEXCEPT {
175 LockFreeThreadPoolExecInfo info;
177 info.callback = callback;
180 return work_queue_.Enqueue(info);
183 errno_t Submit(X func, Callback callback,
184 typename EnableIf<N >= 2 && IsSame<X, Func>::value,
186 void* arg1) NLIB_NOEXCEPT {
187 LockFreeThreadPoolExecInfo info;
189 info.callback = callback;
193 return work_queue_.Enqueue(info);
196 errno_t Submit(X func, Callback callback,
197 typename EnableIf<N >= 3 && IsSame<X, Func>::value,
199 void* arg1,
void* arg2) NLIB_NOEXCEPT {
200 LockFreeThreadPoolExecInfo info;
202 info.callback = callback;
207 return work_queue_.Enqueue(info);
210 errno_t Submit(X func, Callback callback,
211 typename EnableIf<N >= 4 && IsSame<X, Func>::value,
213 void* arg1,
void* arg2,
void* arg3) NLIB_NOEXCEPT {
214 LockFreeThreadPoolExecInfo info;
216 info.callback = callback;
222 return work_queue_.Enqueue(info);
225 errno_t Submit(X func, Callback callback,
226 typename EnableIf<N >= 5 && IsSame<X, Func>::value,
228 void* arg1,
void* arg2,
void* arg3,
void* arg4) NLIB_NOEXCEPT {
229 LockFreeThreadPoolExecInfo info;
231 info.callback = callback;
238 return work_queue_.Enqueue(info);
241 errno_t Submit(X func, Callback callback,
242 typename EnableIf<N >= 6 && IsSame<X, Func>::value,
244 void* arg1,
void* arg2,
void* arg3,
void* arg4,
245 void* arg5) NLIB_NOEXCEPT {
246 LockFreeThreadPoolExecInfo info;
248 info.callback = callback;
256 return work_queue_.Enqueue(info);
259 errno_t Submit(X func, Callback callback,
260 typename EnableIf<N >= 7 && IsSame<X, Func>::value,
262 void* arg1,
void* arg2,
void* arg3,
void* arg4,
263 void* arg5,
void* arg6) NLIB_NOEXCEPT {
264 LockFreeThreadPoolExecInfo info;
266 info.callback = callback;
275 return work_queue_.Enqueue(info);
278 errno_t Submit(X func, Callback callback,
279 typename EnableIf<N >= 8 && IsSame<X, Func>::value,
281 void* arg1,
void* arg2,
void* arg3,
void* arg4,
282 void* arg5,
void* arg6,
void* arg7) NLIB_NOEXCEPT {
283 LockFreeThreadPoolExecInfo info;
285 info.callback = callback;
295 return work_queue_.Enqueue(info);
298 errno_t Submit(X func, Callback callback,
299 typename EnableIf<N >= 9 && IsSame<X, Func>::value,
301 void* arg1,
void* arg2,
void* arg3,
void* arg4,
302 void* arg5,
void* arg6,
void* arg7,
void* arg8) NLIB_NOEXCEPT {
303 LockFreeThreadPoolExecInfo info;
305 info.callback = callback;
316 return work_queue_.Enqueue(info);
319 errno_t Submit(X func, Callback callback,
320 typename EnableIf<N >= 10 && IsSame<X, Func>::value,
322 void* arg1,
void* arg2,
void* arg3,
void* arg4,
323 void* arg5,
void* arg6,
void* arg7,
void* arg8,
void* arg9) NLIB_NOEXCEPT {
324 LockFreeThreadPoolExecInfo info;
326 info.callback = callback;
338 return work_queue_.Enqueue(info);
346 struct LockFreeThreadPoolExecInfo {
359 errno_t e = work_queue_.Init(work_queue_size);
360 if (e != 0)
return e;
361 e = InitBase(thread_count, settings);
379 LockFreeThreadPoolExecInfo info;
381 info.callback = callback;
384 for (
size_t i = 0; i < n; ++i) {
385 info.arg[i] = va_arg(ap,
void*);
388 return work_queue_.Enqueue(info);
394 LockFreeThreadPoolExecInfo info;
398 if (!This->PrimaryWorkerThread_(info.func, info.callback,
399 e == 0 ? &info.arg[0] : NULL, N))
407 LockFreeThreadPoolExecInfo info;
408 uint32_t counter = 0;
412 if (!This->WorkerThread_(info.func, info.callback,
413 e == 0 ? &info.arg[0] : NULL, N, &counter))
421 #endif // INCLUDE_NN_NLIB_THREADING_THREADPOOL_H_ Lock-free algorithms are supported.
#define NLIB_OVERRIDE
Defines override if it is available for use. If not, holds an empty string.
#define NLIB_DISALLOW_COPY_AND_ASSIGN(TypeName)
Prohibits use of the copy constructor and assignment operator for the class specified by TypeName...
ThreadPool() noexcept
Instantiates the object with default parameters (default constructor). Must be initialized with Init...
errno_t Init(size_t thread_count) noexcept
Starts a thread and initializes the thread pool.
~LockFreeThreadPool() noexcept
Destructor. Joins all threads started internally.
LockFreeThreadPool() noexcept
Instantiates the object with default parameters (default constructor). Must be initialized with Init...
Class to wrap nlib_thread_attr. nlib_thread_attr_init() and nlib_thread_attr_destroy() are run automa...
errno_t GetFuture(Future< R > *p) noexcept
Sets Future to get the result of execution.
This thread pool is suitable for deploying fine grain tasks.
errno_t Init() noexcept
Starts a thread and initializes the thread pool.
void SwapUnsafe(LockFreeQueue &rhs) noexcept
Swaps an object. This is not thread-safe.
#define NLIB_NOEXCEPT
Defines noexcept geared to the environment, or the equivalent.
A container-like class similar to std::vector that can store objects that do not have copy constructo...
Class that gets the output of a different thread executing in a thread safe manner. This class is similar to the std::shared_future class of C++11.
errno_t Dequeue(DequeueType x) noexcept
Picks up an element from the queue and stores it in x. This is thread-safe.
size_t GetHardwareConcurrency() noexcept
Returns the number of hardware threads.
Class that wraps a function to run in a different thread, and gets the return value in a thread safe ...
Implements the Future pattern for multithread programming.
#define NLIB_FINAL
Defines final if it is available for use. If not, holds an empty string.
Tasks may be deployed to a pool of threads that have been already created.
errno_t Init(FUNC &func)
Performs initialization.