ORE Studio 0.0.4
Loading...
Searching...
No Matches
Public Member Functions | List of all members
client Class Reference

NATS client: connection, pub/sub, request/reply, and JetStream. More...

#include <client.hpp>

Collaboration diagram for client:
Collaboration graph

Public Member Functions

 client (config::nats_options opts)
 
 client (const client &)=delete
 
clientoperator= (const client &)=delete
 
void connect ()
 Connect to the NATS server and initialise JetStream.
 
void disconnect ()
 Disconnect from NATS, freeing all cnats resources.
 
bool is_connected () const noexcept
 
void publish (std::string_view subject, std::span< const std::byte > data, std::unordered_map< std::string, std::string > headers={})
 Publish a message to a subject.
 
message request_sync (std::string_view subject, std::span< const std::byte > data, std::unordered_map< std::string, std::string > headers={}, std::chrono::milliseconds timeout=std::chrono::seconds(30))
 Synchronous request/reply.
 
boost::asio::awaitable< messagerequest (std::string_view subject, std::span< const std::byte > data, std::unordered_map< std::string, std::string > headers={}, std::chrono::milliseconds timeout=std::chrono::seconds(30))
 Asynchronous request/reply (ASIO coroutine).
 
subscription subscribe (std::string_view subject, message_handler handler)
 Subscribe to a subject.
 
subscription queue_subscribe (std::string_view subject, std::string_view queue_group, message_handler handler)
 Queue-group subscribe (competing consumers).
 
void js_publish (std::string_view subject, std::span< const std::byte > data, std::unordered_map< std::string, std::string > headers={})
 Publish a message to a JetStream stream.
 
subscription js_subscribe (std::string_view subject, std::string_view durable_name, message_handler handler)
 Push-subscribe to a JetStream subject with a durable consumer.
 
subscription js_queue_subscribe (std::string_view subject, std::string_view durable_name, std::string_view queue_group, message_handler handler)
 Queue-group push-subscribe to a JetStream subject.
 
jetstream_admin make_admin ()
 Create a JetStream admin handle for managing streams and consumers.
 
void drain ()
 Graceful shutdown.
 
std::string make_subject (std::string_view relative) const
 Prepend the configured subject prefix to a relative subject.
 

Detailed Description

NATS client: connection, pub/sub, request/reply, and JetStream.

No cnats types appear in this header. All cnats state is hidden behind a Pimpl. The same client instance is used by both service-side handlers (via queue_subscribe) and client-side request/reply callers.

Thread safety:

Lifecycle: all subscription handles must be destroyed before the client that created them.

Service-side usage
nats::service::client nats(cfg.nats);
nats.connect();
auto sub = nats.queue_subscribe(
"ores.iam.v1.>", "ores.iam.service",
[](nats::message msg) { handle(msg); });
nats.drain(); // blocks until SIGINT/SIGTERM
A received NATS message.
Definition message.hpp:40
NATS client: connection, pub/sub, request/reply, and JetStream.
Definition client.hpp:73
subscription queue_subscribe(std::string_view subject, std::string_view queue_group, message_handler handler)
Queue-group subscribe (competing consumers).
Definition client.cpp:386
void connect()
Connect to the NATS server and initialise JetStream.
Definition client.cpp:227
void drain()
Graceful shutdown.
Definition client.cpp:489
Client-side usage
nats::service::client nats(cfg.nats);
nats.connect();
auto reply = nats.request_sync(
"ores.iam.v1.accounts.list", msgpack_bytes,
{{"Authorization", "Bearer " + jwt}});
message request_sync(std::string_view subject, std::span< const std::byte > data, std::unordered_map< std::string, std::string > headers={}, std::chrono::milliseconds timeout=std::chrono::seconds(30))
Synchronous request/reply.
Definition client.cpp:310

Member Function Documentation

◆ connect()

void connect ( )

Connect to the NATS server and initialise JetStream.

Synchronous. Throws std::runtime_error on failure.

◆ disconnect()

void disconnect ( )

Disconnect from NATS, freeing all cnats resources.

Safe to call more than once. All subscriptions must be destroyed before calling this.

◆ publish()

void publish ( std::string_view  subject,
std::span< const std::byte >  data,
std::unordered_map< std::string, std::string >  headers = {} 
)

Publish a message to a subject.

Fire-and-forget. headers are optional; pass the Authorization header for authenticated publishes.

Here is the caller graph for this function:

◆ request_sync()

message request_sync ( std::string_view  subject,
std::span< const std::byte >  data,
std::unordered_map< std::string, std::string >  headers = {},
std::chrono::milliseconds  timeout = std::chrono::seconds(30) 
)

Synchronous request/reply.

Blocks until a response arrives or timeout elapses. Throws on timeout or transport error. Suitable for Qt/shell client code that is not running on an ASIO executor.

Here is the caller graph for this function:

◆ request()

boost::asio::awaitable< message > request ( std::string_view  subject,
std::span< const std::byte >  data,
std::unordered_map< std::string, std::string >  headers = {},
std::chrono::milliseconds  timeout = std::chrono::seconds(30) 
)

Asynchronous request/reply (ASIO coroutine).

Must be called from within a boost::asio::awaitable coroutine. The caller's executor is used for callback delivery.

◆ subscribe()

subscription subscribe ( std::string_view  subject,
message_handler  handler 
)

Subscribe to a subject.

All instances receive every message (fan-out). Use queue_subscribe for competing-consumer load balancing.

◆ queue_subscribe()

subscription queue_subscribe ( std::string_view  subject,
std::string_view  queue_group,
message_handler  handler 
)

Queue-group subscribe (competing consumers).

Only one subscriber in the queue_group receives each message. Use this for all service-side subscriptions so that multiple instances of the same service share the load.

Convention: queue_group == service name, e.g. "ores.iam.service".

◆ js_publish()

void js_publish ( std::string_view  subject,
std::span< const std::byte >  data,
std::unordered_map< std::string, std::string >  headers = {} 
)

Publish a message to a JetStream stream.

The target subject must be covered by a stream configured on the NATS server. Blocks until the server acknowledges receipt. Throws on failure.

◆ js_subscribe()

subscription js_subscribe ( std::string_view  subject,
std::string_view  durable_name,
message_handler  handler 
)

Push-subscribe to a JetStream subject with a durable consumer.

Messages are delivered automatically to handler. The consumer state survives client restarts (durable).

Each message is auto-acknowledged after handler returns without throwing.

◆ js_queue_subscribe()

subscription js_queue_subscribe ( std::string_view  subject,
std::string_view  durable_name,
std::string_view  queue_group,
message_handler  handler 
)

Queue-group push-subscribe to a JetStream subject.

Combines durable consumer semantics with competing-consumer load balancing across a queue group.

◆ make_admin()

jetstream_admin make_admin ( )

Create a JetStream admin handle for managing streams and consumers.

The returned jetstream_admin is lightweight and borrows this client's JetStream context. The client must outlive the admin handle.

◆ drain()

void drain ( )

Graceful shutdown.

Stops accepting new messages, waits for all in-flight deliveries to complete, then returns. Call this instead of disconnect() when you want a clean shutdown at the end of a service lifetime.

◆ make_subject()

std::string make_subject ( std::string_view  relative) const

Prepend the configured subject prefix to a relative subject.

If nats_options::subject_prefix is non-empty, returns "{prefix}.{relative}"; otherwise returns relative unchanged.

All pub/sub/request methods call this internally, so callers always pass relative subjects (e.g. "iam.v1.accounts.list").

Here is the caller graph for this function: