NATS Idiomatic Architecture: Drop Binary Protocol, Adopt MessagePack + JetStream

Table of Contents

Context

The 2026-03-13 NATS migration plan replaced the TCP/SSL transport with NATS but preserved the custom binary frame format (magic bytes + uint16 message type + JWT + payload). NATS was therefore used as a dumb pipe — its routing, pub/sub, and persistence capabilities were completely ignored.

This plan supersedes the original plan's remaining phases with a fully idiomatic NATS architecture:

  • NATS subjects replace the uint16 message type field for routing
  • MessagePack (rfl::msgpack) replaces the custom binary frame as the payload format
  • Authorization: Bearer <token> in NATS message headers replaces JWT-in-frame
  • JetStream replaces PGMQ (ores.mq) for durable queuing from day one
  • Queue groups provide horizontal scaling at zero cost
  • ores.comms is deleted entirely — NATS is the bus, not a thin wrapper over it

No backwards compatibility is required. The system may be non-functional between phases.

Key Architectural Decisions

Subject Naming

{component}.v1.{entity}.{operation}    -- request/reply
{component}.v1.events.{type}           -- pub/sub domain events
{component}.v1.js.{stream}             -- JetStream subjects

Examples:

  • ores.iam.v1.accounts.list
  • ores.iam.v1.accounts.create
  • ores.refdata.v1.currencies.get
  • ores.iam.v1.events.account_changed
  • ores.trading.v1.js.trades

Each service subscribes to ores.{domain}.v1.> with a queue group equal to the service name (e.g. ores.iam.service). Multiple instances of the same service automatically load-balance via queue group semantics.

Payload Format

MessagePack via rfl::msgpack. Request and response are plain domain structs serialised directly — no frame wrapper, no magic bytes, no type field.

vcpkg change required: add "msgpack" to reflectcpp features.

Authentication

JWT goes in the NATS message header Authorization: Bearer <token>. This follows HTTP conventions and is the standard pattern in NATS microservice examples. The handler extracts and validates the token from the header before processing the payload.

No server-side session tracking is required: JWTs are self-contained (claims carry account_id, tenant_id, roles).

JetStream

JetStream is used from day one for:

  • Domain event persistence (e.g. ores.iam.v1.events.* → stream ORES_IAM_EVENTS)
  • Work queues replacing PGMQ (e.g. ores.scheduler.v1.js.jobs)
  • Chart history replay (future: DeliverByStartTime consumer)

JetStream streams and consumers are declared by each service at startup via the management API.

Queue Groups

Every service subscriber uses queue_subscribe with the service name as the queue group. This gives horizontal scaling with no extra configuration.

ores.mq and ores.comms are deleted

ores.mq (PGMQ) is superseded by JetStream. ores.comms (binary frame protocol, TCP client, message dispatcher, subscription manager) is superseded by NATS itself. Both projects are removed from the build.

ores.nats Client Interface

ores.nats is expanded into the core network foundation. No cnats types appear in any public header (Pimpl throughout). The library provides:

  • ores::nats::message — received message (subject, reply_subject, data, headers)
  • ores::nats::subscription — RAII unsubscribe handle
  • ores::nats::client — connection, publish, request/reply, subscribe, JetStream

The existing nats_server and nats_client (which still inherit from ores.comms interfaces) are replaced by this new client class.

Current State (as of 2026-03-14)

  • Phases 1–3 of the original plan are complete: NNG removed, cnats added, ores.comms.service uses NATS transport via nats_server
  • 10 domain service scaffolds exist (ores.mq.service deleted today)
  • Qt client and shell still use binary frames via the old nats_client
  • ores.comms still exists and is depended on by multiple projects

Phase 1 — ores.nats Clean Client Interface

PR: [nats] Replace nats_server/nats_client with clean client interface

Remove all ores.comms dependencies from ores.nats. Implement the new interface with full Pimpl.

File Action
ores.nats/domain/message.hpp NEWstruct message { subject, reply_subject, data, headers }
ores.nats/service/subscription.hpp/cpp NEW — RAII unsubscribe handle
ores.nats/service/client.hpp/cpp NEW — Pimpl client; see interface below
ores.nats/service/nats_server.hpp/cpp DELETE — replaced by client
ores.nats/service/nats_client.hpp/cpp DELETE — replaced by client
ores.nats/src/CMakeLists.txt Remove ores.comms.lib dependency
vcpkg.json Add "msgpack" to reflectcpp features

Client interface:

class client {
public:
    explicit client(config::nats_options opts);
    ~client();
    client(const client&) = delete;
    client& operator=(const client&) = delete;

    // Lifecycle
    void connect();       // synchronous, throws on failure
    void disconnect();
    [[nodiscard]] bool is_connected() const noexcept;

    // Request/reply
    [[nodiscard]] message request_sync(
        std::string_view subject,
        std::span<const std::byte> data,
        std::unordered_map<std::string, std::string> headers = {},
        std::chrono::milliseconds timeout = std::chrono::seconds(30));

    [[nodiscard]] boost::asio::awaitable<message> request(
        std::string_view subject,
        std::span<const std::byte> data,
        std::unordered_map<std::string, std::string> headers = {},
        std::chrono::milliseconds timeout = std::chrono::seconds(30));

    // Core pub/sub
    void publish(std::string_view subject,
        std::span<const std::byte> data,
        std::unordered_map<std::string, std::string> headers = {});

    [[nodiscard]] subscription subscribe(std::string_view subject,
        message_handler handler);

    [[nodiscard]] subscription queue_subscribe(std::string_view subject,
        std::string_view queue_group,
        message_handler handler);

    // JetStream
    void js_publish(std::string_view subject,
        std::span<const std::byte> data,
        std::unordered_map<std::string, std::string> headers = {});

    [[nodiscard]] subscription js_subscribe(
        std::string_view subject,
        std::string_view durable_name,
        message_handler handler);

    [[nodiscard]] subscription js_queue_subscribe(
        std::string_view subject,
        std::string_view durable_name,
        std::string_view queue_group,
        message_handler handler);

    void drain();

private:
    struct impl;
    std::unique_ptr<impl> impl_;
};

Working state: ores.nats compiles with no ores.comms dependency. Existing users of nats_server (ores.comms.service) and old nats_client (ores.qt, ores.comms.shell) do not yet compile — fix in subsequent phases.

Phase 2 — Wire Up Domain Services

PR: [nats] Wire up domain services with idiomatic subjects and MessagePack

Each of the 10 domain services registers subject-based handlers using client::queue_subscribe. The domain registrar interfaces change: instead of taking a message_server&, they take a nats::client& and register their own subject handlers directly.

Pattern for each service application.cpp:

// 1. Create DB context
auto ctx = make_context(cfg.database);

// 2. Create NATS client
nats::client nats(cfg.nats);
nats.connect();

// 3. Register domain handlers (queue group = service name)
ores::iam::messaging::registrar::register_handlers(
    nats, "ores.iam.service", ctx, ...);

// 4. Run (block until SIGINT/SIGTERM)
nats.drain();

Registrar subject convention:

Service Subscribes to Queue group
ores.iam.service ores.iam.v1.> ores.iam.service
ores.refdata.service ores.refdata.v1.> ores.refdata.service
ores.dq.service ores.dq.v1.> ores.dq.service
ores.variability.service ores.variability.v1.> ores.variability.service
ores.assets.service ores.assets.v1.> ores.assets.service
ores.telemetry.service ores.telemetry.v1.> ores.telemetry.service
ores.trading.service ores.trading.v1.> ores.trading.service
ores.scheduler.service ores.scheduler.v1.> ores.scheduler.service
ores.reporting.service ores.reporting.v1.> ores.reporting.service
ores.synthetic.service ores.synthetic.v1.> ores.synthetic.service

Each domain registrar implementation replaces binary-frame dispatch with per-subject queue_subscribe calls. Payload is deserialised with rfl::msgpack. JWT is extracted from the Authorization header.

ores.comms.service is deleted once all 10 services are wired.

Working state: All 10 services start, connect to NATS, and handle domain requests. Shell and Qt clients are still broken (next phase).

Phase 3 — Delete ores.comms

PR: [cleanup] Delete ores.comms and ores.comms.service

File/Project Action
projects/ores.comms/ Delete entire project
projects/ores.comms.service/ Delete entire project
projects/ores.mq/ Delete entire project
projects/CMakeLists.txt Remove the three add_subdirectory entries
All remaining dependents Ensure no ores.comms.lib / ores.mq.lib in any target_link_libraries

Working state: Clean build with no ores.comms or ores.mq in the tree.

Phase 4 — Rewrite Shell Client

PR: [nats] Rewrite ores.comms.shell using ores.nats::client

ores.comms.shell currently builds binary frames and sends them via the old nats_client. Replace with direct nats::client usage:

  • Each shell command calls client.request_sync() to the appropriate subject
  • Payload serialised with rfl::msgpack
  • JWT placed in the Authorization header

Working state: Shell commands work against the new domain services.

Phase 5 — Rewrite Qt Client

PR: [nats] Rewrite ores.qt ClientManager using ores.nats::client

ClientManager currently wraps the old nats_client (which wrapped client_base). Replace with nats::client:

  • connect() / disconnect() map directly
  • Each request from a Qt dialog calls request_sync() or the async request()
  • Notifications: Qt dialogs subscribe to ores.{domain}.v1.events.> via client.subscribe(); callbacks are posted to the Qt event loop via QMetaObject::invokeMethod(..., Qt::QueuedConnection)

Working state: Full Qt application functional. Login, all dialogs, live updates all work via NATS.

Phase 6 — JetStream Events and Eventing Bridge

PR: [nats] Bridge ores.eventing to JetStream pub/sub

The existing ores.eventing Postgres NOTIFY → event bus pipeline is extended: instead of notifying a subscription_manager, each domain service publishes to its JetStream event stream when entities change.

  • Each service creates its event stream at startup (e.g. ORES_IAM_EVENTS consuming ores.iam.v1.events.*)
  • Qt dialogs subscribe to relevant event subjects for live refresh
  • Replaces the old subscription_manager / subscription_handler mechanism

Working state: Live entity updates delivered via NATS JetStream pub/sub. Charts can use DeliverByStartTime consumer for history + live in one consumer.

Risks and Mitigations

Risk Mitigation
cnats callbacks on non-Qt thread client posts all callbacks to caller's ASIO io_context or Qt event loop
DB connections multiply ×10 across services Pool size 1–2 per service; verify PostgreSQL max_connections before Phase 2
JetStream stream name conflicts on shared NATS Prefix all stream names with ORES_ and document naming convention
Subject naming drift across services nats_subject_registry (compile-time map) enforced by each registrar

Deletions Summary

Project / Component Reason
ores.comms Replaced by NATS: subjects replace dispatcher, cnats replaces TCP client
ores.comms.service Replaced by 10 standalone domain services
ores.mq (PGMQ) Replaced by JetStream
ores.mq.service Already deleted
Old nats_server, old nats_client Replaced by ores.nats::client

Date: 2026-03-14

Emacs 29.1 (Org mode 9.6.6)