Compute Grid Telemetry Design
Table of Contents
Overview
The compute dashboard currently reads stats via ad-hoc live queries at refresh
time. This is the wrong approach. All dashboard statistics must come from
TimescaleDB time-series tables written by their owning services — the same
pattern used by the NATS telemetry system in ores.telemetry.service.
This design introduces a compute grid telemetry system with two distinct samplers, three new hypertables, and a NATS request/reply protocol so the dashboard can query the latest snapshot without issuing live queries.
Goals
- All compute dashboard stats served from TimescaleDB hypertables.
- Server-side grid stats (hosts, agents, workunits, results) owned by
ores.compute.service. - Per-node execution stats (task count, duration, bytes) owned by
ores.compute.wrapper. - 30-second sample interval, configurable via a system settings flag.
- Dashboard reads the most recent snapshot via a NATS request/reply call.
- No ad-hoc live queries remain in
ComputeDashboardMdiWindow.
Out of Scope
- Continuous aggregates (hourly/daily rollups) — add later once base tables are proven.
- Alerting thresholds — add later.
- Per-workunit timing at the service level — wrapper already tracks this per task.
Architecture
┌─────────────────────────────────────────────────────────────┐ │ ores.compute.service │ │ │ │ compute_grid_poller ──── every 30s ────► TimescaleDB │ │ (new, analogous to ores_compute_ │ │ nats_poller) grid_samples_ │ │ tbl │ │ get_grid_stats handler ◄── NATS request ─── Dashboard │ │ ── NATS reply ───► │ └─────────────────────────────────────────────────────────────┘ ┌─────────────────────────────────────────────────────────────┐ │ ores.compute.wrapper (each node) │ │ │ │ node_stats_reporter ──── every 30s ────► TimescaleDB │ │ (new, NATS publish) ores_compute_ │ │ node_samples_ │ │ tbl │ └─────────────────────────────────────────────────────────────┘
SQL Schema
Naming Conventions
Tables follow the ores_compute_ prefix matching the component name,
with the _tbl suffix and _samples_ infix for time-series tables, per
the SQL Schema Creator skill.
Table: ores_compute_grid_samples_tbl
Owned by ores.compute.service. One row per sample interval, per tenant.
Captures server-side aggregate counts derived from a single consistent
database read.
create table if not exists ores_compute_grid_samples_tbl ( "sampled_at" timestamp with time zone not null, "tenant_id" uuid not null, -- Host counts "total_hosts" integer not null default 0, "online_hosts" integer not null default 0, "idle_hosts" integer not null default 0, -- Result state breakdown (server_state codes) "results_inactive" integer not null default 0, -- state 1 "results_unsent" integer not null default 0, -- state 2 "results_in_progress" integer not null default 0, -- state 4 "results_done" integer not null default 0, -- state 5 -- Workunit counts "total_workunits" integer not null default 0, -- Batch counts "total_batches" integer not null default 0, "active_batches" integer not null default 0, -- Outcome breakdown for Done results (latest 24h window) "outcomes_success" integer not null default 0, -- outcome 1 "outcomes_client_error" integer not null default 0, -- outcome 3 "outcomes_no_reply" integer not null default 0, -- outcome 4 primary key (sampled_at, tenant_id) ); create index if not exists ores_compute_grid_samples_tenant_idx on ores_compute_grid_samples_tbl (tenant_id, sampled_at desc); -- TimescaleDB hypertable (1-day chunks, 30-day retention) do $$ ... $$;
Why separate outcome counts? The outcome breakdown only makes sense for completed results. Counting the 24h window avoids the column growing without bound for long-running grids.
Table: ores_compute_node_samples_tbl
Owned by wrapper nodes. One row per node per sample interval.
create table if not exists ores_compute_node_samples_tbl ( "sampled_at" timestamp with time zone not null, "tenant_id" uuid not null, "host_id" uuid not null, -- Cumulative task execution counters (reset on wrapper restart) "tasks_completed" integer not null default 0, "tasks_failed" integer not null default 0, -- Rolling totals since last sample "tasks_since_last" integer not null default 0, -- Timing (milliseconds, average of tasks_since_last) "avg_task_duration_ms" bigint not null default 0, "max_task_duration_ms" bigint not null default 0, -- Bytes transferred since last sample "input_bytes_fetched" bigint not null default 0, "output_bytes_uploaded" bigint not null default 0, -- Heartbeat liveness (seconds since last heartbeat) "seconds_since_hb" integer not null default 0, primary key (sampled_at, tenant_id, host_id) ); create index if not exists ores_compute_node_samples_host_idx on ores_compute_node_samples_tbl (host_id, sampled_at desc); -- TimescaleDB hypertable (1-day chunks, 30-day retention) do $$ ... $$;
Hypertable Block
Both tables use the same conditional hypertable block from the existing
pattern (copied from nats_server_samples_create.sql):
do $$ declare tsdb_installed boolean; begin select exists ( select 1 from pg_extension where extname = 'timescaledb' ) into tsdb_installed; if tsdb_installed then perform public.create_hypertable( '<table_name>', 'sampled_at', chunk_time_interval => interval '1 day', if_not_exists => true ); declare current_license text; begin select current_setting('timescaledb.license', true) into current_license; if current_license = 'timescale' then perform public.add_retention_policy( '<table_name>', drop_after => interval '30 days', if_not_exists => true ); end if; end; end if; end $$;
File Layout
Following the existing SQL file organisation:
projects/ores.sql/create/compute/ compute_grid_samples_create.sql -- grid table + hypertable block compute_node_samples_create.sql -- node table + hypertable block compute_samples_create.sql -- master orchestration file projects/ores.sql/drop/compute/ compute_grid_samples_drop.sql compute_node_samples_drop.sql compute_samples_drop.sql
The master compute_create.sql (already exists for domain tables) gains a
\i compute/compute_samples_create.sql line. Likewise for compute_drop.sql.
C++ Domain Types
ores.compute — new structs in domain/ facet
// domain/grid_sample.hpp struct grid_sample final { std::chrono::system_clock::time_point sampled_at; utility::uuid::tenant_id tenant_id; int total_hosts{0}; int online_hosts{0}; int idle_hosts{0}; int results_inactive{0}; int results_unsent{0}; int results_in_progress{0}; int results_done{0}; int total_workunits{0}; int total_batches{0}; int active_batches{0}; int outcomes_success{0}; int outcomes_client_error{0}; int outcomes_no_reply{0}; }; // domain/node_sample.hpp struct node_sample final { std::chrono::system_clock::time_point sampled_at; utility::uuid::tenant_id tenant_id; boost::uuids::uuid host_id; int tasks_completed{0}; int tasks_failed{0}; int tasks_since_last{0}; std::int64_t avg_task_duration_ms{0}; std::int64_t max_task_duration_ms{0}; std::int64_t input_bytes_fetched{0}; std::int64_t output_bytes_uploaded{0}; int seconds_since_hb{0}; };
Repository (ores.compute.database)
Add to the existing compute_repository (or a new dedicated
compute_telemetry_repository):
void insert_grid_sample(const ores::database::context& ctx, const domain::grid_sample& sample); void insert_node_sample(const ores::database::context& ctx, const domain::node_sample& sample); // Returns the most recent sample for a given tenant std::optional<domain::grid_sample> latest_grid_sample(const ores::database::context& ctx); std::vector<domain::node_sample> latest_node_samples(const ores::database::context& ctx);
Server-Side Sampler (ores.compute.service)
New class: compute_grid_poller
Analogous to nats_poller in ores.telemetry.service.
class compute_grid_poller { public: compute_grid_poller(std::uint32_t interval_seconds, ores::database::context ctx); boost::asio::awaitable<void> run(); private: void poll_once(); domain::grid_sample compute_sample(); std::uint32_t interval_seconds_; ores::database::context ctx_; database::repository::compute_telemetry_repository repo_; };
poll_once logic
1. Open a short-lived DB connection (or reuse ctx_). 2. Count hosts total. 3. Count hosts with last_rpc_time within 5 minutes → online_hosts. 4. Count results with server_state = 4 (InProgress), get distinct host_ids → in_progress_host_ids. 5. idle_hosts = online_hosts − |in_progress_host_ids ∩ online_host_ids|. 6. Count results grouped by server_state → results_* breakdown. 7. Count workunits total → total_workunits. 8. Count batches total, batches with active results → batch counts. 9. Count results done in last 24h grouped by outcome → outcomes_*. 10. Write grid_sample row.
All queries are against the existing domain tables — no ad-hoc live path on the dashboard side.
Wiring into application::run()
// Alongside the existing signal wait, co_spawn the poller: auto poller = compute_grid_poller(cfg.telemetry_interval_seconds, ctx); co_await ( signals.async_wait(use_awaitable) || poller.run() );
Node-Side Sampler (ores.compute.wrapper)
New class: node_stats_reporter
Lives in app/node_stats_reporter.hpp. Runs on the heartbeat thread or as
its own thread.
class node_stats_reporter { public: node_stats_reporter(ores::nats::service::client& nats, const std::string& host_id, const std::string& tenant_id, std::uint32_t interval_seconds); void start(); void stop(); // Called by process_assignment on each completed task void record_task_completed(std::chrono::milliseconds duration, std::int64_t input_bytes, std::int64_t output_bytes); void record_task_failed(); private: void run(); void publish_sample(); ores::nats::service::client& nats_; std::string host_id_; std::string tenant_id_; std::uint32_t interval_seconds_; std::atomic<bool> stop_{false}; std::thread thread_; // Atomics for thread-safe accumulation std::atomic<int> tasks_completed_{0}; std::atomic<int> tasks_failed_{0}; std::atomic<int> tasks_since_last_{0}; std::atomic<std::int64_t> total_duration_since_last_{0}; std::atomic<std::int64_t> max_duration_since_last_{0}; std::atomic<std::int64_t> input_bytes_since_last_{0}; std::atomic<std::int64_t> output_bytes_since_last_{0}; };
NATS subject for node samples
The reporter publishes to a JetStream subject consumed by the compute service for persistence:
compute.v1.telemetry.node_samples (published by wrappers)
The compute service subscribes as a queue consumer and writes the incoming
node_sample structs to ores_compute_node_samples_tbl. This keeps the
wrapper stateless with respect to the database.
Alternative: direct NATS request/reply
If the node has no direct DB access (typical in multi-site deployments), the above publish-and-persist pattern is preferred. If nodes do have DB access, they can write directly — but keep the design consistent with the NATS-first approach used everywhere else.
NATS Protocol: Dashboard Stats Query
Subject
compute.v1.telemetry.get_grid_stats
Request
struct get_grid_stats_request { std::string tenant_id; // JWT-derived; handler validates };
Response
struct get_grid_stats_response { bool success{false}; std::string message; // Grid sample fields (mirrors grid_sample domain type) int total_hosts{0}; int online_hosts{0}; int idle_hosts{0}; int results_inactive{0}; int results_unsent{0}; int results_in_progress{0}; int results_done{0}; int total_workunits{0}; int total_batches{0}; int active_batches{0}; int outcomes_success{0}; int outcomes_client_error{0}; int outcomes_no_reply{0}; // When this sample was taken std::string sampled_at; // ISO-8601 // Latest node samples summary std::vector<node_sample_summary> node_summaries; }; struct node_sample_summary { std::string host_id; int tasks_completed{0}; int tasks_since_last{0}; std::int64_t avg_task_duration_ms{0}; std::int64_t input_bytes_fetched{0}; std::int64_t output_bytes_uploaded{0}; int seconds_since_hb{0}; };
The handler reads latest_grid_sample() and latest_node_samples() from
the repository — no live aggregation on the hot path.
Qt Dashboard Changes
Revert ad-hoc fix
Remove the online_host_ids / busy_host_ids logic added to
ComputeDashboardMdiWindow::loadCounts(). The loadCounts() function is
replaced by a single get_grid_stats request/reply call.
New loadCounts() implementation
void ComputeDashboardMdiWindow::loadCounts() { QPointer<ComputeDashboardMdiWindow> self = this; QFuture<DashboardCounts> future = QtConcurrent::run([self]() -> DashboardCounts { if (!self || !self->clientManager_) return {}; compute::messaging::get_grid_stats_request req; auto resp = self->clientManager_-> process_authenticated_request(std::move(req)); if (!resp || !resp->success) return {}; DashboardCounts c; c.total_hosts = resp->total_hosts; c.idle_agents = resp->idle_hosts; c.total_workunits = resp->total_workunits; c.active_workunits = resp->results_in_progress; c.total_results = resp->results_done; c.recent_results = resp->outcomes_success; // last 24h c.success = true; return c; }); // ... watcher as before }
Dashboard stat boxes
| Box label | Source field |
|---|---|
| Total Hosts | total_hosts |
| Idle Agents | idle_hosts |
| Total Workunits | total_workunits |
| Active Workunits | results_in_progress |
| Total Results | results_done |
| Recent Results | outcomes_success (24h) |
Configuration
ores.compute.service options
Add to config/options.hpp:
/// Interval in seconds between compute grid telemetry samples. /// 0 disables the sampler entirely. std::uint32_t telemetry_interval_seconds{30};
CLI flag: --telemetry-interval (default 30).
ores.compute.wrapper options
Add to config/options.hpp:
/// Interval in seconds between per-node telemetry publishes. /// 0 disables reporting. std::uint32_t telemetry_interval_seconds{30};
CLI flag: --telemetry-interval (default 30).
Implementation Plan
Tasks are listed in dependency order. Each row is a single commit.
| # | Owner | Task |
|---|---|---|
| 1 | ores.sql |
Create compute_grid_samples_create.sql |
| 2 | ores.sql |
Create compute_node_samples_create.sql |
| 3 | ores.sql |
Create drop scripts; wire into master orchestration |
| 4 | ores.compute |
Add grid_sample and node_sample domain structs |
| 5 | ores.compute.database |
Add insert_grid_sample, insert_node_sample, query methods |
| 6 | ores.compute |
Add NATS messages: get_grid_stats_request/response, node_sample_publish |
| 7 | ores.compute.service |
Implement compute_grid_poller |
| 8 | ores.compute.service |
Implement get_grid_stats handler |
| 9 | ores.compute.service |
Subscribe to compute.v1.telemetry.node_samples, persist to DB |
| 10 | ores.compute.service |
Wire poller + handler into application::run(); add config flag |
| 11 | ores.compute.wrapper |
Implement node_stats_reporter; instrument process_assignment |
| 12 | ores.compute.wrapper |
Wire reporter into application::run(); add config flag |
| 13 | ores.qt |
Rewrite loadCounts() to use get_grid_stats request/reply |
| 14 | ores.qt |
Populate all six dashboard boxes from response fields |
Open Questions
- Node sample subject: should wrapper nodes publish node samples over NATS (service persists) or write directly to the database? NATS-first is preferred for consistency, but requires the compute service to subscribe to a second subject. Decision: NATS-first.
- Outcome 24h window: the 24h cutoff for
outcomes_*is arbitrary. Could be made configurable, but YAGNI for now. - Node summary in dashboard response: the node summaries add per-node detail to the grid stats response. The current dashboard doesn't show them, but they're cheap to include and useful for future node drill-down. Keep in response; dashboard ignores for now.