ORE Studio 0.0.4
Loading...
Searching...
No Matches
event_bus.hpp
1/* -*- mode: c++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*-
2 *
3 * Copyright (C) 2025 Marco Craveiro <marco.craveiro@gmail.com>
4 *
5 * This program is free software; you can redistribute it and/or modify it under
6 * the terms of the GNU General Public License as published by the Free Software
7 * Foundation; either version 3 of the License, or (at your option) any later
8 * version.
9 *
10 * This program is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
12 * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
13 * details.
14 *
15 * You should have received a copy of the GNU General Public License along with
16 * this program; if not, write to the Free Software Foundation, Inc., 51
17 * Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18 *
19 */
20#ifndef ORES_EVENTING_SERVICE_EVENT_BUS_HPP
21#define ORES_EVENTING_SERVICE_EVENT_BUS_HPP
22
23#include <any>
24#include <mutex>
25#include <memory>
26#include <vector>
27#include <typeindex>
28#include <functional>
29#include <unordered_map>
30#include <boost/core/demangle.hpp>
31#include "ores.utility/log/make_logger.hpp"
32
33namespace ores::eventing::service {
34
41class subscription final {
42public:
43 using unsubscribe_fn = std::function<void()>;
44
45 subscription() = default;
46 explicit subscription(unsubscribe_fn fn) : unsubscribe_(std::move(fn)) {}
47
49 if (unsubscribe_) {
50 unsubscribe_();
51 }
52 }
53
54 subscription(const subscription&) = delete;
55 subscription& operator=(const subscription&) = delete;
56
57 subscription(subscription&& other) noexcept
58 : unsubscribe_(std::move(other.unsubscribe_)) {
59 other.unsubscribe_ = nullptr;
60 }
61
62 subscription& operator=(subscription&& other) noexcept {
63 if (this != &other) {
64 if (unsubscribe_) {
65 unsubscribe_();
66 }
67 unsubscribe_ = std::move(other.unsubscribe_);
68 other.unsubscribe_ = nullptr;
69 }
70 return *this;
71 }
72
76 [[nodiscard]] bool is_active() const { return unsubscribe_ != nullptr; }
77
81 void unsubscribe() {
82 if (unsubscribe_) {
83 unsubscribe_();
84 unsubscribe_ = nullptr;
85 }
86 }
87
88private:
89 unsubscribe_fn unsubscribe_;
90};
91
119class event_bus final {
120private:
121 [[nodiscard]] static auto& lg() {
122 using namespace ores::utility::log;
123 static auto instance = make_logger("ores.eventing.service.event_bus");
124 return instance;
125 }
126
127 using subscriber_id = std::uint64_t;
128
129 struct subscriber_entry {
130 subscriber_id id;
131 std::any handler;
132 };
133
134 using subscriber_list = std::vector<subscriber_entry>;
135 using subscriber_map = std::unordered_map<std::type_index, subscriber_list>;
136
137public:
138 event_bus() = default;
139 ~event_bus() = default;
140
141 event_bus(const event_bus&) = delete;
142 event_bus& operator=(const event_bus&) = delete;
143 event_bus(event_bus&&) = delete;
144 event_bus& operator=(event_bus&&) = delete;
145
156 template<typename Event>
157 [[nodiscard]] subscription subscribe(std::function<void(const Event&)> handler) {
158 using namespace ores::utility::log;
159 std::lock_guard lock(mutex_);
160
161 const auto type_idx = std::type_index(typeid(Event));
162 const auto id = next_subscriber_id_++;
163 const auto type_name = boost::core::demangle(type_idx.name());
164
165 subscribers_[type_idx].push_back({id, std::move(handler)});
166
167 const auto count = subscribers_[type_idx].size();
168 BOOST_LOG_SEV(lg(), info)
169 << "Subscriber " << id << " subscribed to event type '"
170 << type_name << "' (total subscribers: " << count << ")";
171
172 return subscription([this, type_idx, id, type_name]() {
173 unsubscribe(type_idx, id, type_name);
174 });
175 }
176
187 template<typename Event>
188 void publish(const Event& event) {
189 using namespace ores::utility::log;
190 std::vector<std::function<void(const Event&)>> handlers_copy;
191 const auto type_idx = std::type_index(typeid(Event));
192 const auto type_name = boost::core::demangle(type_idx.name());
193
194 {
195 std::lock_guard lock(mutex_);
196
197 auto it = subscribers_.find(type_idx);
198 if (it == subscribers_.end()) {
199 BOOST_LOG_SEV(lg(), debug)
200 << "No subscribers for event type '" << type_name
201 << "' - event will not be delivered";
202 return;
203 }
204
205 handlers_copy.reserve(it->second.size());
206 for (const auto& entry : it->second) {
207 try {
208 const auto& handler =
209 std::any_cast<const std::function<void(const Event&)>&>(
210 entry.handler);
211 handlers_copy.push_back(handler);
212 } catch (const std::bad_any_cast& e) {
213 BOOST_LOG_SEV(lg(), error)
214 << "Type mismatch for subscriber " << entry.id
215 << " on event type '" << type_name << "': " << e.what();
216 }
217 }
218 }
219
220 BOOST_LOG_SEV(lg(), info)
221 << "Publishing event type '" << type_name << "' to "
222 << handlers_copy.size() << " subscriber(s)";
223
224 std::size_t success_count = 0;
225 for (const auto& handler : handlers_copy) {
226 try {
227 handler(event);
228 ++success_count;
229 } catch (const std::exception& e) {
230 BOOST_LOG_SEV(lg(), error)
231 << "Exception in event handler for '" << type_name
232 << "': " << e.what();
233 }
234 }
235
236 BOOST_LOG_SEV(lg(), debug)
237 << "Event '" << type_name << "' delivery complete: "
238 << success_count << "/" << handlers_copy.size() << " handlers succeeded";
239 }
240
247 template<typename Event>
248 [[nodiscard]] std::size_t subscriber_count() const {
249 std::lock_guard lock(mutex_);
250
251 const auto type_idx = std::type_index(typeid(Event));
252 auto it = subscribers_.find(type_idx);
253 if (it == subscribers_.end()) {
254 return 0;
255 }
256 return it->second.size();
257 }
258
259private:
260 void unsubscribe(std::type_index type_idx, subscriber_id id,
261 const std::string& type_name) {
262 using namespace ores::utility::log;
263 std::lock_guard lock(mutex_);
264
265 auto it = subscribers_.find(type_idx);
266 if (it == subscribers_.end()) {
267 BOOST_LOG_SEV(lg(), warn)
268 << "Unsubscribe called for subscriber " << id
269 << " but no subscribers exist for event type '" << type_name << "'";
270 return;
271 }
272
273 auto& list = it->second;
274 auto removed = std::erase_if(list, [id](const subscriber_entry& entry) {
275 return entry.id == id;
276 });
277
278 if (removed > 0) {
279 const auto remaining = list.size();
280 BOOST_LOG_SEV(lg(), info)
281 << "Subscriber " << id << " unsubscribed from event type '"
282 << type_name << "' (remaining subscribers: " << remaining << ")";
283 } else {
284 BOOST_LOG_SEV(lg(), warn)
285 << "Unsubscribe called for subscriber " << id
286 << " but it was not found in event type '" << type_name << "'";
287 }
288
289 if (list.empty()) {
290 BOOST_LOG_SEV(lg(), debug)
291 << "No more subscribers for event type '" << type_name
292 << "' - removing from map";
293 subscribers_.erase(it);
294 }
295 }
296
297 mutable std::mutex mutex_;
298 subscriber_map subscribers_;
299 subscriber_id next_subscriber_id_ = 1;
300};
301
302}
303
304#endif
Implements logging for ORE Studio.
Definition lifecycle_manager.hpp:30
RAII handle for managing event subscriptions.
Definition event_bus.hpp:41
void unsubscribe()
Manually unsubscribe before destruction.
Definition event_bus.hpp:81
bool is_active() const
Check if the subscription is active.
Definition event_bus.hpp:76
A typed, thread-safe, in-process publish/subscribe event bus.
Definition event_bus.hpp:119
std::size_t subscriber_count() const
Get the number of subscribers for a specific event type.
Definition event_bus.hpp:248
void publish(const Event &event)
Publish an event to all subscribers.
Definition event_bus.hpp:188
subscription subscribe(std::function< void(const Event &)> handler)
Subscribe to events of a specific type.
Definition event_bus.hpp:157