Compute Grid Design

Table of Contents

Overview

A BOINC-inspired distributed compute grid backed by PostgreSQL and NATS. Compute nodes (hosts) execute wrapped engines (ORE Studio, llama.cpp, etc.); the server-side service owns all DB access, NATS dispatch, validation, and assimilation. PGMQ and pg_cron are not used; NATS JetStream provides the work queue; all lifecycle transitions are event-driven except the stale-host reaper, which is a scheduled job in ores.scheduler.

Status

Phase Item Status
1 SQL schema (7 tables + RLS + triggers) ✅ Done
1 ores.compute component (domain + repo) ✅ Done
2 Generators ✅ Done
2 Messaging layer ✅ Done
3 ores.compute.service executable ✅ Done
4 JetStream dispatch (in save_workunit) ✅ Done
4 Work-pull + result-submit handlers ✅ Done
4 Heartbeat handler ✅ Done
5 Validator (event-driven) ✅ Done
5 Assimilator (event-driven) ✅ Done
5 Reaper scheduler job ✅ Done
6 Qt list windows ✅ Done
6 CLI commands ✅ Done

Architecture

Components

┌──────────────────┐    cron job      ┌──────────────────────────────────────┐
│  ores.scheduler  │─────────────────▶│          ores.compute.service        │
│  (reaper job)    │ compute.v1.      │  save_workunit  │ result.submit      │
└──────────────────┘ work.reap        │  (dispatch)     │ (validate+assim.)  │
                                      └────────┬─────────────────┬───────────┘
                                               │ NATS JetStream  │ PostgreSQL
                                      ┌────────▼─────────────────▼───────────┐
                                      │  NATS JetStream   │   PostgreSQL      │
                                      │  stream: COMPUTE  │   ores.compute.*  │
                                      └────────┬──────────────────────────────┘
                                               │ JetStream pull consumer
                                      ┌────────▼─────────────────────────────┐
                                      │           Wrapper (on-node)           │
                                      │  Pulls work · Runs engine · Submits  │
                                      └──────────────────────────────────────┘

Design Principles

  1. The DB is the authoritative state store. All lifecycle transitions happen as DB writes first; NATS carries the triggers, not the state.
  2. Nodes never connect to PostgreSQL. All DB interaction goes through ores.compute.service.
  3. NATS JetStream replaces PGMQ for the work queue. The save_workunit handler publishes workunit assignments synchronously to a durable JetStream stream; wrappers use a pull consumer so work is never double-dispatched.
  4. No in-process background loops. The Dispatcher, Validator, and Assimilator are all event-driven and run inline within their triggering handler calls. Only the Reaper needs periodic scheduling, which is delegated to ores.scheduler via a cron job.
  5. Multiple instances of ores.compute.service form a NATS queue group; NATS routes each request to exactly one instance automatically.
  6. Multi-tenancy: all NATS subjects are prefixed with the environment prefix (e.g. ores.dev.local1.compute.v1.hosts.list).

Lifecycle: Event-Driven Transitions

All state machine transitions except reaping are triggered directly by handler calls — no polling:

Trigger Handler Action
Client calls workunits.save save_workunit Create result rows; publish to JetStream (dispatch)
Wrapper calls work.pull pull_work Ack JetStream msg; set host_id, server_state=4
Wrapper calls results.submit submit_result Set server_state=5; run Validator inline
Validator sets canonical (inline in submit) Run Assimilator inline; check batch dependencies
Scheduler fires reap job reap_work handler Reset stale in-progress results; republish to JS

NATS Subject Map

All subjects below are relative (the prefix is prepended automatically).

Subject Direction Description
compute.v1.hosts.list Request / Reply List registered hosts
compute.v1.hosts.save Request / Reply Register or update a host
compute.v1.hosts.delete Request / Reply Remove a host record
compute.v1.apps.list Request / Reply List compute applications
compute.v1.apps.save Request / Reply Register or update an app
compute.v1.app-versions.list Request / Reply List app versions
compute.v1.app-versions.save Request / Reply Register or update an app version
compute.v1.batches.list Request / Reply List batches
compute.v1.batches.save Request / Reply Create or update a batch
compute.v1.workunits.list Request / Reply List workunits (filterable by batch)
compute.v1.workunits.save Request / Reply Create workunit + dispatch to JetStream
compute.v1.results.list Request / Reply List results (filterable by workunit)
compute.v1.results.submit Request / Reply Wrapper submits completed output
compute.v1.work.pull Request / Reply Wrapper pulls next assignment
compute.v1.work.heartbeat Core publish Wrapper sends liveness heartbeat
compute.v1.work.reap Core publish ores.scheduler triggers stale reaper
ores.compute.host_changed Event publish DB trigger → NATS event
ores.compute.result_changed Event publish DB trigger → NATS event
ores.compute.batch_changed Event publish DB trigger → NATS event

JetStream Stream: COMPUTE

Stream name : COMPUTE
Subject     : compute.v1.work.assignments.*   (* = tenant_id)
Retention   : WorkQueuePolicy (message deleted after ack)
Replicas    : 1 (dev) / 3 (prod)
Max age     : 24h (undelivered assignments expire)

The save_workunit handler publishes one message per result row it creates, containing result_id and workunit_id. The wrapper uses a durable pull consumer with AckWait equal to the expected max engine runtime. If the wrapper crashes before acking, the message re-delivers automatically.

State Machine: Result.server_state

1: Inactive   ──[save_workunit publishes to JS]──▶  2: Unsent
2: Unsent     ──[wrapper calls work.pull]─────────▶  4: InProgress
4: InProgress ──[wrapper calls results.submit]───▶  5: Done
4: InProgress ──[JS AckWait expires]─────────────▶  2: Unsent  (auto re-deliver)
4: InProgress ──[reaper detects stale host]──────▶  2: Unsent  (republish to JS)
5: Done       ──[Validator accepts]──────────────▶  canonical_result_id set on workunit

Implementation Plan

Phase 2 — Generators & Messaging

2a. Generators

Use the /domain-type-creator skill pattern to create faker-based generators for all 6 domain types. Generators live in ores.compute/include/ores.compute/generators/ and ores.compute/src/generators/.

Entities (in dependency order):

  1. app_generator — no FKs
  2. host_generator — no FKs
  3. app_version_generator — needs app_id
  4. batch_generator — no FKs
  5. workunit_generator — needs batch_id, app_version_id
  6. result_generator — needs workunit_id, optionally host_id

2b. Messaging Layer

Files to create under ores.compute/include/ores.compute/messaging/ and ores.compute/src/messaging/:

File Content
host_protocol.hpp list/save/delete request+response types
app_protocol.hpp list/save request+response types
app_version_protocol.hpp list/save request+response types
batch_protocol.hpp list/save request+response types
workunit_protocol.hpp list/save request+response types
result_protocol.hpp list + submit_result request+response types
work_protocol.hpp pull_work_request/response, heartbeat_message, reap_message
host_handler.hpp / .cpp list / save / delete
app_handler.hpp / .cpp list / save
app_version_handler.hpp / .cpp list / save
batch_handler.hpp / .cpp list / save
workunit_handler.hpp / .cpp list / save (includes inline dispatch to JetStream)
result_handler.hpp / .cpp list / submit (includes inline Validator + Assimilator)
work_handler.hpp / .cpp pull / heartbeat / reap
registrar.hpp / .cpp Wires all handlers to NATS subscriptions

Protocol type conventions (matching existing services):

struct list_hosts_request {
    using response_type = list_hosts_response;
    static constexpr std::string_view nats_subject = "compute.v1.hosts.list";
    int offset = 0;
    int limit = 100;
};

struct list_hosts_response {
    std::vector<domain::host> hosts;
    int total_available_count = 0;
};

The save_workunit handler is responsible for two things after writing to DB:

  1. Creating one result row per target_redundancy with server_state=1 (Inactive).
  2. Immediately publishing each result to the COMPUTE JetStream stream, transitioning them to server_state=2 (Unsent).

The submit_result handler is responsible for:

  1. Writing the result to DB (server_state=5, output_uri, received_at, outcome=1).
  2. Running the Validator inline: if target_redundancy is satisfied, set workunits.canonical_result_id.
  3. Running the Assimilator inline: if all workunits in the batch have a canonical_result_id, update batches.status = 'complete' and handle any dependent batches.

Phase 3 — Service Executable

Create ores.compute.service as a separate CMake project (like ores.scheduler.service).

Structure:

projects/ores.compute.service/
  CMakeLists.txt
  src/
    CMakeLists.txt
    app/
      application.hpp
      application.cpp
      main.cpp
  include/
    ores.compute.service/
      app/
        application.hpp

The service executable wires together:

  1. Database context (connection pool, 4 connections).
  2. NATS client (connect, set subject prefix).
  3. JWT verifier (fetch JWKS from IAM, exponential backoff).
  4. Messaging registrar (ores::compute::messaging::registrar::register_handlers).
  5. Eventing pipeline (PostgreSQL LISTEN/NOTIFY → NATS publish):
    • ores_compute_hosts_tblores.compute.host_changed
    • ores_compute_results_tblores.compute.result_changed
    • ores_compute_batches_tblores.compute.batch_changed
  6. Graceful shutdown: drain NATS, return.

src/CMakeLists.txt dependencies:

target_link_libraries(${exe_target_name}
    PRIVATE
        ores.compute.lib
        ores.service.lib
        ores.eventing.lib
        ores.security.lib
        ores.nats.lib
        ores.database.lib
        ores.utility.lib
        ores.platform.lib)

Register in projects/CMakeLists.txt after ores.compute.

Phase 4 — Dispatch, Work Pull & Result Submit

4a. Dispatch (inside save_workunit handler)

After saving the workunit to DB:

  1. Create target_redundancy result rows with server_state=1 (Inactive).
  2. For each result, publish to JetStream subject compute.v1.work.assignments.{tenant_id} with payload { result_id, workunit_id, app_version_id, input_uri, config_uri }.
  3. Update each result to server_state=2 (Unsent) in DB.
  4. Return the saved workunit to the caller.

4b. Work Pull Handler (compute.v1.work.pull)

  1. Verify JWT (wrapper authenticates with its host JWT).
  2. Fetch one JetStream message from the COMPUTE stream (pull consumer).
  3. Update the result row: host_id = <wrapper's host_id>, server_state=4 (InProgress), pgmq_msg_id = <js sequence number>.
  4. Ack the JetStream message only after the DB write succeeds.
  5. Return the work payload to the wrapper.

4c. Result Submit Handler (compute.v1.results.submit)

  1. Verify JWT.
  2. Validate output_uri is non-empty.
  3. Update result: server_state=5, output_uri, received_at=now(), outcome=1 (Success).
  4. Validator (inline): count Done results for this workunit.
    • If target_redundancy == 1: set workunits.canonical_result_id.
    • If target_redundancy > 1: compare output_uri hashes; accept majority; mark minority as invalid.
  5. Assimilator (inline, only if canonical result was just set):
    • Count workunits in the batch that still lack canonical_result_id.
    • If zero: update batches.status = 'complete'.
    • Check batch_dependencies: for each child batch whose parents are all 'complete', update child to status = 'open' (eligible for dispatch).
  6. Return success to the wrapper.

4d. Heartbeat Handler (compute.v1.work.heartbeat)

Wrapper publishes (fire-and-forget, no reply needed):

  • Update hosts.last_rpc_time = now() for the identified host.

4e. Reap Handler (compute.v1.work.reap)

Published by ores.scheduler on a cron schedule (e.g. every minute).

  1. Find all results where server_state=4 (InProgress) and the host's last_rpc_time < now() - stale_threshold.
  2. For each stale result: a. Reset server_state=2 (Unsent) in DB. b. Publish a new JetStream message for that result (re-queue).
  3. Optionally mark the host as inactive.

ores.scheduler job definition:

Name        : compute-reaper
Cron        : * * * * *   (every minute)
Action type : mq
MQ subject  : compute.v1.work.reap
Payload     : {}

Phase 5 — Wrapper Authentication

The wrapper binary must authenticate before pulling work. The recommended approach:

  • When a host is registered via compute.v1.hosts.save, the handler issues a machine-to-machine JWT (via ores.iam.service) with sub = host_id.
  • The wrapper stores this JWT and presents it as a Bearer token in the Authorization header of every NATS request.
  • All handlers validate the JWT using the standard jwt_authenticator.

Phase 6 — UI & CLI

6a. Qt List Windows

Use the /qt-entity-creator skill to generate list windows for:

Entity Window class Notes
host HostMdiWindow Shows last_rpc_time, credit_total
app AppMdiWindow  
app_version AppVersionMdiWindow  
batch BatchMdiWindow Shows status, live progress via event bus
workunit WorkunitMdiWindow Filterable by batch
result ResultMdiWindow Filterable by workunit; shows server_state

6b. CLI Commands

Use the /cli-entity-creator skill to generate CLI commands:

  • ores compute hosts list
  • ores compute apps list / add
  • ores compute app-versions list / add
  • ores compute batches list / add
  • ores compute workunits list
  • ores compute results list

Key Decisions

No in-process background loops

The Dispatcher, Validator, and Assimilator do not need polling loops because they are always triggered by a caller:

  • Dispatcher runs inside save_workunit (synchronous, before returning to client).
  • Validator runs inside submit_result (synchronous, before returning to wrapper).
  • Assimilator runs inside submit_result after the Validator completes.

Only the Reaper needs a timer, and that is provided by ores.scheduler.

Why not PGMQ?

PGMQ is not used anywhere in the current codebase. NATS JetStream with WorkQueuePolicy provides equivalent at-least-once delivery semantics via AckWait-based re-delivery, integrates naturally with the existing NATS client library, and avoids a PostgreSQL extension dependency.

Why not pg_cron?

pg_cron schedules jobs inside the database process, making them hard to observe, test, and operate. The Reaper's periodic trigger is delegated to ores.scheduler, which already has cron support, NATS integration, and operational visibility.

NATS queue groups as load balancer

Multiple instances of ores.compute.service subscribe to the same queue group. NATS routes each wrapper request to exactly one instance automatically — no separate HTTP server pool or proxy is needed.

JetStream re-delivery vs. reaper

JetStream handles re-delivery when AckWait expires (wrapper crashed before acking). The Reaper handles a separate edge case: the wrapper acked the message and began running, but then the host went silent without calling results.submit. The Reaper detects this via last_rpc_time staleness and re-queues the result independently of JetStream.

Files Created So Far

projects/ores.sql/create/compute/    — 15 SQL files (tables, triggers, RLS)
projects/ores.sql/drop/compute/      — 15 SQL drop files
projects/ores.codegen/models/compute/ — 7 codegen JSON models
projects/ores.compute/
  include/ores.compute/
    domain/     — host, app, app_version, batch, workunit, result
    repository/ — *_entity, *_mapper, *_repository (hpp) for all 6
  src/
    domain/     — stub.cpp
    repository/ — 12 .cpp files
  tests/        — main.cpp, stub_tests.cpp
  modeling/     — ores.compute.puml
  CMakeLists.txt, src/CMakeLists.txt, tests/CMakeLists.txt

Date: 2026-03-20

Emacs 29.1 (Org mode 9.6.6)