Workflow Engine Generalisation
Table of Contents
- Overview
- Motivation
- Architecture
- Data model changes
- Workflow definition abstraction
- Step-completed handler (the engine core)
- Compensation handler
- Workflows to implement
- Migration plan
- Revert note
- Success criteria
Overview
This document describes the generalisation of ores.workflow into a
durable, event-driven workflow engine capable of orchestrating all
cross-service sagas in the system. The target is 15–20 distinct
workflows sharing one engine with consistent behaviour.
Motivation
Current state
Two cross-service workflows exist today:
provision_parties— inores.workflow; synchronous saga; blocks until all three steps (save party, save account, link) complete.ore_import— inores.ore.service; the saga logic is embedded inore_import_handler, which calls downstream services directly.
Neither workflow is durable: if ores.workflow or ores.ore.service
restarts mid-execution, the workflow is lost. There is no mechanism
to resume from the last completed step.
Problems with the current approach
- No durability: an in-flight workflow is lost on service restart.
- No unified view: it is impossible to list all in-flight workflows from one place; they are scattered across services.
- Inconsistent patterns:
provision_partiesandore_importimplement the saga pattern differently; future workflows will diverge further. - Domain logic leakage:
ore_import_handlercallsores.trading,ores.iam, etc., conflating ORE-specific logic with cross-service orchestration. - No async support: synchronous sagas cannot model steps that take minutes or hours (e.g. compute grid execution).
- Duplication: every new workflow must re-implement step tracking, compensation, error recording, and retry.
Target
A single ores.workflow service that:
- Orchestrates all cross-service sagas (15–20 workflows)
- Persists all workflow and step state to PostgreSQL
- Is fully event-driven: never blocks on a domain service call
- Can be restarted at any time and resume all in-flight workflows
- Provides a unified audit trail and operational view
Architecture
Core principle
ores.workflow is a persistent state machine driven entirely by events.
On new workflow request: persist workflow_instance → dispatch step[0] command → return On step-completed event: load workflow_instance → update step → dispatch step[n+1] → return (or begin compensation if step failed) On service restart: load all in-progress workflows → re-dispatch pending commands → return
ores.workflow never waits for a response. It dispatches a command
(fire-and-forget NATS publish) and returns. When the domain service
completes the step, it publishes a completion event. The workflow
engine processes that event and advances.
This pattern is uniform for all steps regardless of duration: a millisecond DB write and an hours-long compute job are handled identically.
Command / event protocol
Command (ores.workflow → domain service)
subject: {service}.v1.{resource}.{action}
headers:
X-Workflow-Instance-Id: <uuid>
X-Workflow-Step-Id: <uuid> ← idempotency key; echoed back in event
body: {step-specific parameters as JSON}
The command is a fire-and-forget NATS publish. The domain service processes it asynchronously.
Completion event (domain service → ores.workflow)
All domain services publish completion events to a single well-known subject:
subject: workflow.v1.events.step-completed body: workflow_instance_id uuid step_id uuid ← echoed from X-Workflow-Step-Id header success bool result_json string ← stored in workflow_step.response_json error_message string
ores.workflow has one permanent queue-group subscription to this
subject. All completions from all services and all workflow types
flow through this single handler.
Idempotency contract for domain services
Every domain service handler that participates in a workflow must:
- Extract
X-Workflow-Step-Idfrom the command headers. - Check whether this
step_idhas already been processed (by looking for an existing record associated with that key). - If already processed: publish the cached completion event again and return (idempotent re-notify).
- If not processed: execute the operation, store
workflow_step_idon the created/modified entity, publish the completion event.
This guarantees that re-dispatched commands after a service restart produce the same outcome without executing the operation twice.
The simplest implementation: store workflow_step_id as a nullable
column on the entity created by the step (e.g. batch.workflow_step_id,
report_input_bundle.workflow_step_id). A NULL means the entity was
not created by a workflow step; a non-null value is the idempotency key.
Startup recovery
On service start, ores.workflow executes a recovery pass:
- Queries all
workflow_instancerows withstate = in_progressorstate = compensating. - For each instance, loads the current step (the
workflow_steprow withstate = in_progress). - Re-dispatches the step's command using the same
step_idas theX-Workflow-Step-Idheader. - Domain services deduplicate via the idempotency key and republish their completion events.
Data model changes
workflow_instance (additions)
current_step_index integer not null default 0 step_count integer not null default 0 last_event_at timestamptz null
workflow_step (additions)
command_subject text not null default '' command_json text not null default '' -- persisted before publish command_published_at timestamptz null -- set after successful publish idempotency_key text not null default '' -- = step_id string compensation_subject text not null default '' -- subject for compensation command compensation_json text not null default '' -- built from step result
The request_json column already exists and is used for the command body.
command_published_at is null until the publish succeeds; on restart,
any step where this is non-null and state = in_progress has its
command re-published.
Workflow definition abstraction
Rather than one executor class per workflow (current pattern), a workflow is defined as a sequence of step descriptors:
struct workflow_step_def { std::string name; std::string command_subject; std::string compensation_subject; // empty if no compensation // Builds command payload from workflow request + previous step results std::function<std::string( const std::string& request_json, const std::vector<std::string>& step_results)> build_command; // Builds compensation payload from original command + step result std::function<std::string( const std::string& command_json, const std::string& result_json)> build_compensation; }; struct workflow_definition { std::string type_name; std::vector<workflow_step_def> steps; };
ores.workflow maintains a registry of workflow_definition instances
indexed by type name. The step-completed handler looks up the definition,
builds the next command payload, and dispatches the next step. No
bespoke executor class is needed per workflow type.
Step-completed handler (the engine core)
nats.queue_subscribe("workflow.v1.events.step-completed", "ores.workflow.service"):
1. Decode event: {workflow_instance_id, step_id, success, result_json, error_message}
2. Load workflow_step by step_id from DB
3. Guard: if step.state != in_progress → log duplicate and return
4. Update step: state = completed (or failed), response_json = result_json
5. If success:
Load workflow_instance
current_step_index++
If current_step_index < step_count:
Load step_def[current_step_index]
Build command from request_json + all previous result_json values
Persist new workflow_step (state=in_progress, command_published_at=null)
Publish command to step_def.command_subject
Update workflow_step.command_published_at = now()
Else:
Update workflow_instance: state = completed
6. If failure:
begin_compensation(workflow_instance)
Compensation handler
begin_compensation(instance):
Update instance: state = compensating
Load all completed steps in reverse index order
For each completed step with non-empty compensation_subject:
Build compensation command from step.command_json + step.response_json
Persist compensation workflow_step (step_index = -(original+1))
Publish compensation command
Wait for compensation step-completed events (same mechanism)
On all compensations done: update instance state = compensated
Workflows to implement
Listed in priority order:
| # | Workflow | Steps | Async? |
|---|---|---|---|
| 1 | provision_parties |
save party → save account → link | No |
| 2 | ore_import |
execute import → (rollback on failure) | No |
| 3 | report_execution |
gather inputs → prepare package → submit compute | Yes |
| → wait compute → process results → finalise |
The provision_parties refactor serves as the first proof-of-concept
for the new engine. ore_import validates that an existing workflow can
be migrated cleanly. report_execution validates the async wait pattern.
Migration plan
Phase 1: engine infrastructure (this plan)
1.1 DB schema additions
- Add columns to
workflow_instanceandworkflow_stepas described above. - Add FSM states for
workflow_instance_lifecycle: verifycompensatingandcompensatedstates exist alongsidein_progress/completed/failed. - SQL migration scripts for existing tables.
1.2 Step-completed subscription
- Add
workflow.v1.events.step-completedqueue subscription in registrar. - Implement the step-completed handler as described above.
- Implement
begin_compensation.
1.3 Startup recovery
- On application start, after NATS connect, scan for in-progress workflows.
- Re-dispatch pending step commands.
1.4 Workflow definition registry
- Implement
workflow_definitionandworkflow_step_defstructs. - Implement a
workflow_registrythat maps type names to definitions. - Replace the existing
provision_parties_workflowexecutor class with aworkflow_definitionregistered at startup.
1.5 publish_step_completion helper
- A small utility that domain service handlers call to publish the
workflow.v1.events.step-completedevent. - Signature:
publish_step_completion(nats_client, step_id, instance_id, success, result_json, error) - Add to
ores.service(shared service utilities) so all domain services can use it without depending onores.workflow.
Phase 2: refactor existing workflows
2.1 Refactor provision_parties
- Add
X-Workflow-Step-Idhandling toores.refdatasave_party,ores.iamsave_account,ores.iamsave_account_party handlers. - Each handler: check idempotency, execute, call
publish_step_completion. - Replace synchronous
provision_parties_workflowexecutor with aworkflow_definition(3 step_defs). - Remove synchronous executor loop from
workflow_handler.cpp.
2.2 Refactor ore_import
- Move saga orchestration from
ores.ore.servicetoores.workflow. - Add
ore.v1.ore.import.executehandler toores.ore.service: executes the full import, callspublish_step_completion. - Add
ore.v1.ore.import.rollbackhandler: reverses inserts. - Register a 1-step (execute + rollback)
workflow_definitioninores.workflow.
Phase 3: report execution workflow
Implement after phase 2 validates the engine. See report execution plan.
Revert note
The following code introduced in session 2026-04-05 implements a synchronous saga for report execution using the old pattern and must be reverted before starting phase 1:
projects/ores.workflow/include/ores.workflow/service/run_report_workflow.hppprojects/ores.workflow/src/service/run_report_workflow.cpprun_report()inworkflow_handler.hpp/workflow_handler.cpprun_report_messageinworkflow_protocol.hppmark_running/completed/failedmessages inreport_instance_protocol.hppmark_running/completed/failedhandlers inreport_instance_handler.hpp- Corresponding registrar subscriptions in both reporting and workflow
The report_instance_protocol.hpp additions for mark-* will return in
a different form as part of phase 3, driven by the new engine contract.
Success criteria
ores.workflowcan be killed and restarted mid-provision_partiesand resume correctly.- A
provision_partiesworkflow shows correct step-by-step history in theworkflow_stepstable regardless of whether it was interrupted. ore_importshows aworkflow_instancerecord for every import.- No cross-service saga logic lives outside
ores.workflow.