Workflow Engine Generalisation

Table of Contents

Overview

This document describes the generalisation of ores.workflow into a durable, event-driven workflow engine capable of orchestrating all cross-service sagas in the system. The target is 15–20 distinct workflows sharing one engine with consistent behaviour.

Motivation

Current state

Two cross-service workflows exist today:

  1. provision_parties — in ores.workflow; synchronous saga; blocks until all three steps (save party, save account, link) complete.
  2. ore_import — in ores.ore.service; the saga logic is embedded in ore_import_handler, which calls downstream services directly.

Neither workflow is durable: if ores.workflow or ores.ore.service restarts mid-execution, the workflow is lost. There is no mechanism to resume from the last completed step.

Problems with the current approach

  • No durability: an in-flight workflow is lost on service restart.
  • No unified view: it is impossible to list all in-flight workflows from one place; they are scattered across services.
  • Inconsistent patterns: provision_parties and ore_import implement the saga pattern differently; future workflows will diverge further.
  • Domain logic leakage: ore_import_handler calls ores.trading, ores.iam, etc., conflating ORE-specific logic with cross-service orchestration.
  • No async support: synchronous sagas cannot model steps that take minutes or hours (e.g. compute grid execution).
  • Duplication: every new workflow must re-implement step tracking, compensation, error recording, and retry.

Target

A single ores.workflow service that:

  • Orchestrates all cross-service sagas (15–20 workflows)
  • Persists all workflow and step state to PostgreSQL
  • Is fully event-driven: never blocks on a domain service call
  • Can be restarted at any time and resume all in-flight workflows
  • Provides a unified audit trail and operational view

Architecture

Core principle

ores.workflow is a persistent state machine driven entirely by events.

On new workflow request:
  persist workflow_instance → dispatch step[0] command → return

On step-completed event:
  load workflow_instance → update step → dispatch step[n+1] → return
  (or begin compensation if step failed)

On service restart:
  load all in-progress workflows → re-dispatch pending commands → return

ores.workflow never waits for a response. It dispatches a command (fire-and-forget NATS publish) and returns. When the domain service completes the step, it publishes a completion event. The workflow engine processes that event and advances.

This pattern is uniform for all steps regardless of duration: a millisecond DB write and an hours-long compute job are handled identically.

Command / event protocol

Command (ores.workflow → domain service)

subject:  {service}.v1.{resource}.{action}
headers:
  X-Workflow-Instance-Id: <uuid>
  X-Workflow-Step-Id:     <uuid>   ← idempotency key; echoed back in event
body:     {step-specific parameters as JSON}

The command is a fire-and-forget NATS publish. The domain service processes it asynchronously.

Completion event (domain service → ores.workflow)

All domain services publish completion events to a single well-known subject:

subject:  workflow.v1.events.step-completed
body:
  workflow_instance_id  uuid
  step_id               uuid    ← echoed from X-Workflow-Step-Id header
  success               bool
  result_json           string  ← stored in workflow_step.response_json
  error_message         string

ores.workflow has one permanent queue-group subscription to this subject. All completions from all services and all workflow types flow through this single handler.

Idempotency contract for domain services

Every domain service handler that participates in a workflow must:

  1. Extract X-Workflow-Step-Id from the command headers.
  2. Check whether this step_id has already been processed (by looking for an existing record associated with that key).
  3. If already processed: publish the cached completion event again and return (idempotent re-notify).
  4. If not processed: execute the operation, store workflow_step_id on the created/modified entity, publish the completion event.

This guarantees that re-dispatched commands after a service restart produce the same outcome without executing the operation twice.

The simplest implementation: store workflow_step_id as a nullable column on the entity created by the step (e.g. batch.workflow_step_id, report_input_bundle.workflow_step_id). A NULL means the entity was not created by a workflow step; a non-null value is the idempotency key.

Startup recovery

On service start, ores.workflow executes a recovery pass:

  1. Queries all workflow_instance rows with state = in_progress or state = compensating.
  2. For each instance, loads the current step (the workflow_step row with state = in_progress).
  3. Re-dispatches the step's command using the same step_id as the X-Workflow-Step-Id header.
  4. Domain services deduplicate via the idempotency key and republish their completion events.

Data model changes

workflow_instance (additions)

current_step_index   integer     not null default 0
step_count           integer     not null default 0
last_event_at        timestamptz null

workflow_step (additions)

command_subject      text        not null default ''
command_json         text        not null default ''   -- persisted before publish
command_published_at timestamptz null                  -- set after successful publish
idempotency_key      text        not null default ''   -- = step_id string
compensation_subject text        not null default ''   -- subject for compensation command
compensation_json    text        not null default ''   -- built from step result

The request_json column already exists and is used for the command body. command_published_at is null until the publish succeeds; on restart, any step where this is non-null and state = in_progress has its command re-published.

Workflow definition abstraction

Rather than one executor class per workflow (current pattern), a workflow is defined as a sequence of step descriptors:

struct workflow_step_def {
    std::string name;
    std::string command_subject;
    std::string compensation_subject;   // empty if no compensation

    // Builds command payload from workflow request + previous step results
    std::function<std::string(
        const std::string& request_json,
        const std::vector<std::string>& step_results)> build_command;

    // Builds compensation payload from original command + step result
    std::function<std::string(
        const std::string& command_json,
        const std::string& result_json)> build_compensation;
};

struct workflow_definition {
    std::string type_name;
    std::vector<workflow_step_def> steps;
};

ores.workflow maintains a registry of workflow_definition instances indexed by type name. The step-completed handler looks up the definition, builds the next command payload, and dispatches the next step. No bespoke executor class is needed per workflow type.

Step-completed handler (the engine core)

nats.queue_subscribe("workflow.v1.events.step-completed", "ores.workflow.service"):

  1. Decode event: {workflow_instance_id, step_id, success, result_json, error_message}
  2. Load workflow_step by step_id from DB
  3. Guard: if step.state != in_progress → log duplicate and return
  4. Update step: state = completed (or failed), response_json = result_json

  5. If success:
       Load workflow_instance
       current_step_index++
       If current_step_index < step_count:
           Load step_def[current_step_index]
           Build command from request_json + all previous result_json values
           Persist new workflow_step (state=in_progress, command_published_at=null)
           Publish command to step_def.command_subject
           Update workflow_step.command_published_at = now()
       Else:
           Update workflow_instance: state = completed

  6. If failure:
       begin_compensation(workflow_instance)

Compensation handler

begin_compensation(instance):
  Update instance: state = compensating
  Load all completed steps in reverse index order
  For each completed step with non-empty compensation_subject:
      Build compensation command from step.command_json + step.response_json
      Persist compensation workflow_step (step_index = -(original+1))
      Publish compensation command
  Wait for compensation step-completed events (same mechanism)
  On all compensations done: update instance state = compensated

Workflows to implement

Listed in priority order:

# Workflow Steps Async?
1 provision_parties save party → save account → link No
2 ore_import execute import → (rollback on failure) No
3 report_execution gather inputs → prepare package → submit compute Yes
    → wait compute → process results → finalise  

The provision_parties refactor serves as the first proof-of-concept for the new engine. ore_import validates that an existing workflow can be migrated cleanly. report_execution validates the async wait pattern.

Migration plan

Phase 1: engine infrastructure (this plan)

1.1 DB schema additions

  • Add columns to workflow_instance and workflow_step as described above.
  • Add FSM states for workflow_instance_lifecycle: verify compensating and compensated states exist alongside in_progress / completed / failed.
  • SQL migration scripts for existing tables.

1.2 Step-completed subscription

  • Add workflow.v1.events.step-completed queue subscription in registrar.
  • Implement the step-completed handler as described above.
  • Implement begin_compensation.

1.3 Startup recovery

  • On application start, after NATS connect, scan for in-progress workflows.
  • Re-dispatch pending step commands.

1.4 Workflow definition registry

  • Implement workflow_definition and workflow_step_def structs.
  • Implement a workflow_registry that maps type names to definitions.
  • Replace the existing provision_parties_workflow executor class with a workflow_definition registered at startup.

1.5 publish_step_completion helper

  • A small utility that domain service handlers call to publish the workflow.v1.events.step-completed event.
  • Signature: publish_step_completion(nats_client, step_id, instance_id, success, result_json, error)
  • Add to ores.service (shared service utilities) so all domain services can use it without depending on ores.workflow.

Phase 2: refactor existing workflows

2.1 Refactor provision_parties

  • Add X-Workflow-Step-Id handling to ores.refdata save_party, ores.iam save_account, ores.iam save_account_party handlers.
  • Each handler: check idempotency, execute, call publish_step_completion.
  • Replace synchronous provision_parties_workflow executor with a workflow_definition (3 step_defs).
  • Remove synchronous executor loop from workflow_handler.cpp.

2.2 Refactor ore_import

  • Move saga orchestration from ores.ore.service to ores.workflow.
  • Add ore.v1.ore.import.execute handler to ores.ore.service: executes the full import, calls publish_step_completion.
  • Add ore.v1.ore.import.rollback handler: reverses inserts.
  • Register a 1-step (execute + rollback) workflow_definition in ores.workflow.

Phase 3: report execution workflow

Implement after phase 2 validates the engine. See report execution plan.

Revert note

The following code introduced in session 2026-04-05 implements a synchronous saga for report execution using the old pattern and must be reverted before starting phase 1:

  • projects/ores.workflow/include/ores.workflow/service/run_report_workflow.hpp
  • projects/ores.workflow/src/service/run_report_workflow.cpp
  • run_report() in workflow_handler.hpp / workflow_handler.cpp
  • run_report_message in workflow_protocol.hpp
  • mark_running/completed/failed messages in report_instance_protocol.hpp
  • mark_running/completed/failed handlers in report_instance_handler.hpp
  • Corresponding registrar subscriptions in both reporting and workflow

The report_instance_protocol.hpp additions for mark-* will return in a different form as part of phase 3, driven by the new engine contract.

Success criteria

  • ores.workflow can be killed and restarted mid-provision_parties and resume correctly.
  • A provision_parties workflow shows correct step-by-step history in the workflow_steps table regardless of whether it was interrupted.
  • ore_import shows a workflow_instance record for every import.
  • No cross-service saga logic lives outside ores.workflow.