nlib
ThreadPool.h
[詳解]
1 
2 /*--------------------------------------------------------------------------------*
3  Project: CrossRoad
4  Copyright (C)Nintendo All rights reserved.
5 
6  These coded instructions, statements, and computer programs contain proprietary
7  information of Nintendo and/or its licensed developers and are protected by
8  national and international copyright laws. They may not be disclosed to third
9  parties or copied or duplicated in any form, in whole or in part, without the
10  prior written consent of Nintendo.
11 
12  The content herein is highly confidential and should be handled accordingly.
13  *--------------------------------------------------------------------------------*/
14 
15 #pragma once
16 #ifndef INCLUDE_NN_NLIB_THREADING_THREADPOOL_H_
17 #define INCLUDE_NN_NLIB_THREADING_THREADPOOL_H_
18 
19 #include "nn/nlib/Nqueue.h"
21 #include "nn/nlib/LockFree.h"
22 
23 NLIB_NAMESPACE_BEGIN
24 namespace threading {
25 
26 namespace detail {
27 class NLIB_VIS_PUBLIC ThPoolTaskBase {
28  public:
29  ThPoolTaskBase() NLIB_NOEXCEPT {}
30  virtual ~ThPoolTaskBase() NLIB_NOEXCEPT {}
31  virtual void operator()() NLIB_NOEXCEPT = 0;
32 
33  private:
34  NLIB_DISALLOW_COPY_AND_ASSIGN(ThPoolTaskBase);
35 };
36 
37 template <class R>
38 class ThPoolTask : public ThPoolTaskBase {
39  public:
40  explicit ThPoolTask(PackagedTask<R()>& task) NLIB_NOEXCEPT { // NOLINT
41  task.swap(task_);
42  }
43  virtual ~ThPoolTask() NLIB_NOEXCEPT NLIB_OVERRIDE {}
44  virtual void operator()() NLIB_NOEXCEPT NLIB_OVERRIDE { task_(); }
45 
46  private:
47  PackagedTask<R()> task_;
49 };
50 } // namespace detail
51 
53  public:
54  ThreadPool() NLIB_NOEXCEPT : prv_(NULL) {}
55  errno_t Init(size_t thread_count, const ThreadSettings& settings) NLIB_NOEXCEPT;
56  errno_t Init(size_t thread_count) NLIB_NOEXCEPT {
57  ThreadSettings settings;
58  return this->Init(thread_count, settings);
59  }
60  errno_t Init() NLIB_NOEXCEPT {
61  return this->Init(GetHardwareConcurrency());
62  }
64  template <class R, class FUNC>
65  errno_t Submit(Future<R>* future, FUNC& func) NLIB_NOEXCEPT; // NOLINT
66  size_t Count() NLIB_NOEXCEPT;
67 
68  private:
69  NLIB_VIS_HIDDEN void JoinThreads(Nlist<nlib_thread>& L) NLIB_NOEXCEPT; // NOLINT
70  static NLIB_VIS_HIDDEN void WorkerThread(void* p) NLIB_NOEXCEPT;
71  errno_t Submit_(detail::ThPoolTaskBase* p) NLIB_NOEXCEPT;
72  struct ThreadPoolPrivate;
73  ThreadPoolPrivate* prv_;
75 };
76 
77 template <class R, class FUNC>
78 inline errno_t ThreadPool::Submit(Future<R>* future, FUNC& func) NLIB_NOEXCEPT { // NOLINT
79  if (!prv_) return EINVAL;
80  PackagedTask<R()> task;
81  errno_t e = task.Init(func);
82  if (e != 0) return e;
83  if (future) {
84  e = task.GetFuture(future);
85  if (e != 0) return e;
86  }
87  detail::ThPoolTaskBase* ptask = new (std::nothrow) detail::ThPoolTask<R>(task);
88  if (!ptask) return ENOMEM;
89  return Submit_(ptask);
90 }
91 
92 namespace detail {
93 
94 class NLIB_VIS_PUBLIC LockFreeThreadPoolBase {
95  protected:
96  LockFreeThreadPoolBase(void (*primary_worker_thread)(void*), void (*worker_thread)(void*))
97  : primary_worker_thread_(primary_worker_thread),
98  worker_thread_(worker_thread),
99  status_(0), thread_count_(0), thread_list_(NULL) {}
100  ~LockFreeThreadPoolBase() NLIB_NOEXCEPT;
101  typedef void* (*Func)(void**, size_t) NLIB_NOEXCEPT_FUNCPTR;
102  typedef void (*Callback)(void*) NLIB_NOEXCEPT_FUNCPTR; // NOLINT
103  errno_t InitBase(size_t thread_count, const ThreadSettings& settings) NLIB_NOEXCEPT;
104  bool PrimaryWorkerThread_(Func func, Callback callback,
105  void** args, size_t n) NLIB_NOEXCEPT;
106  bool WorkerThread_(Func func, Callback callback, void** args,
107  size_t n, uint32_t* counter) NLIB_NOEXCEPT;
108  void JoinThreads() NLIB_NOEXCEPT;
109 
110  public:
111  size_t GetThreadCount() NLIB_NOEXCEPT { return thread_count_; }
112 
113  private:
114  NLIB_VIS_HIDDEN void JoinThreads(nlib_thread* L, size_t cnt) NLIB_NOEXCEPT;
115  void (*primary_worker_thread_)(void*);
116  void (*worker_thread_)(void*);
117  int32_t status_;
118  uint32_t thread_count_;
119  nlib_thread* thread_list_;
120 
121  NLIB_DISALLOW_COPY_AND_ASSIGN(LockFreeThreadPoolBase);
122 };
123 
124 } // namespace detail
125 
126 template<size_t N = 5>
127 class LockFreeThreadPool NLIB_FINAL : public NLIB_NS::threading::detail::LockFreeThreadPoolBase {
128  public:
129 #ifdef NLIB_DOXYGEN
130  typedef void* (*Func)(void** args, size_t n);
131  typedef void (*Callback)(void* result);
132 #else
133  typedef NLIB_NS::threading::detail::LockFreeThreadPoolBase::Func Func;
134  typedef NLIB_NS::threading::detail::LockFreeThreadPoolBase::Callback Callback;
135 #endif
136 
138  : LockFreeThreadPoolBase(PrimaryWorkerThread, WorkerThread) {}
139  ~LockFreeThreadPool() NLIB_NOEXCEPT { this->JoinThreads(); }
140  errno_t Init(size_t work_queue_size, size_t thread_count,
141  const ThreadSettings& settings) NLIB_NOEXCEPT;
142  errno_t SubmitVarArgs(Func func, Callback callback, size_t n, ...) NLIB_NOEXCEPT;
143 
144 #ifdef NLIB_DOXYGEN
145  errno_t Submit(Func func, Callback callback);
146  errno_t Submit(Func func, Callback callback, void* arg0);
147  errno_t Submit(Func func, Callback callback, void* arg0, void* arg1);
148  errno_t Submit(Func func, Callback callback, void* arg0, void* arg1, void* arg2);
149  errno_t Submit(Func func, Callback callback, void* arg0, void* arg1, void* arg2, void* arg3);
150  errno_t Submit(Func func, Callback callback, void* arg0, void* arg1, void* arg2, void* arg3,
151  void* arg4);
152  errno_t Submit(Func func, Callback callback, void* arg0, void* arg1, void* arg2, void* arg3,
153  void* arg4, void* arg5);
154  errno_t Submit(Func func, Callback callback, void* arg0, void* arg1, void* arg2, void* arg3,
155  void* arg4, void* arg5, void* arg6);
156  errno_t Submit(Func func, Callback callback, void* arg0, void* arg1, void* arg2, void* arg3,
157  void* arg4, void* arg5, void* arg6, void* arg7);
158  errno_t Submit(Func func, Callback callback, void* arg0, void* arg1, void* arg2, void* arg3,
159  void* arg4, void* arg5, void* arg6, void* arg7, void* arg8);
160  errno_t Submit(Func func, Callback callback, void* arg0, void* arg1, void* arg2, void* arg3,
161  void* arg4, void* arg5, void* arg6, void* arg7, void* arg8, void* arg9);
162  size_t GetThreadCount();
163 #endif
164  errno_t Submit(Func func, Callback callback) NLIB_NOEXCEPT {
165  LockFreeThreadPoolExecInfo info;
166  info.func = func;
167  info.callback = callback;
168  // memset(&info.arg[0], 0, sizeof(info.arg[0]) * N);
169  return work_queue_.Enqueue(info);
170  }
171  template<class X>
172  errno_t Submit(X func, Callback callback,
173  typename EnableIf<N >= 1 && IsSame<X, Func>::value,
174  void*>::type arg0) NLIB_NOEXCEPT {
175  LockFreeThreadPoolExecInfo info;
176  info.func = func;
177  info.callback = callback;
178  info.arg[0] = arg0;
179  // memset(&info.arg[0] + 1, 0, sizeof(info.arg[0]) * (N - 1));
180  return work_queue_.Enqueue(info);
181  }
182  template<class X>
183  errno_t Submit(X func, Callback callback,
184  typename EnableIf<N >= 2 && IsSame<X, Func>::value,
185  void*>::type arg0,
186  void* arg1) NLIB_NOEXCEPT {
187  LockFreeThreadPoolExecInfo info;
188  info.func = func;
189  info.callback = callback;
190  info.arg[0] = arg0;
191  info.arg[1] = arg1;
192  // memset(&info.arg[0] + 2, 0, sizeof(info.arg[0]) * (N - 2));
193  return work_queue_.Enqueue(info);
194  }
195  template<class X>
196  errno_t Submit(X func, Callback callback,
197  typename EnableIf<N >= 3 && IsSame<X, Func>::value,
198  void*>::type arg0,
199  void* arg1, void* arg2) NLIB_NOEXCEPT {
200  LockFreeThreadPoolExecInfo info;
201  info.func = func;
202  info.callback = callback;
203  info.arg[0] = arg0;
204  info.arg[1] = arg1;
205  info.arg[2] = arg2;
206  // memset(&info.arg[0] + 3, 0, sizeof(info.arg[0]) * (N - 3));
207  return work_queue_.Enqueue(info);
208  }
209  template<class X>
210  errno_t Submit(X func, Callback callback,
211  typename EnableIf<N >= 4 && IsSame<X, Func>::value,
212  void*>::type arg0,
213  void* arg1, void* arg2, void* arg3) NLIB_NOEXCEPT {
214  LockFreeThreadPoolExecInfo info;
215  info.func = func;
216  info.callback = callback;
217  info.arg[0] = arg0;
218  info.arg[1] = arg1;
219  info.arg[2] = arg2;
220  info.arg[3] = arg3;
221  // memset(&info.arg[0] + 4, 0, sizeof(info.arg[0]) * (N - 4));
222  return work_queue_.Enqueue(info);
223  }
224  template<class X>
225  errno_t Submit(X func, Callback callback,
226  typename EnableIf<N >= 5 && IsSame<X, Func>::value,
227  void*>::type arg0,
228  void* arg1, void* arg2, void* arg3, void* arg4) NLIB_NOEXCEPT {
229  LockFreeThreadPoolExecInfo info;
230  info.func = func;
231  info.callback = callback;
232  info.arg[0] = arg0;
233  info.arg[1] = arg1;
234  info.arg[2] = arg2;
235  info.arg[3] = arg3;
236  info.arg[4] = arg4;
237  // memset(&info.arg[0] + 5, 0, sizeof(info.arg[0]) * (N - 5));
238  return work_queue_.Enqueue(info);
239  }
240  template<class X>
241  errno_t Submit(X func, Callback callback,
242  typename EnableIf<N >= 6 && IsSame<X, Func>::value,
243  void*>::type arg0,
244  void* arg1, void* arg2, void* arg3, void* arg4,
245  void* arg5) NLIB_NOEXCEPT {
246  LockFreeThreadPoolExecInfo info;
247  info.func = func;
248  info.callback = callback;
249  info.arg[0] = arg0;
250  info.arg[1] = arg1;
251  info.arg[2] = arg2;
252  info.arg[3] = arg3;
253  info.arg[4] = arg4;
254  info.arg[5] = arg5;
255  // memset(&info.arg[0] + 6, 0, sizeof(info.arg[0]) * (N - 6));
256  return work_queue_.Enqueue(info);
257  }
258  template<class X>
259  errno_t Submit(X func, Callback callback,
260  typename EnableIf<N >= 7 && IsSame<X, Func>::value,
261  void*>::type arg0,
262  void* arg1, void* arg2, void* arg3, void* arg4,
263  void* arg5, void* arg6) NLIB_NOEXCEPT {
264  LockFreeThreadPoolExecInfo info;
265  info.func = func;
266  info.callback = callback;
267  info.arg[0] = arg0;
268  info.arg[1] = arg1;
269  info.arg[2] = arg2;
270  info.arg[3] = arg3;
271  info.arg[4] = arg4;
272  info.arg[5] = arg5;
273  info.arg[6] = arg6;
274  // memset(&info.arg[0] + 7, 0, sizeof(info.arg[0]) * (N - 7));
275  return work_queue_.Enqueue(info);
276  }
277  template<class X>
278  errno_t Submit(X func, Callback callback,
279  typename EnableIf<N >= 8 && IsSame<X, Func>::value,
280  void*>::type arg0,
281  void* arg1, void* arg2, void* arg3, void* arg4,
282  void* arg5, void* arg6, void* arg7) NLIB_NOEXCEPT {
283  LockFreeThreadPoolExecInfo info;
284  info.func = func;
285  info.callback = callback;
286  info.arg[0] = arg0;
287  info.arg[1] = arg1;
288  info.arg[2] = arg2;
289  info.arg[3] = arg3;
290  info.arg[4] = arg4;
291  info.arg[5] = arg5;
292  info.arg[6] = arg6;
293  info.arg[7] = arg7;
294  // memset(&info.arg[0] + 8, 0, sizeof(info.arg[0]) * (N - 8));
295  return work_queue_.Enqueue(info);
296  }
297  template<class X>
298  errno_t Submit(X func, Callback callback,
299  typename EnableIf<N >= 9 && IsSame<X, Func>::value,
300  void*>::type arg0,
301  void* arg1, void* arg2, void* arg3, void* arg4,
302  void* arg5, void* arg6, void* arg7, void* arg8) NLIB_NOEXCEPT {
303  LockFreeThreadPoolExecInfo info;
304  info.func = func;
305  info.callback = callback;
306  info.arg[0] = arg0;
307  info.arg[1] = arg1;
308  info.arg[2] = arg2;
309  info.arg[3] = arg3;
310  info.arg[4] = arg4;
311  info.arg[5] = arg5;
312  info.arg[6] = arg6;
313  info.arg[7] = arg7;
314  info.arg[8] = arg8;
315  // memset(&info.arg[0] + 9, 0, sizeof(info.arg[0]) * (N - 9));
316  return work_queue_.Enqueue(info);
317  }
318  template<class X>
319  errno_t Submit(X func, Callback callback,
320  typename EnableIf<N >= 10 && IsSame<X, Func>::value,
321  void*>::type arg0,
322  void* arg1, void* arg2, void* arg3, void* arg4,
323  void* arg5, void* arg6, void* arg7, void* arg8, void* arg9) NLIB_NOEXCEPT {
324  LockFreeThreadPoolExecInfo info;
325  info.func = func;
326  info.callback = callback;
327  info.arg[0] = arg0;
328  info.arg[1] = arg1;
329  info.arg[2] = arg2;
330  info.arg[3] = arg3;
331  info.arg[4] = arg4;
332  info.arg[5] = arg5;
333  info.arg[6] = arg6;
334  info.arg[7] = arg7;
335  info.arg[8] = arg8;
336  info.arg[9] = arg9;
337  // memset(&info.arg[0] + 10, 0, sizeof(info.arg[0]) * (N - 10));
338  return work_queue_.Enqueue(info);
339  }
340 
341  private:
342  static void PrimaryWorkerThread(void* p) NLIB_NOEXCEPT;
343  static void WorkerThread(void* p) NLIB_NOEXCEPT;
344 
345  private:
346  struct LockFreeThreadPoolExecInfo {
347  Func func;
348  Callback callback;
349  void* arg[N];
350  };
352 
354 };
355 
356 template<size_t N>
357 errno_t LockFreeThreadPool<N>::Init(size_t work_queue_size, size_t thread_count,
358  const ThreadSettings& settings) NLIB_NOEXCEPT {
359  errno_t e = work_queue_.Init(work_queue_size);
360  if (e != 0) return e;
361  e = InitBase(thread_count, settings);
362  if (e != 0) {
364  q.SwapUnsafe(work_queue_);
365  return e;
366  }
367  return 0;
368 }
369 
370 template<size_t N>
371 errno_t LockFreeThreadPool<N>::SubmitVarArgs(Func func, Callback callback,
372  size_t n, ...) NLIB_NOEXCEPT {
373  if (n > N) {
374  NLIB_ASSERT(n <= N);
375  return EINVAL;
376  }
377 
378  va_list ap;
379  LockFreeThreadPoolExecInfo info;
380  info.func = func;
381  info.callback = callback;
382 
383  va_start(ap, n);
384  for (size_t i = 0; i < n; ++i) {
385  info.arg[i] = va_arg(ap, void*);
386  }
387  va_end(ap);
388  return work_queue_.Enqueue(info);
389 }
390 
391 template<size_t N>
393  LockFreeThreadPool* This = reinterpret_cast<LockFreeThreadPool*>(p);
394  LockFreeThreadPoolExecInfo info;
395 
396  for (;;) {
397  errno_t e = This->work_queue_.Dequeue(&info);
398  if (!This->PrimaryWorkerThread_(info.func, info.callback,
399  e == 0 ? &info.arg[0] : NULL, N))
400  break;
401  }
402 }
403 
404 template<size_t N>
406  LockFreeThreadPool* This = reinterpret_cast<LockFreeThreadPool*>(p);
407  LockFreeThreadPoolExecInfo info;
408  uint32_t counter = 0;
409 
410  for (;;) {
411  errno_t e = This->work_queue_.Dequeue(&info);
412  if (!This->WorkerThread_(info.func, info.callback,
413  e == 0 ? &info.arg[0] : NULL, N, &counter))
414  break;
415  }
416 }
417 
418 } // namespace threading
419 NLIB_NAMESPACE_END
420 
421 #endif // INCLUDE_NN_NLIB_THREADING_THREADPOOL_H_
ロックフリーアルゴリズムがサポートされています。
#define NLIB_OVERRIDE
利用可能であればoverrideが定義されます。そうでない場合は空文字列です。
Definition: Config.h:228
#define NLIB_DISALLOW_COPY_AND_ASSIGN(TypeName)
TypeName で指定されたクラスのコピーコンストラクタと代入演算子を禁止します。
Definition: Config.h:163
ThreadPool() noexcept
デフォルトコンストラクタです。Init()で初期化する必要があります。
Definition: ThreadPool.h:54
errno_t Init(size_t thread_count) noexcept
スレッドを起動してスレッドプールを初期化します。
Definition: ThreadPool.h:56
~LockFreeThreadPool() noexcept
デストラクタです。内部で起動したスレッドを全てJoinします。
Definition: ThreadPool.h:139
LockFreeThreadPool() noexcept
デフォルトコンストラクタです。Init()で初期化する必要があります。
Definition: ThreadPool.h:137
#define NLIB_VIS_HIDDEN
関数やクラス等のシンボルをライブラリの外部に公開しません。
Definition: Platform_unix.h:88
#define NLIB_VIS_PUBLIC
関数やクラス等のシンボルをライブラリの外部に公開します。
Definition: Platform_unix.h:89
nlib_thread_attrをラップするクラスです。必要に応じて自動的にnlib_thread_attr_init()とnlib_thread_attr...
Definition: Thread.h:29
errno_t GetFuture(Future< R > *p) noexcept
実行結果を取得するためのFutureを設定します。
細粒度のタスクを投入するのに向いたスレッドプールです。
Definition: ThreadPool.h:127
errno_t Init() noexcept
スレッドを起動してスレッドプールを初期化します。
Definition: ThreadPool.h:60
void SwapUnsafe(LockFreeQueue &rhs) noexcept
オブジェクトをスワップします。スレッドセーフではありません。
Definition: LockFree.h:564
#define NLIB_NOEXCEPT
環境に合わせてnoexcept 又は同等の定義がされます。
Definition: Config.h:99
std::vectorに似た、コピーコンストラクタを持たないオブジェクトを格納可能なコンテナ類似クラスです。 ...
Definition: Nlist.h:32
別のスレッド実行の出力をスレッドセーフに取得するためのクラスです。C++11のstd::shared_futureに似ていま...
Definition: AsyncFileIo.h:26
errno_t Dequeue(DequeueType x) noexcept
キューから要素を取り出してxに格納します。スレッドセーフです。
Definition: LockFree.h:557
size_t GetHardwareConcurrency() noexcept
ハードウェアスレッドの数を返します。
Definition: Thread.h:797
関数をラップし、別スレッドで実行し戻り値をスレッドセーフに取得できるようにするクラスです。C++11のstd:...
Definition: Future.h:931
マルチスレッドプログラミングのためのFutureパターンを実装しています。
#define NLIB_FINAL
利用可能であればfinalが定義されます。そうでない場合は空文字列です。
Definition: Config.h:229
予めプールされた作成済みのスレッドに対してタスクを投入することができます。
Definition: ThreadPool.h:52
pthread_t nlib_thread
スレッドを指し示す識別子
errno_t Init(FUNC &func)
初期化を行います。
int errno_t
intのtypedefで、戻り値としてPOSIXのエラー値を返すことを示します。
Definition: NMalloc.h:37