nlib
ThreadPool.h
Go to the documentation of this file.
1 
2 /*--------------------------------------------------------------------------------*
3  Project: CrossRoad
4  Copyright (C)Nintendo All rights reserved.
5 
6  These coded instructions, statements, and computer programs contain proprietary
7  information of Nintendo and/or its licensed developers and are protected by
8  national and international copyright laws. They may not be disclosed to third
9  parties or copied or duplicated in any form, in whole or in part, without the
10  prior written consent of Nintendo.
11 
12  The content herein is highly confidential and should be handled accordingly.
13  *--------------------------------------------------------------------------------*/
14 
15 #pragma once
16 #ifndef INCLUDE_NN_NLIB_THREADING_THREADPOOL_H_
17 #define INCLUDE_NN_NLIB_THREADING_THREADPOOL_H_
18 
19 #include "nn/nlib/Nqueue.h"
21 #include "nn/nlib/LockFree.h"
22 
23 NLIB_NAMESPACE_BEGIN
24 namespace threading {
25 
26 namespace detail {
27 class NLIB_VIS_PUBLIC ThPoolTaskBase {
28  public:
29  ThPoolTaskBase() NLIB_NOEXCEPT {}
30  virtual ~ThPoolTaskBase() NLIB_NOEXCEPT {}
31  virtual void operator()() NLIB_NOEXCEPT = 0;
32 
33  private:
34  NLIB_DISALLOW_COPY_AND_ASSIGN(ThPoolTaskBase);
35 };
36 
37 template <class R>
38 class ThPoolTask : public ThPoolTaskBase {
39  public:
40  explicit ThPoolTask(PackagedTask<R()>& task, move_tag) NLIB_NOEXCEPT // NOLINT
41  : task_(task, move_tag()) {}
42  virtual ~ThPoolTask() NLIB_NOEXCEPT NLIB_OVERRIDE {}
43  virtual void operator()() NLIB_NOEXCEPT NLIB_OVERRIDE { task_(); }
44 
45  private:
46  PackagedTask<R()> task_;
48 };
49 } // namespace detail
50 
52  public:
53  ThreadPool() NLIB_NOEXCEPT : prv_(nullptr) {}
54  errno_t Init(size_t thread_count, const ThreadSettings& settings) NLIB_NOEXCEPT;
55  errno_t Init(size_t thread_count) NLIB_NOEXCEPT {
56  ThreadSettings settings;
57  return this->Init(thread_count, settings);
58  }
59  errno_t Init() NLIB_NOEXCEPT {
60  return this->Init(GetHardwareConcurrency());
61  }
63  template <class R, class FUNC>
64  errno_t Submit(Future<R>* future, FUNC& func) NLIB_NOEXCEPT; // NOLINT
65  size_t Count() NLIB_NOEXCEPT;
66 
67  private:
68  NLIB_VIS_HIDDEN void JoinThreads(Nlist<nlib_thread>& L) NLIB_NOEXCEPT; // NOLINT
69  static NLIB_VIS_HIDDEN void WorkerThread(void* p) NLIB_NOEXCEPT;
70  errno_t Submit_(detail::ThPoolTaskBase* p) NLIB_NOEXCEPT;
71  struct ThreadPoolPrivate;
72  ThreadPoolPrivate* prv_;
74 };
75 
76 template <class R, class FUNC>
77 inline errno_t ThreadPool::Submit(Future<R>* future, FUNC& func) NLIB_NOEXCEPT { // NOLINT
78  if (!prv_) return EINVAL;
79  PackagedTask<R()> task;
80  errno_t e = task.Init(func);
81  if (e != 0) return e;
82  if (future) {
83  e = task.GetFuture(future);
84  if (e != 0) return e;
85  }
86  detail::ThPoolTaskBase* ptask = new (std::nothrow) detail::ThPoolTask<R>(task, move_tag());
87  if (!ptask) return ENOMEM;
88  return Submit_(ptask);
89 }
90 
91 namespace detail {
92 
93 class NLIB_VIS_PUBLIC LockFreeThreadPoolBase {
94  protected:
95  LockFreeThreadPoolBase(void (*primary_worker_thread)(void*), void (*worker_thread)(void*))
96  : primary_worker_thread_(primary_worker_thread),
97  worker_thread_(worker_thread),
98  status_(0), thread_count_(0), thread_list_(nullptr) {}
99  ~LockFreeThreadPoolBase() NLIB_NOEXCEPT;
100  typedef void* (*Func)(void**, size_t) NLIB_NOEXCEPT_FUNCPTR;
101  typedef void (*Callback)(void*) NLIB_NOEXCEPT_FUNCPTR; // NOLINT
102  errno_t InitBase(size_t thread_count, const ThreadSettings& settings) NLIB_NOEXCEPT;
103  bool PrimaryWorkerThread_(Func func, Callback callback,
104  void** args, size_t n) NLIB_NOEXCEPT;
105  bool WorkerThread_(Func func, Callback callback, void** args,
106  size_t n, uint32_t* counter) NLIB_NOEXCEPT;
107  void JoinThreads() NLIB_NOEXCEPT;
108 
109  public:
110  size_t GetThreadCount() NLIB_NOEXCEPT { return thread_count_; }
111 
112  private:
113  NLIB_VIS_HIDDEN void JoinThreads(nlib_thread* L, size_t cnt) NLIB_NOEXCEPT;
114  void (*primary_worker_thread_)(void*);
115  void (*worker_thread_)(void*);
116  int32_t status_;
117  uint32_t thread_count_;
118  nlib_thread* thread_list_;
119 
120  NLIB_DISALLOW_COPY_AND_ASSIGN(LockFreeThreadPoolBase);
121 };
122 
123 } // namespace detail
124 
125 template<size_t N = 5>
126 class LockFreeThreadPool NLIB_FINAL : public NLIB_NS::threading::detail::LockFreeThreadPoolBase {
127  public:
128 #ifdef NLIB_DOXYGEN
129  typedef void* (*Func)(void** args, size_t n);
130  typedef void (*Callback)(void* result);
131 #else
132  typedef NLIB_NS::threading::detail::LockFreeThreadPoolBase::Func Func;
133  typedef NLIB_NS::threading::detail::LockFreeThreadPoolBase::Callback Callback;
134 #endif
135 
137  : LockFreeThreadPoolBase(PrimaryWorkerThread, WorkerThread) {}
138  ~LockFreeThreadPool() NLIB_NOEXCEPT { this->JoinThreads(); }
139  errno_t Init(size_t work_queue_size, size_t thread_count,
140  const ThreadSettings& settings) NLIB_NOEXCEPT;
141  errno_t SubmitVarArgs(Func func, Callback callback, size_t n, ...) NLIB_NOEXCEPT;
142 
143 #ifdef NLIB_DOXYGEN
144  errno_t Submit(Func func, Callback callback);
145  errno_t Submit(Func func, Callback callback, void* arg0);
146  errno_t Submit(Func func, Callback callback, void* arg0, void* arg1);
147  errno_t Submit(Func func, Callback callback, void* arg0, void* arg1, void* arg2);
148  errno_t Submit(Func func, Callback callback, void* arg0, void* arg1, void* arg2, void* arg3);
149  errno_t Submit(Func func, Callback callback, void* arg0, void* arg1, void* arg2, void* arg3,
150  void* arg4);
151  errno_t Submit(Func func, Callback callback, void* arg0, void* arg1, void* arg2, void* arg3,
152  void* arg4, void* arg5);
153  errno_t Submit(Func func, Callback callback, void* arg0, void* arg1, void* arg2, void* arg3,
154  void* arg4, void* arg5, void* arg6);
155  errno_t Submit(Func func, Callback callback, void* arg0, void* arg1, void* arg2, void* arg3,
156  void* arg4, void* arg5, void* arg6, void* arg7);
157  errno_t Submit(Func func, Callback callback, void* arg0, void* arg1, void* arg2, void* arg3,
158  void* arg4, void* arg5, void* arg6, void* arg7, void* arg8);
159  errno_t Submit(Func func, Callback callback, void* arg0, void* arg1, void* arg2, void* arg3,
160  void* arg4, void* arg5, void* arg6, void* arg7, void* arg8, void* arg9);
161  size_t GetThreadCount();
162 #endif
163  errno_t Submit(Func func, Callback callback) NLIB_NOEXCEPT {
164  LockFreeThreadPoolExecInfo info;
165  info.func = func;
166  info.callback = callback;
167  // memset(&info.arg[0], 0, sizeof(info.arg[0]) * N);
168  return work_queue_.Enqueue(info);
169  }
170  template<class X>
171  errno_t Submit(X func, Callback callback,
172  typename EnableIf<N >= 1 && IsSame<X, Func>::value,
173  void*>::type arg0) NLIB_NOEXCEPT {
174  LockFreeThreadPoolExecInfo info;
175  info.func = func;
176  info.callback = callback;
177  info.arg[0] = arg0;
178  // memset(&info.arg[0] + 1, 0, sizeof(info.arg[0]) * (N - 1));
179  return work_queue_.Enqueue(info);
180  }
181  template<class X>
182  errno_t Submit(X func, Callback callback,
183  typename EnableIf<N >= 2 && IsSame<X, Func>::value,
184  void*>::type arg0,
185  void* arg1) NLIB_NOEXCEPT {
186  LockFreeThreadPoolExecInfo info;
187  info.func = func;
188  info.callback = callback;
189  info.arg[0] = arg0;
190  info.arg[1] = arg1;
191  // memset(&info.arg[0] + 2, 0, sizeof(info.arg[0]) * (N - 2));
192  return work_queue_.Enqueue(info);
193  }
194  template<class X>
195  errno_t Submit(X func, Callback callback,
196  typename EnableIf<N >= 3 && IsSame<X, Func>::value,
197  void*>::type arg0,
198  void* arg1, void* arg2) NLIB_NOEXCEPT {
199  LockFreeThreadPoolExecInfo info;
200  info.func = func;
201  info.callback = callback;
202  info.arg[0] = arg0;
203  info.arg[1] = arg1;
204  info.arg[2] = arg2;
205  // memset(&info.arg[0] + 3, 0, sizeof(info.arg[0]) * (N - 3));
206  return work_queue_.Enqueue(info);
207  }
208  template<class X>
209  errno_t Submit(X func, Callback callback,
210  typename EnableIf<N >= 4 && IsSame<X, Func>::value,
211  void*>::type arg0,
212  void* arg1, void* arg2, void* arg3) NLIB_NOEXCEPT {
213  LockFreeThreadPoolExecInfo info;
214  info.func = func;
215  info.callback = callback;
216  info.arg[0] = arg0;
217  info.arg[1] = arg1;
218  info.arg[2] = arg2;
219  info.arg[3] = arg3;
220  // memset(&info.arg[0] + 4, 0, sizeof(info.arg[0]) * (N - 4));
221  return work_queue_.Enqueue(info);
222  }
223  template<class X>
224  errno_t Submit(X func, Callback callback,
225  typename EnableIf<N >= 5 && IsSame<X, Func>::value,
226  void*>::type arg0,
227  void* arg1, void* arg2, void* arg3, void* arg4) NLIB_NOEXCEPT {
228  LockFreeThreadPoolExecInfo info;
229  info.func = func;
230  info.callback = callback;
231  info.arg[0] = arg0;
232  info.arg[1] = arg1;
233  info.arg[2] = arg2;
234  info.arg[3] = arg3;
235  info.arg[4] = arg4;
236  // memset(&info.arg[0] + 5, 0, sizeof(info.arg[0]) * (N - 5));
237  return work_queue_.Enqueue(info);
238  }
239  template<class X>
240  errno_t Submit(X func, Callback callback,
241  typename EnableIf<N >= 6 && IsSame<X, Func>::value,
242  void*>::type arg0,
243  void* arg1, void* arg2, void* arg3, void* arg4,
244  void* arg5) NLIB_NOEXCEPT {
245  LockFreeThreadPoolExecInfo info;
246  info.func = func;
247  info.callback = callback;
248  info.arg[0] = arg0;
249  info.arg[1] = arg1;
250  info.arg[2] = arg2;
251  info.arg[3] = arg3;
252  info.arg[4] = arg4;
253  info.arg[5] = arg5;
254  // memset(&info.arg[0] + 6, 0, sizeof(info.arg[0]) * (N - 6));
255  return work_queue_.Enqueue(info);
256  }
257  template<class X>
258  errno_t Submit(X func, Callback callback,
259  typename EnableIf<N >= 7 && IsSame<X, Func>::value,
260  void*>::type arg0,
261  void* arg1, void* arg2, void* arg3, void* arg4,
262  void* arg5, void* arg6) NLIB_NOEXCEPT {
263  LockFreeThreadPoolExecInfo info;
264  info.func = func;
265  info.callback = callback;
266  info.arg[0] = arg0;
267  info.arg[1] = arg1;
268  info.arg[2] = arg2;
269  info.arg[3] = arg3;
270  info.arg[4] = arg4;
271  info.arg[5] = arg5;
272  info.arg[6] = arg6;
273  // memset(&info.arg[0] + 7, 0, sizeof(info.arg[0]) * (N - 7));
274  return work_queue_.Enqueue(info);
275  }
276  template<class X>
277  errno_t Submit(X func, Callback callback,
278  typename EnableIf<N >= 8 && IsSame<X, Func>::value,
279  void*>::type arg0,
280  void* arg1, void* arg2, void* arg3, void* arg4,
281  void* arg5, void* arg6, void* arg7) NLIB_NOEXCEPT {
282  LockFreeThreadPoolExecInfo info;
283  info.func = func;
284  info.callback = callback;
285  info.arg[0] = arg0;
286  info.arg[1] = arg1;
287  info.arg[2] = arg2;
288  info.arg[3] = arg3;
289  info.arg[4] = arg4;
290  info.arg[5] = arg5;
291  info.arg[6] = arg6;
292  info.arg[7] = arg7;
293  // memset(&info.arg[0] + 8, 0, sizeof(info.arg[0]) * (N - 8));
294  return work_queue_.Enqueue(info);
295  }
296  template<class X>
297  errno_t Submit(X func, Callback callback,
298  typename EnableIf<N >= 9 && IsSame<X, Func>::value,
299  void*>::type arg0,
300  void* arg1, void* arg2, void* arg3, void* arg4,
301  void* arg5, void* arg6, void* arg7, void* arg8) NLIB_NOEXCEPT {
302  LockFreeThreadPoolExecInfo info;
303  info.func = func;
304  info.callback = callback;
305  info.arg[0] = arg0;
306  info.arg[1] = arg1;
307  info.arg[2] = arg2;
308  info.arg[3] = arg3;
309  info.arg[4] = arg4;
310  info.arg[5] = arg5;
311  info.arg[6] = arg6;
312  info.arg[7] = arg7;
313  info.arg[8] = arg8;
314  // memset(&info.arg[0] + 9, 0, sizeof(info.arg[0]) * (N - 9));
315  return work_queue_.Enqueue(info);
316  }
317  template<class X>
318  errno_t Submit(X func, Callback callback,
319  typename EnableIf<N >= 10 && IsSame<X, Func>::value,
320  void*>::type arg0,
321  void* arg1, void* arg2, void* arg3, void* arg4,
322  void* arg5, void* arg6, void* arg7, void* arg8, void* arg9) NLIB_NOEXCEPT {
323  LockFreeThreadPoolExecInfo info;
324  info.func = func;
325  info.callback = callback;
326  info.arg[0] = arg0;
327  info.arg[1] = arg1;
328  info.arg[2] = arg2;
329  info.arg[3] = arg3;
330  info.arg[4] = arg4;
331  info.arg[5] = arg5;
332  info.arg[6] = arg6;
333  info.arg[7] = arg7;
334  info.arg[8] = arg8;
335  info.arg[9] = arg9;
336  // memset(&info.arg[0] + 10, 0, sizeof(info.arg[0]) * (N - 10));
337  return work_queue_.Enqueue(info);
338  }
339 
340  private:
341  static void PrimaryWorkerThread(void* p) NLIB_NOEXCEPT;
342  static void WorkerThread(void* p) NLIB_NOEXCEPT;
343 
344  private:
345  struct LockFreeThreadPoolExecInfo {
346  Func func;
347  Callback callback;
348  void* arg[N];
349  };
351 
353 };
354 
355 template<size_t N>
356 errno_t LockFreeThreadPool<N>::Init(size_t work_queue_size, size_t thread_count,
357  const ThreadSettings& settings) NLIB_NOEXCEPT {
358  errno_t e = work_queue_.Init(work_queue_size);
359  if (e != 0) return e;
360  e = InitBase(thread_count, settings);
361  if (e != 0) {
363  q.SwapUnsafe(work_queue_);
364  return e;
365  }
366  return 0;
367 }
368 
369 template<size_t N>
370 errno_t LockFreeThreadPool<N>::SubmitVarArgs(Func func, Callback callback,
371  size_t n, ...) NLIB_NOEXCEPT {
372  if (n > N) {
373  NLIB_ASSERT(n <= N);
374  return EINVAL;
375  }
376 
377  va_list ap;
378  LockFreeThreadPoolExecInfo info;
379  info.func = func;
380  info.callback = callback;
381 
382  va_start(ap, n);
383  for (size_t i = 0; i < n; ++i) {
384  info.arg[i] = va_arg(ap, void*);
385  }
386  va_end(ap);
387  return work_queue_.Enqueue(info);
388 }
389 
390 template<size_t N>
392  LockFreeThreadPool* This = reinterpret_cast<LockFreeThreadPool*>(p);
393  LockFreeThreadPoolExecInfo info;
394 
395  for (;;) {
396  errno_t e = This->work_queue_.Dequeue(&info);
397  if (!This->PrimaryWorkerThread_(info.func, info.callback,
398  e == 0 ? &info.arg[0] : nullptr, N))
399  break;
400  }
401 }
402 
403 template<size_t N>
405  LockFreeThreadPool* This = reinterpret_cast<LockFreeThreadPool*>(p);
406  LockFreeThreadPoolExecInfo info;
407  uint32_t counter = 0;
408 
409  for (;;) {
410  errno_t e = This->work_queue_.Dequeue(&info);
411  if (!This->WorkerThread_(info.func, info.callback,
412  e == 0 ? &info.arg[0] : nullptr, N, &counter))
413  break;
414  }
415 }
416 
417 } // namespace threading
418 NLIB_NAMESPACE_END
419 
420 #endif // INCLUDE_NN_NLIB_THREADING_THREADPOOL_H_
Lock-free algorithms are supported.
#define NLIB_OVERRIDE
Defines override if it is available for use. If not, holds an empty string.
Definition: Config.h:244
#define NLIB_DISALLOW_COPY_AND_ASSIGN(TypeName)
Prohibits use of the copy constructor and assignment operator for the class specified by TypeName...
Definition: Config.h:179
ThreadPool() noexcept
Instantiates the object with default parameters (default constructor). Must be initialized with Init...
Definition: ThreadPool.h:53
errno_t Init(size_t thread_count) noexcept
Starts a thread and initializes the thread pool.
Definition: ThreadPool.h:55
~LockFreeThreadPool() noexcept
Destructor. Joins all threads started internally.
Definition: ThreadPool.h:138
LockFreeThreadPool() noexcept
Instantiates the object with default parameters (default constructor). Must be initialized with Init...
Definition: ThreadPool.h:136
#define NLIB_VIS_HIDDEN
Symbols for functions and classes are not made available outside of the library.
Definition: Platform_unix.h:88
#define NLIB_VIS_PUBLIC
Symbols for functions and classes are made available outside of the library.
Definition: Platform_unix.h:89
Class to wrap nlib_thread_attr. nlib_thread_attr_init() and nlib_thread_attr_destroy() are run automa...
Definition: Thread.h:29
errno_t GetFuture(Future< R > *p) noexcept
Sets Future to get the result of execution.
An empty structure indicating that an argument to a function needs to be moved.
Definition: Config.h:265
This thread pool is suitable for deploying fine grain tasks.
Definition: ThreadPool.h:126
errno_t Init() noexcept
Starts a thread and initializes the thread pool.
Definition: ThreadPool.h:59
void SwapUnsafe(LockFreeQueue &rhs) noexcept
Swaps an object. This is not thread-safe.
Definition: LockFree.h:564
#define NLIB_NOEXCEPT
Defines noexcept geared to the environment, or the equivalent.
Definition: Config.h:105
A container-like class similar to std::vector that can store objects that do not have copy constructo...
Definition: Nlist.h:32
Class that gets the output of a different thread executing in a thread safe manner. This class is similar to the std::shared_future class of C++11.
Definition: AsyncFileIo.h:26
errno_t Dequeue(DequeueType x) noexcept
Picks up an element from the queue and stores it in x. This is thread-safe.
Definition: LockFree.h:557
size_t GetHardwareConcurrency() noexcept
Returns the number of hardware threads.
Definition: Thread.h:814
Class that wraps a function to run in a different thread, and gets the return value in a thread safe ...
Definition: Future.h:1110
Implements the Future pattern for multithread programming.
#define NLIB_FINAL
Defines final if it is available for use. If not, holds an empty string.
Definition: Config.h:245
Tasks may be deployed to a pool of threads that have been already created.
Definition: ThreadPool.h:51
pthread_t nlib_thread
The identifier for threads.
errno_t Init(FUNC &func)
Performs initialization.
int errno_t
Indicates with an int-type typedef that a POSIX error value is returned as the return value...
Definition: NMalloc.h:37