NATS Monitor Design
Table of Contents
Overview
Extend ores.telemetry.service with a background NATS metrics poller. The
poller periodically calls the NATS server's built-in HTTP monitoring API, stores
the samples in two new TimescaleDB hypertables in the existing telemetry
database, and exposes NATS request subjects so QueueChartWindow in ores.qt
can render historical time-series charts.
Goals
- Record per-stream JetStream metrics (messages, bytes, consumer count).
- Record server-level metrics (throughput, connections, memory, slow consumers).
- Power the currently-placeholder
QueueChartWindowchart widget. - No new service binary or deployment unit.
Non-goals
- Server-push / streaming (Qt polls on demand).
- Aggregation or roll-up views (raw samples only for now).
- Alerting.
Architecture
No new component. Three existing components are extended:
| Component | Change |
|---|---|
ores.telemetry |
Two new domain types + protocol DTOs |
ores.telemetry.database |
Two new hypertables + repository methods |
ores.telemetry.service |
New nats_poller class + two new config fields |
ores.qt |
Wire QueueChartWindow to real data |
Data flow:
NATS server :8222/varz ores.telemetry.service TimescaleDB
NATS server :8222/jsz --> nats_poller (coroutine) --> ores_nats_server_samples_tbl
--> ores_nats_stream_samples_tbl
ores.qt QueueChartWindow -- NATS request/reply --> telemetry registrar
<-- nats_stream_samples -- telemetry repository
Domain Model
New types in ores.telemetry/domain/
nats_server_sample.hpp:
struct nats_server_sample { std::chrono::system_clock::time_point sampled_at; std::uint64_t in_msgs{0}; std::uint64_t out_msgs{0}; std::uint64_t in_bytes{0}; std::uint64_t out_bytes{0}; int connections{0}; std::uint64_t mem_bytes{0}; int slow_consumers{0}; };
nats_stream_sample.hpp:
struct nats_stream_sample { std::chrono::system_clock::time_point sampled_at; std::string stream_name; std::uint64_t messages{0}; std::uint64_t bytes{0}; int consumer_count{0}; };
nats_samples_query.hpp (for Qt requests):
struct nats_server_samples_query { std::chrono::system_clock::time_point start_time; std::chrono::system_clock::time_point end_time; std::uint32_t limit{1000}; }; struct nats_stream_samples_query { std::string stream_name; std::chrono::system_clock::time_point start_time; std::chrono::system_clock::time_point end_time; std::uint32_t limit{1000}; };
New protocol types in ores.telemetry/messaging/
nats_samples_protocol.hpp:
struct get_nats_server_samples_request { nats_server_samples_query query; }; struct get_nats_server_samples_response { bool success{false}; std::vector<nats_server_sample> samples; }; struct get_nats_stream_samples_request { nats_stream_samples_query query; }; struct get_nats_stream_samples_response { bool success{false}; std::vector<nats_stream_sample> samples; };
Database Schema
Two new SQL files under ores.sql/create/telemetry/.
ores_nats_server_samples_tbl
create table if not exists ores_nats_server_samples_tbl ( sampled_at timestamptz not null, in_msgs bigint not null default 0, out_msgs bigint not null default 0, in_bytes bigint not null default 0, out_bytes bigint not null default 0, connections integer not null default 0, mem_bytes bigint not null default 0, slow_consumers integer not null default 0, primary key (sampled_at) ); perform public.create_hypertable( 'ores_nats_server_samples_tbl', 'sampled_at', chunk_time_interval => interval '1 day', if_not_exists => true); perform public.add_retention_policy( 'ores_nats_server_samples_tbl', drop_after => interval '30 days', if_not_exists => true);
ores_nats_stream_samples_tbl
create table if not exists ores_nats_stream_samples_tbl ( sampled_at timestamptz not null, stream_name text not null, messages bigint not null default 0, bytes bigint not null default 0, consumer_count integer not null default 0, primary key (sampled_at, stream_name) ); create index ores_nats_stream_samples_name_idx on ores_nats_stream_samples_tbl (stream_name, sampled_at desc); perform public.create_hypertable( 'ores_nats_stream_samples_tbl', 'sampled_at', chunk_time_interval => interval '1 day', if_not_exists => true); perform public.add_retention_policy( 'ores_nats_stream_samples_tbl', drop_after => interval '30 days', if_not_exists => true);
NATS Poller
New class nats_poller in ores.telemetry.service/app/
class nats_poller { public: nats_poller(std::string monitor_url, std::chrono::seconds interval, ores::database::context ctx); boost::asio::awaitable<void> run(boost::asio::io_context& io_ctx); private: nats_server_sample poll_varz() const; std::vector<nats_stream_sample> poll_jsz() const; std::string monitor_url_; std::chrono::seconds interval_; ores::database::context ctx_; };
The run() coroutine loops on a steady_timer:
while (true) { try { repository_.insert_server_sample(ctx_, poll_varz()); repository_.insert_stream_samples(ctx_, poll_jsz()); } catch (const std::exception& e) { BOOST_LOG_SEV(lg(), warn) << "NATS poll failed: " << e.what(); } timer.expires_after(interval_); co_await timer.async_wait(boost::asio::use_awaitable); }
HTTP calls to :8222/varz and :8222/jsz?streams=true use Boost.Beast
(synchronous, localhost — latency is negligible). JSON parsing via rfl::json.
Changes to application::run()
After registering NATS subscriptions, launch the poller as a detached coroutine:
auto poller = std::make_shared<nats_poller>( cfg.nats_monitor_url, cfg.sample_interval, ctx); boost::asio::co_spawn(io_ctx, poller->run(io_ctx), boost::asio::detached);
Configuration
Two new fields in ores.telemetry.service/config/options.hpp:
/// URL of the NATS server monitoring HTTP endpoint. std::string nats_monitor_url{"http://localhost:8222"}; /// How often to poll the monitoring endpoint. std::chrono::seconds sample_interval{30};
Two new CLI flags in parser.cpp:
| Flag | Default | Env var |
|---|---|---|
--nats-monitor-url |
http://localhost:8222 |
SERVICE_NATS_MONITOR_URL |
--sample-interval |
30 (seconds) |
SERVICE_SAMPLE_INTERVAL |
NATS Subjects
Two new subjects handled by the telemetry registrar (following the existing
telemetry.v1.* convention):
| Subject | Handler |
|---|---|
telemetry.v1.nats.server-samples.list |
Return server samples for range |
telemetry.v1.nats.stream-samples.list |
Return stream samples for range |
Repository Changes
New methods on telemetry_repository:
void insert_server_sample(context ctx, const domain::nats_server_sample& sample); void insert_stream_samples(context ctx, const std::vector<domain::nats_stream_sample>& samples); std::vector<domain::nats_server_sample> query_server_samples(context ctx, const domain::nats_server_samples_query& q); std::vector<domain::nats_stream_sample> query_stream_samples(context ctx, const domain::nats_stream_samples_query& q);
Each follows the existing sqlgen + begin_transaction / commit pattern.
New entity structs and mapper methods follow the telemetry_entity /
telemetry_mapper pattern already in ores.telemetry.database.
Qt Integration
QueueChartWindow currently shows a static placeholder message. Once the
subjects are live it sends a get_nats_stream_samples_request via the NATS
session, receives the response, and plots bytes and message count as two
QLineSeries on the existing QChart. Time-range selector already wired up.
Files Changed
New
ores.telemetry/include/ores.telemetry/domain/nats_server_sample.hppores.telemetry/include/ores.telemetry/domain/nats_stream_sample.hppores.telemetry/include/ores.telemetry/domain/nats_samples_query.hppores.telemetry/include/ores.telemetry/messaging/nats_samples_protocol.hppores.telemetry.service/include/ores.telemetry.service/app/nats_poller.hppores.telemetry.service/src/app/nats_poller.cppores.sql/create/telemetry/nats_server_samples_create.sqlores.sql/create/telemetry/nats_stream_samples_create.sql
Modified
ores.telemetry.database/include/…/repository/telemetry_repository.hpp— new methodsores.telemetry.database/src/repository/telemetry_repository.cpp— implementores.telemetry.database/src/repository/telemetry_entity.hpp— new entitiesores.telemetry.database/src/repository/telemetry_mapper.hpp/cpp— new mappersores.telemetry/src/messaging/registrar.cpp— add two new subject handlersores.telemetry.service/config/options.hpp— add two fieldsores.telemetry.service/src/config/parser.cpp— add two CLI flagsores.telemetry.service/src/app/application.cpp— launch poller coroutineores.telemetry.service/src/CMakeLists.txt— addBoost::beastores.qt/include/ores.qt/QueueChartWindow.hpp— add load/series methodsores.qt/src/QueueChartWindow.cpp— implement chart data loading