ORE Studio 0.0.4
Loading...
Searching...
No Matches
postgres_listener_service.hpp
1/* -*- mode: c++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*-
2 *
3 * Copyright (C) 2025 Marco Craveiro <marco.craveiro@gmail.com>
4 *
5 * This program is free software; you can redistribute it and/or modify it under
6 * the terms of the GNU General Public License as published by the Free Software
7 * Foundation; either version 3 of the License, or (at your option) any later
8 * version.
9 *
10 * This program is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
12 * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
13 * details.
14 *
15 * You should have received a copy of the GNU General Public License along with
16 * this program; if not, write to the Free Software Foundation, Inc., 51
17 * Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18 *
19 */
20#ifndef ORES_EVENTING_SERVICE_POSTGRES_LISTENER_SERVICE_HPP
21#define ORES_EVENTING_SERVICE_POSTGRES_LISTENER_SERVICE_HPP
22
23#include <mutex>
24#include <atomic>
25#include <string>
26#include <thread>
27#include <vector>
28#include <optional>
29#include <functional>
30#include <condition_variable>
31#include <sqlgen/postgres.hpp>
32#include "ores.logging/make_logger.hpp"
33#include "ores.database/domain/context.hpp"
34#include "ores.eventing/domain/entity_change_event.hpp"
35
37
50private:
51 [[nodiscard]] static auto& lg() {
52 using namespace ores::logging;
53 static auto instance = make_logger(
54 "ores.eventing.service.postgres_listener_service");
55 return instance;
56 }
57
58public:
66 using notification_callback_t = std::function<void(const domain::entity_change_event&)>;
67
80
87
88 // Deleted copy constructor and assignment operator to prevent copies.
90 postgres_listener_service& operator=(const postgres_listener_service&) = delete;
91
98 void start();
99
103 void stop();
104
116 void subscribe(const std::string& channel_name);
117
124 void notify(const std::string& channel_name, const std::string& payload);
125
135 bool wait_until_ready(std::chrono::milliseconds timeout = std::chrono::seconds(5));
136
137private:
143 bool open_connection();
144
151 void issue_pending_listens();
152
158 void listen_loop();
159
167 void handle_notification(const sqlgen::postgres::Notification& notification);
168
169private:
171 notification_callback_t notification_callback_;
172
173 mutable std::mutex mutex_;
174 std::optional<rfl::Ref<sqlgen::postgres::Connection>> connection_;
175 std::vector<std::string> subscribed_channels_;
176
177 std::thread listener_thread_;
178 std::atomic<bool> running_;
179
180 std::condition_variable ready_cv_;
181 bool ready_{false};
182};
183
184}
185
186#endif
Event bus and related services.
Definition event_bus.hpp:33
Implements logging infrastructure for ORE Studio.
Definition boost_severity.hpp:28
Context for the operations on a postgres database.
Definition context.hpp:30
Represents a low-level notification about a change to an entity at the repository level.
Definition entity_change_event.hpp:33
Manages a dedicated PostgreSQL connection to listen for NOTIFY events.
Definition postgres_listener_service.hpp:49
std::function< void(const domain::entity_change_event &)> notification_callback_t
Type alias for the notification callback function.
Definition postgres_listener_service.hpp:66
bool wait_until_ready(std::chrono::milliseconds timeout=std::chrono::seconds(5))
Waits until the listener is ready to receive notifications.
Definition postgres_listener_service.cpp:123
void start()
Starts the listener thread and begins listening for notifications.
Definition postgres_listener_service.cpp:88
void stop()
Stops the listener thread and waits for it to join.
Definition postgres_listener_service.cpp:104
void notify(const std::string &channel_name, const std::string &payload)
Sends a NOTIFY on a PostgreSQL channel.
Definition postgres_listener_service.cpp:158
~postgres_listener_service()
Destroys the postgres_listener_service.
Definition postgres_listener_service.cpp:42
void subscribe(const std::string &channel_name)
Subscribes to a PostgreSQL NOTIFY channel.
Definition postgres_listener_service.cpp:128