nlib
ThreadPool.h
[詳解]
1 
2 /*--------------------------------------------------------------------------------*
3  Project: CrossRoad
4  Copyright (C)Nintendo All rights reserved.
5 
6  These coded instructions, statements, and computer programs contain proprietary
7  information of Nintendo and/or its licensed developers and are protected by
8  national and international copyright laws. They may not be disclosed to third
9  parties or copied or duplicated in any form, in whole or in part, without the
10  prior written consent of Nintendo.
11 
12  The content herein is highly confidential and should be handled accordingly.
13  *--------------------------------------------------------------------------------*/
14 
15 #pragma once
16 #ifndef INCLUDE_NN_NLIB_THREADING_THREADPOOL_H_
17 #define INCLUDE_NN_NLIB_THREADING_THREADPOOL_H_
18 
19 #include "nn/nlib/Nqueue.h"
21 #include "nn/nlib/LockFree.h"
22 
23 NLIB_NAMESPACE_BEGIN
24 namespace threading {
25 
26 namespace detail {
27 class NLIB_VIS_PUBLIC ThPoolTaskBase {
28  public:
29  ThPoolTaskBase() NLIB_NOEXCEPT {}
30  virtual ~ThPoolTaskBase() NLIB_NOEXCEPT {}
31  virtual void operator()() NLIB_NOEXCEPT = 0;
32 
33  private:
34  NLIB_DISALLOW_COPY_AND_ASSIGN(ThPoolTaskBase);
35 };
36 
37 template<class R>
38 class ThPoolTask : public ThPoolTaskBase {
39  public:
40  explicit ThPoolTask(PackagedTask<R()>& task, move_tag) NLIB_NOEXCEPT : task_(task, move_tag()) {
41  }
42  virtual ~ThPoolTask() NLIB_NOEXCEPT NLIB_OVERRIDE {}
43  virtual void operator()() NLIB_NOEXCEPT NLIB_OVERRIDE { task_(); }
44 
45  private:
46  PackagedTask<R()> task_;
48 };
49 } // namespace detail
50 
52  public:
53  ThreadPool() NLIB_NOEXCEPT : prv_(nullptr) {}
54  errno_t Init(size_t thread_count, const ThreadSettings& settings) NLIB_NOEXCEPT;
55  errno_t Init(size_t thread_count) NLIB_NOEXCEPT {
56  ThreadSettings settings;
57  return this->Init(thread_count, settings);
58  }
61  template<class R, class FUNC>
62  errno_t Submit(Future<R>* future, FUNC& func) NLIB_NOEXCEPT;
63  size_t Count() NLIB_NOEXCEPT;
64 
65  private:
66  NLIB_VIS_HIDDEN void JoinThreads(Nlist<nlib_thread>& L) NLIB_NOEXCEPT;
67  static NLIB_VIS_HIDDEN void WorkerThread(void* p);
68  errno_t Submit_(detail::ThPoolTaskBase* p) NLIB_NOEXCEPT;
69  struct ThreadPoolPrivate;
70  ThreadPoolPrivate* prv_;
72 };
73 
74 template<class R, class FUNC>
75 inline errno_t ThreadPool::Submit(Future<R>* future, FUNC& func) NLIB_NOEXCEPT {
76  if (!prv_) return EINVAL;
77  PackagedTask<R()> task;
78  errno_t e = task.Init(func);
79  if (e != 0) return e;
80  if (future) {
81  e = task.GetFuture(future);
82  if (e != 0) return e;
83  }
84  detail::ThPoolTaskBase* ptask = new (std::nothrow) detail::ThPoolTask<R>(task, move_tag());
85  if (!ptask) return ENOMEM;
86  return Submit_(ptask);
87 }
88 
89 namespace detail {
90 
91 class NLIB_VIS_PUBLIC LockFreeThreadPoolBase {
92  protected:
93  LockFreeThreadPoolBase(void (*primary_worker_thread)(void*), void (*worker_thread)(void*)) :
94  primary_worker_thread_(primary_worker_thread),
95  worker_thread_(worker_thread),
96  status_(0),
97  thread_count_(0),
98  thread_list_(nullptr) {}
99  ~LockFreeThreadPoolBase() NLIB_NOEXCEPT;
100  typedef void* (*Func)(void**, size_t)NLIB_NOEXCEPT_FUNCPTR;
101  typedef void (*Callback)(void*) NLIB_NOEXCEPT_FUNCPTR;
102  errno_t InitBase(size_t thread_count, const ThreadSettings& settings) NLIB_NOEXCEPT;
103  bool PrimaryWorkerThread_(Func func, Callback callback, void** args, size_t n) NLIB_NOEXCEPT;
104  bool WorkerThread_(Func func, Callback callback, void** args, size_t n,
105  uint32_t* counter) NLIB_NOEXCEPT;
106  void JoinThreads() NLIB_NOEXCEPT;
107 
108  public:
109  size_t GetThreadCount() NLIB_NOEXCEPT { return thread_count_; }
110 
111  private:
112  NLIB_VIS_HIDDEN void JoinThreads(nlib_thread* L, size_t cnt) NLIB_NOEXCEPT;
113  void (*primary_worker_thread_)(void*);
114  void (*worker_thread_)(void*);
115  int32_t status_;
116  uint32_t thread_count_;
117  nlib_thread* thread_list_;
118 
119  NLIB_DISALLOW_COPY_AND_ASSIGN(LockFreeThreadPoolBase);
120 };
121 
122 } // namespace detail
123 
124 template<size_t N = 5>
125 class LockFreeThreadPool NLIB_FINAL : public NLIB_NS::threading::detail::LockFreeThreadPoolBase {
126  public:
127 #ifdef NLIB_DOXYGEN
128  typedef void* (*Func)(void** args, size_t n);
129  typedef void (*Callback)(void* result);
130 #else
131  typedef NLIB_NS::threading::detail::LockFreeThreadPoolBase::Func Func;
132  typedef NLIB_NS::threading::detail::LockFreeThreadPoolBase::Callback Callback;
133 #endif
134 
135  LockFreeThreadPool() NLIB_NOEXCEPT : LockFreeThreadPoolBase(PrimaryWorkerThread, WorkerThread) {
136  }
137  ~LockFreeThreadPool() NLIB_NOEXCEPT { this->JoinThreads(); }
138  errno_t
139  Init(size_t work_queue_size, size_t thread_count, const ThreadSettings& settings) NLIB_NOEXCEPT;
140  errno_t SubmitVarArgs(Func func, Callback callback, size_t n, ...) NLIB_NOEXCEPT;
141 
142 #ifdef NLIB_DOXYGEN
143  errno_t Submit(Func func, Callback callback);
144  errno_t Submit(Func func, Callback callback, void* arg0);
145  errno_t Submit(Func func, Callback callback, void* arg0, void* arg1);
146  errno_t Submit(Func func, Callback callback, void* arg0, void* arg1, void* arg2);
147  errno_t Submit(Func func, Callback callback, void* arg0, void* arg1, void* arg2, void* arg3);
148  errno_t Submit(Func func, Callback callback, void* arg0, void* arg1, void* arg2, void* arg3,
149  void* arg4);
150  errno_t Submit(Func func, Callback callback, void* arg0, void* arg1, void* arg2, void* arg3,
151  void* arg4, void* arg5);
152  errno_t Submit(Func func, Callback callback, void* arg0, void* arg1, void* arg2, void* arg3,
153  void* arg4, void* arg5, void* arg6);
154  errno_t Submit(Func func, Callback callback, void* arg0, void* arg1, void* arg2, void* arg3,
155  void* arg4, void* arg5, void* arg6, void* arg7);
156  errno_t Submit(Func func, Callback callback, void* arg0, void* arg1, void* arg2, void* arg3,
157  void* arg4, void* arg5, void* arg6, void* arg7, void* arg8);
158  errno_t Submit(Func func, Callback callback, void* arg0, void* arg1, void* arg2, void* arg3,
159  void* arg4, void* arg5, void* arg6, void* arg7, void* arg8, void* arg9);
160  size_t GetThreadCount();
161 #endif
162  errno_t Submit(Func func, Callback callback) NLIB_NOEXCEPT {
163  LockFreeThreadPoolExecInfo info;
164  info.func = func;
165  info.callback = callback;
166  // memset(&info.arg[0], 0, sizeof(info.arg[0]) * N);
167  return work_queue_.Enqueue(info);
168  }
169  template<class X>
170  errno_t
171  Submit(X func, Callback callback,
172  typename EnableIf<N >= 1 && IsSame<X, Func>::value, void*>::type arg0) NLIB_NOEXCEPT {
173  LockFreeThreadPoolExecInfo info;
174  info.func = func;
175  info.callback = callback;
176  info.arg[0] = arg0;
177  // memset(&info.arg[0] + 1, 0, sizeof(info.arg[0]) * (N - 1));
178  return work_queue_.Enqueue(info);
179  }
180  template<class X>
181  errno_t Submit(X func, Callback callback,
182  typename EnableIf<N >= 2 && IsSame<X, Func>::value, void*>::type arg0,
183  void* arg1) NLIB_NOEXCEPT {
184  LockFreeThreadPoolExecInfo info;
185  info.func = func;
186  info.callback = callback;
187  info.arg[0] = arg0;
188  info.arg[1] = arg1;
189  // memset(&info.arg[0] + 2, 0, sizeof(info.arg[0]) * (N - 2));
190  return work_queue_.Enqueue(info);
191  }
192  template<class X>
193  errno_t Submit(X func, Callback callback,
194  typename EnableIf<N >= 3 && IsSame<X, Func>::value, void*>::type arg0,
195  void* arg1, void* arg2) NLIB_NOEXCEPT {
196  LockFreeThreadPoolExecInfo info;
197  info.func = func;
198  info.callback = callback;
199  info.arg[0] = arg0;
200  info.arg[1] = arg1;
201  info.arg[2] = arg2;
202  // memset(&info.arg[0] + 3, 0, sizeof(info.arg[0]) * (N - 3));
203  return work_queue_.Enqueue(info);
204  }
205  template<class X>
206  errno_t Submit(X func, Callback callback,
207  typename EnableIf<N >= 4 && IsSame<X, Func>::value, void*>::type arg0,
208  void* arg1, void* arg2, void* arg3) NLIB_NOEXCEPT {
209  LockFreeThreadPoolExecInfo info;
210  info.func = func;
211  info.callback = callback;
212  info.arg[0] = arg0;
213  info.arg[1] = arg1;
214  info.arg[2] = arg2;
215  info.arg[3] = arg3;
216  // memset(&info.arg[0] + 4, 0, sizeof(info.arg[0]) * (N - 4));
217  return work_queue_.Enqueue(info);
218  }
219  template<class X>
220  errno_t Submit(X func, Callback callback,
221  typename EnableIf<N >= 5 && IsSame<X, Func>::value, void*>::type arg0,
222  void* arg1, void* arg2, void* arg3, void* arg4) NLIB_NOEXCEPT {
223  LockFreeThreadPoolExecInfo info;
224  info.func = func;
225  info.callback = callback;
226  info.arg[0] = arg0;
227  info.arg[1] = arg1;
228  info.arg[2] = arg2;
229  info.arg[3] = arg3;
230  info.arg[4] = arg4;
231  // memset(&info.arg[0] + 5, 0, sizeof(info.arg[0]) * (N - 5));
232  return work_queue_.Enqueue(info);
233  }
234  template<class X>
235  errno_t Submit(X func, Callback callback,
236  typename EnableIf<N >= 6 && IsSame<X, Func>::value, void*>::type arg0,
237  void* arg1, void* arg2, void* arg3, void* arg4, void* arg5) NLIB_NOEXCEPT {
238  LockFreeThreadPoolExecInfo info;
239  info.func = func;
240  info.callback = callback;
241  info.arg[0] = arg0;
242  info.arg[1] = arg1;
243  info.arg[2] = arg2;
244  info.arg[3] = arg3;
245  info.arg[4] = arg4;
246  info.arg[5] = arg5;
247  // memset(&info.arg[0] + 6, 0, sizeof(info.arg[0]) * (N - 6));
248  return work_queue_.Enqueue(info);
249  }
250  template<class X>
251  errno_t
252  Submit(X func, Callback callback,
253  typename EnableIf<N >= 7 && IsSame<X, Func>::value, void*>::type arg0, void* arg1,
254  void* arg2, void* arg3, void* arg4, void* arg5, void* arg6) NLIB_NOEXCEPT {
255  LockFreeThreadPoolExecInfo info;
256  info.func = func;
257  info.callback = callback;
258  info.arg[0] = arg0;
259  info.arg[1] = arg1;
260  info.arg[2] = arg2;
261  info.arg[3] = arg3;
262  info.arg[4] = arg4;
263  info.arg[5] = arg5;
264  info.arg[6] = arg6;
265  // memset(&info.arg[0] + 7, 0, sizeof(info.arg[0]) * (N - 7));
266  return work_queue_.Enqueue(info);
267  }
268  template<class X>
269  errno_t
270  Submit(X func, Callback callback,
271  typename EnableIf<N >= 8 && IsSame<X, Func>::value, void*>::type arg0, void* arg1,
272  void* arg2, void* arg3, void* arg4, void* arg5, void* arg6, void* arg7) NLIB_NOEXCEPT {
273  LockFreeThreadPoolExecInfo info;
274  info.func = func;
275  info.callback = callback;
276  info.arg[0] = arg0;
277  info.arg[1] = arg1;
278  info.arg[2] = arg2;
279  info.arg[3] = arg3;
280  info.arg[4] = arg4;
281  info.arg[5] = arg5;
282  info.arg[6] = arg6;
283  info.arg[7] = arg7;
284  // memset(&info.arg[0] + 8, 0, sizeof(info.arg[0]) * (N - 8));
285  return work_queue_.Enqueue(info);
286  }
287  template<class X>
288  errno_t Submit(X func, Callback callback,
289  typename EnableIf<N >= 9 && IsSame<X, Func>::value, void*>::type arg0,
290  void* arg1, void* arg2, void* arg3, void* arg4, void* arg5, void* arg6,
291  void* arg7, void* arg8) NLIB_NOEXCEPT {
292  LockFreeThreadPoolExecInfo info;
293  info.func = func;
294  info.callback = callback;
295  info.arg[0] = arg0;
296  info.arg[1] = arg1;
297  info.arg[2] = arg2;
298  info.arg[3] = arg3;
299  info.arg[4] = arg4;
300  info.arg[5] = arg5;
301  info.arg[6] = arg6;
302  info.arg[7] = arg7;
303  info.arg[8] = arg8;
304  // memset(&info.arg[0] + 9, 0, sizeof(info.arg[0]) * (N - 9));
305  return work_queue_.Enqueue(info);
306  }
307  template<class X>
308  errno_t Submit(X func, Callback callback,
309  typename EnableIf<N >= 10 && IsSame<X, Func>::value, void*>::type arg0,
310  void* arg1, void* arg2, void* arg3, void* arg4, void* arg5, void* arg6,
311  void* arg7, void* arg8, void* arg9) NLIB_NOEXCEPT {
312  LockFreeThreadPoolExecInfo info;
313  info.func = func;
314  info.callback = callback;
315  info.arg[0] = arg0;
316  info.arg[1] = arg1;
317  info.arg[2] = arg2;
318  info.arg[3] = arg3;
319  info.arg[4] = arg4;
320  info.arg[5] = arg5;
321  info.arg[6] = arg6;
322  info.arg[7] = arg7;
323  info.arg[8] = arg8;
324  info.arg[9] = arg9;
325  // memset(&info.arg[0] + 10, 0, sizeof(info.arg[0]) * (N - 10));
326  return work_queue_.Enqueue(info);
327  }
328 
329  private:
330  static void PrimaryWorkerThread(void* p) NLIB_NOEXCEPT;
331  static void WorkerThread(void* p) NLIB_NOEXCEPT;
332 
333  private:
334  struct LockFreeThreadPoolExecInfo {
335  Func func;
336  Callback callback;
337  void* arg[N];
338  };
339  LockFreeQueue<LockFreeThreadPoolExecInfo> work_queue_;
340 
341  NLIB_DISALLOW_COPY_AND_ASSIGN(LockFreeThreadPool);
342 };
343 
344 template<size_t N>
345 errno_t LockFreeThreadPool<N>::Init(size_t work_queue_size, size_t thread_count,
346  const ThreadSettings& settings) NLIB_NOEXCEPT {
347  errno_t e = work_queue_.Init(work_queue_size);
348  if (e != 0) return e;
349  e = InitBase(thread_count, settings);
350  if (e != 0) {
352  q.SwapUnsafe(work_queue_);
353  return e;
354  }
355  return 0;
356 }
357 
358 template<size_t N>
359 errno_t
360 LockFreeThreadPool<N>::SubmitVarArgs(Func func, Callback callback, size_t n, ...) NLIB_NOEXCEPT {
361  if (n > N) {
362  NLIB_ASSERT(n <= N);
363  return EINVAL;
364  }
365 
366  va_list ap;
367  LockFreeThreadPoolExecInfo info;
368  info.func = func;
369  info.callback = callback;
370 
371  va_start(ap, n);
372  for (size_t i = 0; i < n; ++i) {
373  info.arg[i] = va_arg(ap, void*);
374  }
375  va_end(ap);
376  return work_queue_.Enqueue(info);
377 }
378 
379 template<size_t N>
381  LockFreeThreadPool* This = reinterpret_cast<LockFreeThreadPool*>(p);
382  LockFreeThreadPoolExecInfo info;
383 
384  for (;;) {
385  errno_t e = This->work_queue_.Dequeue(&info);
386  if (!This->PrimaryWorkerThread_(info.func, info.callback, e == 0 ? &info.arg[0] : nullptr,
387  N))
388  break;
389  }
390 }
391 
392 template<size_t N>
393 void LockFreeThreadPool<N>::WorkerThread(void* p) NLIB_NOEXCEPT {
394  LockFreeThreadPool* This = reinterpret_cast<LockFreeThreadPool*>(p);
395  LockFreeThreadPoolExecInfo info;
396  uint32_t counter = 0;
397 
398  for (;;) {
399  errno_t e = This->work_queue_.Dequeue(&info);
400  if (!This->WorkerThread_(info.func, info.callback, e == 0 ? &info.arg[0] : nullptr, N,
401  &counter))
402  break;
403  }
404 }
405 
406 } // namespace threading
407 NLIB_NAMESPACE_END
408 
409 #endif // INCLUDE_NN_NLIB_THREADING_THREADPOOL_H_
ロックフリーアルゴリズムがサポートされています。
#define NLIB_OVERRIDE
利用可能であればoverrideが定義されます。そうでない場合は空文字列です。
Definition: Config.h:249
#define NLIB_DISALLOW_COPY_AND_ASSIGN(TypeName)
TypeName で指定されたクラスのコピーコンストラクタと代入演算子を禁止します。
Definition: Config.h:183
ThreadPool() noexcept
デフォルトコンストラクタです。Init()で初期化する必要があります。
Definition: ThreadPool.h:53
errno_t Init(size_t thread_count) noexcept
スレッドを起動してスレッドプールを初期化します。
Definition: ThreadPool.h:55
~LockFreeThreadPool() noexcept
デストラクタです。内部で起動したスレッドを全てJoinします。
Definition: ThreadPool.h:137
LockFreeThreadPool() noexcept
デフォルトコンストラクタです。Init()で初期化する必要があります。
Definition: ThreadPool.h:135
#define NLIB_VIS_HIDDEN
関数やクラス等のシンボルをライブラリの外部に公開しません。
Definition: Platform_unix.h:86
#define NLIB_VIS_PUBLIC
関数やクラス等のシンボルをライブラリの外部に公開します。
Definition: Platform_unix.h:87
nlib_thread_attrをラップするクラスです。必要に応じて自動的にnlib_thread_attr_init()とnlib_thread_attr...
Definition: Thread.h:29
errno_t GetFuture(Future< R > *p) noexcept
実行結果を取得するためのFutureを設定します。
空の構造体で、関数の引数をムーブすべきことを示すために利用されます。
Definition: Config.h:270
細粒度のタスクを投入するのに向いたスレッドプールです。
Definition: ThreadPool.h:125
errno_t Init() noexcept
スレッドを起動してスレッドプールを初期化します。
Definition: ThreadPool.h:59
void SwapUnsafe(LockFreeQueue &rhs) noexcept
オブジェクトをスワップします。スレッドセーフではありません。
Definition: LockFree.h:536
#define NLIB_NOEXCEPT
環境に合わせてnoexcept 又は同等の定義がされます。
Definition: Config.h:109
std::vectorに似た、コピーコンストラクタを持たないオブジェクトを格納可能なコンテナ類似クラスです。 ...
Definition: Nlist.h:32
別のスレッド実行の出力をスレッドセーフに取得するためのクラスです。C++11のstd::shared_futureに似ていま...
Definition: AsyncFileIo.h:26
errno_t Dequeue(DequeueType x) noexcept
キューから要素を取り出してxに格納します。スレッドセーフです。
Definition: LockFree.h:529
size_t GetHardwareConcurrency() noexcept
ハードウェアスレッドの数を返します。
Definition: Thread.h:807
関数をラップし、別スレッドで実行し戻り値をスレッドセーフに取得できるようにするクラスです。C++11のstd:...
Definition: Future.h:1083
マルチスレッドプログラミングのためのFutureパターンを実装しています。
#define NLIB_FINAL
利用可能であればfinalが定義されます。そうでない場合は空文字列です。
Definition: Config.h:250
予めプールされた作成済みのスレッドに対してタスクを投入することができます。
Definition: ThreadPool.h:51
pthread_t nlib_thread
スレッドを指し示す識別子
errno_t Init(FUNC &func)
初期化を行います。
int errno_t
intのtypedefで、戻り値としてPOSIXのエラー値を返すことを示します。
Definition: NMalloc.h:37