NATS Monitor Design

Table of Contents

Overview

Extend ores.telemetry.service with a background NATS metrics poller. The poller periodically calls the NATS server's built-in HTTP monitoring API, stores the samples in two new TimescaleDB hypertables in the existing telemetry database, and exposes NATS request subjects so QueueChartWindow in ores.qt can render historical time-series charts.

Goals

  • Record per-stream JetStream metrics (messages, bytes, consumer count).
  • Record server-level metrics (throughput, connections, memory, slow consumers).
  • Power the currently-placeholder QueueChartWindow chart widget.
  • No new service binary or deployment unit.

Non-goals

  • Server-push / streaming (Qt polls on demand).
  • Aggregation or roll-up views (raw samples only for now).
  • Alerting.

Architecture

No new component. Three existing components are extended:

Component Change
ores.telemetry Two new domain types + protocol DTOs
ores.telemetry.database Two new hypertables + repository methods
ores.telemetry.service New nats_poller class + two new config fields
ores.qt Wire QueueChartWindow to real data

Data flow:

NATS server :8222/varz       ores.telemetry.service        TimescaleDB
NATS server :8222/jsz   -->  nats_poller (coroutine)  -->  ores_nats_server_samples_tbl
                                                       -->  ores_nats_stream_samples_tbl

ores.qt QueueChartWindow  -- NATS request/reply -->  telemetry registrar
                          <-- nats_stream_samples --  telemetry repository

Domain Model

New types in ores.telemetry/domain/

nats_server_sample.hpp:

struct nats_server_sample {
    std::chrono::system_clock::time_point sampled_at;
    std::uint64_t in_msgs{0};
    std::uint64_t out_msgs{0};
    std::uint64_t in_bytes{0};
    std::uint64_t out_bytes{0};
    int connections{0};
    std::uint64_t mem_bytes{0};
    int slow_consumers{0};
};

nats_stream_sample.hpp:

struct nats_stream_sample {
    std::chrono::system_clock::time_point sampled_at;
    std::string stream_name;
    std::uint64_t messages{0};
    std::uint64_t bytes{0};
    int consumer_count{0};
};

nats_samples_query.hpp (for Qt requests):

struct nats_server_samples_query {
    std::chrono::system_clock::time_point start_time;
    std::chrono::system_clock::time_point end_time;
    std::uint32_t limit{1000};
};

struct nats_stream_samples_query {
    std::string stream_name;
    std::chrono::system_clock::time_point start_time;
    std::chrono::system_clock::time_point end_time;
    std::uint32_t limit{1000};
};

New protocol types in ores.telemetry/messaging/

nats_samples_protocol.hpp:

struct get_nats_server_samples_request  { nats_server_samples_query query; };
struct get_nats_server_samples_response { bool success{false};
                                          std::vector<nats_server_sample> samples; };

struct get_nats_stream_samples_request  { nats_stream_samples_query query; };
struct get_nats_stream_samples_response { bool success{false};
                                          std::vector<nats_stream_sample> samples; };

Database Schema

Two new SQL files under ores.sql/create/telemetry/.

ores_nats_server_samples_tbl

create table if not exists ores_nats_server_samples_tbl (
    sampled_at    timestamptz  not null,
    in_msgs       bigint       not null default 0,
    out_msgs      bigint       not null default 0,
    in_bytes      bigint       not null default 0,
    out_bytes     bigint       not null default 0,
    connections   integer      not null default 0,
    mem_bytes     bigint       not null default 0,
    slow_consumers integer     not null default 0,
    primary key (sampled_at)
);

perform public.create_hypertable(
    'ores_nats_server_samples_tbl', 'sampled_at',
    chunk_time_interval => interval '1 day',
    if_not_exists => true);

perform public.add_retention_policy(
    'ores_nats_server_samples_tbl',
    drop_after => interval '30 days',
    if_not_exists => true);

ores_nats_stream_samples_tbl

create table if not exists ores_nats_stream_samples_tbl (
    sampled_at     timestamptz  not null,
    stream_name    text         not null,
    messages       bigint       not null default 0,
    bytes          bigint       not null default 0,
    consumer_count integer      not null default 0,
    primary key (sampled_at, stream_name)
);

create index ores_nats_stream_samples_name_idx
    on ores_nats_stream_samples_tbl (stream_name, sampled_at desc);

perform public.create_hypertable(
    'ores_nats_stream_samples_tbl', 'sampled_at',
    chunk_time_interval => interval '1 day',
    if_not_exists => true);

perform public.add_retention_policy(
    'ores_nats_stream_samples_tbl',
    drop_after => interval '30 days',
    if_not_exists => true);

NATS Poller

New class nats_poller in ores.telemetry.service/app/

class nats_poller {
public:
    nats_poller(std::string monitor_url,
                std::chrono::seconds interval,
                ores::database::context ctx);

    boost::asio::awaitable<void>
    run(boost::asio::io_context& io_ctx);

private:
    nats_server_sample  poll_varz() const;
    std::vector<nats_stream_sample> poll_jsz() const;

    std::string monitor_url_;
    std::chrono::seconds interval_;
    ores::database::context ctx_;
};

The run() coroutine loops on a steady_timer:

while (true) {
    try {
        repository_.insert_server_sample(ctx_, poll_varz());
        repository_.insert_stream_samples(ctx_, poll_jsz());
    } catch (const std::exception& e) {
        BOOST_LOG_SEV(lg(), warn) << "NATS poll failed: " << e.what();
    }
    timer.expires_after(interval_);
    co_await timer.async_wait(boost::asio::use_awaitable);
}

HTTP calls to :8222/varz and :8222/jsz?streams=true use Boost.Beast (synchronous, localhost — latency is negligible). JSON parsing via rfl::json.

Changes to application::run()

After registering NATS subscriptions, launch the poller as a detached coroutine:

auto poller = std::make_shared<nats_poller>(
    cfg.nats_monitor_url, cfg.sample_interval, ctx);
boost::asio::co_spawn(io_ctx,
    poller->run(io_ctx), boost::asio::detached);

Configuration

Two new fields in ores.telemetry.service/config/options.hpp:

/// URL of the NATS server monitoring HTTP endpoint.
std::string nats_monitor_url{"http://localhost:8222"};

/// How often to poll the monitoring endpoint.
std::chrono::seconds sample_interval{30};

Two new CLI flags in parser.cpp:

Flag Default Env var
--nats-monitor-url http://localhost:8222 SERVICE_NATS_MONITOR_URL
--sample-interval 30 (seconds) SERVICE_SAMPLE_INTERVAL

NATS Subjects

Two new subjects handled by the telemetry registrar (following the existing telemetry.v1.* convention):

Subject Handler
telemetry.v1.nats.server-samples.list Return server samples for range
telemetry.v1.nats.stream-samples.list Return stream samples for range

Repository Changes

New methods on telemetry_repository:

void insert_server_sample(context ctx,
    const domain::nats_server_sample& sample);

void insert_stream_samples(context ctx,
    const std::vector<domain::nats_stream_sample>& samples);

std::vector<domain::nats_server_sample>
query_server_samples(context ctx,
    const domain::nats_server_samples_query& q);

std::vector<domain::nats_stream_sample>
query_stream_samples(context ctx,
    const domain::nats_stream_samples_query& q);

Each follows the existing sqlgen + begin_transaction / commit pattern. New entity structs and mapper methods follow the telemetry_entity / telemetry_mapper pattern already in ores.telemetry.database.

Qt Integration

QueueChartWindow currently shows a static placeholder message. Once the subjects are live it sends a get_nats_stream_samples_request via the NATS session, receives the response, and plots bytes and message count as two QLineSeries on the existing QChart. Time-range selector already wired up.

Files Changed

New

  • ores.telemetry/include/ores.telemetry/domain/nats_server_sample.hpp
  • ores.telemetry/include/ores.telemetry/domain/nats_stream_sample.hpp
  • ores.telemetry/include/ores.telemetry/domain/nats_samples_query.hpp
  • ores.telemetry/include/ores.telemetry/messaging/nats_samples_protocol.hpp
  • ores.telemetry.service/include/ores.telemetry.service/app/nats_poller.hpp
  • ores.telemetry.service/src/app/nats_poller.cpp
  • ores.sql/create/telemetry/nats_server_samples_create.sql
  • ores.sql/create/telemetry/nats_stream_samples_create.sql

Modified

  • ores.telemetry.database/include/…/repository/telemetry_repository.hpp — new methods
  • ores.telemetry.database/src/repository/telemetry_repository.cpp — implement
  • ores.telemetry.database/src/repository/telemetry_entity.hpp — new entities
  • ores.telemetry.database/src/repository/telemetry_mapper.hpp/cpp — new mappers
  • ores.telemetry/src/messaging/registrar.cpp — add two new subject handlers
  • ores.telemetry.service/config/options.hpp — add two fields
  • ores.telemetry.service/src/config/parser.cpp — add two CLI flags
  • ores.telemetry.service/src/app/application.cpp — launch poller coroutine
  • ores.telemetry.service/src/CMakeLists.txt — add Boost::beast
  • ores.qt/include/ores.qt/QueueChartWindow.hpp — add load/series methods
  • ores.qt/src/QueueChartWindow.cpp — implement chart data loading