ORE Studio 0.0.4
Loading...
Searching...
No Matches
postgres_event_source.hpp
1/* -*- mode: c++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*-
2 *
3 * Copyright (C) 2025 Marco Craveiro <marco.craveiro@gmail.com>
4 *
5 * This program is free software; you can redistribute it and/or modify it under
6 * the terms of the GNU General Public License as published by the Free Software
7 * Foundation; either version 3 of the License, or (at your option) any later
8 * version.
9 *
10 * This program is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
12 * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
13 * details.
14 *
15 * You should have received a copy of the GNU General Public License along with
16 * this program; if not, write to the Free Software Foundation, Inc., 51
17 * Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18 *
19 */
20#ifndef ORES_EVENTING_SERVICE_POSTGRES_EVENT_SOURCE_HPP
21#define ORES_EVENTING_SERVICE_POSTGRES_EVENT_SOURCE_HPP
22
23#include <memory>
24#include <functional>
25#include <unordered_map>
26#include "ores.logging/make_logger.hpp"
27#include "ores.database/domain/context.hpp"
28#include "ores.database/service/postgres_listener_service.hpp"
29#include "ores.eventing/domain/entity_change_event.hpp"
30#include "ores.eventing/service/event_bus.hpp"
31
33
58private:
59 [[nodiscard]] static auto& lg() {
60 using namespace ores::logging;
61 static auto instance = make_logger(
62 "ores.eventing.service.postgres_event_source");
63 return instance;
64 }
65
72 using publisher_fn = std::function<void(
73 std::chrono::system_clock::time_point,
74 const std::vector<std::string>&,
75 const std::string&)>;
76
77 struct entity_mapping {
78 std::string channel_name;
79 publisher_fn publisher;
80 };
81
82public:
90
92
94 postgres_event_source& operator=(const postgres_event_source&) = delete;
95
107 template<typename Event>
108 void register_mapping(const std::string& entity_name,
109 const std::string& channel_name) {
110 using namespace ores::logging;
111 BOOST_LOG_SEV(lg(), info)
112 << "Registering entity-to-event mapping: entity='" << entity_name
113 << "', channel='" << channel_name << "'";
114
115 entity_mappings_[entity_name] = entity_mapping{
116 .channel_name = channel_name,
117 .publisher = [this, entity_name](std::chrono::system_clock::time_point ts,
118 const std::vector<std::string>& entity_ids,
119 const std::string& tenant_id) {
120 using namespace ores::logging;
121 BOOST_LOG_SEV(lg(), info)
122 << "Publishing domain event for entity: " << entity_name
123 << " with " << entity_ids.size() << " entity IDs"
124 << " (tenant: " << tenant_id << ")";
125 bus_.publish(Event{ts, entity_ids, tenant_id});
126 }
127 };
128
129 listener_.subscribe(channel_name);
130 BOOST_LOG_SEV(lg(), debug)
131 << "Subscribed to PostgreSQL channel: " << channel_name;
132 }
133
139 void start();
140
146 void stop();
147
148private:
154 void on_entity_change(const domain::entity_change_event& e);
155
156 event_bus& bus_;
158 std::unordered_map<std::string, entity_mapping> entity_mappings_;
159};
160
161}
162
163#endif
Event bus and related services.
Definition event_bus.hpp:33
Implements logging infrastructure for ORE Studio.
Definition boost_severity.hpp:28
Context for the operations on a postgres database.
Definition context.hpp:47
Manages a dedicated PostgreSQL connection to listen for NOTIFY events.
Definition postgres_listener_service.hpp:47
void subscribe(const std::string &channel_name)
Subscribes to a PostgreSQL NOTIFY channel.
Definition postgres_listener_service.cpp:126
Represents a low-level notification about a change to an entity at the repository level.
Definition entity_change_event.hpp:33
A typed, thread-safe, in-process publish/subscribe event bus.
Definition event_bus.hpp:119
void publish(const Event &event)
Publish an event to all subscribers.
Definition event_bus.hpp:188
Event source that bridges PostgreSQL LISTEN/NOTIFY to the event bus.
Definition postgres_event_source.hpp:57
void start()
Start the event source.
Definition postgres_event_source.cpp:58
void stop()
Stop the event source.
Definition postgres_event_source.cpp:64
void register_mapping(const std::string &entity_name, const std::string &channel_name)
Register a mapping from entity name to typed domain event.
Definition postgres_event_source.hpp:108