20#ifndef ORES_SERVICE_SERVICE_DOMAIN_SERVICE_RUNNER_IMPL_HPP
21#define ORES_SERVICE_SERVICE_DOMAIN_SERVICE_RUNNER_IMPL_HPP
26#include <boost/asio/bind_cancellation_slot.hpp>
27#include <boost/asio/cancellation_signal.hpp>
28#include <boost/asio/co_spawn.hpp>
29#include <boost/asio/error.hpp>
30#include <boost/asio/signal_set.hpp>
31#include <boost/asio/use_awaitable.hpp>
32#include <boost/system/error_code.hpp>
33#include "ores.logging/make_logger.hpp"
34#include "ores.nats/service/jwks.hpp"
35#include "ores.security/jwt/jwt_authenticator.hpp"
37namespace ores::service::service {
39template<
typename RegisterFn>
40boost::asio::awaitable<void>
41run(boost::asio::io_context& io_ctx,
44 std::string_view name,
45 RegisterFn&& register_fn,
46 std::function<
void(boost::asio::io_context&)> on_started) {
49 static const std::string_view logger_name =
"ores.service.service.runner";
50 static auto& lg = []() ->
auto& {
51 static auto instance = make_logger(logger_name);
59 boost::asio::cancellation_signal startup_cancel;
60 boost::asio::signal_set signals(io_ctx, SIGINT, SIGTERM);
61 signals.async_wait([&startup_cancel](
const boost::system::error_code& ec,
int) {
62 if (!ec) startup_cancel.emit(boost::asio::cancellation_type::all);
70 pub_key =
co_await boost::asio::co_spawn(
72 ores::nats::service::fetch_jwks_public_key(nats),
73 boost::asio::bind_cancellation_slot(
74 startup_cancel.slot(), boost::asio::use_awaitable));
75 }
catch (
const boost::system::system_error& e) {
76 if (e.code() != boost::asio::error::operation_aborted)
78 BOOST_LOG_SEV(lg, info) <<
"Shutdown signal received during startup.";
79 BOOST_LOG_SEV(lg, info) <<
"Shutdown complete: " << name;
82 BOOST_LOG_SEV(lg, info) <<
"Fetched JWKS public key from IAM";
89 std::optional<ores::security::jwt::jwt_authenticator> verifier =
92 auto subs = register_fn(nats, std::move(ctx), std::move(verifier));
93 BOOST_LOG_SEV(lg, info) <<
"Registered " << subs.size() <<
" subscription(s).";
94 for (
const auto& sub : subs)
95 BOOST_LOG_SEV(lg, info) <<
"NATS subscribe: " << sub.subject();
100 BOOST_LOG_SEV(lg, info) <<
"Service ready.";
101 BOOST_LOG_SEV(lg, info) <<
"Waiting for requests...";
102 co_await signals.async_wait(boost::asio::use_awaitable);
104 BOOST_LOG_SEV(lg, info) <<
"Shutdown signal received. Draining...";
106 BOOST_LOG_SEV(lg, info) <<
"Shutdown complete: " << name;
Implements logging infrastructure for ORE Studio.
Definition boost_severity.hpp:28
Context for the operations on a postgres database.
Definition context.hpp:47
NATS client: connection, pub/sub, request/reply, and JetStream.
Definition client.hpp:73
void drain()
Graceful shutdown.
Definition client.cpp:489
static jwt_authenticator create_rs256_verifier(const std::string &public_key_pem, const std::string &issuer="", const std::string &audience="")
Creates an RS256 verifier using an RSA public key (PEM).
Definition jwt_authenticator.cpp:71