Server-Side ORE Import
Table of Contents
Overview
ORE import is currently client-side: the Qt wizard reads a local ORE directory,
parses every XML file, and sends save_* NATS requests directly to downstream
services (refdata, trading, marketdata). This couples import to the Qt process,
leaves no audit trail, and makes per-item error tracking difficult.
The new design moves all processing server-side. The client packs the ORE
directory into a tarball and uploads it to object storage; a dedicated
ores.ore.service microservice receives a NATS trigger, downloads and unpacks
the tarball, runs the import planner, and executes a saga against the
downstream services. Every log line carries a correlation_id so the full
server-side trace for a single import can be grepped across services.
Status
| Phase | Item | Status | PRs |
|---|---|---|---|
| 1 | ores.storage http_client + archiver |
Pending | — |
| 1 | ores.storage storage_transfer |
Pending | — |
| 2 | ores.ore.api codegen scaffold |
Pending | — |
| 2 | ore_storage bucket helper | Pending | — |
| 2 | ore_import_protocol types | Pending | — |
| 3 | ores.ore.service codegen scaffold |
Pending | — |
| 3 | config / app / messaging / workflow | Pending | — |
| 3 | Correlation ID logging throughout | Pending | — |
| 4 | Qt: pack_and_upload | Pending | — |
| 4 | Qt: NATS ore.import request / response | Pending | — |
| 4 | Qt: display item_errors + correlation_id | Pending | — |
Architecture
Qt client
1. storage_transfer::pack_and_upload(ore_dir, "ore-imports", "{id}.tar.gz")
2. NATS request → workflow.v1.ore.import { request_id, import_choices_json }
3. Await ore_import_response; display item_errors
ores.ore.service
1. Receive + authenticate workflow.v1.ore.import
2. Create workflow_instance (type=ore_import, status=in_progress)
3. storage_transfer::fetch_and_unpack("ore-imports", "{id}.tar.gz", work_dir/{id}/)
4. ore_directory_scanner::scan(work_dir/{id}/)
5. NATS → refdata.v1.currencies.list (get existing iso_codes)
6. ore_import_planner::plan(scan, iso_codes, import_choices)
7. Saga: currencies → portfolios → books → trades → market data
8. On failure: compensate in reverse; collect item_errors
9. Reply ore_import_response { success, item_errors, correlation_id }
Design Principles
ores.http.serverhandles storage upload only. No workflow logic there.ores.ore.serviceis a standalone binary; not linked into any other service.storage_transferlives inores.storage(no Qt dependency) — usable from Qt, CLI, wt, integration tests, and any future service.- Trade save failures are partial: errors collected into
item_errorsbut the saga continues for remaining items. Infrastructure failures (DB, NATS down) trigger full compensation. - Every log line in
ores.ore.servicecarriescorrelation_id.
Phase 1: ores.storage Transfer Helpers
Adds three new types to ores.storage. ores.compute.wrapper keeps its own
copies of http_client and archiver for now; deduplication is a follow-on PR.
ores.storage/net/http_client
Boost Beast synchronous GET and PUT over plain TCP.
namespace ores::storage::net { class http_client { public: static void get(const std::string& url, const std::filesystem::path& dest); static void put(const std::string& url, const std::filesystem::path& src); private: struct url_parts { std::string host, port, path; }; static url_parts parse_url(const std::string& url); }; }
ores.storage/filesystem/archiver
libarchive .tar.gz pack and extract.
namespace ores::storage::filesystem { class archiver final { public: static void pack(const std::filesystem::path& src_dir, const std::filesystem::path& dest_archive); static void extract(const std::filesystem::path& archive, const std::filesystem::path& dest_dir); }; }
ores.storage/net/storage_transfer
Symmetric composite. Atomic operations (upload, download, pack, unpack) are exposed individually so callers that need to transfer a single file (CSV, JSON config, etc.) can do so without touching the archive layer. The composite helpers combine pack+upload and download+unpack using a UUID-named temp file.
namespace ores::storage::net { class storage_transfer { public: explicit storage_transfer(std::string http_base_url); // Atomic operations void pack(const std::filesystem::path& src_dir, const std::filesystem::path& dest_archive); void unpack(const std::filesystem::path& archive, const std::filesystem::path& dest_dir); void upload(const std::string& bucket, const std::string& key, const std::filesystem::path& src_file); void download(const std::string& bucket, const std::string& key, const std::filesystem::path& dest_file); // Composite helpers void pack_and_upload(const std::filesystem::path& src_dir, const std::string& bucket, const std::string& key); void fetch_and_unpack(const std::string& bucket, const std::string& key, const std::filesystem::path& dest_dir); private: std::string http_base_url_; }; }
Temp files live in std::filesystem::temp_directory_path() with a UUID suffix
and are deleted after use. All operations log at DEBUG with bucket, key, bytes,
and duration.
CMake changes
target_link_libraries(${lib_target_name}
PUBLIC
ores.logging.lib
Boost::beast
Boost::asio
LibArchive::LibArchive)
Phase 2: ores.ore.api Component
Scaffolded via codegen.
Codegen model
models/ore.api/ores_ore_api_component.json:
{
"component": {
"name": "ore.api",
"full_name": "ores.ore.api",
"brief": "ORE import API types and protocol",
"description": "Protocol types and bucket helpers for server-side ORE import."
}
}
ores.ore.api/net/ore_storage
Bucket helper mirroring compute_storage.
namespace ores::ore::net { struct ore_storage { static constexpr std::string_view bucket = "ore-imports"; static std::string import_key(std::string_view request_id); // "{id}.tar.gz" static std::string import_path(std::string_view request_id); // "/api/v1/storage/..." }; }
ores.ore.api/messaging/ore_import_protocol
namespace ores::ore::messaging { struct ore_import_item_error { std::string source_file; // relative path within the ORE directory std::string item_id; // trade_id, iso_code, series_key, etc. std::string message; }; struct ore_import_request { using response_type = ore_import_response; static constexpr std::string_view nats_subject = "workflow.v1.ore.import"; std::string request_id; std::string import_choices_json; std::string correlation_id; }; struct ore_import_response { bool success = false; std::string message; std::vector<ore_import_item_error> item_errors; std::string correlation_id; }; }
CMake deps
target_link_libraries(${lib_target_name}
PUBLIC
ores.storage.lib
ores.logging.lib)
Phase 3: ores.ore.service Component
Scaffolded via codegen. Structure follows ores.workflow.service exactly.
Codegen model
models/ore.service/ores_ore_service_component.json:
{
"component": {
"name": "ore.service",
"full_name": "ores.ore.service",
"brief": "ORE import microservice",
"description": "Standalone NATS microservice that receives ORE import workflow requests, downloads and unpacks ORE directory tarballs from storage, plans the import, and orchestrates saga steps against refdata, trading, and marketdata services."
}
}
File structure
ores.ore.service/ config/options.hpp/.cpp — db, nats, work_dir, http_base_url config/parser.hpp/.cpp app/application.hpp/.cpp — boots NATS + DB, domain_service_runner messaging/registrar.hpp/.cpp — subscribes to workflow.v1.ore.import messaging/ore_import_handler.hpp/.cpp service/ore_import_workflow.hpp/.cpp main.cpp
Correlation ID logging
Every log line carries corr=<X>:
[info] ore.import received | corr=<X> request_id=<Y> [debug] fetching tarball | corr=<X> bucket=ore-imports key=<Y>.tar.gz [debug] tarball fetched | corr=<X> bytes=<N> duration=<D>ms [debug] extracting | corr=<X> dest=/work/<Y> [debug] directory extracted | corr=<X> files=<N> [info] scanning ORE directory | corr=<X> [info] import plan | corr=<X> currencies=N portfolios=N books=N trades=N [debug] saving currency | corr=<X> iso_code=<C> [debug] saving portfolio | corr=<X> name=<P> id=<uuid> [debug] saving book | corr=<X> name=<B> id=<uuid> [debug] saving trade | corr=<X> file=<F> trade_id=<T> [debug] saving market datum | corr=<X> series=<S> [info] import complete | corr=<X> duration=<D>ms [warn] item failed | corr=<X> file=<F> item=<T> error=<E> [error] import failed | corr=<X> step=<S> error=<E> [warn] compensating | corr=<X> completed_steps=<N> [debug] compensation complete | corr=<X>
The correlation_id is extracted from the inbound NATS header (or generated if
absent), stored in the workflow_instance record, forwarded as a header on every
outbound NATS call, and echoed in ore_import_response.
ore_import_handler
class ore_import_handler { public: ore_import_handler(nats::client& nats, database::context ctx, security::jwt::jwt_authenticator signer, nats::nats_client outbound_nats, std::string http_base_url, std::string work_dir); void ore_import(nats::message msg); };
ore_import_workflow saga steps
execute():
0. storage_transfer::fetch_and_unpack("ore-imports", id+".tar.gz", work_dir/id/)
1. ore_directory_scanner::scan(work_dir/id/)
2. NATS → refdata.v1.currencies.list (existing iso_codes)
3. ore_import_planner::plan(scan, iso_codes, choices)
4. For each currency → NATS refdata.v1.currencies.save
5. For each portfolio → NATS refdata.v1.portfolios.save (parents first)
6. For each book → NATS refdata.v1.books.save
7. For each trade → NATS trading.v1.trades.save
collect ore_import_item_error on failure; continue remaining trades
8. For each market datum → NATS marketdata.v1.fixings.save
compensate() (reverse order):
Delete market data, trades, books, portfolios, currencies saved above.
Steps 0-3 have no server-side state to undo.
CMake deps
target_link_libraries(${lib_target_name}
PUBLIC
ores.service.lib
ores.workflow.lib
ores.ore.lib
ores.ore.api.lib
ores.storage.lib
ores.nats.lib
ores.iam.client.lib
ores.database.lib
ores.refdata.api.lib
ores.trading.api.lib
ores.marketdata.api.lib
ores.utility.lib
ores.telemetry.lib)
Phase 4: Qt Client Integration
- Generate
request_idUUID at wizard entry. - Call
storage_transfer::pack_and_upload(ore_dir, "ore-imports", ore_storage::import_key(request_id)). - On success, send NATS request to
workflow.v1.ore.import. - Await
ore_import_response; displayitem_errorsgrouped bysource_file. - Show
correlation_idon summary page for support reference.
The wizard no longer calls domain parsers or save_* services directly.
All ORE processing moves server-side.
Open Questions
- Books ownership:
ore_import_plan.booksisvector<refdata::domain::book>. Are books owned by refdata or trading? Clarify before implementing Phase 3 step 6. - Market data compensation: Does
marketdata.v1.fixings.deleteexist? If not, compensation for market data is a no-op until the endpoint is added. - Tarball retention: After a successful import, should the
ore-imports/{id}.tar.gzbe deleted? Suggestion: keep for 24h then purge via scheduler job (future work). ores.compute.wrapperdeduplication: Migrate to useores.storagehttp_client and archiver once Phase 1 is stable. Defer to a cleanup PR.