20#ifndef ORES_EVENTING_SERVICE_POSTGRES_EVENT_SOURCE_HPP
21#define ORES_EVENTING_SERVICE_POSTGRES_EVENT_SOURCE_HPP
25#include <unordered_map>
26#include "ores.logging/make_logger.hpp"
27#include "ores.database/domain/context.hpp"
28#include "ores.eventing/domain/entity_change_event.hpp"
29#include "ores.eventing/service/event_bus.hpp"
30#include "ores.eventing/service/postgres_listener_service.hpp"
59 [[nodiscard]]
static auto& lg() {
61 static auto instance = make_logger(
62 "ores.eventing.service.postgres_event_source");
71 using publisher_fn = std::function<void(
72 std::chrono::system_clock::time_point,
73 const std::vector<std::string>&)>;
75 struct entity_mapping {
76 std::string channel_name;
77 publisher_fn publisher;
105 template<
typename Event>
107 const std::string& channel_name) {
109 BOOST_LOG_SEV(lg(), info)
110 <<
"Registering entity-to-event mapping: entity='" << entity_name
111 <<
"', channel='" << channel_name <<
"'";
113 entity_mappings_[entity_name] = entity_mapping{
114 .channel_name = channel_name,
115 .publisher = [
this, entity_name](std::chrono::system_clock::time_point ts,
116 const std::vector<std::string>& entity_ids) {
118 BOOST_LOG_SEV(lg(), info)
119 <<
"Publishing domain event for entity: " << entity_name
120 <<
" with " << entity_ids.size() <<
" entity IDs";
121 bus_.
publish(Event{ts, entity_ids});
126 BOOST_LOG_SEV(lg(), debug)
127 <<
"Subscribed to PostgreSQL channel: " << channel_name;
154 std::unordered_map<std::string, entity_mapping> entity_mappings_;
Event bus and related services.
Definition event_bus.hpp:33
Implements logging infrastructure for ORE Studio.
Definition boost_severity.hpp:28
Context for the operations on a postgres database.
Definition context.hpp:30
Represents a low-level notification about a change to an entity at the repository level.
Definition entity_change_event.hpp:33
A typed, thread-safe, in-process publish/subscribe event bus.
Definition event_bus.hpp:119
void publish(const Event &event)
Publish an event to all subscribers.
Definition event_bus.hpp:188
Event source that bridges PostgreSQL LISTEN/NOTIFY to the event bus.
Definition postgres_event_source.hpp:57
void start()
Start the event source.
Definition postgres_event_source.cpp:41
void stop()
Stop the event source.
Definition postgres_event_source.cpp:47
void register_mapping(const std::string &entity_name, const std::string &channel_name)
Register a mapping from entity name to typed domain event.
Definition postgres_event_source.hpp:106
Manages a dedicated PostgreSQL connection to listen for NOTIFY events.
Definition postgres_listener_service.hpp:49
void subscribe(const std::string &channel_name)
Subscribes to a PostgreSQL NOTIFY channel.
Definition postgres_listener_service.cpp:128