nlib
ThreadPool.h
Go to the documentation of this file.
1 
2 /*--------------------------------------------------------------------------------*
3  Project: CrossRoad
4  Copyright (C)Nintendo All rights reserved.
5 
6  These coded instructions, statements, and computer programs contain proprietary
7  information of Nintendo and/or its licensed developers and are protected by
8  national and international copyright laws. They may not be disclosed to third
9  parties or copied or duplicated in any form, in whole or in part, without the
10  prior written consent of Nintendo.
11 
12  The content herein is highly confidential and should be handled accordingly.
13  *--------------------------------------------------------------------------------*/
14 
15 #pragma once
16 #ifndef INCLUDE_NN_NLIB_THREADING_THREADPOOL_H_
17 #define INCLUDE_NN_NLIB_THREADING_THREADPOOL_H_
18 
19 #include "nn/nlib/Nqueue.h"
21 #include "nn/nlib/LockFree.h"
22 
23 NLIB_NAMESPACE_BEGIN
24 namespace threading {
25 
26 namespace detail {
27 class NLIB_VIS_PUBLIC ThPoolTaskBase {
28  public:
29  ThPoolTaskBase() NLIB_NOEXCEPT {}
30  virtual ~ThPoolTaskBase() NLIB_NOEXCEPT {}
31  virtual void operator()() NLIB_NOEXCEPT = 0;
32 
33  private:
34  NLIB_DISALLOW_COPY_AND_ASSIGN(ThPoolTaskBase);
35 };
36 
37 template<class R>
38 class ThPoolTask : public ThPoolTaskBase {
39  public:
40  explicit ThPoolTask(PackagedTask<R()>& task, move_tag) NLIB_NOEXCEPT : task_(task, move_tag()) {
41  }
42  virtual ~ThPoolTask() NLIB_NOEXCEPT NLIB_OVERRIDE {}
43  virtual void operator()() NLIB_NOEXCEPT NLIB_OVERRIDE { task_(); }
44 
45  private:
46  PackagedTask<R()> task_;
48 };
49 } // namespace detail
50 
52  public:
53  ThreadPool() NLIB_NOEXCEPT : prv_(nullptr) {}
54  errno_t Init(size_t thread_count, const ThreadSettings& settings) NLIB_NOEXCEPT;
55  errno_t Init(size_t thread_count) NLIB_NOEXCEPT {
56  ThreadSettings settings;
57  return this->Init(thread_count, settings);
58  }
61  template<class R, class FUNC>
62  errno_t Submit(Future<R>* future, FUNC& func) NLIB_NOEXCEPT;
63  size_t Count() NLIB_NOEXCEPT;
64 
65  private:
66  NLIB_VIS_HIDDEN void JoinThreads(Nlist<nlib_thread>& L) NLIB_NOEXCEPT;
67  static NLIB_VIS_HIDDEN void WorkerThread(void* p);
68  errno_t Submit_(detail::ThPoolTaskBase* p) NLIB_NOEXCEPT;
69  struct ThreadPoolPrivate;
70  ThreadPoolPrivate* prv_;
72 };
73 
74 template<class R, class FUNC>
75 inline errno_t ThreadPool::Submit(Future<R>* future, FUNC& func) NLIB_NOEXCEPT {
76  if (!prv_) return EINVAL;
77  PackagedTask<R()> task;
78  errno_t e = task.Init(func);
79  if (e != 0) return e;
80  if (future) {
81  e = task.GetFuture(future);
82  if (e != 0) return e;
83  }
84  detail::ThPoolTaskBase* ptask = new (std::nothrow) detail::ThPoolTask<R>(task, move_tag());
85  if (!ptask) return ENOMEM;
86  return Submit_(ptask);
87 }
88 
89 namespace detail {
90 
91 class NLIB_VIS_PUBLIC LockFreeThreadPoolBase {
92  protected:
93  LockFreeThreadPoolBase(void (*primary_worker_thread)(void*), void (*worker_thread)(void*)) :
94  primary_worker_thread_(primary_worker_thread),
95  worker_thread_(worker_thread),
96  status_(0),
97  thread_count_(0),
98  thread_list_(nullptr) {}
99  ~LockFreeThreadPoolBase() NLIB_NOEXCEPT;
100  typedef void* (*Func)(void**, size_t)NLIB_NOEXCEPT_FUNCPTR;
101  typedef void (*Callback)(void*) NLIB_NOEXCEPT_FUNCPTR;
102  errno_t InitBase(size_t thread_count, const ThreadSettings& settings) NLIB_NOEXCEPT;
103  bool PrimaryWorkerThread_(Func func, Callback callback, void** args, size_t n) NLIB_NOEXCEPT;
104  bool WorkerThread_(Func func, Callback callback, void** args, size_t n,
105  uint32_t* counter) NLIB_NOEXCEPT;
106  void JoinThreads() NLIB_NOEXCEPT;
107 
108  public:
109  size_t GetThreadCount() NLIB_NOEXCEPT { return thread_count_; }
110 
111  private:
112  NLIB_VIS_HIDDEN void JoinThreads(nlib_thread* L, size_t cnt) NLIB_NOEXCEPT;
113  void (*primary_worker_thread_)(void*);
114  void (*worker_thread_)(void*);
115  int32_t status_;
116  uint32_t thread_count_;
117  nlib_thread* thread_list_;
118 
119  NLIB_DISALLOW_COPY_AND_ASSIGN(LockFreeThreadPoolBase);
120 };
121 
122 } // namespace detail
123 
124 template<size_t N = 5>
125 class LockFreeThreadPool NLIB_FINAL : public NLIB_NS::threading::detail::LockFreeThreadPoolBase {
126  public:
127 #ifdef NLIB_DOXYGEN
128  typedef void* (*Func)(void** args, size_t n);
129  typedef void (*Callback)(void* result);
130 #else
131  typedef NLIB_NS::threading::detail::LockFreeThreadPoolBase::Func Func;
132  typedef NLIB_NS::threading::detail::LockFreeThreadPoolBase::Callback Callback;
133 #endif
134 
135  LockFreeThreadPool() NLIB_NOEXCEPT : LockFreeThreadPoolBase(PrimaryWorkerThread, WorkerThread) {
136  }
137  ~LockFreeThreadPool() NLIB_NOEXCEPT { this->JoinThreads(); }
138  errno_t
139  Init(size_t work_queue_size, size_t thread_count, const ThreadSettings& settings) NLIB_NOEXCEPT;
140  errno_t SubmitVarArgs(Func func, Callback callback, size_t n, ...) NLIB_NOEXCEPT;
141 
142 #ifdef NLIB_DOXYGEN
143  errno_t Submit(Func func, Callback callback);
144  errno_t Submit(Func func, Callback callback, void* arg0);
145  errno_t Submit(Func func, Callback callback, void* arg0, void* arg1);
146  errno_t Submit(Func func, Callback callback, void* arg0, void* arg1, void* arg2);
147  errno_t Submit(Func func, Callback callback, void* arg0, void* arg1, void* arg2, void* arg3);
148  errno_t Submit(Func func, Callback callback, void* arg0, void* arg1, void* arg2, void* arg3,
149  void* arg4);
150  errno_t Submit(Func func, Callback callback, void* arg0, void* arg1, void* arg2, void* arg3,
151  void* arg4, void* arg5);
152  errno_t Submit(Func func, Callback callback, void* arg0, void* arg1, void* arg2, void* arg3,
153  void* arg4, void* arg5, void* arg6);
154  errno_t Submit(Func func, Callback callback, void* arg0, void* arg1, void* arg2, void* arg3,
155  void* arg4, void* arg5, void* arg6, void* arg7);
156  errno_t Submit(Func func, Callback callback, void* arg0, void* arg1, void* arg2, void* arg3,
157  void* arg4, void* arg5, void* arg6, void* arg7, void* arg8);
158  errno_t Submit(Func func, Callback callback, void* arg0, void* arg1, void* arg2, void* arg3,
159  void* arg4, void* arg5, void* arg6, void* arg7, void* arg8, void* arg9);
160  size_t GetThreadCount();
161 #endif
162  errno_t Submit(Func func, Callback callback) NLIB_NOEXCEPT {
163  LockFreeThreadPoolExecInfo info;
164  info.func = func;
165  info.callback = callback;
166  // memset(&info.arg[0], 0, sizeof(info.arg[0]) * N);
167  return work_queue_.Enqueue(info);
168  }
169  template<class X>
170  errno_t
171  Submit(X func, Callback callback,
172  typename EnableIf<N >= 1 && IsSame<X, Func>::value, void*>::type arg0) NLIB_NOEXCEPT {
173  LockFreeThreadPoolExecInfo info;
174  info.func = func;
175  info.callback = callback;
176  info.arg[0] = arg0;
177  // memset(&info.arg[0] + 1, 0, sizeof(info.arg[0]) * (N - 1));
178  return work_queue_.Enqueue(info);
179  }
180  template<class X>
181  errno_t Submit(X func, Callback callback,
182  typename EnableIf<N >= 2 && IsSame<X, Func>::value, void*>::type arg0,
183  void* arg1) NLIB_NOEXCEPT {
184  LockFreeThreadPoolExecInfo info;
185  info.func = func;
186  info.callback = callback;
187  info.arg[0] = arg0;
188  info.arg[1] = arg1;
189  // memset(&info.arg[0] + 2, 0, sizeof(info.arg[0]) * (N - 2));
190  return work_queue_.Enqueue(info);
191  }
192  template<class X>
193  errno_t Submit(X func, Callback callback,
194  typename EnableIf<N >= 3 && IsSame<X, Func>::value, void*>::type arg0,
195  void* arg1, void* arg2) NLIB_NOEXCEPT {
196  LockFreeThreadPoolExecInfo info;
197  info.func = func;
198  info.callback = callback;
199  info.arg[0] = arg0;
200  info.arg[1] = arg1;
201  info.arg[2] = arg2;
202  // memset(&info.arg[0] + 3, 0, sizeof(info.arg[0]) * (N - 3));
203  return work_queue_.Enqueue(info);
204  }
205  template<class X>
206  errno_t Submit(X func, Callback callback,
207  typename EnableIf<N >= 4 && IsSame<X, Func>::value, void*>::type arg0,
208  void* arg1, void* arg2, void* arg3) NLIB_NOEXCEPT {
209  LockFreeThreadPoolExecInfo info;
210  info.func = func;
211  info.callback = callback;
212  info.arg[0] = arg0;
213  info.arg[1] = arg1;
214  info.arg[2] = arg2;
215  info.arg[3] = arg3;
216  // memset(&info.arg[0] + 4, 0, sizeof(info.arg[0]) * (N - 4));
217  return work_queue_.Enqueue(info);
218  }
219  template<class X>
220  errno_t Submit(X func, Callback callback,
221  typename EnableIf<N >= 5 && IsSame<X, Func>::value, void*>::type arg0,
222  void* arg1, void* arg2, void* arg3, void* arg4) NLIB_NOEXCEPT {
223  LockFreeThreadPoolExecInfo info;
224  info.func = func;
225  info.callback = callback;
226  info.arg[0] = arg0;
227  info.arg[1] = arg1;
228  info.arg[2] = arg2;
229  info.arg[3] = arg3;
230  info.arg[4] = arg4;
231  // memset(&info.arg[0] + 5, 0, sizeof(info.arg[0]) * (N - 5));
232  return work_queue_.Enqueue(info);
233  }
234  template<class X>
235  errno_t Submit(X func, Callback callback,
236  typename EnableIf<N >= 6 && IsSame<X, Func>::value, void*>::type arg0,
237  void* arg1, void* arg2, void* arg3, void* arg4, void* arg5) NLIB_NOEXCEPT {
238  LockFreeThreadPoolExecInfo info;
239  info.func = func;
240  info.callback = callback;
241  info.arg[0] = arg0;
242  info.arg[1] = arg1;
243  info.arg[2] = arg2;
244  info.arg[3] = arg3;
245  info.arg[4] = arg4;
246  info.arg[5] = arg5;
247  // memset(&info.arg[0] + 6, 0, sizeof(info.arg[0]) * (N - 6));
248  return work_queue_.Enqueue(info);
249  }
250  template<class X>
251  errno_t
252  Submit(X func, Callback callback,
253  typename EnableIf<N >= 7 && IsSame<X, Func>::value, void*>::type arg0, void* arg1,
254  void* arg2, void* arg3, void* arg4, void* arg5, void* arg6) NLIB_NOEXCEPT {
255  LockFreeThreadPoolExecInfo info;
256  info.func = func;
257  info.callback = callback;
258  info.arg[0] = arg0;
259  info.arg[1] = arg1;
260  info.arg[2] = arg2;
261  info.arg[3] = arg3;
262  info.arg[4] = arg4;
263  info.arg[5] = arg5;
264  info.arg[6] = arg6;
265  // memset(&info.arg[0] + 7, 0, sizeof(info.arg[0]) * (N - 7));
266  return work_queue_.Enqueue(info);
267  }
268  template<class X>
269  errno_t
270  Submit(X func, Callback callback,
271  typename EnableIf<N >= 8 && IsSame<X, Func>::value, void*>::type arg0, void* arg1,
272  void* arg2, void* arg3, void* arg4, void* arg5, void* arg6, void* arg7) NLIB_NOEXCEPT {
273  LockFreeThreadPoolExecInfo info;
274  info.func = func;
275  info.callback = callback;
276  info.arg[0] = arg0;
277  info.arg[1] = arg1;
278  info.arg[2] = arg2;
279  info.arg[3] = arg3;
280  info.arg[4] = arg4;
281  info.arg[5] = arg5;
282  info.arg[6] = arg6;
283  info.arg[7] = arg7;
284  // memset(&info.arg[0] + 8, 0, sizeof(info.arg[0]) * (N - 8));
285  return work_queue_.Enqueue(info);
286  }
287  template<class X>
288  errno_t Submit(X func, Callback callback,
289  typename EnableIf<N >= 9 && IsSame<X, Func>::value, void*>::type arg0,
290  void* arg1, void* arg2, void* arg3, void* arg4, void* arg5, void* arg6,
291  void* arg7, void* arg8) NLIB_NOEXCEPT {
292  LockFreeThreadPoolExecInfo info;
293  info.func = func;
294  info.callback = callback;
295  info.arg[0] = arg0;
296  info.arg[1] = arg1;
297  info.arg[2] = arg2;
298  info.arg[3] = arg3;
299  info.arg[4] = arg4;
300  info.arg[5] = arg5;
301  info.arg[6] = arg6;
302  info.arg[7] = arg7;
303  info.arg[8] = arg8;
304  // memset(&info.arg[0] + 9, 0, sizeof(info.arg[0]) * (N - 9));
305  return work_queue_.Enqueue(info);
306  }
307  template<class X>
308  errno_t Submit(X func, Callback callback,
309  typename EnableIf<N >= 10 && IsSame<X, Func>::value, void*>::type arg0,
310  void* arg1, void* arg2, void* arg3, void* arg4, void* arg5, void* arg6,
311  void* arg7, void* arg8, void* arg9) NLIB_NOEXCEPT {
312  LockFreeThreadPoolExecInfo info;
313  info.func = func;
314  info.callback = callback;
315  info.arg[0] = arg0;
316  info.arg[1] = arg1;
317  info.arg[2] = arg2;
318  info.arg[3] = arg3;
319  info.arg[4] = arg4;
320  info.arg[5] = arg5;
321  info.arg[6] = arg6;
322  info.arg[7] = arg7;
323  info.arg[8] = arg8;
324  info.arg[9] = arg9;
325  // memset(&info.arg[0] + 10, 0, sizeof(info.arg[0]) * (N - 10));
326  return work_queue_.Enqueue(info);
327  }
328 
329  private:
330  static void PrimaryWorkerThread(void* p) NLIB_NOEXCEPT;
331  static void WorkerThread(void* p) NLIB_NOEXCEPT;
332 
333  private:
334  struct LockFreeThreadPoolExecInfo {
335  Func func;
336  Callback callback;
337  void* arg[N];
338  };
339  LockFreeQueue<LockFreeThreadPoolExecInfo> work_queue_;
340 
341  NLIB_DISALLOW_COPY_AND_ASSIGN(LockFreeThreadPool);
342 };
343 
344 template<size_t N>
345 errno_t LockFreeThreadPool<N>::Init(size_t work_queue_size, size_t thread_count,
346  const ThreadSettings& settings) NLIB_NOEXCEPT {
347  errno_t e = work_queue_.Init(work_queue_size);
348  if (e != 0) return e;
349  e = InitBase(thread_count, settings);
350  if (e != 0) {
352  q.SwapUnsafe(work_queue_);
353  return e;
354  }
355  return 0;
356 }
357 
358 template<size_t N>
359 errno_t
360 LockFreeThreadPool<N>::SubmitVarArgs(Func func, Callback callback, size_t n, ...) NLIB_NOEXCEPT {
361  if (n > N) {
362  NLIB_ASSERT(n <= N);
363  return EINVAL;
364  }
365 
366  va_list ap;
367  LockFreeThreadPoolExecInfo info;
368  info.func = func;
369  info.callback = callback;
370 
371  va_start(ap, n);
372  for (size_t i = 0; i < n; ++i) {
373  info.arg[i] = va_arg(ap, void*);
374  }
375  va_end(ap);
376  return work_queue_.Enqueue(info);
377 }
378 
379 template<size_t N>
381  LockFreeThreadPool* This = reinterpret_cast<LockFreeThreadPool*>(p);
382  LockFreeThreadPoolExecInfo info;
383 
384  for (;;) {
385  errno_t e = This->work_queue_.Dequeue(&info);
386  if (!This->PrimaryWorkerThread_(info.func, info.callback, e == 0 ? &info.arg[0] : nullptr,
387  N))
388  break;
389  }
390 }
391 
392 template<size_t N>
393 void LockFreeThreadPool<N>::WorkerThread(void* p) NLIB_NOEXCEPT {
394  LockFreeThreadPool* This = reinterpret_cast<LockFreeThreadPool*>(p);
395  LockFreeThreadPoolExecInfo info;
396  uint32_t counter = 0;
397 
398  for (;;) {
399  errno_t e = This->work_queue_.Dequeue(&info);
400  if (!This->WorkerThread_(info.func, info.callback, e == 0 ? &info.arg[0] : nullptr, N,
401  &counter))
402  break;
403  }
404 }
405 
406 } // namespace threading
407 NLIB_NAMESPACE_END
408 
409 #endif // INCLUDE_NN_NLIB_THREADING_THREADPOOL_H_
Lock-free algorithms are supported.
#define NLIB_OVERRIDE
Defines override if it is available for use. If not, holds an empty string.
Definition: Config.h:249
#define NLIB_DISALLOW_COPY_AND_ASSIGN(TypeName)
Prohibits use of the copy constructor and assignment operator for the class specified by TypeName...
Definition: Config.h:183
ThreadPool() noexcept
Instantiates the object with default parameters (default constructor). Must be initialized with Init...
Definition: ThreadPool.h:53
errno_t Init(size_t thread_count) noexcept
Starts a thread and initializes the thread pool.
Definition: ThreadPool.h:55
~LockFreeThreadPool() noexcept
Destructor. Joins all threads started internally.
Definition: ThreadPool.h:137
LockFreeThreadPool() noexcept
Instantiates the object with default parameters (default constructor). Must be initialized with Init...
Definition: ThreadPool.h:135
#define NLIB_VIS_HIDDEN
Symbols for functions and classes are not made available outside of the library.
Definition: Platform_unix.h:86
#define NLIB_VIS_PUBLIC
Symbols for functions and classes are made available outside of the library.
Definition: Platform_unix.h:87
Class to wrap nlib_thread_attr. nlib_thread_attr_init() and nlib_thread_attr_destroy() are run automa...
Definition: Thread.h:29
errno_t GetFuture(Future< R > *p) noexcept
Sets Future to get the result of execution.
An empty structure indicating that an argument to a function needs to be moved.
Definition: Config.h:270
This thread pool is suitable for deploying fine grain tasks.
Definition: ThreadPool.h:125
errno_t Init() noexcept
Starts a thread and initializes the thread pool.
Definition: ThreadPool.h:59
void SwapUnsafe(LockFreeQueue &rhs) noexcept
Swaps an object. This is not thread-safe.
Definition: LockFree.h:536
#define NLIB_NOEXCEPT
Defines noexcept geared to the environment, or the equivalent.
Definition: Config.h:109
A container-like class similar to std::vector that can store objects that do not have copy constructo...
Definition: Nlist.h:32
Class that gets the output of a different thread executing in a thread safe manner. This class is similar to the std::shared_future class of C++11.
Definition: AsyncFileIo.h:26
errno_t Dequeue(DequeueType x) noexcept
Picks up an element from the queue and stores it in x. This is thread-safe.
Definition: LockFree.h:529
size_t GetHardwareConcurrency() noexcept
Returns the number of hardware threads.
Definition: Thread.h:807
Class that wraps a function to run in a different thread, and gets the return value in a thread safe ...
Definition: Future.h:1083
Implements the Future pattern for multithread programming.
#define NLIB_FINAL
Defines final if it is available for use. If not, holds an empty string.
Definition: Config.h:250
Tasks may be deployed to a pool of threads that have been already created.
Definition: ThreadPool.h:51
pthread_t nlib_thread
The identifier for threads.
errno_t Init(FUNC &func)
Performs initialization.
int errno_t
Indicates with an int-type typedef that a POSIX error value is returned as the return value...
Definition: NMalloc.h:37