Darwin Neuroevolution Framework
parallel_for_each.h
1 // Copyright 2018 The Darwin Neuroevolution Framework Authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #pragma once
16 
17 #include "utils.h"
18 #include "scope_guard.h"
19 #include "thread_pool.h"
20 
21 namespace pp {
22 
23 // per-thread state used to catch accidental nesting of parallel-for-loops
24 extern thread_local bool g_inside_parallel_for;
25 
54 template <class T, class Body>
55 void for_each(T& array, const Body& loop_body) {
56  CHECK(!g_inside_parallel_for);
57 
58  auto thread_pool = ParallelForSupport::threadPool();
59  CHECK(thread_pool != nullptr);
60 
61  if (array.size() == 0)
62  return;
63 
64  // this is roughly the number of shards per worker thread
65  //
66  // here, a shard is a set of array indexes. higher granularity means
67  // smaller individual shards and results in better load balancing,
68  // but also higher work queue synchronization overhead
69  //
70  // TODO: investigate auto-tuning solutions instead of the hardcoded value
71  //
72  constexpr int kShardsGranularity = 100;
73 
74  const int size = int(array.size());
75  const int shards_count = thread_pool->threadsCount() * kShardsGranularity;
76  const int shard_size = size / shards_count;
77  const int remainder = size % shards_count;
78 
79  // create a batch for all the shards
80  auto batch = make_unique<WorkBatch>();
81 
82  int index = 0;
83  for (int i = 0; i < shards_count && index < size; ++i) {
84  int actual_shard_size = i < remainder ? (shard_size + 1) : shard_size;
85  CHECK(actual_shard_size > 0);
86  batch->pushWork([&, beginIndex = index, endIndex = index + actual_shard_size] {
87  g_inside_parallel_for = true;
88  SCOPE_EXIT { g_inside_parallel_for = false; };
89 
90  CHECK(beginIndex < endIndex);
91 
92  for (int i = beginIndex; i < endIndex; ++i) {
93  loop_body(i, array[i]);
94  }
95  });
96 
97  index += actual_shard_size;
98  }
99  CHECK(index == size);
100 
101  // push work and wait for completition
102  thread_pool->processBatch(std::move(batch));
103 }
104 
105 } // namespace pp
Parallel Processing primitives.
Definition: parallel_for_each.cpp:15
void for_each(T &array, const Body &loop_body)
Iterates over an array, with support for parallel execution.
Definition: parallel_for_each.h:55