Darwin Neuroevolution Framework
thread_pool.h
1 // Copyright 2018 The Darwin Neuroevolution Framework Authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #pragma once
16 
17 #include "utils.h"
18 
19 #include <assert.h>
20 #include <algorithm>
21 #include <atomic>
22 #include <condition_variable>
23 #include <cstdint>
24 #include <cstdlib>
25 #include <deque>
26 #include <functional>
27 #include <memory>
28 #include <mutex>
29 #include <thread>
30 #include <vector>
31 using namespace std;
32 
33 namespace pp {
34 
35 struct WorkBatch;
36 
39 class WorkItem {
40  public:
41  WorkItem(WorkBatch* batch) : batch_(batch) {}
42  virtual ~WorkItem() = default;
43 
45  virtual bool execute() = 0;
46 
48  WorkBatch* batch() const { return batch_; }
49 
50  private:
51  // parent work batch
52  WorkBatch* const batch_ = nullptr;
53 };
54 
55 // A work item wrapping a closure object (aka function objects)
56 template <class Body>
57 class ClosureWorkItem : public WorkItem {
58  public:
59  ClosureWorkItem(WorkBatch* batch, const Body& item_body)
60  : WorkItem(batch), item_body_(item_body) {}
61 
62  bool execute() override {
63  item_body_();
64  return true;
65  }
66 
67  private:
68  Body item_body_;
69 };
70 
81 struct WorkBatch {
83  vector<unique_ptr<WorkItem>> work_items;
84 
86  size_t work_left = 0;
87 
89  atomic<bool> canceled = false;
90 
92  template <class Body>
93  void pushWork(const Body& body) {
94  work_items.emplace_back(make_unique<ClosureWorkItem<Body>>(this, body));
95  }
96 };
97 
98 class CanceledException {};
99 
107 class Controller {
108  public:
109  virtual ~Controller() = default;
110 
118  virtual void checkpoint() = 0;
119 };
120 
128  public:
130  static constexpr int kAutoThreadCount = 0;
131 
132  public:
138  ThreadPool(int threads_count, Controller* controller = nullptr);
139 
142  void processBatch(unique_ptr<WorkBatch> batch);
143 
145  int threadsCount() const { return int(worker_threads_.size()); }
146 
147  private:
148  void executeOneItem();
149  unique_ptr<WorkItem> acquireWork();
150  void finishedWork(WorkBatch* batch);
151  void workerThread();
152 
153  private:
154  deque<unique_ptr<WorkItem>> work_items_;
155  vector<thread> worker_threads_;
156  Controller* controller_ = nullptr;
157 
158  mutable mutex lock_;
159  mutable condition_variable queue_cv_;
160  mutable condition_variable results_cv_;
161 };
162 
163 class ParallelForSupport {
164  public:
165  static void init(Controller* controller) {
166  auto thread_pool = make_unique<ThreadPool>(ThreadPool::kAutoThreadCount, controller);
167  CHECK(thread_pool_.exchange(thread_pool.release()) == nullptr);
168  }
169 
170  static ThreadPool* threadPool() { return thread_pool_; }
171 
172  private:
173  static atomic<ThreadPool*> thread_pool_;
174 };
175 
176 } // namespace pp
Parallel Processing primitives.
Definition: parallel_for_each.cpp:15
vector< unique_ptr< WorkItem > > work_items
The set of work items to be processed.
Definition: thread_pool.h:83
int threadsCount() const
The number of threads managed by this thread pool.
Definition: thread_pool.h:145
WorkBatch * batch() const
The associated (parent) WorkBatch.
Definition: thread_pool.h:48
STL namespace.
A basic thread pool (managing a fixed number of threads)
Definition: thread_pool.h:127
A collection of work items to be processed in a fork/join fashion.
Definition: thread_pool.h:81
void pushWork(const Body &body)
Appends a new WorkItem.
Definition: thread_pool.h:93
Classes derived from this are not copyable or movable.
Definition: utils.h:69
Generic work item interface.
Definition: thread_pool.h:39
An optional thread pool controller, which can be used to pause/resume/cancel the queued work items...
Definition: thread_pool.h:107