nlib
ThreadPool.h
Go to the documentation of this file.
1 
2 #pragma once
3 #ifndef INCLUDE_NN_NLIB_THREADING_THREADPOOL_H_
4 #define INCLUDE_NN_NLIB_THREADING_THREADPOOL_H_
5 
6 #include "nn/nlib/Nqueue.h"
8 #include "nn/nlib/LockFree.h"
9 
10 NLIB_NAMESPACE_BEGIN
11 namespace threading {
12 
13 namespace detail {
14 class NLIB_VIS_PUBLIC ThPoolTaskBase {
15  public:
16  ThPoolTaskBase() NLIB_NOEXCEPT {}
17  virtual ~ThPoolTaskBase() NLIB_NOEXCEPT {}
18  virtual void operator()() NLIB_NOEXCEPT = 0;
19 
20  private:
21  NLIB_DISALLOW_COPY_AND_ASSIGN(ThPoolTaskBase);
22 };
23 
24 template <class R>
25 class ThPoolTask : public ThPoolTaskBase {
26  public:
27  explicit ThPoolTask(PackagedTask<R()>& task) NLIB_NOEXCEPT { // NOLINT
28  task.swap(task_);
29  }
30  virtual ~ThPoolTask() NLIB_NOEXCEPT NLIB_OVERRIDE {}
31  virtual void operator()() NLIB_NOEXCEPT NLIB_OVERRIDE { task_(); }
32 
33  private:
34  PackagedTask<R()> task_;
36 };
37 } // namespace detail
38 
40  public:
41  ThreadPool() NLIB_NOEXCEPT : prv_(NULL) {}
42  errno_t Init(size_t thread_count, const ThreadSettings& settings) NLIB_NOEXCEPT;
43  errno_t Init(size_t thread_count) NLIB_NOEXCEPT {
44  ThreadSettings settings;
45  return this->Init(thread_count, settings);
46  }
47  errno_t Init() NLIB_NOEXCEPT {
48  return this->Init(GetHardwareConcurrency());
49  }
51  template <class R, class FUNC>
52  errno_t Submit(Future<R>* future, FUNC& func) NLIB_NOEXCEPT; // NOLINT
53  size_t Count() NLIB_NOEXCEPT;
54 
55  private:
56  NLIB_VIS_HIDDEN void JoinThreads(Nlist<nlib_thread>& L) NLIB_NOEXCEPT; // NOLINT
57  static NLIB_VIS_HIDDEN void WorkerThread(void* p) NLIB_NOEXCEPT;
58  errno_t Submit_(detail::ThPoolTaskBase* p) NLIB_NOEXCEPT;
59  struct ThreadPoolPrivate;
60  ThreadPoolPrivate* prv_;
62 };
63 
64 template <class R, class FUNC>
65 inline errno_t ThreadPool::Submit(Future<R>* future, FUNC& func) NLIB_NOEXCEPT { // NOLINT
66  if (!prv_) return EINVAL;
67  PackagedTask<R()> task;
68  errno_t e = task.Init(func);
69  if (e != 0) return e;
70  if (future) {
71  e = task.GetFuture(future);
72  if (e != 0) return e;
73  }
74  detail::ThPoolTaskBase* ptask = new (std::nothrow) detail::ThPoolTask<R>(task);
75  if (!ptask) return ENOMEM;
76  return Submit_(ptask);
77 }
78 
79 namespace detail {
80 
81 class NLIB_VIS_PUBLIC LockFreeThreadPoolBase {
82  protected:
83  LockFreeThreadPoolBase(void (*primary_worker_thread)(void*), void (*worker_thread)(void*))
84  : primary_worker_thread_(primary_worker_thread),
85  worker_thread_(worker_thread),
86  status_(0), thread_count_(0), thread_list_(NULL) {}
87  ~LockFreeThreadPoolBase() NLIB_NOEXCEPT;
88  typedef void* (*Func)(void**, size_t);
89  typedef void (*Callback)(void*);
90  errno_t InitBase(size_t thread_count, const ThreadSettings& settings) NLIB_NOEXCEPT;
91  bool PrimaryWorkerThread_(Func func, Callback callback,
92  void** args, size_t n) NLIB_NOEXCEPT;
93  bool WorkerThread_(Func func, Callback callback, void** args,
94  size_t n, uint32_t* counter) NLIB_NOEXCEPT;
95  void JoinThreads() NLIB_NOEXCEPT;
96 
97  public:
98  size_t GetThreadCount() NLIB_NOEXCEPT { return thread_count_; }
99 
100  private:
101  NLIB_VIS_HIDDEN void JoinThreads(nlib_thread* L, size_t cnt) NLIB_NOEXCEPT;
102  void (*primary_worker_thread_)(void*);
103  void (*worker_thread_)(void*);
104  int32_t status_;
105  uint32_t thread_count_;
106  nlib_thread* thread_list_;
107 
108  NLIB_DISALLOW_COPY_AND_ASSIGN(LockFreeThreadPoolBase);
109 };
110 
111 } // namespace detail
112 
113 template<size_t N = 5>
114 class LockFreeThreadPool NLIB_FINAL : public NLIB_NS::threading::detail::LockFreeThreadPoolBase {
115  public:
116 #ifdef NLIB_DOXYGEN
117  typedef void* (*Func)(void** args, size_t n);
118  typedef void (*Callback)(void* result);
119 #else
120  typedef NLIB_NS::threading::detail::LockFreeThreadPoolBase::Func Func;
121  typedef NLIB_NS::threading::detail::LockFreeThreadPoolBase::Callback Callback;
122 #endif
123 
125  : LockFreeThreadPoolBase(PrimaryWorkerThread, WorkerThread) {}
126  ~LockFreeThreadPool() NLIB_NOEXCEPT { this->JoinThreads(); }
127  errno_t Init(size_t work_queue_size, size_t thread_count,
128  const ThreadSettings& settings) NLIB_NOEXCEPT;
129  errno_t SubmitVarArgs(Func func, Callback callback, size_t n, ...) NLIB_NOEXCEPT;
130 
131 #ifdef NLIB_DOXYGEN
132  errno_t Submit(Func func, Callback callback);
133  errno_t Submit(Func func, Callback callback, void* arg0);
134  errno_t Submit(Func func, Callback callback, void* arg0, void* arg1);
135  errno_t Submit(Func func, Callback callback, void* arg0, void* arg1, void* arg2);
136  errno_t Submit(Func func, Callback callback, void* arg0, void* arg1, void* arg2, void* arg3);
137  errno_t Submit(Func func, Callback callback, void* arg0, void* arg1, void* arg2, void* arg3,
138  void* arg4);
139  errno_t Submit(Func func, Callback callback, void* arg0, void* arg1, void* arg2, void* arg3,
140  void* arg4, void* arg5);
141  errno_t Submit(Func func, Callback callback, void* arg0, void* arg1, void* arg2, void* arg3,
142  void* arg4, void* arg5, void* arg6);
143  errno_t Submit(Func func, Callback callback, void* arg0, void* arg1, void* arg2, void* arg3,
144  void* arg4, void* arg5, void* arg6, void* arg7);
145  errno_t Submit(Func func, Callback callback, void* arg0, void* arg1, void* arg2, void* arg3,
146  void* arg4, void* arg5, void* arg6, void* arg7, void* arg8);
147  errno_t Submit(Func func, Callback callback, void* arg0, void* arg1, void* arg2, void* arg3,
148  void* arg4, void* arg5, void* arg6, void* arg7, void* arg8, void* arg9);
149  size_t GetThreadCount();
150 #endif
151  errno_t Submit(Func func, Callback callback) NLIB_NOEXCEPT {
152  LockFreeThreadPoolExecInfo info;
153  info.func = func;
154  info.callback = callback;
155  // memset(&info.arg[0], 0, sizeof(info.arg[0]) * N);
156  return work_queue_.Enqueue(info);
157  }
158  template<class X>
159  errno_t Submit(X func, Callback callback,
160  typename EnableIf<N >= 1 && IsSame<X, Func>::value,
161  void*>::type arg0) NLIB_NOEXCEPT {
162  LockFreeThreadPoolExecInfo info;
163  info.func = func;
164  info.callback = callback;
165  info.arg[0] = arg0;
166  // memset(&info.arg[0] + 1, 0, sizeof(info.arg[0]) * (N - 1));
167  return work_queue_.Enqueue(info);
168  }
169  template<class X>
170  errno_t Submit(X func, Callback callback,
171  typename EnableIf<N >= 2 && IsSame<X, Func>::value,
172  void*>::type arg0,
173  void* arg1) NLIB_NOEXCEPT {
174  LockFreeThreadPoolExecInfo info;
175  info.func = func;
176  info.callback = callback;
177  info.arg[0] = arg0;
178  info.arg[1] = arg1;
179  // memset(&info.arg[0] + 2, 0, sizeof(info.arg[0]) * (N - 2));
180  return work_queue_.Enqueue(info);
181  }
182  template<class X>
183  errno_t Submit(X func, Callback callback,
184  typename EnableIf<N >= 3 && IsSame<X, Func>::value,
185  void*>::type arg0,
186  void* arg1, void* arg2) NLIB_NOEXCEPT {
187  LockFreeThreadPoolExecInfo info;
188  info.func = func;
189  info.callback = callback;
190  info.arg[0] = arg0;
191  info.arg[1] = arg1;
192  info.arg[2] = arg2;
193  // memset(&info.arg[0] + 3, 0, sizeof(info.arg[0]) * (N - 3));
194  return work_queue_.Enqueue(info);
195  }
196  template<class X>
197  errno_t Submit(X func, Callback callback,
198  typename EnableIf<N >= 4 && IsSame<X, Func>::value,
199  void*>::type arg0,
200  void* arg1, void* arg2, void* arg3) NLIB_NOEXCEPT {
201  LockFreeThreadPoolExecInfo info;
202  info.func = func;
203  info.callback = callback;
204  info.arg[0] = arg0;
205  info.arg[1] = arg1;
206  info.arg[2] = arg2;
207  info.arg[3] = arg3;
208  // memset(&info.arg[0] + 4, 0, sizeof(info.arg[0]) * (N - 4));
209  return work_queue_.Enqueue(info);
210  }
211  template<class X>
212  errno_t Submit(X func, Callback callback,
213  typename EnableIf<N >= 5 && IsSame<X, Func>::value,
214  void*>::type arg0,
215  void* arg1, void* arg2, void* arg3, void* arg4) NLIB_NOEXCEPT {
216  LockFreeThreadPoolExecInfo info;
217  info.func = func;
218  info.callback = callback;
219  info.arg[0] = arg0;
220  info.arg[1] = arg1;
221  info.arg[2] = arg2;
222  info.arg[3] = arg3;
223  info.arg[4] = arg4;
224  // memset(&info.arg[0] + 5, 0, sizeof(info.arg[0]) * (N - 5));
225  return work_queue_.Enqueue(info);
226  }
227  template<class X>
228  errno_t Submit(X func, Callback callback,
229  typename EnableIf<N >= 6 && IsSame<X, Func>::value,
230  void*>::type arg0,
231  void* arg1, void* arg2, void* arg3, void* arg4,
232  void* arg5) NLIB_NOEXCEPT {
233  LockFreeThreadPoolExecInfo info;
234  info.func = func;
235  info.callback = callback;
236  info.arg[0] = arg0;
237  info.arg[1] = arg1;
238  info.arg[2] = arg2;
239  info.arg[3] = arg3;
240  info.arg[4] = arg4;
241  info.arg[5] = arg5;
242  // memset(&info.arg[0] + 6, 0, sizeof(info.arg[0]) * (N - 6));
243  return work_queue_.Enqueue(info);
244  }
245  template<class X>
246  errno_t Submit(X func, Callback callback,
247  typename EnableIf<N >= 7 && IsSame<X, Func>::value,
248  void*>::type arg0,
249  void* arg1, void* arg2, void* arg3, void* arg4,
250  void* arg5, void* arg6) NLIB_NOEXCEPT {
251  LockFreeThreadPoolExecInfo info;
252  info.func = func;
253  info.callback = callback;
254  info.arg[0] = arg0;
255  info.arg[1] = arg1;
256  info.arg[2] = arg2;
257  info.arg[3] = arg3;
258  info.arg[4] = arg4;
259  info.arg[5] = arg5;
260  info.arg[6] = arg6;
261  // memset(&info.arg[0] + 7, 0, sizeof(info.arg[0]) * (N - 7));
262  return work_queue_.Enqueue(info);
263  }
264  template<class X>
265  errno_t Submit(X func, Callback callback,
266  typename EnableIf<N >= 8 && IsSame<X, Func>::value,
267  void*>::type arg0,
268  void* arg1, void* arg2, void* arg3, void* arg4,
269  void* arg5, void* arg6, void* arg7) NLIB_NOEXCEPT {
270  LockFreeThreadPoolExecInfo info;
271  info.func = func;
272  info.callback = callback;
273  info.arg[0] = arg0;
274  info.arg[1] = arg1;
275  info.arg[2] = arg2;
276  info.arg[3] = arg3;
277  info.arg[4] = arg4;
278  info.arg[5] = arg5;
279  info.arg[6] = arg6;
280  info.arg[7] = arg7;
281  // memset(&info.arg[0] + 8, 0, sizeof(info.arg[0]) * (N - 8));
282  return work_queue_.Enqueue(info);
283  }
284  template<class X>
285  errno_t Submit(X func, Callback callback,
286  typename EnableIf<N >= 9 && IsSame<X, Func>::value,
287  void*>::type arg0,
288  void* arg1, void* arg2, void* arg3, void* arg4,
289  void* arg5, void* arg6, void* arg7, void* arg8) NLIB_NOEXCEPT {
290  LockFreeThreadPoolExecInfo info;
291  info.func = func;
292  info.callback = callback;
293  info.arg[0] = arg0;
294  info.arg[1] = arg1;
295  info.arg[2] = arg2;
296  info.arg[3] = arg3;
297  info.arg[4] = arg4;
298  info.arg[5] = arg5;
299  info.arg[6] = arg6;
300  info.arg[7] = arg7;
301  info.arg[8] = arg8;
302  // memset(&info.arg[0] + 9, 0, sizeof(info.arg[0]) * (N - 9));
303  return work_queue_.Enqueue(info);
304  }
305  template<class X>
306  errno_t Submit(X func, Callback callback,
307  typename EnableIf<N >= 10 && IsSame<X, Func>::value,
308  void*>::type arg0,
309  void* arg1, void* arg2, void* arg3, void* arg4,
310  void* arg5, void* arg6, void* arg7, void* arg8, void* arg9) NLIB_NOEXCEPT {
311  LockFreeThreadPoolExecInfo info;
312  info.func = func;
313  info.callback = callback;
314  info.arg[0] = arg0;
315  info.arg[1] = arg1;
316  info.arg[2] = arg2;
317  info.arg[3] = arg3;
318  info.arg[4] = arg4;
319  info.arg[5] = arg5;
320  info.arg[6] = arg6;
321  info.arg[7] = arg7;
322  info.arg[8] = arg8;
323  info.arg[9] = arg9;
324  // memset(&info.arg[0] + 10, 0, sizeof(info.arg[0]) * (N - 10));
325  return work_queue_.Enqueue(info);
326  }
327 
328  private:
329  static void PrimaryWorkerThread(void* p) NLIB_NOEXCEPT;
330  static void WorkerThread(void* p) NLIB_NOEXCEPT;
331 
332  private:
333  struct LockFreeThreadPoolExecInfo {
334  Func func;
335  Callback callback;
336  void* arg[N];
337  };
339 
341 };
342 
343 template<size_t N>
344 errno_t LockFreeThreadPool<N>::Init(size_t work_queue_size, size_t thread_count,
345  const ThreadSettings& settings) NLIB_NOEXCEPT {
346  errno_t e = work_queue_.Init(work_queue_size);
347  if (e != 0) return e;
348  e = InitBase(thread_count, settings);
349  if (e != 0) {
351  q.SwapUnsafe(work_queue_);
352  return e;
353  }
354  return 0;
355 }
356 
357 template<size_t N>
358 errno_t LockFreeThreadPool<N>::SubmitVarArgs(Func func, Callback callback,
359  size_t n, ...) NLIB_NOEXCEPT {
360  if (n > N) {
361  NLIB_ASSERT(n <= N);
362  return EINVAL;
363  }
364 
365  va_list ap;
366  LockFreeThreadPoolExecInfo info;
367  info.func = func;
368  info.callback = callback;
369 
370  va_start(ap, n);
371  for (size_t i = 0; i < n; ++i) {
372  info.arg[i] = va_arg(ap, void*);
373  }
374  va_end(ap);
375  return work_queue_.Enqueue(info);
376 }
377 
378 template<size_t N>
380  LockFreeThreadPool* This = reinterpret_cast<LockFreeThreadPool*>(p);
381  LockFreeThreadPoolExecInfo info;
382 
383  for (;;) {
384  errno_t e = This->work_queue_.Dequeue(&info);
385  if (!This->PrimaryWorkerThread_(info.func, info.callback,
386  e == 0 ? &info.arg[0] : NULL, N))
387  break;
388  }
389 }
390 
391 template<size_t N>
393  LockFreeThreadPool* This = reinterpret_cast<LockFreeThreadPool*>(p);
394  LockFreeThreadPoolExecInfo info;
395  uint32_t counter = 0;
396 
397  for (;;) {
398  errno_t e = This->work_queue_.Dequeue(&info);
399  if (!This->WorkerThread_(info.func, info.callback,
400  e == 0 ? &info.arg[0] : NULL, N, &counter))
401  break;
402  }
403 }
404 
405 } // namespace threading
406 NLIB_NAMESPACE_END
407 
408 #endif // INCLUDE_NN_NLIB_THREADING_THREADPOOL_H_
Lock-free algorithms are supported.
#define NLIB_OVERRIDE
Defines override if it is available for use. If not, holds an empty string.
Definition: Config.h:210
#define NLIB_DISALLOW_COPY_AND_ASSIGN(TypeName)
Prohibits use of the copy constructor and assignment operator for the class specified by TypeName...
Definition: Config.h:145
ThreadPool() noexcept
Instantiates the object with default parameters (default constructor). Must be initialized with Init...
Definition: ThreadPool.h:41
errno_t Init(size_t thread_count) noexcept
Starts a thread and initializes the thread pool.
Definition: ThreadPool.h:43
~LockFreeThreadPool() noexcept
Destructor. Joins all threads started internally.
Definition: ThreadPool.h:126
LockFreeThreadPool() noexcept
Instantiates the object with default parameters (default constructor). Must be initialized with Init...
Definition: ThreadPool.h:124
#define NLIB_VIS_HIDDEN
Symbols for functions and classes are not made available outside of the library.
Definition: Platform_unix.h:60
#define NLIB_VIS_PUBLIC
Symbols for functions and classes are made available outside of the library.
Definition: Platform_unix.h:61
Class to wrap nlib_thread_attr. nlib_thread_attr_init() and nlib_thread_attr_destroy() are run automa...
Definition: Thread.h:16
errno_t GetFuture(Future< R > *p) noexcept
Sets Future to get the result of execution.
This thread pool is suitable for deploying fine grain tasks.
Definition: ThreadPool.h:114
errno_t Init() noexcept
Starts a thread and initializes the thread pool.
Definition: ThreadPool.h:47
void SwapUnsafe(LockFreeQueue &rhs) noexcept
Swaps an object. This is not thread-safe.
Definition: LockFree.h:551
#define NLIB_NOEXCEPT
Defines noexcept geared to the environment, or the equivalent.
Definition: Config.h:86
A container-like class similar to std::vector that can store objects that do not have copy constructo...
Definition: Nlist.h:19
Class that gets the output of a different thread executing in a thread safe manner. This class is similar to the std::shared_future class of C++11.
Definition: AsyncFileIo.h:13
errno_t Dequeue(DequeueType x) noexcept
Picks up an element from the queue and stores it in x. This is thread-safe.
Definition: LockFree.h:544
size_t GetHardwareConcurrency() noexcept
Returns the number of hardware threads.
Definition: Thread.h:784
Class that wraps a function to run in a different thread, and gets the return value in a thread safe ...
Definition: Future.h:918
Implements the Future pattern for multithread programming.
#define NLIB_FINAL
Defines final if it is available for use. If not, holds an empty string.
Definition: Config.h:211
Tasks may be deployed to a pool of threads that have been already created.
Definition: ThreadPool.h:39
pthread_t nlib_thread
The identifier for threads.
errno_t Init(FUNC &func)
Performs initialization.
int errno_t
Indicates with an int-type typedef that a POSIX error value is returned as the return value...
Definition: NMalloc.h:24