20#ifndef ORES_SERVICE_SERVICE_HEARTBEAT_PUBLISHER_HPP
21#define ORES_SERVICE_SERVICE_HEARTBEAT_PUBLISHER_HPP
25#include <boost/asio/awaitable.hpp>
26#include <boost/asio/steady_timer.hpp>
27#include <boost/asio/this_coro.hpp>
28#include <boost/asio/use_awaitable.hpp>
29#include <boost/system/system_error.hpp>
30#include <boost/uuid/uuid_io.hpp>
31#include <rfl/json.hpp>
32#include "ores.logging/make_logger.hpp"
33#include "ores.nats/service/client.hpp"
34#include "ores.utility/uuid/uuid_v7_generator.hpp"
35#include "ores.telemetry/messaging/service_samples_protocol.hpp"
37namespace ores::service::service {
53 inline static std::string_view logger_name =
54 "ores.service.service.heartbeat_publisher";
56 [[nodiscard]]
static auto& lg() {
58 static auto instance = make_logger(logger_name);
66 std::uint32_t interval_seconds = 15)
67 : service_name_(std::move(service_name))
68 , version_(std::move(version))
70 , interval_seconds_(interval_seconds) {
73 instance_id_ = boost::uuids::to_string(gen());
82 boost::asio::awaitable<void>
run() {
83 BOOST_LOG_SEV(lg(), ores::logging::info)
84 <<
"Heartbeat publisher started for '" << service_name_
85 <<
"' instance=" << instance_id_
86 <<
" interval=" << interval_seconds_ <<
"s";
88 auto executor =
co_await boost::asio::this_coro::executor;
89 boost::asio::steady_timer timer(executor);
95 std::chrono::seconds(interval_seconds_));
96 co_await timer.async_wait(boost::asio::use_awaitable);
98 }
catch (
const boost::system::system_error& e) {
99 if (e.code() != boost::asio::error::operation_aborted) {
100 BOOST_LOG_SEV(lg(), ores::logging::warn)
101 <<
"Heartbeat timer error: " << e.what();
105 BOOST_LOG_SEV(lg(), ores::logging::info)
106 <<
"Heartbeat publisher stopped for '" << service_name_ <<
"'";
110 void publish_once() {
117 const auto json = rfl::json::write(hb);
118 std::vector<std::byte> data(
119 reinterpret_cast<const std::byte*
>(json.data()),
120 reinterpret_cast<const std::byte*
>(json.data() + json.size()));
122 telemetry::messaging::service_heartbeat_message::nats_subject,
124 BOOST_LOG_SEV(lg(), ores::logging::trace)
125 <<
"Heartbeat published: " << service_name_;
126 }
catch (
const std::exception& e) {
127 BOOST_LOG_SEV(lg(), ores::logging::warn)
128 <<
"Failed to publish heartbeat: " << e.what();
132 std::string service_name_;
133 std::string instance_id_;
134 std::string version_;
136 std::uint32_t interval_seconds_;
Implements logging infrastructure for ORE Studio.
Definition boost_severity.hpp:28
NATS client: connection, pub/sub, request/reply, and JetStream.
Definition client.hpp:73
void publish(std::string_view subject, std::span< const std::byte > data, std::unordered_map< std::string, std::string > headers={})
Publish a message to a subject.
Definition client.cpp:298
Background coroutine that publishes a heartbeat on a timer.
Definition heartbeat_publisher.hpp:51
boost::asio::awaitable< void > run()
Runs the heartbeat loop as an awaitable coroutine.
Definition heartbeat_publisher.hpp:82
Fire-and-forget heartbeat published by every domain service.
Definition service_samples_protocol.hpp:40
std::string service_name
Canonical service name (e.g. "ores.compute.service").
Definition service_samples_protocol.hpp:45
std::string version
Service version string (e.g. "1.0").
Definition service_samples_protocol.hpp:55
std::string instance_id
Per-process UUID, generated once at service startup.
Definition service_samples_protocol.hpp:52
A generator for UUID version 7 (v7) based on RFC 9562.
Definition uuid_v7_generator.hpp:50