Report Execution Workflow — Implementation Plan
Table of Contents
- Overview
- Implementation phases
- Phase 3.0: Prerequisites DONE
- Phase 3.1: Fix trigger() and add concurrency check
- Phase 3.2: Report execution protocol and workflow definition
- Phase 3.3: risk_report_config domain model
- Phase 3.4: Gather trades step handler
- Phase 3.5: Gather market data step handler
- Phase 3.6: Assemble report input bundle
- Phase 3.7: Finalise step handler
- Phase 3.8: ORE mapping and packaging DEFERRED
- Phase 3.9: Submit to compute grid DEFERRED
- Phase 3.10: Grid execution and async wait DEFERRED
- Phase 3.11: Process results DEFERRED
- Phased workflow definition evolution
- FSM state considerations
- Implementation order
- Success criteria
Overview
This plan covers the implementation of the report execution workflow (Phase 3 from the workflow engine generalisation plan). It converts the report execution pipeline from a design document into working code, using the durable event-driven workflow engine built in Phases 1–2.
The pipeline takes a pending report instance and drives it through
data gathering, ORE mapping, compute submission, result processing,
and finalisation. It is implemented as a multi-step saga in
ores.workflow, with step handlers distributed across
ores.reporting.service, ores.ore.service, and
ores.compute.service.
Implementation phases
The work is broken into incremental, independently testable phases. Each phase produces a buildable, committable unit. Later phases depend on earlier ones but can be deferred without breaking the system.
Phase 3.0: Prerequisites DONE
[X]Createores.workflow.apicomponent (shared messaging types).[X]Moveworkflow_events.hppandworkflow_protocol.hpptoores.workflow.api.[X]Addworkflow_step_contexthelper toworkflow_helpers.hpp.
Phase 3.1: Fix trigger() and add concurrency check
Summary
The current report_instance_handler::trigger() does not set
fsm_state_id on the created instance and publishes a raw JSON blob to
a subject that nothing handles. This phase fixes both issues.
Tasks
- Load
report_instance_lifecycleFSM state map at startup in the reporting registrar. Useload_fsm_states(svc_nats, "report_instance_lifecycle")(same pattern as workflow registrar). - Implement concurrency check in
trigger():- Query for any existing report instance for the same definition
with state in {
pending,running}. - Apply
concurrency_policyfrom the definition:skip→ create instance with stateskipped; return.queue→ create instance with statequeued; return.fail→ create instance with statefailed; return.- Otherwise → create instance with state
pending.
- Query for any existing report instance for the same definition
with state in {
- Replace the raw JSON publish to
workflow.v1.reports.runwith a properstart_workflow_messagepublish toworkflow.v1.start:- Type:
report_execution_workflow tenant_id: from trigger message.request_json: serialisedreport_execution_request(see 3.2).- Pre-generate
instance_idfor the workflow instance.
- Type:
- Set
started_at = now()on the report instance.
Acceptance criteria
- Report instance created by trigger has non-null
fsm_state_id. - Concurrency policy is evaluated;
skip=/=queue=/=failproduce the correct terminal or waiting state. - A
start_workflow_messageis published and the workflow engine creates aworkflow_instancerecord.
Phase 3.2: Report execution protocol and workflow definition
Summary
Define the internal messaging structs for all report execution workflow steps and register the workflow definition in the engine.
Tasks
Create
ores.reporting.api/messaging/report_execution_protocol.hpp:// Stored as workflow_instance.request_json. Contains everything // the step builders need to dispatch commands. struct report_execution_request { std::string report_instance_id; std::string definition_id; std::string tenant_id; std::string correlation_id; }; // Step 0: gather trades struct gather_trades_request { static constexpr std::string_view nats_subject = "reporting.v1.report.gather-trades"; std::string report_instance_id; std::string definition_id; std::string tenant_id; std::string correlation_id; }; struct gather_trades_result { bool success = false; std::string message; int trade_count = 0; std::string trades_json; // serialised trade_export_item vector }; // Step 1: gather market data struct gather_market_data_request { static constexpr std::string_view nats_subject = "reporting.v1.report.gather-market-data"; std::string report_instance_id; std::string definition_id; std::string tenant_id; std::string correlation_id; }; struct gather_market_data_result { bool success = false; std::string message; int series_count = 0; std::string market_data_json; // serialised market_series vector }; // Step 2: assemble report input bundle struct assemble_bundle_request { static constexpr std::string_view nats_subject = "reporting.v1.report.assemble-bundle"; std::string report_instance_id; std::string definition_id; std::string tenant_id; std::string correlation_id; }; struct assemble_bundle_result { bool success = false; std::string message; std::string bundle_ref; // storage key or DB reference }; // Step 3: ORE mapping and packaging (ores.ore.service) struct prepare_ore_package_request { static constexpr std::string_view nats_subject = "ore.v1.report.prepare-package"; std::string report_instance_id; std::string bundle_ref; std::string tenant_id; std::string correlation_id; }; struct prepare_ore_package_result { bool success = false; std::string message; std::vector<std::string> tarball_uris; // per-book tarballs in storage }; // Step 4: submit to compute (ores.compute.service) struct submit_compute_request { static constexpr std::string_view nats_subject = "compute.v1.report.submit"; std::string report_instance_id; std::string tenant_id; std::string correlation_id; std::vector<std::string> tarball_uris; }; struct submit_compute_result { bool success = false; std::string message; std::string batch_id; }; // Step 5: finalise report instance struct finalise_report_request { static constexpr std::string_view nats_subject = "reporting.v1.report.finalise"; std::string report_instance_id; std::string tenant_id; std::string correlation_id; }; struct finalise_report_result { bool success = false; std::string message; };
- Create
ores.workflow/service/report_execution_definitions.hpp:- Register
report_execution_workflowwith the step sequence. - Initial implementation: 3 steps (gather_trades → assemble_bundle → finalise). The ORE mapping and compute steps are added in later phases.
- Register
- Register the definition in
ores.workflow/src/messaging/registrar.cpp.
Notes
- Steps are numbered 0..N in the workflow definition.
- The protocol includes all steps even though some are implemented later, so the structs don't change.
gather_market_dataandprepare_ore_packageandsubmit_computeare defined in the protocol but not wired into the workflow definition until their implementation phases.
Phase 3.3: risk_report_config domain model
Summary
Create a minimal C++ domain model for risk_report_config so the
gather steps can query book/portfolio scope and report parameters.
Tasks
- Create
ores.reporting.api/domain/risk_report_config.hpp:- Fields needed for execution:
id,tenant_id,version,report_definition_id,base_currency,market_data_type,market_data_date,n_threads. - Analytics flags:
npv_enabled,cashflow_enabled,curves_enabled,sensitivity_enabled, etc. - Keep the struct complete (all SQL columns) since other consumers may need them later.
- Fields needed for execution:
- Create entity, mapper, and repository using the domain type creator
Domain Type Creator Skill:
- Entity:
risk_report_config_entity.hpp - Mapper:
risk_report_config_mapper.hpp - Repository:
risk_report_config_repository.hppwith:find_by_definition_id(ctx, definition_id) -> optional<risk_report_config>read_latest(ctx) -> vector<risk_report_config>
- Entity:
- Add scope query methods to a new repository or the existing one:
get_book_scope(ctx, config_id) -> vector<string>— returns active book UUIDs from the junction table.get_portfolio_scope(ctx, config_id) -> vector<string>— returns active portfolio UUIDs from the junction table.
Acceptance criteria
risk_report_configcan be loaded bydefinition_id.- Book and portfolio scope lists can be retrieved for a config.
Phase 3.4: Gather trades step handler
Summary
Implement the first workflow step: load the risk report config, resolve book scope, and fetch all trades (with instruments) from the trading service.
Tasks
- Create
ores.reporting.core/messaging/report_execution_handler.hpp:- Constructor:
(nats, ctx, outbound_nats)or similar. - Method:
gather_trades(ores::nats::message msg).
- Constructor:
- Implement
gather_trades: a. Extractworkflow_step_contextfrom message. b. Decodegather_trades_requestfrom message body. c. Build tenant-scoped DB context fromtenant_id. d. Loadrisk_report_configbydefinition_id. e. Get book scope from junction table. f. If book scope is empty, get portfolio scope; for each portfolio, query books (viarefdata.v1.books.listor similar). g. For each book, calltrading.v1.trades.portfolio.exportvia outbound NATS to get trades with resolved instruments. h. Aggregate alltrade_export_itemvectors. i. Buildgather_trades_resultwith serialised trades JSON and trade count. j. Callwf.complete(rfl::json::write(result)). - Subscribe
gather_trades_request::nats_subjectin the reporting registrar (ores.reporting.core/src/messaging/registrar.cpp).
Acceptance criteria
- A triggered report instance creates a workflow that dispatches
gather_tradesto the reporting service. - The step handler loads the config, resolves scope, fetches trades,
and publishes
step_completedwith the trade data.
Phase 3.5: Gather market data step handler
Summary
Fetch market data series relevant to the trade portfolio.
Tasks
- Add
gather_market_datamethod toreport_execution_handler. - Implement:
a. Extract
workflow_step_context. b. Decodegather_market_data_request. c. Loadrisk_report_configformarket_data_typeandmarket_data_date. d. Callmarketdata.v1.series.listto fetch all series for the tenant (filtered by type if applicable). e. Buildgather_market_data_resultwith serialised market data. f. Callwf.complete(...). - Subscribe in reporting registrar.
- Wire into the workflow definition (add as step between
gather_tradesandassemble_bundle).
Notes
- The market data step depends on knowing which series types are relevant for the trade portfolio. Initially this can fetch all series and let the ORE mapping step filter what it needs.
- More sophisticated filtering (inspecting trade types to determine required yield curves, FX rates, etc.) is a future enhancement.
Phase 3.6: Assemble report input bundle
Summary
Aggregate the trade data and market data from prior steps into a
single report_input_bundle entity and persist it.
Tasks
- Create
ores.reporting.api/domain/report_input_bundle.hpp:- Fields:
id,instance_id,trades_json,market_data_json,config_json,created_at.
- Fields:
- Create entity, mapper, repository for
report_input_bundle. - Add
assemble_bundlemethod toreport_execution_handler: a. Decode request. b. Load previous step results from the command payload (trades JSON from step 0, market data JSON from step 1). c. Loadrisk_report_configsnapshot as config JSON. d. Persistreport_input_bundle. e. Returnassemble_bundle_resultwith the bundle reference. - Wire into the workflow definition.
Notes
- The bundle could alternatively be stored in object storage (via
ores.http.server) to avoid large DB rows. Decision deferred; initial implementation uses a DB row.
Phase 3.7: Finalise step handler
Summary
Mark the report instance as completed or failed and record timestamps.
Tasks
- Add
finalisemethod toreport_execution_handler: a. Decodefinalise_report_request. b. Load the report instance. c. Transition FSM state tocompleted. d. Setcompleted_at = now(). e. Setoutput_messagewith execution summary. f. Callwf.complete(...). - Add compensation handling for earlier steps:
gather_trades: no compensation needed (read-only).gather_market_data: no compensation needed (read-only).assemble_bundle: delete thereport_input_bundlerow.- On compensation,
finaliseis not called; instead the workflow engine marks the instance ascompensated.
- Add a separate compensation handler that transitions the report
instance to
failedwhen the workflow compensates:- Subject:
reporting.v1.report.fail - Sets
output_messagefrom the error,completed_at = now().
- Subject:
Acceptance criteria
- After a successful workflow, the report instance has state
completed,completed_atset, andoutput_messagewith summary. - After a failed workflow, the report instance has state
failedwith the error inoutput_message.
Phase 3.8: ORE mapping and packaging DEFERRED
Summary
ores.ore.service receives the report_input_bundle reference, maps
the engine-agnostic data to ORE XML format, packages per-book tarballs,
and uploads them to object storage.
Tasks
- Add
prepare_ore_packagehandler toores.ore.service: a. Download thereport_input_bundle. b. Map trades → ORE portfolio XML. c. Map market data → ORE market data XML. d. Maprisk_report_configanalytics flags →ore.xmlconfig. e. Package one tarball per book. f. Upload each tarball to storage viaores.http.server. g. Returnprepare_ore_package_resultwith tarball URIs. - Wire into the workflow definition (step between assemble_bundle and submit_compute).
Dependencies
- ORE XML mapping code (
ores.ore/orexml/facet). - Pricing engine configuration mapping.
- Object storage upload API.
Status
Deferred — requires ORE XML generation capabilities not yet available.
Phase 3.9: Submit to compute grid DEFERRED
Summary
ores.compute.service receives the tarball URIs, creates a batch and
workunits, and dispatches work assignments to the JetStream stream.
Tasks
- Add
submit_computehandler toores.compute.service: a. Create abatchrecord linked to thereport_instance_id. b. For each tarball URI, create aworkunitwithinput_uri. c. Publishwork_assignment_eventto JetStream for each workunit. d. Returnsubmit_compute_resultwithbatch_id. - Wire into the workflow definition.
Status
Deferred — depends on Phase 3.8 (tarball URIs) and JetStream integration.
Phase 3.10: Grid execution and async wait DEFERRED
Summary
The workflow enters an async wait state while BOINC-style compute
wrappers execute ORE. When all workunits complete, the compute
service's assimilator publishes a batch-completed event that the
workflow engine receives as a step-completed event.
Tasks
- Implement the assimilator batch-completion → step-completed bridge
in
ores.compute.service. - The workflow step for this phase is passive: no command is published, only a completion event is expected.
Notes
This phase requires a mechanism for the workflow engine to wait for an externally triggered event. Current options:
- The submit_compute step returns immediately and a separate
subscription in
ores.compute.servicewatches for batch completion and publishes the step-completed event. - The workflow engine supports a
wait_for_eventstep type (future).
Status
Deferred — depends on compute grid execution infrastructure.
Phase 3.11: Process results DEFERRED
Summary
Download compute outputs from storage, validate, and push to downstream services (results storage, analytics).
Tasks
- Add
process_resultshandler toores.reporting.service. - For each workunit result: download output, validate, store.
- Wire into the workflow definition.
Status
Deferred — depends on Phases 3.9–3.10.
Phased workflow definition evolution
The workflow definition grows incrementally as phases are implemented:
| Phase | Steps in definition |
|---|---|
| 3.2 | gather_trades → assemble_bundle → finalise |
| 3.5 | gather_trades → gather_market_data → assemble → final |
| 3.8 | … → assemble → prepare_ore_package → finalise |
| 3.9 | … → prepare_ore_package → submit_compute → finalise |
| 3.10 | … → submit_compute → wait_compute → finalise |
| 3.11 | … → wait_compute → process_results → finalise |
At each phase the workflow is end-to-end functional with the steps available, and produces a completed report instance (even if the report content is partial).
FSM state considerations
The existing report_instance_lifecycle FSM has 7 states:
pending, queued, running, completed, failed, cancelled,
skipped.
The report execution design document proposes more granular states
(gathering_inputs, preparing_package, computing, etc.). These
are deferred: the workflow engine's workflow_step table already
provides per-step visibility. The report instance FSM transitions
are:
pending→running(when workflow starts).running→completed(finalise step succeeds).running→failed(any step fails; compensation marks failed).running→cancelled(user cancels; future feature).
Granular FSM states can be added later as an enhancement without changing the workflow architecture.
Implementation order
Recommended order for implementation within a single session or across sessions:
- Phase 3.1 — fix trigger() (unblocks everything).
- Phase 3.2 — protocol structs + workflow definition skeleton.
- Phase 3.3 — risk_report_config domain model (unblocks data steps).
- Phase 3.4 — gather trades handler (first working step).
- Phase 3.7 — finalise handler (end-to-end workflow completes).
- Phase 3.5 — gather market data (enriches data).
- Phase 3.6 — assemble bundle (persists gathered data).
- Phases 3.8–3.11 — ORE mapping and compute (when ready).
Success criteria
- A scheduled report definition fires, creates an instance with correct FSM state, dispatches a workflow, gathers trades, and marks the instance as completed.
workflow_instanceandworkflow_steptables show the full audit trail.- Concurrency policy is enforced: skip/queue/fail produce the correct initial state.
- The workflow can be interrupted and resumed on service restart.