Server-Side ORE Import

Table of Contents

Overview

ORE import is currently client-side: the Qt wizard reads a local ORE directory, parses every XML file, and sends save_* NATS requests directly to downstream services (refdata, trading, marketdata). This couples import to the Qt process, leaves no audit trail, and makes per-item error tracking difficult.

The new design moves all processing server-side. The client packs the ORE directory into a tarball and uploads it to object storage; a dedicated ores.ore.service microservice receives a NATS trigger, downloads and unpacks the tarball, runs the import planner, and executes a saga against the downstream services. Every log line carries a correlation_id so the full server-side trace for a single import can be grepped across services.

Status

Phase Item Status PRs
1 ores.storage http_client + archiver Pending
1 ores.storage storage_transfer Pending
2 ores.ore.api codegen scaffold Pending
2 ore_storage bucket helper Pending
2 ore_import_protocol types Pending
3 ores.ore.service codegen scaffold Pending
3 config / app / messaging / workflow Pending
3 Correlation ID logging throughout Pending
4 Qt: pack_and_upload Pending
4 Qt: NATS ore.import request / response Pending
4 Qt: display item_errors + correlation_id Pending

Architecture

Qt client
  1. storage_transfer::pack_and_upload(ore_dir, "ore-imports", "{id}.tar.gz")
  2. NATS request → workflow.v1.ore.import { request_id, import_choices_json }
  3. Await ore_import_response; display item_errors

ores.ore.service
  1. Receive + authenticate workflow.v1.ore.import
  2. Create workflow_instance (type=ore_import, status=in_progress)
  3. storage_transfer::fetch_and_unpack("ore-imports", "{id}.tar.gz", work_dir/{id}/)
  4. ore_directory_scanner::scan(work_dir/{id}/)
  5. NATS → refdata.v1.currencies.list  (get existing iso_codes)
  6. ore_import_planner::plan(scan, iso_codes, import_choices)
  7. Saga: currencies → portfolios → books → trades → market data
  8. On failure: compensate in reverse; collect item_errors
  9. Reply ore_import_response { success, item_errors, correlation_id }

Design Principles

  1. ores.http.server handles storage upload only. No workflow logic there.
  2. ores.ore.service is a standalone binary; not linked into any other service.
  3. storage_transfer lives in ores.storage (no Qt dependency) — usable from Qt, CLI, wt, integration tests, and any future service.
  4. Trade save failures are partial: errors collected into item_errors but the saga continues for remaining items. Infrastructure failures (DB, NATS down) trigger full compensation.
  5. Every log line in ores.ore.service carries correlation_id.

Phase 1: ores.storage Transfer Helpers

Adds three new types to ores.storage. ores.compute.wrapper keeps its own copies of http_client and archiver for now; deduplication is a follow-on PR.

ores.storage/net/http_client

Boost Beast synchronous GET and PUT over plain TCP.

namespace ores::storage::net {
class http_client {
public:
    static void get(const std::string& url,
                    const std::filesystem::path& dest);
    static void put(const std::string& url,
                    const std::filesystem::path& src);
private:
    struct url_parts { std::string host, port, path; };
    static url_parts parse_url(const std::string& url);
};
}

ores.storage/filesystem/archiver

libarchive .tar.gz pack and extract.

namespace ores::storage::filesystem {
class archiver final {
public:
    static void pack(const std::filesystem::path& src_dir,
                     const std::filesystem::path& dest_archive);
    static void extract(const std::filesystem::path& archive,
                        const std::filesystem::path& dest_dir);
};
}

ores.storage/net/storage_transfer

Symmetric composite. Atomic operations (upload, download, pack, unpack) are exposed individually so callers that need to transfer a single file (CSV, JSON config, etc.) can do so without touching the archive layer. The composite helpers combine pack+upload and download+unpack using a UUID-named temp file.

namespace ores::storage::net {
class storage_transfer {
public:
    explicit storage_transfer(std::string http_base_url);

    // Atomic operations
    void pack(const std::filesystem::path& src_dir,
              const std::filesystem::path& dest_archive);
    void unpack(const std::filesystem::path& archive,
                const std::filesystem::path& dest_dir);
    void upload(const std::string& bucket, const std::string& key,
                const std::filesystem::path& src_file);
    void download(const std::string& bucket, const std::string& key,
                  const std::filesystem::path& dest_file);

    // Composite helpers
    void pack_and_upload(const std::filesystem::path& src_dir,
                         const std::string& bucket, const std::string& key);
    void fetch_and_unpack(const std::string& bucket, const std::string& key,
                          const std::filesystem::path& dest_dir);
private:
    std::string http_base_url_;
};
}

Temp files live in std::filesystem::temp_directory_path() with a UUID suffix and are deleted after use. All operations log at DEBUG with bucket, key, bytes, and duration.

CMake changes

target_link_libraries(${lib_target_name}
    PUBLIC
        ores.logging.lib
        Boost::beast
        Boost::asio
        LibArchive::LibArchive)

Phase 2: ores.ore.api Component

Scaffolded via codegen.

Codegen model

models/ore.api/ores_ore_api_component.json:

{
  "component": {
    "name": "ore.api",
    "full_name": "ores.ore.api",
    "brief": "ORE import API types and protocol",
    "description": "Protocol types and bucket helpers for server-side ORE import."
  }
}

ores.ore.api/net/ore_storage

Bucket helper mirroring compute_storage.

namespace ores::ore::net {
struct ore_storage {
    static constexpr std::string_view bucket = "ore-imports";
    static std::string import_key(std::string_view request_id);  // "{id}.tar.gz"
    static std::string import_path(std::string_view request_id); // "/api/v1/storage/..."
};
}

ores.ore.api/messaging/ore_import_protocol

namespace ores::ore::messaging {

struct ore_import_item_error {
    std::string source_file;  // relative path within the ORE directory
    std::string item_id;      // trade_id, iso_code, series_key, etc.
    std::string message;
};

struct ore_import_request {
    using response_type = ore_import_response;
    static constexpr std::string_view nats_subject = "workflow.v1.ore.import";
    std::string request_id;
    std::string import_choices_json;
    std::string correlation_id;
};

struct ore_import_response {
    bool success = false;
    std::string message;
    std::vector<ore_import_item_error> item_errors;
    std::string correlation_id;
};

}

CMake deps

target_link_libraries(${lib_target_name}
    PUBLIC
        ores.storage.lib
        ores.logging.lib)

Phase 3: ores.ore.service Component

Scaffolded via codegen. Structure follows ores.workflow.service exactly.

Codegen model

models/ore.service/ores_ore_service_component.json:

{
  "component": {
    "name": "ore.service",
    "full_name": "ores.ore.service",
    "brief": "ORE import microservice",
    "description": "Standalone NATS microservice that receives ORE import workflow requests, downloads and unpacks ORE directory tarballs from storage, plans the import, and orchestrates saga steps against refdata, trading, and marketdata services."
  }
}

File structure

ores.ore.service/
  config/options.hpp/.cpp        — db, nats, work_dir, http_base_url
  config/parser.hpp/.cpp
  app/application.hpp/.cpp       — boots NATS + DB, domain_service_runner
  messaging/registrar.hpp/.cpp   — subscribes to workflow.v1.ore.import
  messaging/ore_import_handler.hpp/.cpp
  service/ore_import_workflow.hpp/.cpp
  main.cpp

Correlation ID logging

Every log line carries corr=<X>:

[info]  ore.import received    | corr=<X> request_id=<Y>
[debug] fetching tarball       | corr=<X> bucket=ore-imports key=<Y>.tar.gz
[debug] tarball fetched        | corr=<X> bytes=<N> duration=<D>ms
[debug] extracting             | corr=<X> dest=/work/<Y>
[debug] directory extracted    | corr=<X> files=<N>
[info]  scanning ORE directory | corr=<X>
[info]  import plan            | corr=<X> currencies=N portfolios=N books=N trades=N
[debug] saving currency        | corr=<X> iso_code=<C>
[debug] saving portfolio       | corr=<X> name=<P> id=<uuid>
[debug] saving book            | corr=<X> name=<B> id=<uuid>
[debug] saving trade           | corr=<X> file=<F> trade_id=<T>
[debug] saving market datum    | corr=<X> series=<S>
[info]  import complete        | corr=<X> duration=<D>ms
[warn]  item failed            | corr=<X> file=<F> item=<T> error=<E>
[error] import failed          | corr=<X> step=<S> error=<E>
[warn]  compensating           | corr=<X> completed_steps=<N>
[debug] compensation complete  | corr=<X>

The correlation_id is extracted from the inbound NATS header (or generated if absent), stored in the workflow_instance record, forwarded as a header on every outbound NATS call, and echoed in ore_import_response.

ore_import_handler

class ore_import_handler {
public:
    ore_import_handler(nats::client& nats, database::context ctx,
                       security::jwt::jwt_authenticator signer,
                       nats::nats_client outbound_nats,
                       std::string http_base_url,
                       std::string work_dir);

    void ore_import(nats::message msg);
};

ore_import_workflow saga steps

execute():
  0. storage_transfer::fetch_and_unpack("ore-imports", id+".tar.gz", work_dir/id/)
  1. ore_directory_scanner::scan(work_dir/id/)
  2. NATS → refdata.v1.currencies.list  (existing iso_codes)
  3. ore_import_planner::plan(scan, iso_codes, choices)
  4. For each currency   → NATS refdata.v1.currencies.save
  5. For each portfolio  → NATS refdata.v1.portfolios.save  (parents first)
  6. For each book       → NATS refdata.v1.books.save
  7. For each trade      → NATS trading.v1.trades.save
     collect ore_import_item_error on failure; continue remaining trades
  8. For each market datum → NATS marketdata.v1.fixings.save

compensate() (reverse order):
  Delete market data, trades, books, portfolios, currencies saved above.
  Steps 0-3 have no server-side state to undo.

CMake deps

target_link_libraries(${lib_target_name}
    PUBLIC
        ores.service.lib
        ores.workflow.lib
        ores.ore.lib
        ores.ore.api.lib
        ores.storage.lib
        ores.nats.lib
        ores.iam.client.lib
        ores.database.lib
        ores.refdata.api.lib
        ores.trading.api.lib
        ores.marketdata.api.lib
        ores.utility.lib
        ores.telemetry.lib)

Phase 4: Qt Client Integration

  1. Generate request_id UUID at wizard entry.
  2. Call storage_transfer::pack_and_upload(ore_dir, "ore-imports", ore_storage::import_key(request_id)).
  3. On success, send NATS request to workflow.v1.ore.import.
  4. Await ore_import_response; display item_errors grouped by source_file.
  5. Show correlation_id on summary page for support reference.

The wizard no longer calls domain parsers or save_* services directly. All ORE processing moves server-side.

Open Questions

  • Books ownership: ore_import_plan.books is vector<refdata::domain::book>. Are books owned by refdata or trading? Clarify before implementing Phase 3 step 6.
  • Market data compensation: Does marketdata.v1.fixings.delete exist? If not, compensation for market data is a no-op until the endpoint is added.
  • Tarball retention: After a successful import, should the ore-imports/{id}.tar.gz be deleted? Suggestion: keep for 24h then purge via scheduler job (future work).
  • ores.compute.wrapper deduplication: Migrate to use ores.storage http_client and archiver once Phase 1 is stable. Defer to a cleanup PR.

Date: 2026-04-03

Emacs 29.1 (Org mode 9.6.6)