nlib
ThreadPool.h
Go to the documentation of this file.
1 
2 /*--------------------------------------------------------------------------------*
3  Project: CrossRoad
4  Copyright (C)Nintendo All rights reserved.
5 
6  These coded instructions, statements, and computer programs contain proprietary
7  information of Nintendo and/or its licensed developers and are protected by
8  national and international copyright laws. They may not be disclosed to third
9  parties or copied or duplicated in any form, in whole or in part, without the
10  prior written consent of Nintendo.
11 
12  The content herein is highly confidential and should be handled accordingly.
13  *--------------------------------------------------------------------------------*/
14 
15 #pragma once
16 #ifndef INCLUDE_NN_NLIB_THREADING_THREADPOOL_H_
17 #define INCLUDE_NN_NLIB_THREADING_THREADPOOL_H_
18 
19 #include "nn/nlib/Nqueue.h"
21 #include "nn/nlib/LockFree.h"
22 
23 NLIB_NAMESPACE_BEGIN
24 namespace threading {
25 
26 namespace detail {
27 class NLIB_VIS_PUBLIC ThPoolTaskBase {
28  public:
29  ThPoolTaskBase() NLIB_NOEXCEPT {}
30  virtual ~ThPoolTaskBase() NLIB_NOEXCEPT {}
31  virtual void operator()() NLIB_NOEXCEPT = 0;
32 
33  private:
34  NLIB_DISALLOW_COPY_AND_ASSIGN(ThPoolTaskBase);
35 };
36 
37 template <class R>
38 class ThPoolTask : public ThPoolTaskBase {
39  public:
40  explicit ThPoolTask(PackagedTask<R()>& task) NLIB_NOEXCEPT { // NOLINT
41  task.swap(task_);
42  }
43  virtual ~ThPoolTask() NLIB_NOEXCEPT NLIB_OVERRIDE {}
44  virtual void operator()() NLIB_NOEXCEPT NLIB_OVERRIDE { task_(); }
45 
46  private:
47  PackagedTask<R()> task_;
49 };
50 } // namespace detail
51 
53  public:
54  ThreadPool() NLIB_NOEXCEPT : prv_(NULL) {}
55  errno_t Init(size_t thread_count, const ThreadSettings& settings) NLIB_NOEXCEPT;
56  errno_t Init(size_t thread_count) NLIB_NOEXCEPT {
57  ThreadSettings settings;
58  return this->Init(thread_count, settings);
59  }
60  errno_t Init() NLIB_NOEXCEPT {
61  return this->Init(GetHardwareConcurrency());
62  }
64  template <class R, class FUNC>
65  errno_t Submit(Future<R>* future, FUNC& func) NLIB_NOEXCEPT; // NOLINT
66  size_t Count() NLIB_NOEXCEPT;
67 
68  private:
69  NLIB_VIS_HIDDEN void JoinThreads(Nlist<nlib_thread>& L) NLIB_NOEXCEPT; // NOLINT
70  static NLIB_VIS_HIDDEN void WorkerThread(void* p) NLIB_NOEXCEPT;
71  errno_t Submit_(detail::ThPoolTaskBase* p) NLIB_NOEXCEPT;
72  struct ThreadPoolPrivate;
73  ThreadPoolPrivate* prv_;
75 };
76 
77 template <class R, class FUNC>
78 inline errno_t ThreadPool::Submit(Future<R>* future, FUNC& func) NLIB_NOEXCEPT { // NOLINT
79  if (!prv_) return EINVAL;
80  PackagedTask<R()> task;
81  errno_t e = task.Init(func);
82  if (e != 0) return e;
83  if (future) {
84  e = task.GetFuture(future);
85  if (e != 0) return e;
86  }
87  detail::ThPoolTaskBase* ptask = new (std::nothrow) detail::ThPoolTask<R>(task);
88  if (!ptask) return ENOMEM;
89  return Submit_(ptask);
90 }
91 
92 namespace detail {
93 
94 class NLIB_VIS_PUBLIC LockFreeThreadPoolBase {
95  protected:
96  LockFreeThreadPoolBase(void (*primary_worker_thread)(void*), void (*worker_thread)(void*))
97  : primary_worker_thread_(primary_worker_thread),
98  worker_thread_(worker_thread),
99  status_(0), thread_count_(0), thread_list_(NULL) {}
100  ~LockFreeThreadPoolBase() NLIB_NOEXCEPT;
101  typedef void* (*Func)(void**, size_t) NLIB_NOEXCEPT_FUNCPTR;
102  typedef void (*Callback)(void*) NLIB_NOEXCEPT_FUNCPTR; // NOLINT
103  errno_t InitBase(size_t thread_count, const ThreadSettings& settings) NLIB_NOEXCEPT;
104  bool PrimaryWorkerThread_(Func func, Callback callback,
105  void** args, size_t n) NLIB_NOEXCEPT;
106  bool WorkerThread_(Func func, Callback callback, void** args,
107  size_t n, uint32_t* counter) NLIB_NOEXCEPT;
108  void JoinThreads() NLIB_NOEXCEPT;
109 
110  public:
111  size_t GetThreadCount() NLIB_NOEXCEPT { return thread_count_; }
112 
113  private:
114  NLIB_VIS_HIDDEN void JoinThreads(nlib_thread* L, size_t cnt) NLIB_NOEXCEPT;
115  void (*primary_worker_thread_)(void*);
116  void (*worker_thread_)(void*);
117  int32_t status_;
118  uint32_t thread_count_;
119  nlib_thread* thread_list_;
120 
121  NLIB_DISALLOW_COPY_AND_ASSIGN(LockFreeThreadPoolBase);
122 };
123 
124 } // namespace detail
125 
126 template<size_t N = 5>
127 class LockFreeThreadPool NLIB_FINAL : public NLIB_NS::threading::detail::LockFreeThreadPoolBase {
128  public:
129 #ifdef NLIB_DOXYGEN
130  typedef void* (*Func)(void** args, size_t n);
131  typedef void (*Callback)(void* result);
132 #else
133  typedef NLIB_NS::threading::detail::LockFreeThreadPoolBase::Func Func;
134  typedef NLIB_NS::threading::detail::LockFreeThreadPoolBase::Callback Callback;
135 #endif
136 
138  : LockFreeThreadPoolBase(PrimaryWorkerThread, WorkerThread) {}
139  ~LockFreeThreadPool() NLIB_NOEXCEPT { this->JoinThreads(); }
140  errno_t Init(size_t work_queue_size, size_t thread_count,
141  const ThreadSettings& settings) NLIB_NOEXCEPT;
142  errno_t SubmitVarArgs(Func func, Callback callback, size_t n, ...) NLIB_NOEXCEPT;
143 
144 #ifdef NLIB_DOXYGEN
145  errno_t Submit(Func func, Callback callback);
146  errno_t Submit(Func func, Callback callback, void* arg0);
147  errno_t Submit(Func func, Callback callback, void* arg0, void* arg1);
148  errno_t Submit(Func func, Callback callback, void* arg0, void* arg1, void* arg2);
149  errno_t Submit(Func func, Callback callback, void* arg0, void* arg1, void* arg2, void* arg3);
150  errno_t Submit(Func func, Callback callback, void* arg0, void* arg1, void* arg2, void* arg3,
151  void* arg4);
152  errno_t Submit(Func func, Callback callback, void* arg0, void* arg1, void* arg2, void* arg3,
153  void* arg4, void* arg5);
154  errno_t Submit(Func func, Callback callback, void* arg0, void* arg1, void* arg2, void* arg3,
155  void* arg4, void* arg5, void* arg6);
156  errno_t Submit(Func func, Callback callback, void* arg0, void* arg1, void* arg2, void* arg3,
157  void* arg4, void* arg5, void* arg6, void* arg7);
158  errno_t Submit(Func func, Callback callback, void* arg0, void* arg1, void* arg2, void* arg3,
159  void* arg4, void* arg5, void* arg6, void* arg7, void* arg8);
160  errno_t Submit(Func func, Callback callback, void* arg0, void* arg1, void* arg2, void* arg3,
161  void* arg4, void* arg5, void* arg6, void* arg7, void* arg8, void* arg9);
162  size_t GetThreadCount();
163 #endif
164  errno_t Submit(Func func, Callback callback) NLIB_NOEXCEPT {
165  LockFreeThreadPoolExecInfo info;
166  info.func = func;
167  info.callback = callback;
168  // memset(&info.arg[0], 0, sizeof(info.arg[0]) * N);
169  return work_queue_.Enqueue(info);
170  }
171  template<class X>
172  errno_t Submit(X func, Callback callback,
173  typename EnableIf<N >= 1 && IsSame<X, Func>::value,
174  void*>::type arg0) NLIB_NOEXCEPT {
175  LockFreeThreadPoolExecInfo info;
176  info.func = func;
177  info.callback = callback;
178  info.arg[0] = arg0;
179  // memset(&info.arg[0] + 1, 0, sizeof(info.arg[0]) * (N - 1));
180  return work_queue_.Enqueue(info);
181  }
182  template<class X>
183  errno_t Submit(X func, Callback callback,
184  typename EnableIf<N >= 2 && IsSame<X, Func>::value,
185  void*>::type arg0,
186  void* arg1) NLIB_NOEXCEPT {
187  LockFreeThreadPoolExecInfo info;
188  info.func = func;
189  info.callback = callback;
190  info.arg[0] = arg0;
191  info.arg[1] = arg1;
192  // memset(&info.arg[0] + 2, 0, sizeof(info.arg[0]) * (N - 2));
193  return work_queue_.Enqueue(info);
194  }
195  template<class X>
196  errno_t Submit(X func, Callback callback,
197  typename EnableIf<N >= 3 && IsSame<X, Func>::value,
198  void*>::type arg0,
199  void* arg1, void* arg2) NLIB_NOEXCEPT {
200  LockFreeThreadPoolExecInfo info;
201  info.func = func;
202  info.callback = callback;
203  info.arg[0] = arg0;
204  info.arg[1] = arg1;
205  info.arg[2] = arg2;
206  // memset(&info.arg[0] + 3, 0, sizeof(info.arg[0]) * (N - 3));
207  return work_queue_.Enqueue(info);
208  }
209  template<class X>
210  errno_t Submit(X func, Callback callback,
211  typename EnableIf<N >= 4 && IsSame<X, Func>::value,
212  void*>::type arg0,
213  void* arg1, void* arg2, void* arg3) NLIB_NOEXCEPT {
214  LockFreeThreadPoolExecInfo info;
215  info.func = func;
216  info.callback = callback;
217  info.arg[0] = arg0;
218  info.arg[1] = arg1;
219  info.arg[2] = arg2;
220  info.arg[3] = arg3;
221  // memset(&info.arg[0] + 4, 0, sizeof(info.arg[0]) * (N - 4));
222  return work_queue_.Enqueue(info);
223  }
224  template<class X>
225  errno_t Submit(X func, Callback callback,
226  typename EnableIf<N >= 5 && IsSame<X, Func>::value,
227  void*>::type arg0,
228  void* arg1, void* arg2, void* arg3, void* arg4) NLIB_NOEXCEPT {
229  LockFreeThreadPoolExecInfo info;
230  info.func = func;
231  info.callback = callback;
232  info.arg[0] = arg0;
233  info.arg[1] = arg1;
234  info.arg[2] = arg2;
235  info.arg[3] = arg3;
236  info.arg[4] = arg4;
237  // memset(&info.arg[0] + 5, 0, sizeof(info.arg[0]) * (N - 5));
238  return work_queue_.Enqueue(info);
239  }
240  template<class X>
241  errno_t Submit(X func, Callback callback,
242  typename EnableIf<N >= 6 && IsSame<X, Func>::value,
243  void*>::type arg0,
244  void* arg1, void* arg2, void* arg3, void* arg4,
245  void* arg5) NLIB_NOEXCEPT {
246  LockFreeThreadPoolExecInfo info;
247  info.func = func;
248  info.callback = callback;
249  info.arg[0] = arg0;
250  info.arg[1] = arg1;
251  info.arg[2] = arg2;
252  info.arg[3] = arg3;
253  info.arg[4] = arg4;
254  info.arg[5] = arg5;
255  // memset(&info.arg[0] + 6, 0, sizeof(info.arg[0]) * (N - 6));
256  return work_queue_.Enqueue(info);
257  }
258  template<class X>
259  errno_t Submit(X func, Callback callback,
260  typename EnableIf<N >= 7 && IsSame<X, Func>::value,
261  void*>::type arg0,
262  void* arg1, void* arg2, void* arg3, void* arg4,
263  void* arg5, void* arg6) NLIB_NOEXCEPT {
264  LockFreeThreadPoolExecInfo info;
265  info.func = func;
266  info.callback = callback;
267  info.arg[0] = arg0;
268  info.arg[1] = arg1;
269  info.arg[2] = arg2;
270  info.arg[3] = arg3;
271  info.arg[4] = arg4;
272  info.arg[5] = arg5;
273  info.arg[6] = arg6;
274  // memset(&info.arg[0] + 7, 0, sizeof(info.arg[0]) * (N - 7));
275  return work_queue_.Enqueue(info);
276  }
277  template<class X>
278  errno_t Submit(X func, Callback callback,
279  typename EnableIf<N >= 8 && IsSame<X, Func>::value,
280  void*>::type arg0,
281  void* arg1, void* arg2, void* arg3, void* arg4,
282  void* arg5, void* arg6, void* arg7) NLIB_NOEXCEPT {
283  LockFreeThreadPoolExecInfo info;
284  info.func = func;
285  info.callback = callback;
286  info.arg[0] = arg0;
287  info.arg[1] = arg1;
288  info.arg[2] = arg2;
289  info.arg[3] = arg3;
290  info.arg[4] = arg4;
291  info.arg[5] = arg5;
292  info.arg[6] = arg6;
293  info.arg[7] = arg7;
294  // memset(&info.arg[0] + 8, 0, sizeof(info.arg[0]) * (N - 8));
295  return work_queue_.Enqueue(info);
296  }
297  template<class X>
298  errno_t Submit(X func, Callback callback,
299  typename EnableIf<N >= 9 && IsSame<X, Func>::value,
300  void*>::type arg0,
301  void* arg1, void* arg2, void* arg3, void* arg4,
302  void* arg5, void* arg6, void* arg7, void* arg8) NLIB_NOEXCEPT {
303  LockFreeThreadPoolExecInfo info;
304  info.func = func;
305  info.callback = callback;
306  info.arg[0] = arg0;
307  info.arg[1] = arg1;
308  info.arg[2] = arg2;
309  info.arg[3] = arg3;
310  info.arg[4] = arg4;
311  info.arg[5] = arg5;
312  info.arg[6] = arg6;
313  info.arg[7] = arg7;
314  info.arg[8] = arg8;
315  // memset(&info.arg[0] + 9, 0, sizeof(info.arg[0]) * (N - 9));
316  return work_queue_.Enqueue(info);
317  }
318  template<class X>
319  errno_t Submit(X func, Callback callback,
320  typename EnableIf<N >= 10 && IsSame<X, Func>::value,
321  void*>::type arg0,
322  void* arg1, void* arg2, void* arg3, void* arg4,
323  void* arg5, void* arg6, void* arg7, void* arg8, void* arg9) NLIB_NOEXCEPT {
324  LockFreeThreadPoolExecInfo info;
325  info.func = func;
326  info.callback = callback;
327  info.arg[0] = arg0;
328  info.arg[1] = arg1;
329  info.arg[2] = arg2;
330  info.arg[3] = arg3;
331  info.arg[4] = arg4;
332  info.arg[5] = arg5;
333  info.arg[6] = arg6;
334  info.arg[7] = arg7;
335  info.arg[8] = arg8;
336  info.arg[9] = arg9;
337  // memset(&info.arg[0] + 10, 0, sizeof(info.arg[0]) * (N - 10));
338  return work_queue_.Enqueue(info);
339  }
340 
341  private:
342  static void PrimaryWorkerThread(void* p) NLIB_NOEXCEPT;
343  static void WorkerThread(void* p) NLIB_NOEXCEPT;
344 
345  private:
346  struct LockFreeThreadPoolExecInfo {
347  Func func;
348  Callback callback;
349  void* arg[N];
350  };
352 
354 };
355 
356 template<size_t N>
357 errno_t LockFreeThreadPool<N>::Init(size_t work_queue_size, size_t thread_count,
358  const ThreadSettings& settings) NLIB_NOEXCEPT {
359  errno_t e = work_queue_.Init(work_queue_size);
360  if (e != 0) return e;
361  e = InitBase(thread_count, settings);
362  if (e != 0) {
364  q.SwapUnsafe(work_queue_);
365  return e;
366  }
367  return 0;
368 }
369 
370 template<size_t N>
371 errno_t LockFreeThreadPool<N>::SubmitVarArgs(Func func, Callback callback,
372  size_t n, ...) NLIB_NOEXCEPT {
373  if (n > N) {
374  NLIB_ASSERT(n <= N);
375  return EINVAL;
376  }
377 
378  va_list ap;
379  LockFreeThreadPoolExecInfo info;
380  info.func = func;
381  info.callback = callback;
382 
383  va_start(ap, n);
384  for (size_t i = 0; i < n; ++i) {
385  info.arg[i] = va_arg(ap, void*);
386  }
387  va_end(ap);
388  return work_queue_.Enqueue(info);
389 }
390 
391 template<size_t N>
393  LockFreeThreadPool* This = reinterpret_cast<LockFreeThreadPool*>(p);
394  LockFreeThreadPoolExecInfo info;
395 
396  for (;;) {
397  errno_t e = This->work_queue_.Dequeue(&info);
398  if (!This->PrimaryWorkerThread_(info.func, info.callback,
399  e == 0 ? &info.arg[0] : NULL, N))
400  break;
401  }
402 }
403 
404 template<size_t N>
406  LockFreeThreadPool* This = reinterpret_cast<LockFreeThreadPool*>(p);
407  LockFreeThreadPoolExecInfo info;
408  uint32_t counter = 0;
409 
410  for (;;) {
411  errno_t e = This->work_queue_.Dequeue(&info);
412  if (!This->WorkerThread_(info.func, info.callback,
413  e == 0 ? &info.arg[0] : NULL, N, &counter))
414  break;
415  }
416 }
417 
418 } // namespace threading
419 NLIB_NAMESPACE_END
420 
421 #endif // INCLUDE_NN_NLIB_THREADING_THREADPOOL_H_
Lock-free algorithms are supported.
#define NLIB_OVERRIDE
Defines override if it is available for use. If not, holds an empty string.
Definition: Config.h:228
#define NLIB_DISALLOW_COPY_AND_ASSIGN(TypeName)
Prohibits use of the copy constructor and assignment operator for the class specified by TypeName...
Definition: Config.h:163
ThreadPool() noexcept
Instantiates the object with default parameters (default constructor). Must be initialized with Init...
Definition: ThreadPool.h:54
errno_t Init(size_t thread_count) noexcept
Starts a thread and initializes the thread pool.
Definition: ThreadPool.h:56
~LockFreeThreadPool() noexcept
Destructor. Joins all threads started internally.
Definition: ThreadPool.h:139
LockFreeThreadPool() noexcept
Instantiates the object with default parameters (default constructor). Must be initialized with Init...
Definition: ThreadPool.h:137
#define NLIB_VIS_HIDDEN
Symbols for functions and classes are not made available outside of the library.
Definition: Platform_unix.h:88
#define NLIB_VIS_PUBLIC
Symbols for functions and classes are made available outside of the library.
Definition: Platform_unix.h:89
Class to wrap nlib_thread_attr. nlib_thread_attr_init() and nlib_thread_attr_destroy() are run automa...
Definition: Thread.h:29
errno_t GetFuture(Future< R > *p) noexcept
Sets Future to get the result of execution.
This thread pool is suitable for deploying fine grain tasks.
Definition: ThreadPool.h:127
errno_t Init() noexcept
Starts a thread and initializes the thread pool.
Definition: ThreadPool.h:60
void SwapUnsafe(LockFreeQueue &rhs) noexcept
Swaps an object. This is not thread-safe.
Definition: LockFree.h:564
#define NLIB_NOEXCEPT
Defines noexcept geared to the environment, or the equivalent.
Definition: Config.h:99
A container-like class similar to std::vector that can store objects that do not have copy constructo...
Definition: Nlist.h:32
Class that gets the output of a different thread executing in a thread safe manner. This class is similar to the std::shared_future class of C++11.
Definition: AsyncFileIo.h:26
errno_t Dequeue(DequeueType x) noexcept
Picks up an element from the queue and stores it in x. This is thread-safe.
Definition: LockFree.h:557
size_t GetHardwareConcurrency() noexcept
Returns the number of hardware threads.
Definition: Thread.h:797
Class that wraps a function to run in a different thread, and gets the return value in a thread safe ...
Definition: Future.h:931
Implements the Future pattern for multithread programming.
#define NLIB_FINAL
Defines final if it is available for use. If not, holds an empty string.
Definition: Config.h:229
Tasks may be deployed to a pool of threads that have been already created.
Definition: ThreadPool.h:52
pthread_t nlib_thread
The identifier for threads.
errno_t Init(FUNC &func)
Performs initialization.
int errno_t
Indicates with an int-type typedef that a POSIX error value is returned as the return value...
Definition: NMalloc.h:37