Compute Grid Telemetry Design

Table of Contents

Overview

The compute dashboard currently reads stats via ad-hoc live queries at refresh time. This is the wrong approach. All dashboard statistics must come from TimescaleDB time-series tables written by their owning services — the same pattern used by the NATS telemetry system in ores.telemetry.service.

This design introduces a compute grid telemetry system with two distinct samplers, three new hypertables, and a NATS request/reply protocol so the dashboard can query the latest snapshot without issuing live queries.

Goals

  • All compute dashboard stats served from TimescaleDB hypertables.
  • Server-side grid stats (hosts, agents, workunits, results) owned by ores.compute.service.
  • Per-node execution stats (task count, duration, bytes) owned by ores.compute.wrapper.
  • 30-second sample interval, configurable via a system settings flag.
  • Dashboard reads the most recent snapshot via a NATS request/reply call.
  • No ad-hoc live queries remain in ComputeDashboardMdiWindow.

Out of Scope

  • Continuous aggregates (hourly/daily rollups) — add later once base tables are proven.
  • Alerting thresholds — add later.
  • Per-workunit timing at the service level — wrapper already tracks this per task.

Architecture

┌─────────────────────────────────────────────────────────────┐
│  ores.compute.service                                        │
│                                                             │
│  compute_grid_poller   ──── every 30s ────►  TimescaleDB   │
│  (new, analogous to                          ores_compute_  │
│   nats_poller)                               grid_samples_  │
│                                              tbl            │
│  get_grid_stats handler ◄── NATS request ───  Dashboard    │
│                          ── NATS reply  ───►               │
└─────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────┐
│  ores.compute.wrapper  (each node)                          │
│                                                             │
│  node_stats_reporter   ──── every 30s ────►  TimescaleDB   │
│  (new, NATS publish)                         ores_compute_  │
│                                              node_samples_  │
│                                              tbl            │
└─────────────────────────────────────────────────────────────┘

SQL Schema

Naming Conventions

Tables follow the ores_compute_ prefix matching the component name, with the _tbl suffix and _samples_ infix for time-series tables, per the SQL Schema Creator skill.

Table: ores_compute_grid_samples_tbl

Owned by ores.compute.service. One row per sample interval, per tenant. Captures server-side aggregate counts derived from a single consistent database read.

create table if not exists ores_compute_grid_samples_tbl (
    "sampled_at"            timestamp with time zone not null,
    "tenant_id"             uuid                     not null,

    -- Host counts
    "total_hosts"           integer not null default 0,
    "online_hosts"          integer not null default 0,
    "idle_hosts"            integer not null default 0,

    -- Result state breakdown (server_state codes)
    "results_inactive"      integer not null default 0,  -- state 1
    "results_unsent"        integer not null default 0,  -- state 2
    "results_in_progress"   integer not null default 0,  -- state 4
    "results_done"          integer not null default 0,  -- state 5

    -- Workunit counts
    "total_workunits"       integer not null default 0,

    -- Batch counts
    "total_batches"         integer not null default 0,
    "active_batches"        integer not null default 0,

    -- Outcome breakdown for Done results (latest 24h window)
    "outcomes_success"      integer not null default 0,  -- outcome 1
    "outcomes_client_error" integer not null default 0,  -- outcome 3
    "outcomes_no_reply"     integer not null default 0,  -- outcome 4

    primary key (sampled_at, tenant_id)
);

create index if not exists ores_compute_grid_samples_tenant_idx
    on ores_compute_grid_samples_tbl (tenant_id, sampled_at desc);

-- TimescaleDB hypertable (1-day chunks, 30-day retention)
do $$ ... $$;

Why separate outcome counts? The outcome breakdown only makes sense for completed results. Counting the 24h window avoids the column growing without bound for long-running grids.

Table: ores_compute_node_samples_tbl

Owned by wrapper nodes. One row per node per sample interval.

create table if not exists ores_compute_node_samples_tbl (
    "sampled_at"            timestamp with time zone not null,
    "tenant_id"             uuid                     not null,
    "host_id"               uuid                     not null,

    -- Cumulative task execution counters (reset on wrapper restart)
    "tasks_completed"       integer not null default 0,
    "tasks_failed"          integer not null default 0,

    -- Rolling totals since last sample
    "tasks_since_last"      integer not null default 0,

    -- Timing (milliseconds, average of tasks_since_last)
    "avg_task_duration_ms"  bigint  not null default 0,
    "max_task_duration_ms"  bigint  not null default 0,

    -- Bytes transferred since last sample
    "input_bytes_fetched"   bigint  not null default 0,
    "output_bytes_uploaded" bigint  not null default 0,

    -- Heartbeat liveness (seconds since last heartbeat)
    "seconds_since_hb"      integer not null default 0,

    primary key (sampled_at, tenant_id, host_id)
);

create index if not exists ores_compute_node_samples_host_idx
    on ores_compute_node_samples_tbl (host_id, sampled_at desc);

-- TimescaleDB hypertable (1-day chunks, 30-day retention)
do $$ ... $$;

Hypertable Block

Both tables use the same conditional hypertable block from the existing pattern (copied from nats_server_samples_create.sql):

do $$
declare
    tsdb_installed boolean;
begin
    select exists (
        select 1 from pg_extension where extname = 'timescaledb'
    ) into tsdb_installed;

    if tsdb_installed then
        perform public.create_hypertable(
            '<table_name>',
            'sampled_at',
            chunk_time_interval => interval '1 day',
            if_not_exists => true
        );

        declare
            current_license text;
        begin
            select current_setting('timescaledb.license', true)
                into current_license;
            if current_license = 'timescale' then
                perform public.add_retention_policy(
                    '<table_name>',
                    drop_after => interval '30 days',
                    if_not_exists => true
                );
            end if;
        end;
    end if;
end $$;

File Layout

Following the existing SQL file organisation:

projects/ores.sql/create/compute/
  compute_grid_samples_create.sql   -- grid table + hypertable block
  compute_node_samples_create.sql   -- node table + hypertable block
  compute_samples_create.sql        -- master orchestration file

projects/ores.sql/drop/compute/
  compute_grid_samples_drop.sql
  compute_node_samples_drop.sql
  compute_samples_drop.sql

The master compute_create.sql (already exists for domain tables) gains a \i compute/compute_samples_create.sql line. Likewise for compute_drop.sql.

C++ Domain Types

ores.compute — new structs in domain/ facet

// domain/grid_sample.hpp
struct grid_sample final {
    std::chrono::system_clock::time_point sampled_at;
    utility::uuid::tenant_id              tenant_id;
    int total_hosts{0};
    int online_hosts{0};
    int idle_hosts{0};
    int results_inactive{0};
    int results_unsent{0};
    int results_in_progress{0};
    int results_done{0};
    int total_workunits{0};
    int total_batches{0};
    int active_batches{0};
    int outcomes_success{0};
    int outcomes_client_error{0};
    int outcomes_no_reply{0};
};

// domain/node_sample.hpp
struct node_sample final {
    std::chrono::system_clock::time_point sampled_at;
    utility::uuid::tenant_id              tenant_id;
    boost::uuids::uuid                    host_id;
    int   tasks_completed{0};
    int   tasks_failed{0};
    int   tasks_since_last{0};
    std::int64_t avg_task_duration_ms{0};
    std::int64_t max_task_duration_ms{0};
    std::int64_t input_bytes_fetched{0};
    std::int64_t output_bytes_uploaded{0};
    int   seconds_since_hb{0};
};

Repository (ores.compute.database)

Add to the existing compute_repository (or a new dedicated compute_telemetry_repository):

void insert_grid_sample(const ores::database::context& ctx,
                        const domain::grid_sample& sample);

void insert_node_sample(const ores::database::context& ctx,
                        const domain::node_sample& sample);

// Returns the most recent sample for a given tenant
std::optional<domain::grid_sample>
latest_grid_sample(const ores::database::context& ctx);

std::vector<domain::node_sample>
latest_node_samples(const ores::database::context& ctx);

Server-Side Sampler (ores.compute.service)

New class: compute_grid_poller

Analogous to nats_poller in ores.telemetry.service.

class compute_grid_poller {
public:
    compute_grid_poller(std::uint32_t interval_seconds,
                        ores::database::context ctx);

    boost::asio::awaitable<void> run();

private:
    void poll_once();
    domain::grid_sample compute_sample();

    std::uint32_t interval_seconds_;
    ores::database::context ctx_;
    database::repository::compute_telemetry_repository repo_;
};

poll_once logic

1. Open a short-lived DB connection (or reuse ctx_).
2. Count hosts total.
3. Count hosts with last_rpc_time within 5 minutes  → online_hosts.
4. Count results with server_state = 4 (InProgress), get distinct host_ids
   → in_progress_host_ids.
5. idle_hosts = online_hosts − |in_progress_host_ids ∩ online_host_ids|.
6. Count results grouped by server_state  → results_* breakdown.
7. Count workunits total  → total_workunits.
8. Count batches total, batches with active results  → batch counts.
9. Count results done in last 24h grouped by outcome  → outcomes_*.
10. Write grid_sample row.

All queries are against the existing domain tables — no ad-hoc live path on the dashboard side.

Wiring into application::run()

// Alongside the existing signal wait, co_spawn the poller:
auto poller = compute_grid_poller(cfg.telemetry_interval_seconds, ctx);
co_await (
    signals.async_wait(use_awaitable)
    || poller.run()
);

Node-Side Sampler (ores.compute.wrapper)

New class: node_stats_reporter

Lives in app/node_stats_reporter.hpp. Runs on the heartbeat thread or as its own thread.

class node_stats_reporter {
public:
    node_stats_reporter(ores::nats::service::client& nats,
                        const std::string& host_id,
                        const std::string& tenant_id,
                        std::uint32_t interval_seconds);

    void start();
    void stop();

    // Called by process_assignment on each completed task
    void record_task_completed(std::chrono::milliseconds duration,
                               std::int64_t input_bytes,
                               std::int64_t output_bytes);
    void record_task_failed();

private:
    void run();
    void publish_sample();

    ores::nats::service::client& nats_;
    std::string host_id_;
    std::string tenant_id_;
    std::uint32_t interval_seconds_;
    std::atomic<bool> stop_{false};
    std::thread thread_;

    // Atomics for thread-safe accumulation
    std::atomic<int>          tasks_completed_{0};
    std::atomic<int>          tasks_failed_{0};
    std::atomic<int>          tasks_since_last_{0};
    std::atomic<std::int64_t> total_duration_since_last_{0};
    std::atomic<std::int64_t> max_duration_since_last_{0};
    std::atomic<std::int64_t> input_bytes_since_last_{0};
    std::atomic<std::int64_t> output_bytes_since_last_{0};
};

NATS subject for node samples

The reporter publishes to a JetStream subject consumed by the compute service for persistence:

compute.v1.telemetry.node_samples   (published by wrappers)

The compute service subscribes as a queue consumer and writes the incoming node_sample structs to ores_compute_node_samples_tbl. This keeps the wrapper stateless with respect to the database.

Alternative: direct NATS request/reply

If the node has no direct DB access (typical in multi-site deployments), the above publish-and-persist pattern is preferred. If nodes do have DB access, they can write directly — but keep the design consistent with the NATS-first approach used everywhere else.

NATS Protocol: Dashboard Stats Query

Subject

compute.v1.telemetry.get_grid_stats

Request

struct get_grid_stats_request {
    std::string tenant_id;  // JWT-derived; handler validates
};

Response

struct get_grid_stats_response {
    bool success{false};
    std::string message;

    // Grid sample fields (mirrors grid_sample domain type)
    int total_hosts{0};
    int online_hosts{0};
    int idle_hosts{0};
    int results_inactive{0};
    int results_unsent{0};
    int results_in_progress{0};
    int results_done{0};
    int total_workunits{0};
    int total_batches{0};
    int active_batches{0};
    int outcomes_success{0};
    int outcomes_client_error{0};
    int outcomes_no_reply{0};

    // When this sample was taken
    std::string sampled_at;  // ISO-8601

    // Latest node samples summary
    std::vector<node_sample_summary> node_summaries;
};

struct node_sample_summary {
    std::string host_id;
    int   tasks_completed{0};
    int   tasks_since_last{0};
    std::int64_t avg_task_duration_ms{0};
    std::int64_t input_bytes_fetched{0};
    std::int64_t output_bytes_uploaded{0};
    int   seconds_since_hb{0};
};

The handler reads latest_grid_sample() and latest_node_samples() from the repository — no live aggregation on the hot path.

Qt Dashboard Changes

Revert ad-hoc fix

Remove the online_host_ids / busy_host_ids logic added to ComputeDashboardMdiWindow::loadCounts(). The loadCounts() function is replaced by a single get_grid_stats request/reply call.

New loadCounts() implementation

void ComputeDashboardMdiWindow::loadCounts() {
    QPointer<ComputeDashboardMdiWindow> self = this;

    QFuture<DashboardCounts> future =
        QtConcurrent::run([self]() -> DashboardCounts {
            if (!self || !self->clientManager_) return {};

            compute::messaging::get_grid_stats_request req;
            auto resp = self->clientManager_->
                process_authenticated_request(std::move(req));

            if (!resp || !resp->success) return {};

            DashboardCounts c;
            c.total_hosts      = resp->total_hosts;
            c.idle_agents      = resp->idle_hosts;
            c.total_workunits  = resp->total_workunits;
            c.active_workunits = resp->results_in_progress;
            c.total_results    = resp->results_done;
            c.recent_results   = resp->outcomes_success;   // last 24h
            c.success = true;
            return c;
        });
    // ... watcher as before
}

Dashboard stat boxes

Box label Source field
Total Hosts total_hosts
Idle Agents idle_hosts
Total Workunits total_workunits
Active Workunits results_in_progress
Total Results results_done
Recent Results outcomes_success (24h)

Configuration

ores.compute.service options

Add to config/options.hpp:

/// Interval in seconds between compute grid telemetry samples.
/// 0 disables the sampler entirely.
std::uint32_t telemetry_interval_seconds{30};

CLI flag: --telemetry-interval (default 30).

ores.compute.wrapper options

Add to config/options.hpp:

/// Interval in seconds between per-node telemetry publishes.
/// 0 disables reporting.
std::uint32_t telemetry_interval_seconds{30};

CLI flag: --telemetry-interval (default 30).

Implementation Plan

Tasks are listed in dependency order. Each row is a single commit.

# Owner Task
1 ores.sql Create compute_grid_samples_create.sql
2 ores.sql Create compute_node_samples_create.sql
3 ores.sql Create drop scripts; wire into master orchestration
4 ores.compute Add grid_sample and node_sample domain structs
5 ores.compute.database Add insert_grid_sample, insert_node_sample, query methods
6 ores.compute Add NATS messages: get_grid_stats_request/response, node_sample_publish
7 ores.compute.service Implement compute_grid_poller
8 ores.compute.service Implement get_grid_stats handler
9 ores.compute.service Subscribe to compute.v1.telemetry.node_samples, persist to DB
10 ores.compute.service Wire poller + handler into application::run(); add config flag
11 ores.compute.wrapper Implement node_stats_reporter; instrument process_assignment
12 ores.compute.wrapper Wire reporter into application::run(); add config flag
13 ores.qt Rewrite loadCounts() to use get_grid_stats request/reply
14 ores.qt Populate all six dashboard boxes from response fields

Open Questions

  1. Node sample subject: should wrapper nodes publish node samples over NATS (service persists) or write directly to the database? NATS-first is preferred for consistency, but requires the compute service to subscribe to a second subject. Decision: NATS-first.
  2. Outcome 24h window: the 24h cutoff for outcomes_* is arbitrary. Could be made configurable, but YAGNI for now.
  3. Node summary in dashboard response: the node summaries add per-node detail to the grid stats response. The current dashboard doesn't show them, but they're cheap to include and useful for future node drill-down. Keep in response; dashboard ignores for now.