ORE Studio 0.0.4
Loading...
Searching...
No Matches
heartbeat_publisher.hpp
1/* -*- mode: c++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*-
2 *
3 * Copyright (C) 2026 Marco Craveiro <marco.craveiro@gmail.com>
4 *
5 * This program is free software; you can redistribute it and/or modify it under
6 * the terms of the GNU General Public License as published by the Free Software
7 * Foundation; either version 3 of the License, or (at your option) any later
8 * version.
9 *
10 * This program is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
12 * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
13 * details.
14 *
15 * You should have received a copy of the GNU General Public License along with
16 * this program; if not, write to the Free Software Foundation, Inc., 51
17 * Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18 *
19 */
20#ifndef ORES_SERVICE_SERVICE_HEARTBEAT_PUBLISHER_HPP
21#define ORES_SERVICE_SERVICE_HEARTBEAT_PUBLISHER_HPP
22
23#include <string>
24#include <cstdint>
25#include <boost/asio/awaitable.hpp>
26#include <boost/asio/steady_timer.hpp>
27#include <boost/asio/this_coro.hpp>
28#include <boost/asio/use_awaitable.hpp>
29#include <boost/system/system_error.hpp>
30#include <boost/uuid/uuid_io.hpp>
31#include <rfl/json.hpp>
32#include "ores.logging/make_logger.hpp"
33#include "ores.nats/service/client.hpp"
34#include "ores.utility/uuid/uuid_v7_generator.hpp"
35#include "ores.telemetry/messaging/service_samples_protocol.hpp"
36
37namespace ores::service::service {
38
52private:
53 inline static std::string_view logger_name =
54 "ores.service.service.heartbeat_publisher";
55
56 [[nodiscard]] static auto& lg() {
57 using namespace ores::logging;
58 static auto instance = make_logger(logger_name);
59 return instance;
60 }
61
62public:
63 heartbeat_publisher(std::string service_name,
64 std::string version,
66 std::uint32_t interval_seconds = 15)
67 : service_name_(std::move(service_name))
68 , version_(std::move(version))
69 , nats_(nats)
70 , interval_seconds_(interval_seconds) {
71 // Generate a stable per-process instance UUID.
73 instance_id_ = boost::uuids::to_string(gen());
74 }
75
82 boost::asio::awaitable<void> run() {
83 BOOST_LOG_SEV(lg(), ores::logging::info)
84 << "Heartbeat publisher started for '" << service_name_
85 << "' instance=" << instance_id_
86 << " interval=" << interval_seconds_ << "s";
87
88 auto executor = co_await boost::asio::this_coro::executor;
89 boost::asio::steady_timer timer(executor);
90
91 try {
92 for (;;) {
93 publish_once();
94 timer.expires_after(
95 std::chrono::seconds(interval_seconds_));
96 co_await timer.async_wait(boost::asio::use_awaitable);
97 }
98 } catch (const boost::system::system_error& e) {
99 if (e.code() != boost::asio::error::operation_aborted) {
100 BOOST_LOG_SEV(lg(), ores::logging::warn)
101 << "Heartbeat timer error: " << e.what();
102 }
103 }
104
105 BOOST_LOG_SEV(lg(), ores::logging::info)
106 << "Heartbeat publisher stopped for '" << service_name_ << "'";
107 }
108
109private:
110 void publish_once() {
111 try {
113 hb.service_name = service_name_;
114 hb.instance_id = instance_id_;
115 hb.version = version_;
116
117 const auto json = rfl::json::write(hb);
118 std::vector<std::byte> data(
119 reinterpret_cast<const std::byte*>(json.data()),
120 reinterpret_cast<const std::byte*>(json.data() + json.size()));
121 nats_.publish(
122 telemetry::messaging::service_heartbeat_message::nats_subject,
123 std::move(data));
124 BOOST_LOG_SEV(lg(), ores::logging::trace)
125 << "Heartbeat published: " << service_name_;
126 } catch (const std::exception& e) {
127 BOOST_LOG_SEV(lg(), ores::logging::warn)
128 << "Failed to publish heartbeat: " << e.what();
129 }
130 }
131
132 std::string service_name_;
133 std::string instance_id_;
134 std::string version_;
136 std::uint32_t interval_seconds_;
137};
138
139} // namespace ores::service::service
140
141#endif
Implements logging infrastructure for ORE Studio.
Definition boost_severity.hpp:28
NATS client: connection, pub/sub, request/reply, and JetStream.
Definition client.hpp:73
void publish(std::string_view subject, std::span< const std::byte > data, std::unordered_map< std::string, std::string > headers={})
Publish a message to a subject.
Definition client.cpp:298
Background coroutine that publishes a heartbeat on a timer.
Definition heartbeat_publisher.hpp:51
boost::asio::awaitable< void > run()
Runs the heartbeat loop as an awaitable coroutine.
Definition heartbeat_publisher.hpp:82
Fire-and-forget heartbeat published by every domain service.
Definition service_samples_protocol.hpp:40
std::string service_name
Canonical service name (e.g. "ores.compute.service").
Definition service_samples_protocol.hpp:45
std::string version
Service version string (e.g. "1.0").
Definition service_samples_protocol.hpp:55
std::string instance_id
Per-process UUID, generated once at service startup.
Definition service_samples_protocol.hpp:52
A generator for UUID version 7 (v7) based on RFC 9562.
Definition uuid_v7_generator.hpp:50