Compute Grid Design
Table of Contents
Overview
A BOINC-inspired distributed compute grid backed by PostgreSQL and NATS.
Compute nodes (hosts) execute wrapped engines (ORE Studio, llama.cpp, etc.);
the server-side service owns all DB access, NATS dispatch, validation, and
assimilation. PGMQ and pg_cron are not used; NATS JetStream provides the
work queue; all lifecycle transitions are event-driven except the stale-host
reaper, which is a scheduled job in ores.scheduler.
Status
| Phase | Item | Status |
|---|---|---|
| 1 | SQL schema (7 tables + RLS + triggers) | ✅ Done |
| 1 | ores.compute component (domain + repo) |
✅ Done |
| 2 | Generators | ✅ Done |
| 2 | Messaging layer | ✅ Done |
| 3 | ores.compute.service executable |
✅ Done |
| 4 | JetStream dispatch (in save_workunit) | ✅ Done |
| 4 | Work-pull + result-submit handlers | ✅ Done |
| 4 | Heartbeat handler | ✅ Done |
| 5 | Validator (event-driven) | ✅ Done |
| 5 | Assimilator (event-driven) | ✅ Done |
| 5 | Reaper scheduler job | ✅ Done |
| 6 | Qt list windows | ✅ Done |
| 6 | CLI commands | ✅ Done |
Architecture
Components
┌──────────────────┐ cron job ┌──────────────────────────────────────┐
│ ores.scheduler │─────────────────▶│ ores.compute.service │
│ (reaper job) │ compute.v1. │ save_workunit │ result.submit │
└──────────────────┘ work.reap │ (dispatch) │ (validate+assim.) │
└────────┬─────────────────┬───────────┘
│ NATS JetStream │ PostgreSQL
┌────────▼─────────────────▼───────────┐
│ NATS JetStream │ PostgreSQL │
│ stream: COMPUTE │ ores.compute.* │
└────────┬──────────────────────────────┘
│ JetStream pull consumer
┌────────▼─────────────────────────────┐
│ Wrapper (on-node) │
│ Pulls work · Runs engine · Submits │
└──────────────────────────────────────┘
Design Principles
- The DB is the authoritative state store. All lifecycle transitions happen as DB writes first; NATS carries the triggers, not the state.
- Nodes never connect to PostgreSQL. All DB interaction goes through
ores.compute.service. - NATS JetStream replaces PGMQ for the work queue. The
save_workunithandler publishes workunit assignments synchronously to a durable JetStream stream; wrappers use a pull consumer so work is never double-dispatched. - No in-process background loops. The Dispatcher, Validator, and
Assimilator are all event-driven and run inline within their triggering
handler calls. Only the Reaper needs periodic scheduling, which is
delegated to
ores.schedulervia a cron job. - Multiple instances of
ores.compute.serviceform a NATS queue group; NATS routes each request to exactly one instance automatically. - Multi-tenancy: all NATS subjects are prefixed with the environment prefix
(e.g.
ores.dev.local1.compute.v1.hosts.list).
Lifecycle: Event-Driven Transitions
All state machine transitions except reaping are triggered directly by handler calls — no polling:
| Trigger | Handler | Action |
|---|---|---|
Client calls workunits.save |
save_workunit |
Create result rows; publish to JetStream (dispatch) |
Wrapper calls work.pull |
pull_work |
Ack JetStream msg; set host_id, server_state=4 |
Wrapper calls results.submit |
submit_result |
Set server_state=5; run Validator inline |
| Validator sets canonical | (inline in submit) | Run Assimilator inline; check batch dependencies |
| Scheduler fires reap job | reap_work handler |
Reset stale in-progress results; republish to JS |
NATS Subject Map
All subjects below are relative (the prefix is prepended automatically).
| Subject | Direction | Description |
|---|---|---|
compute.v1.hosts.list |
Request / Reply | List registered hosts |
compute.v1.hosts.save |
Request / Reply | Register or update a host |
compute.v1.hosts.delete |
Request / Reply | Remove a host record |
compute.v1.apps.list |
Request / Reply | List compute applications |
compute.v1.apps.save |
Request / Reply | Register or update an app |
compute.v1.app-versions.list |
Request / Reply | List app versions |
compute.v1.app-versions.save |
Request / Reply | Register or update an app version |
compute.v1.batches.list |
Request / Reply | List batches |
compute.v1.batches.save |
Request / Reply | Create or update a batch |
compute.v1.workunits.list |
Request / Reply | List workunits (filterable by batch) |
compute.v1.workunits.save |
Request / Reply | Create workunit + dispatch to JetStream |
compute.v1.results.list |
Request / Reply | List results (filterable by workunit) |
compute.v1.results.submit |
Request / Reply | Wrapper submits completed output |
compute.v1.work.pull |
Request / Reply | Wrapper pulls next assignment |
compute.v1.work.heartbeat |
Core publish | Wrapper sends liveness heartbeat |
compute.v1.work.reap |
Core publish | ores.scheduler triggers stale reaper |
ores.compute.host_changed |
Event publish | DB trigger → NATS event |
ores.compute.result_changed |
Event publish | DB trigger → NATS event |
ores.compute.batch_changed |
Event publish | DB trigger → NATS event |
JetStream Stream: COMPUTE
Stream name : COMPUTE Subject : compute.v1.work.assignments.* (* = tenant_id) Retention : WorkQueuePolicy (message deleted after ack) Replicas : 1 (dev) / 3 (prod) Max age : 24h (undelivered assignments expire)
The save_workunit handler publishes one message per result row it creates,
containing result_id and workunit_id. The wrapper uses a durable pull
consumer with AckWait equal to the expected max engine runtime. If the
wrapper crashes before acking, the message re-delivers automatically.
State Machine: Result.server_state
1: Inactive ──[save_workunit publishes to JS]──▶ 2: Unsent 2: Unsent ──[wrapper calls work.pull]─────────▶ 4: InProgress 4: InProgress ──[wrapper calls results.submit]───▶ 5: Done 4: InProgress ──[JS AckWait expires]─────────────▶ 2: Unsent (auto re-deliver) 4: InProgress ──[reaper detects stale host]──────▶ 2: Unsent (republish to JS) 5: Done ──[Validator accepts]──────────────▶ canonical_result_id set on workunit
Implementation Plan
Phase 2 — Generators & Messaging
2a. Generators
Use the /domain-type-creator skill pattern to create faker-based generators
for all 6 domain types. Generators live in
ores.compute/include/ores.compute/generators/ and
ores.compute/src/generators/.
Entities (in dependency order):
app_generator— no FKshost_generator— no FKsapp_version_generator— needsapp_idbatch_generator— no FKsworkunit_generator— needsbatch_id,app_version_idresult_generator— needsworkunit_id, optionallyhost_id
2b. Messaging Layer
Files to create under ores.compute/include/ores.compute/messaging/ and
ores.compute/src/messaging/:
| File | Content |
|---|---|
host_protocol.hpp |
list/save/delete request+response types |
app_protocol.hpp |
list/save request+response types |
app_version_protocol.hpp |
list/save request+response types |
batch_protocol.hpp |
list/save request+response types |
workunit_protocol.hpp |
list/save request+response types |
result_protocol.hpp |
list + submit_result request+response types |
work_protocol.hpp |
pull_work_request/response, heartbeat_message, reap_message |
host_handler.hpp / .cpp |
list / save / delete |
app_handler.hpp / .cpp |
list / save |
app_version_handler.hpp / .cpp |
list / save |
batch_handler.hpp / .cpp |
list / save |
workunit_handler.hpp / .cpp |
list / save (includes inline dispatch to JetStream) |
result_handler.hpp / .cpp |
list / submit (includes inline Validator + Assimilator) |
work_handler.hpp / .cpp |
pull / heartbeat / reap |
registrar.hpp / .cpp |
Wires all handlers to NATS subscriptions |
Protocol type conventions (matching existing services):
struct list_hosts_request { using response_type = list_hosts_response; static constexpr std::string_view nats_subject = "compute.v1.hosts.list"; int offset = 0; int limit = 100; }; struct list_hosts_response { std::vector<domain::host> hosts; int total_available_count = 0; };
The save_workunit handler is responsible for two things after writing to DB:
- Creating one result row per
target_redundancywithserver_state=1(Inactive). - Immediately publishing each result to the COMPUTE JetStream stream,
transitioning them to
server_state=2(Unsent).
The submit_result handler is responsible for:
- Writing the result to DB (
server_state=5,output_uri,received_at,outcome=1). - Running the Validator inline: if
target_redundancyis satisfied, setworkunits.canonical_result_id. - Running the Assimilator inline: if all workunits in the batch have a
canonical_result_id, updatebatches.status = 'complete'and handle any dependent batches.
Phase 3 — Service Executable
Create ores.compute.service as a separate CMake project (like
ores.scheduler.service).
Structure:
projects/ores.compute.service/
CMakeLists.txt
src/
CMakeLists.txt
app/
application.hpp
application.cpp
main.cpp
include/
ores.compute.service/
app/
application.hpp
The service executable wires together:
- Database context (connection pool, 4 connections).
- NATS client (connect, set subject prefix).
- JWT verifier (fetch JWKS from IAM, exponential backoff).
- Messaging registrar (
ores::compute::messaging::registrar::register_handlers). - Eventing pipeline (PostgreSQL LISTEN/NOTIFY → NATS publish):
ores_compute_hosts_tbl→ores.compute.host_changedores_compute_results_tbl→ores.compute.result_changedores_compute_batches_tbl→ores.compute.batch_changed
- Graceful shutdown: drain NATS, return.
src/CMakeLists.txt dependencies:
target_link_libraries(${exe_target_name}
PRIVATE
ores.compute.lib
ores.service.lib
ores.eventing.lib
ores.security.lib
ores.nats.lib
ores.database.lib
ores.utility.lib
ores.platform.lib)
Register in projects/CMakeLists.txt after ores.compute.
Phase 4 — Dispatch, Work Pull & Result Submit
4a. Dispatch (inside save_workunit handler)
After saving the workunit to DB:
- Create
target_redundancyresult rows withserver_state=1(Inactive). - For each result, publish to JetStream subject
compute.v1.work.assignments.{tenant_id}with payload{ result_id, workunit_id, app_version_id, input_uri, config_uri }. - Update each result to
server_state=2(Unsent) in DB. - Return the saved workunit to the caller.
4b. Work Pull Handler (compute.v1.work.pull)
- Verify JWT (wrapper authenticates with its host JWT).
- Fetch one JetStream message from the COMPUTE stream (pull consumer).
- Update the result row:
host_id = <wrapper's host_id>,server_state=4(InProgress),pgmq_msg_id = <js sequence number>. - Ack the JetStream message only after the DB write succeeds.
- Return the work payload to the wrapper.
4c. Result Submit Handler (compute.v1.results.submit)
- Verify JWT.
- Validate
output_uriis non-empty. - Update result:
server_state=5,output_uri,received_at=now(),outcome=1(Success). - Validator (inline): count Done results for this workunit.
- If
target_redundancy == 1: setworkunits.canonical_result_id. - If
target_redundancy > 1: compareoutput_urihashes; accept majority; mark minority as invalid.
- If
- Assimilator (inline, only if canonical result was just set):
- Count workunits in the batch that still lack
canonical_result_id. - If zero: update
batches.status = 'complete'. - Check
batch_dependencies: for each child batch whose parents are all'complete', update child tostatus = 'open'(eligible for dispatch).
- Count workunits in the batch that still lack
- Return success to the wrapper.
4d. Heartbeat Handler (compute.v1.work.heartbeat)
Wrapper publishes (fire-and-forget, no reply needed):
- Update
hosts.last_rpc_time = now()for the identified host.
4e. Reap Handler (compute.v1.work.reap)
Published by ores.scheduler on a cron schedule (e.g. every minute).
- Find all results where
server_state=4(InProgress) and the host'slast_rpc_time < now() - stale_threshold. - For each stale result:
a. Reset
server_state=2(Unsent) in DB. b. Publish a new JetStream message for that result (re-queue). - Optionally mark the host as inactive.
ores.scheduler job definition:
Name : compute-reaper
Cron : * * * * * (every minute)
Action type : mq
MQ subject : compute.v1.work.reap
Payload : {}
Phase 5 — Wrapper Authentication
The wrapper binary must authenticate before pulling work. The recommended approach:
- When a host is registered via
compute.v1.hosts.save, the handler issues a machine-to-machine JWT (viaores.iam.service) withsub = host_id. - The wrapper stores this JWT and presents it as a
Bearertoken in theAuthorizationheader of every NATS request. - All handlers validate the JWT using the standard
jwt_authenticator.
Phase 6 — UI & CLI
6a. Qt List Windows
Use the /qt-entity-creator skill to generate list windows for:
| Entity | Window class | Notes |
|---|---|---|
| host | HostMdiWindow |
Shows last_rpc_time, credit_total |
| app | AppMdiWindow |
|
| app_version | AppVersionMdiWindow |
|
| batch | BatchMdiWindow |
Shows status, live progress via event bus |
| workunit | WorkunitMdiWindow |
Filterable by batch |
| result | ResultMdiWindow |
Filterable by workunit; shows server_state |
6b. CLI Commands
Use the /cli-entity-creator skill to generate CLI commands:
ores compute hosts listores compute apps list / addores compute app-versions list / addores compute batches list / addores compute workunits listores compute results list
Key Decisions
No in-process background loops
The Dispatcher, Validator, and Assimilator do not need polling loops because they are always triggered by a caller:
- Dispatcher runs inside
save_workunit(synchronous, before returning to client). - Validator runs inside
submit_result(synchronous, before returning to wrapper). - Assimilator runs inside
submit_resultafter the Validator completes.
Only the Reaper needs a timer, and that is provided by ores.scheduler.
Why not PGMQ?
PGMQ is not used anywhere in the current codebase. NATS JetStream with
WorkQueuePolicy provides equivalent at-least-once delivery semantics via
AckWait-based re-delivery, integrates naturally with the existing NATS client
library, and avoids a PostgreSQL extension dependency.
Why not pg_cron?
pg_cron schedules jobs inside the database process, making them hard to
observe, test, and operate. The Reaper's periodic trigger is delegated to
ores.scheduler, which already has cron support, NATS integration, and
operational visibility.
NATS queue groups as load balancer
Multiple instances of ores.compute.service subscribe to the same queue
group. NATS routes each wrapper request to exactly one instance
automatically — no separate HTTP server pool or proxy is needed.
JetStream re-delivery vs. reaper
JetStream handles re-delivery when AckWait expires (wrapper crashed before
acking). The Reaper handles a separate edge case: the wrapper acked the
message and began running, but then the host went silent without calling
results.submit. The Reaper detects this via last_rpc_time staleness and
re-queues the result independently of JetStream.
Files Created So Far
projects/ores.sql/create/compute/ — 15 SQL files (tables, triggers, RLS)
projects/ores.sql/drop/compute/ — 15 SQL drop files
projects/ores.codegen/models/compute/ — 7 codegen JSON models
projects/ores.compute/
include/ores.compute/
domain/ — host, app, app_version, batch, workunit, result
repository/ — *_entity, *_mapper, *_repository (hpp) for all 6
src/
domain/ — stub.cpp
repository/ — 12 .cpp files
tests/ — main.cpp, stub_tests.cpp
modeling/ — ores.compute.puml
CMakeLists.txt, src/CMakeLists.txt, tests/CMakeLists.txt