Darwin Neuroevolution Framework
pubsub.h
1 // Copyright 2018 The Darwin Neuroevolution Framework Authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #pragma once
16 
17 #include <functional>
18 #include <mutex>
19 #include <vector>
20 using namespace std;
21 
22 namespace core {
23 
29 template <class T>
30 class PubSub {
31  using Subscriber = std::function<void(const T&)>;
32 
33  public:
36  int subscribe(const Subscriber& subscriber) {
37  unique_lock<mutex> guard(lock_);
38 
39  // try to reuse an empty slot first
40  for (int i = 0; i < subscribers_.size(); ++i)
41  if (!subscribers_[i]) {
42  subscribers_[i] = subscriber;
43  return i;
44  }
45 
46  subscribers_.push_back(subscriber);
47  return int(subscribers_.size() - 1);
48  }
49 
52  void unsubscribe(int subscriber_index) {
53  unique_lock<mutex> guard(lock_);
54  CHECK(subscriber_index >= 0 && subscriber_index < subscribers_.size());
55  CHECK(subscribers_[subscriber_index]);
56  subscribers_[subscriber_index] = nullptr;
57  }
58 
64  void publish(const T& value) const {
65  vector<Subscriber> subscribers_snapshot;
66 
67  {
68  unique_lock<mutex> guard(lock_);
69  subscribers_snapshot = subscribers_;
70  }
71 
72  for (const auto& subscriber : subscribers_snapshot) {
73  if (subscriber)
74  subscriber(value);
75  }
76  }
77 
78  private:
79  vector<Subscriber> subscribers_;
80  mutable mutex lock_;
81 };
82 
83 } // namespace core
Generic utilities.
Definition: exception.h:24
STL namespace.
void publish(const T &value) const
Publish a new value.
Definition: pubsub.h:64
A type-safe, non-buffered & synchronous publisher-subscriber channel.
Definition: pubsub.h:30
int subscribe(const Subscriber &subscriber)
Add a subscriber callback.
Definition: pubsub.h:36
void unsubscribe(int subscriber_index)
Removes a subscriber.
Definition: pubsub.h:52