NATS Idiomatic Architecture: Drop Binary Protocol, Adopt MessagePack + JetStream
Table of Contents
- Context
- Key Architectural Decisions
- Current State (as of 2026-03-14)
- Phase 1 —
ores.natsClean Client Interface - Phase 2 — Wire Up Domain Services
- Phase 3 — Delete
ores.comms - Phase 4 — Rewrite Shell Client
- Phase 5 — Rewrite Qt Client
- Phase 6 — JetStream Events and Eventing Bridge
- Risks and Mitigations
- Deletions Summary
Context
The 2026-03-13 NATS migration plan replaced the TCP/SSL transport with NATS but preserved the custom binary frame format (magic bytes + uint16 message type + JWT + payload). NATS was therefore used as a dumb pipe — its routing, pub/sub, and persistence capabilities were completely ignored.
This plan supersedes the original plan's remaining phases with a fully idiomatic NATS architecture:
- NATS subjects replace the uint16 message type field for routing
- MessagePack (
rfl::msgpack) replaces the custom binary frame as the payload format Authorization: Bearer <token>in NATS message headers replaces JWT-in-frame- JetStream replaces PGMQ (
ores.mq) for durable queuing from day one - Queue groups provide horizontal scaling at zero cost
ores.commsis deleted entirely — NATS is the bus, not a thin wrapper over it
No backwards compatibility is required. The system may be non-functional between phases.
Key Architectural Decisions
Subject Naming
{component}.v1.{entity}.{operation} -- request/reply
{component}.v1.events.{type} -- pub/sub domain events
{component}.v1.js.{stream} -- JetStream subjects
Examples:
ores.iam.v1.accounts.listores.iam.v1.accounts.createores.refdata.v1.currencies.getores.iam.v1.events.account_changedores.trading.v1.js.trades
Each service subscribes to ores.{domain}.v1.> with a queue group equal to the
service name (e.g. ores.iam.service). Multiple instances of the same service
automatically load-balance via queue group semantics.
Payload Format
MessagePack via rfl::msgpack. Request and response are plain domain structs
serialised directly — no frame wrapper, no magic bytes, no type field.
vcpkg change required: add "msgpack" to reflectcpp features.
Authentication
JWT goes in the NATS message header Authorization: Bearer <token>. This
follows HTTP conventions and is the standard pattern in NATS microservice
examples. The handler extracts and validates the token from the header before
processing the payload.
No server-side session tracking is required: JWTs are self-contained (claims
carry account_id, tenant_id, roles).
JetStream
JetStream is used from day one for:
- Domain event persistence (e.g.
ores.iam.v1.events.*→ streamORES_IAM_EVENTS) - Work queues replacing PGMQ (e.g.
ores.scheduler.v1.js.jobs) - Chart history replay (future:
DeliverByStartTimeconsumer)
JetStream streams and consumers are declared by each service at startup via the management API.
Queue Groups
Every service subscriber uses queue_subscribe with the service name as the
queue group. This gives horizontal scaling with no extra configuration.
ores.mq and ores.comms are deleted
ores.mq (PGMQ) is superseded by JetStream. ores.comms (binary frame
protocol, TCP client, message dispatcher, subscription manager) is superseded by
NATS itself. Both projects are removed from the build.
ores.nats Client Interface
ores.nats is expanded into the core network foundation. No cnats types appear
in any public header (Pimpl throughout). The library provides:
ores::nats::message— received message (subject, reply_subject, data, headers)ores::nats::subscription— RAII unsubscribe handleores::nats::client— connection, publish, request/reply, subscribe, JetStream
The existing nats_server and nats_client (which still inherit from
ores.comms interfaces) are replaced by this new client class.
Current State (as of 2026-03-14)
- Phases 1–3 of the original plan are complete: NNG removed, cnats added,
ores.comms.serviceuses NATS transport vianats_server - 10 domain service scaffolds exist (
ores.mq.servicedeleted today) - Qt client and shell still use binary frames via the old
nats_client ores.commsstill exists and is depended on by multiple projects
Phase 1 — ores.nats Clean Client Interface
PR: [nats] Replace nats_server/nats_client with clean client interface
Remove all ores.comms dependencies from ores.nats. Implement the new
interface with full Pimpl.
| File | Action |
|---|---|
ores.nats/domain/message.hpp |
NEW — struct message { subject, reply_subject, data, headers } |
ores.nats/service/subscription.hpp/cpp |
NEW — RAII unsubscribe handle |
ores.nats/service/client.hpp/cpp |
NEW — Pimpl client; see interface below |
ores.nats/service/nats_server.hpp/cpp |
DELETE — replaced by client |
ores.nats/service/nats_client.hpp/cpp |
DELETE — replaced by client |
ores.nats/src/CMakeLists.txt |
Remove ores.comms.lib dependency |
vcpkg.json |
Add "msgpack" to reflectcpp features |
Client interface:
class client { public: explicit client(config::nats_options opts); ~client(); client(const client&) = delete; client& operator=(const client&) = delete; // Lifecycle void connect(); // synchronous, throws on failure void disconnect(); [[nodiscard]] bool is_connected() const noexcept; // Request/reply [[nodiscard]] message request_sync( std::string_view subject, std::span<const std::byte> data, std::unordered_map<std::string, std::string> headers = {}, std::chrono::milliseconds timeout = std::chrono::seconds(30)); [[nodiscard]] boost::asio::awaitable<message> request( std::string_view subject, std::span<const std::byte> data, std::unordered_map<std::string, std::string> headers = {}, std::chrono::milliseconds timeout = std::chrono::seconds(30)); // Core pub/sub void publish(std::string_view subject, std::span<const std::byte> data, std::unordered_map<std::string, std::string> headers = {}); [[nodiscard]] subscription subscribe(std::string_view subject, message_handler handler); [[nodiscard]] subscription queue_subscribe(std::string_view subject, std::string_view queue_group, message_handler handler); // JetStream void js_publish(std::string_view subject, std::span<const std::byte> data, std::unordered_map<std::string, std::string> headers = {}); [[nodiscard]] subscription js_subscribe( std::string_view subject, std::string_view durable_name, message_handler handler); [[nodiscard]] subscription js_queue_subscribe( std::string_view subject, std::string_view durable_name, std::string_view queue_group, message_handler handler); void drain(); private: struct impl; std::unique_ptr<impl> impl_; };
Working state: ores.nats compiles with no ores.comms dependency. Existing
users of nats_server (ores.comms.service) and old nats_client (ores.qt,
ores.comms.shell) do not yet compile — fix in subsequent phases.
Phase 2 — Wire Up Domain Services
PR: [nats] Wire up domain services with idiomatic subjects and MessagePack
Each of the 10 domain services registers subject-based handlers using
client::queue_subscribe. The domain registrar interfaces change: instead of
taking a message_server&, they take a nats::client& and register their own
subject handlers directly.
Pattern for each service application.cpp:
// 1. Create DB context auto ctx = make_context(cfg.database); // 2. Create NATS client nats::client nats(cfg.nats); nats.connect(); // 3. Register domain handlers (queue group = service name) ores::iam::messaging::registrar::register_handlers( nats, "ores.iam.service", ctx, ...); // 4. Run (block until SIGINT/SIGTERM) nats.drain();
Registrar subject convention:
| Service | Subscribes to | Queue group |
|---|---|---|
ores.iam.service |
ores.iam.v1.> |
ores.iam.service |
ores.refdata.service |
ores.refdata.v1.> |
ores.refdata.service |
ores.dq.service |
ores.dq.v1.> |
ores.dq.service |
ores.variability.service |
ores.variability.v1.> |
ores.variability.service |
ores.assets.service |
ores.assets.v1.> |
ores.assets.service |
ores.telemetry.service |
ores.telemetry.v1.> |
ores.telemetry.service |
ores.trading.service |
ores.trading.v1.> |
ores.trading.service |
ores.scheduler.service |
ores.scheduler.v1.> |
ores.scheduler.service |
ores.reporting.service |
ores.reporting.v1.> |
ores.reporting.service |
ores.synthetic.service |
ores.synthetic.v1.> |
ores.synthetic.service |
Each domain registrar implementation replaces binary-frame dispatch with
per-subject queue_subscribe calls. Payload is deserialised with rfl::msgpack.
JWT is extracted from the Authorization header.
ores.comms.service is deleted once all 10 services are wired.
Working state: All 10 services start, connect to NATS, and handle domain requests. Shell and Qt clients are still broken (next phase).
Phase 3 — Delete ores.comms
PR: [cleanup] Delete ores.comms and ores.comms.service
| File/Project | Action |
|---|---|
projects/ores.comms/ |
Delete entire project |
projects/ores.comms.service/ |
Delete entire project |
projects/ores.mq/ |
Delete entire project |
projects/CMakeLists.txt |
Remove the three add_subdirectory entries |
| All remaining dependents | Ensure no ores.comms.lib / ores.mq.lib in any target_link_libraries |
Working state: Clean build with no ores.comms or ores.mq in the tree.
Phase 4 — Rewrite Shell Client
PR: [nats] Rewrite ores.comms.shell using ores.nats::client
ores.comms.shell currently builds binary frames and sends them via the old
nats_client. Replace with direct nats::client usage:
- Each shell command calls
client.request_sync()to the appropriate subject - Payload serialised with
rfl::msgpack - JWT placed in the
Authorizationheader
Working state: Shell commands work against the new domain services.
Phase 5 — Rewrite Qt Client
PR: [nats] Rewrite ores.qt ClientManager using ores.nats::client
ClientManager currently wraps the old nats_client (which wrapped client_base).
Replace with nats::client:
connect()/disconnect()map directly- Each request from a Qt dialog calls
request_sync()or the asyncrequest() - Notifications: Qt dialogs subscribe to
ores.{domain}.v1.events.>viaclient.subscribe(); callbacks are posted to the Qt event loop viaQMetaObject::invokeMethod(..., Qt::QueuedConnection)
Working state: Full Qt application functional. Login, all dialogs, live updates all work via NATS.
Phase 6 — JetStream Events and Eventing Bridge
PR: [nats] Bridge ores.eventing to JetStream pub/sub
The existing ores.eventing Postgres NOTIFY → event bus pipeline is extended:
instead of notifying a subscription_manager, each domain service publishes to
its JetStream event stream when entities change.
- Each service creates its event stream at startup
(e.g.
ORES_IAM_EVENTSconsumingores.iam.v1.events.*) - Qt dialogs subscribe to relevant event subjects for live refresh
- Replaces the old
subscription_manager/subscription_handlermechanism
Working state: Live entity updates delivered via NATS JetStream pub/sub.
Charts can use DeliverByStartTime consumer for history + live in one consumer.
Risks and Mitigations
| Risk | Mitigation |
|---|---|
| cnats callbacks on non-Qt thread | client posts all callbacks to caller's ASIO io_context or Qt event loop |
| DB connections multiply ×10 across services | Pool size 1–2 per service; verify PostgreSQL max_connections before Phase 2 |
| JetStream stream name conflicts on shared NATS | Prefix all stream names with ORES_ and document naming convention |
| Subject naming drift across services | nats_subject_registry (compile-time map) enforced by each registrar |
Deletions Summary
| Project / Component | Reason |
|---|---|
ores.comms |
Replaced by NATS: subjects replace dispatcher, cnats replaces TCP client |
ores.comms.service |
Replaced by 10 standalone domain services |
ores.mq (PGMQ) |
Replaced by JetStream |
ores.mq.service |
Already deleted |
Old nats_server, old nats_client |
Replaced by ores.nats::client |