Report Execution Workflow — Implementation Plan

Table of Contents

Overview

This plan covers the implementation of the report execution workflow (Phase 3 from the workflow engine generalisation plan). It converts the report execution pipeline from a design document into working code, using the durable event-driven workflow engine built in Phases 1–2.

The pipeline takes a pending report instance and drives it through data gathering, ORE mapping, compute submission, result processing, and finalisation. It is implemented as a multi-step saga in ores.workflow, with step handlers distributed across ores.reporting.service, ores.ore.service, and ores.compute.service.

Implementation phases

The work is broken into incremental, independently testable phases. Each phase produces a buildable, committable unit. Later phases depend on earlier ones but can be deferred without breaking the system.

Phase 3.0: Prerequisites   DONE

  • [X] Create ores.workflow.api component (shared messaging types).
  • [X] Move workflow_events.hpp and workflow_protocol.hpp to ores.workflow.api.
  • [X] Add workflow_step_context helper to workflow_helpers.hpp.

Phase 3.1: Fix trigger() and add concurrency check

Summary

The current report_instance_handler::trigger() does not set fsm_state_id on the created instance and publishes a raw JSON blob to a subject that nothing handles. This phase fixes both issues.

Tasks

  1. Load report_instance_lifecycle FSM state map at startup in the reporting registrar. Use load_fsm_states(svc_nats, "report_instance_lifecycle") (same pattern as workflow registrar).
  2. Implement concurrency check in trigger():
    • Query for any existing report instance for the same definition with state in {pending, running}.
    • Apply concurrency_policy from the definition:
      • skip → create instance with state skipped; return.
      • queue → create instance with state queued; return.
      • fail → create instance with state failed; return.
      • Otherwise → create instance with state pending.
  3. Replace the raw JSON publish to workflow.v1.reports.run with a proper start_workflow_message publish to workflow.v1.start:
    • Type: report_execution_workflow
    • tenant_id: from trigger message.
    • request_json: serialised report_execution_request (see 3.2).
    • Pre-generate instance_id for the workflow instance.
  4. Set started_at = now() on the report instance.

Acceptance criteria

  • Report instance created by trigger has non-null fsm_state_id.
  • Concurrency policy is evaluated; skip=/=queue=/=fail produce the correct terminal or waiting state.
  • A start_workflow_message is published and the workflow engine creates a workflow_instance record.

Phase 3.2: Report execution protocol and workflow definition

Summary

Define the internal messaging structs for all report execution workflow steps and register the workflow definition in the engine.

Tasks

  1. Create ores.reporting.api/messaging/report_execution_protocol.hpp:

    // Stored as workflow_instance.request_json. Contains everything
    // the step builders need to dispatch commands.
    struct report_execution_request {
        std::string report_instance_id;
        std::string definition_id;
        std::string tenant_id;
        std::string correlation_id;
    };
    
    // Step 0: gather trades
    struct gather_trades_request {
        static constexpr std::string_view nats_subject =
            "reporting.v1.report.gather-trades";
        std::string report_instance_id;
        std::string definition_id;
        std::string tenant_id;
        std::string correlation_id;
    };
    
    struct gather_trades_result {
        bool success = false;
        std::string message;
        int trade_count = 0;
        std::string trades_json;  // serialised trade_export_item vector
    };
    
    // Step 1: gather market data
    struct gather_market_data_request {
        static constexpr std::string_view nats_subject =
            "reporting.v1.report.gather-market-data";
        std::string report_instance_id;
        std::string definition_id;
        std::string tenant_id;
        std::string correlation_id;
    };
    
    struct gather_market_data_result {
        bool success = false;
        std::string message;
        int series_count = 0;
        std::string market_data_json;  // serialised market_series vector
    };
    
    // Step 2: assemble report input bundle
    struct assemble_bundle_request {
        static constexpr std::string_view nats_subject =
            "reporting.v1.report.assemble-bundle";
        std::string report_instance_id;
        std::string definition_id;
        std::string tenant_id;
        std::string correlation_id;
    };
    
    struct assemble_bundle_result {
        bool success = false;
        std::string message;
        std::string bundle_ref;  // storage key or DB reference
    };
    
    // Step 3: ORE mapping and packaging (ores.ore.service)
    struct prepare_ore_package_request {
        static constexpr std::string_view nats_subject =
            "ore.v1.report.prepare-package";
        std::string report_instance_id;
        std::string bundle_ref;
        std::string tenant_id;
        std::string correlation_id;
    };
    
    struct prepare_ore_package_result {
        bool success = false;
        std::string message;
        std::vector<std::string> tarball_uris;  // per-book tarballs in storage
    };
    
    // Step 4: submit to compute (ores.compute.service)
    struct submit_compute_request {
        static constexpr std::string_view nats_subject =
            "compute.v1.report.submit";
        std::string report_instance_id;
        std::string tenant_id;
        std::string correlation_id;
        std::vector<std::string> tarball_uris;
    };
    
    struct submit_compute_result {
        bool success = false;
        std::string message;
        std::string batch_id;
    };
    
    // Step 5: finalise report instance
    struct finalise_report_request {
        static constexpr std::string_view nats_subject =
            "reporting.v1.report.finalise";
        std::string report_instance_id;
        std::string tenant_id;
        std::string correlation_id;
    };
    
    struct finalise_report_result {
        bool success = false;
        std::string message;
    };
    
  2. Create ores.workflow/service/report_execution_definitions.hpp:
    • Register report_execution_workflow with the step sequence.
    • Initial implementation: 3 steps (gather_trades → assemble_bundle → finalise). The ORE mapping and compute steps are added in later phases.
  3. Register the definition in ores.workflow/src/messaging/registrar.cpp.

Notes

  • Steps are numbered 0..N in the workflow definition.
  • The protocol includes all steps even though some are implemented later, so the structs don't change.
  • gather_market_data and prepare_ore_package and submit_compute are defined in the protocol but not wired into the workflow definition until their implementation phases.

Phase 3.3: risk_report_config domain model

Summary

Create a minimal C++ domain model for risk_report_config so the gather steps can query book/portfolio scope and report parameters.

Tasks

  1. Create ores.reporting.api/domain/risk_report_config.hpp:
    • Fields needed for execution: id, tenant_id, version, report_definition_id, base_currency, market_data_type, market_data_date, n_threads.
    • Analytics flags: npv_enabled, cashflow_enabled, curves_enabled, sensitivity_enabled, etc.
    • Keep the struct complete (all SQL columns) since other consumers may need them later.
  2. Create entity, mapper, and repository using the domain type creator Domain Type Creator Skill:
    • Entity: risk_report_config_entity.hpp
    • Mapper: risk_report_config_mapper.hpp
    • Repository: risk_report_config_repository.hpp with:
      • find_by_definition_id(ctx, definition_id) -> optional<risk_report_config>
      • read_latest(ctx) -> vector<risk_report_config>
  3. Add scope query methods to a new repository or the existing one:
    • get_book_scope(ctx, config_id) -> vector<string> — returns active book UUIDs from the junction table.
    • get_portfolio_scope(ctx, config_id) -> vector<string> — returns active portfolio UUIDs from the junction table.

Acceptance criteria

  • risk_report_config can be loaded by definition_id.
  • Book and portfolio scope lists can be retrieved for a config.

Phase 3.4: Gather trades step handler

Summary

Implement the first workflow step: load the risk report config, resolve book scope, and fetch all trades (with instruments) from the trading service.

Tasks

  1. Create ores.reporting.core/messaging/report_execution_handler.hpp:
    • Constructor: (nats, ctx, outbound_nats) or similar.
    • Method: gather_trades(ores::nats::message msg).
  2. Implement gather_trades: a. Extract workflow_step_context from message. b. Decode gather_trades_request from message body. c. Build tenant-scoped DB context from tenant_id. d. Load risk_report_config by definition_id. e. Get book scope from junction table. f. If book scope is empty, get portfolio scope; for each portfolio, query books (via refdata.v1.books.list or similar). g. For each book, call trading.v1.trades.portfolio.export via outbound NATS to get trades with resolved instruments. h. Aggregate all trade_export_item vectors. i. Build gather_trades_result with serialised trades JSON and trade count. j. Call wf.complete(rfl::json::write(result)).
  3. Subscribe gather_trades_request::nats_subject in the reporting registrar (ores.reporting.core/src/messaging/registrar.cpp).

Acceptance criteria

  • A triggered report instance creates a workflow that dispatches gather_trades to the reporting service.
  • The step handler loads the config, resolves scope, fetches trades, and publishes step_completed with the trade data.

Phase 3.5: Gather market data step handler

Summary

Fetch market data series relevant to the trade portfolio.

Tasks

  1. Add gather_market_data method to report_execution_handler.
  2. Implement: a. Extract workflow_step_context. b. Decode gather_market_data_request. c. Load risk_report_config for market_data_type and market_data_date. d. Call marketdata.v1.series.list to fetch all series for the tenant (filtered by type if applicable). e. Build gather_market_data_result with serialised market data. f. Call wf.complete(...).
  3. Subscribe in reporting registrar.
  4. Wire into the workflow definition (add as step between gather_trades and assemble_bundle).

Notes

  • The market data step depends on knowing which series types are relevant for the trade portfolio. Initially this can fetch all series and let the ORE mapping step filter what it needs.
  • More sophisticated filtering (inspecting trade types to determine required yield curves, FX rates, etc.) is a future enhancement.

Phase 3.6: Assemble report input bundle

Summary

Aggregate the trade data and market data from prior steps into a single report_input_bundle entity and persist it.

Tasks

  1. Create ores.reporting.api/domain/report_input_bundle.hpp:
    • Fields: id, instance_id, trades_json, market_data_json, config_json, created_at.
  2. Create entity, mapper, repository for report_input_bundle.
  3. Add assemble_bundle method to report_execution_handler: a. Decode request. b. Load previous step results from the command payload (trades JSON from step 0, market data JSON from step 1). c. Load risk_report_config snapshot as config JSON. d. Persist report_input_bundle. e. Return assemble_bundle_result with the bundle reference.
  4. Wire into the workflow definition.

Notes

  • The bundle could alternatively be stored in object storage (via ores.http.server) to avoid large DB rows. Decision deferred; initial implementation uses a DB row.

Phase 3.7: Finalise step handler

Summary

Mark the report instance as completed or failed and record timestamps.

Tasks

  1. Add finalise method to report_execution_handler: a. Decode finalise_report_request. b. Load the report instance. c. Transition FSM state to completed. d. Set completed_at = now(). e. Set output_message with execution summary. f. Call wf.complete(...).
  2. Add compensation handling for earlier steps:
    • gather_trades: no compensation needed (read-only).
    • gather_market_data: no compensation needed (read-only).
    • assemble_bundle: delete the report_input_bundle row.
    • On compensation, finalise is not called; instead the workflow engine marks the instance as compensated.
  3. Add a separate compensation handler that transitions the report instance to failed when the workflow compensates:
    • Subject: reporting.v1.report.fail
    • Sets output_message from the error, completed_at = now().

Acceptance criteria

  • After a successful workflow, the report instance has state completed, completed_at set, and output_message with summary.
  • After a failed workflow, the report instance has state failed with the error in output_message.

Phase 3.8: ORE mapping and packaging   DEFERRED

Summary

ores.ore.service receives the report_input_bundle reference, maps the engine-agnostic data to ORE XML format, packages per-book tarballs, and uploads them to object storage.

Tasks

  1. Add prepare_ore_package handler to ores.ore.service: a. Download the report_input_bundle. b. Map trades → ORE portfolio XML. c. Map market data → ORE market data XML. d. Map risk_report_config analytics flags → ore.xml config. e. Package one tarball per book. f. Upload each tarball to storage via ores.http.server. g. Return prepare_ore_package_result with tarball URIs.
  2. Wire into the workflow definition (step between assemble_bundle and submit_compute).

Dependencies

  • ORE XML mapping code (ores.ore/orexml/ facet).
  • Pricing engine configuration mapping.
  • Object storage upload API.

Status

Deferred — requires ORE XML generation capabilities not yet available.

Phase 3.9: Submit to compute grid   DEFERRED

Summary

ores.compute.service receives the tarball URIs, creates a batch and workunits, and dispatches work assignments to the JetStream stream.

Tasks

  1. Add submit_compute handler to ores.compute.service: a. Create a batch record linked to the report_instance_id. b. For each tarball URI, create a workunit with input_uri. c. Publish work_assignment_event to JetStream for each workunit. d. Return submit_compute_result with batch_id.
  2. Wire into the workflow definition.

Status

Deferred — depends on Phase 3.8 (tarball URIs) and JetStream integration.

Phase 3.10: Grid execution and async wait   DEFERRED

Summary

The workflow enters an async wait state while BOINC-style compute wrappers execute ORE. When all workunits complete, the compute service's assimilator publishes a batch-completed event that the workflow engine receives as a step-completed event.

Tasks

  1. Implement the assimilator batch-completion → step-completed bridge in ores.compute.service.
  2. The workflow step for this phase is passive: no command is published, only a completion event is expected.

Notes

This phase requires a mechanism for the workflow engine to wait for an externally triggered event. Current options:

  • The submit_compute step returns immediately and a separate subscription in ores.compute.service watches for batch completion and publishes the step-completed event.
  • The workflow engine supports a wait_for_event step type (future).

Status

Deferred — depends on compute grid execution infrastructure.

Phase 3.11: Process results   DEFERRED

Summary

Download compute outputs from storage, validate, and push to downstream services (results storage, analytics).

Tasks

  1. Add process_results handler to ores.reporting.service.
  2. For each workunit result: download output, validate, store.
  3. Wire into the workflow definition.

Status

Deferred — depends on Phases 3.9–3.10.

Phased workflow definition evolution

The workflow definition grows incrementally as phases are implemented:

Phase Steps in definition
3.2 gather_trades → assemble_bundle → finalise
3.5 gather_trades → gather_market_data → assemble → final
3.8 … → assemble → prepare_ore_package → finalise
3.9 … → prepare_ore_package → submit_compute → finalise
3.10 … → submit_compute → wait_compute → finalise
3.11 … → wait_compute → process_results → finalise

At each phase the workflow is end-to-end functional with the steps available, and produces a completed report instance (even if the report content is partial).

FSM state considerations

The existing report_instance_lifecycle FSM has 7 states: pending, queued, running, completed, failed, cancelled, skipped.

The report execution design document proposes more granular states (gathering_inputs, preparing_package, computing, etc.). These are deferred: the workflow engine's workflow_step table already provides per-step visibility. The report instance FSM transitions are:

  • pendingrunning (when workflow starts).
  • runningcompleted (finalise step succeeds).
  • runningfailed (any step fails; compensation marks failed).
  • runningcancelled (user cancels; future feature).

Granular FSM states can be added later as an enhancement without changing the workflow architecture.

Implementation order

Recommended order for implementation within a single session or across sessions:

  1. Phase 3.1 — fix trigger() (unblocks everything).
  2. Phase 3.2 — protocol structs + workflow definition skeleton.
  3. Phase 3.3 — risk_report_config domain model (unblocks data steps).
  4. Phase 3.4 — gather trades handler (first working step).
  5. Phase 3.7 — finalise handler (end-to-end workflow completes).
  6. Phase 3.5 — gather market data (enriches data).
  7. Phase 3.6 — assemble bundle (persists gathered data).
  8. Phases 3.8–3.11 — ORE mapping and compute (when ready).

Success criteria

  • A scheduled report definition fires, creates an instance with correct FSM state, dispatches a workflow, gathers trades, and marks the instance as completed.
  • workflow_instance and workflow_step tables show the full audit trail.
  • Concurrency policy is enforced: skip/queue/fail produce the correct initial state.
  • The workflow can be interrupted and resumed on service restart.