20#ifndef ORES_EVENTING_SERVICE_EVENT_BUS_HPP
21#define ORES_EVENTING_SERVICE_EVENT_BUS_HPP
29#include <unordered_map>
30#include <boost/core/demangle.hpp>
31#include "ores.utility/log/make_logger.hpp"
33namespace ores::eventing::service {
43 using unsubscribe_fn = std::function<void()>;
46 explicit subscription(unsubscribe_fn fn) : unsubscribe_(std::move(fn)) {}
58 : unsubscribe_(std::move(other.unsubscribe_)) {
59 other.unsubscribe_ =
nullptr;
67 unsubscribe_ = std::move(other.unsubscribe_);
68 other.unsubscribe_ =
nullptr;
76 [[nodiscard]]
bool is_active()
const {
return unsubscribe_ !=
nullptr; }
84 unsubscribe_ =
nullptr;
89 unsubscribe_fn unsubscribe_;
121 [[nodiscard]]
static auto& lg() {
123 static auto instance = make_logger(
"ores.eventing.service.event_bus");
127 using subscriber_id = std::uint64_t;
129 struct subscriber_entry {
134 using subscriber_list = std::vector<subscriber_entry>;
135 using subscriber_map = std::unordered_map<std::type_index, subscriber_list>;
156 template<
typename Event>
159 std::lock_guard lock(mutex_);
161 const auto type_idx = std::type_index(
typeid(Event));
162 const auto id = next_subscriber_id_++;
163 const auto type_name = boost::core::demangle(type_idx.name());
165 subscribers_[type_idx].push_back({id, std::move(handler)});
167 const auto count = subscribers_[type_idx].size();
168 BOOST_LOG_SEV(lg(), info)
169 <<
"Subscriber " <<
id <<
" subscribed to event type '"
170 << type_name <<
"' (total subscribers: " << count <<
")";
173 unsubscribe(type_idx,
id, type_name);
187 template<
typename Event>
190 std::vector<std::function<void(
const Event&)>> handlers_copy;
191 const auto type_idx = std::type_index(
typeid(Event));
192 const auto type_name = boost::core::demangle(type_idx.name());
195 std::lock_guard lock(mutex_);
197 auto it = subscribers_.find(type_idx);
198 if (it == subscribers_.end()) {
199 BOOST_LOG_SEV(lg(), debug)
200 <<
"No subscribers for event type '" << type_name
201 <<
"' - event will not be delivered";
205 handlers_copy.reserve(it->second.size());
206 for (
const auto& entry : it->second) {
208 const auto& handler =
209 std::any_cast<
const std::function<void(
const Event&)>&>(
211 handlers_copy.push_back(handler);
212 }
catch (
const std::bad_any_cast& e) {
213 BOOST_LOG_SEV(lg(), error)
214 <<
"Type mismatch for subscriber " << entry.id
215 <<
" on event type '" << type_name <<
"': " << e.what();
220 BOOST_LOG_SEV(lg(), info)
221 <<
"Publishing event type '" << type_name <<
"' to "
222 << handlers_copy.size() <<
" subscriber(s)";
224 std::size_t success_count = 0;
225 for (
const auto& handler : handlers_copy) {
229 }
catch (
const std::exception& e) {
230 BOOST_LOG_SEV(lg(), error)
231 <<
"Exception in event handler for '" << type_name
232 <<
"': " << e.what();
236 BOOST_LOG_SEV(lg(), debug)
237 <<
"Event '" << type_name <<
"' delivery complete: "
238 << success_count <<
"/" << handlers_copy.size() <<
" handlers succeeded";
247 template<
typename Event>
249 std::lock_guard lock(mutex_);
251 const auto type_idx = std::type_index(
typeid(Event));
252 auto it = subscribers_.find(type_idx);
253 if (it == subscribers_.end()) {
256 return it->second.size();
260 void unsubscribe(std::type_index type_idx, subscriber_id
id,
261 const std::string& type_name) {
263 std::lock_guard lock(mutex_);
265 auto it = subscribers_.find(type_idx);
266 if (it == subscribers_.end()) {
267 BOOST_LOG_SEV(lg(), warn)
268 <<
"Unsubscribe called for subscriber " <<
id
269 <<
" but no subscribers exist for event type '" << type_name <<
"'";
273 auto& list = it->second;
274 auto removed = std::erase_if(list, [
id](
const subscriber_entry& entry) {
275 return entry.id == id;
279 const auto remaining = list.size();
280 BOOST_LOG_SEV(lg(), info)
281 <<
"Subscriber " <<
id <<
" unsubscribed from event type '"
282 << type_name <<
"' (remaining subscribers: " << remaining <<
")";
284 BOOST_LOG_SEV(lg(), warn)
285 <<
"Unsubscribe called for subscriber " <<
id
286 <<
" but it was not found in event type '" << type_name <<
"'";
290 BOOST_LOG_SEV(lg(), debug)
291 <<
"No more subscribers for event type '" << type_name
292 <<
"' - removing from map";
293 subscribers_.erase(it);
297 mutable std::mutex mutex_;
298 subscriber_map subscribers_;
299 subscriber_id next_subscriber_id_ = 1;
Implements logging for ORE Studio.
Definition lifecycle_manager.hpp:30
RAII handle for managing event subscriptions.
Definition event_bus.hpp:41
void unsubscribe()
Manually unsubscribe before destruction.
Definition event_bus.hpp:81
bool is_active() const
Check if the subscription is active.
Definition event_bus.hpp:76
A typed, thread-safe, in-process publish/subscribe event bus.
Definition event_bus.hpp:119
std::size_t subscriber_count() const
Get the number of subscribers for a specific event type.
Definition event_bus.hpp:248
void publish(const Event &event)
Publish an event to all subscribers.
Definition event_bus.hpp:188
subscription subscribe(std::function< void(const Event &)> handler)
Subscribe to events of a specific type.
Definition event_bus.hpp:157