20#ifndef ORES_COMMS_NET_CLIENT_HPP
21#define ORES_COMMS_NET_CLIENT_HPP
33#include <condition_variable>
34#include <boost/asio/awaitable.hpp>
35#include <boost/asio/ssl.hpp>
36#include <boost/asio/strand.hpp>
37#include <boost/asio/ip/tcp.hpp>
38#include <boost/asio/io_context.hpp>
39#include "ores.logging/make_logger.hpp"
40#include "ores.eventing/service/event_bus.hpp"
41#include "ores.comms/net/client_options.hpp"
42#include "ores.comms/net/connection.hpp"
43#include "ores.comms/net/pending_request_map.hpp"
44#include "ores.comms/messaging/frame.hpp"
45#include "ores.comms/messaging/message_traits.hpp"
46#include "ores.comms/recording/session_recorder.hpp"
107 const std::string& event_type,
108 std::chrono::system_clock::time_point timestamp,
109 const std::vector<std::string>& entity_ids)>;
118 inline static std::string_view logger_name =
"ores.comms.net.client";
122 static auto instance = make_logger(logger_name);
129 void setup_ssl_context();
136 boost::asio::awaitable<void> perform_handshake();
144 boost::asio::awaitable<void> run_heartbeat();
154 boost::asio::awaitable<void> run_message_loop();
162 boost::asio::awaitable<void> run_reconnect_loop();
169 boost::asio::awaitable<void> perform_connection();
177 std::chrono::milliseconds calculate_backoff(std::uint32_t attempt)
const;
185 void notify_shutdown_if_complete();
199 std::uint32_t next_correlation_id();
211 std::shared_ptr<eventing::service::event_bus> event_bus =
nullptr);
221 std::shared_ptr<eventing::service::event_bus> event_bus =
nullptr);
238 boost::asio::awaitable<void>
connect();
286 bool await_shutdown(std::chrono::milliseconds timeout = std::chrono::milliseconds{5000});
354 std::expected<std::filesystem::path, recording::session_file_error>
379 boost::asio::awaitable<std::expected<messaging::frame, ores::utility::serialization::error_code>>
391 std::expected<messaging::frame, ores::utility::serialization::error_code>
405 template <
typename RequestType,
typename ResponseType,
406 messaging::message_type RequestMsgType>
407 std::expected<ResponseType, ores::utility::serialization::error_code>
411 auto payload = request.serialize();
416 return std::unexpected(result.error());
419 auto response_payload = result->decompressed_payload();
420 if (!response_payload) {
421 BOOST_LOG_SEV(lg(), error) <<
"Failed to decompress response";
422 return std::unexpected(response_payload.error());
425 auto response = ResponseType::deserialize(*response_payload);
427 BOOST_LOG_SEV(lg(), error) <<
"Failed to deserialize response";
428 return std::unexpected(response.error());
445 template <
typename RequestType,
typename ResponseType,
446 messaging::message_type RequestMsgType>
447 boost::asio::awaitable<std::expected<ResponseType, ores::utility::serialization::error_code>>
451 auto payload = request.serialize();
454 auto result =
co_await send_request(std::move(request_frame));
456 co_return std::unexpected(result.error());
459 auto response_payload = result->decompressed_payload();
460 if (!response_payload) {
461 BOOST_LOG_SEV(lg(), error) <<
"Failed to decompress response";
462 co_return std::unexpected(response_payload.error());
465 auto response = ResponseType::deserialize(*response_payload);
467 BOOST_LOG_SEV(lg(), error) <<
"Failed to deserialize response";
468 co_return std::unexpected(response.error());
490 template <
typename RequestType>
492 std::expected<typename messaging::message_traits<RequestType>::response_type,
493 ores::utility::serialization::error_code>
498 typename traits::response_type,
499 traits::request_message_type>(std::move(request));
512 template <
typename RequestType>
514 boost::asio::awaitable<
515 std::expected<typename messaging::message_traits<RequestType>::response_type,
516 ores::utility::serialization::error_code>>
521 typename traits::response_type,
522 traits::request_message_type>(std::move(request));
527 std::unique_ptr<boost::asio::io_context> io_ctx_;
528 boost::asio::any_io_executor executor_;
529 boost::asio::ssl::context ssl_ctx_;
530 std::unique_ptr<connection> conn_;
531 std::uint32_t sequence_number_;
532 std::atomic<connection_state> state_;
533 mutable std::mutex state_mutex_;
538 std::shared_ptr<eventing::service::event_bus> event_bus_;
541 std::unique_ptr<boost::asio::strand<boost::asio::any_io_executor>> write_strand_;
542 std::atomic<bool> write_in_progress_{
false};
543 std::unique_ptr<pending_request_map> pending_requests_;
544 std::atomic<std::uint32_t> correlation_id_counter_{1};
545 std::atomic<bool> message_loop_running_{
false};
546 std::atomic<bool> reconnect_loop_running_{
false};
547 std::atomic<bool> heartbeat_loop_running_{
false};
548 std::atomic<bool> shutdown_requested_{
false};
549 std::condition_variable shutdown_cv_;
550 std::mutex shutdown_mutex_;
551 std::unique_ptr<std::thread> io_thread_;
552 std::unique_ptr<boost::asio::executor_work_guard<boost::asio::io_context::executor_type>> work_guard_;
558 std::shared_ptr<recording::session_recorder> recorder_;
compression_type
Compression algorithm used for payload compression.
Definition message_types.hpp:259
Contains the networking elements of the comms library.
Definition client.hpp:48
std::function< void(const std::string &event_type, std::chrono::system_clock::time_point timestamp, const std::vector< std::string > &entity_ids)> notification_callback_t
Callback invoked when client receives a notification from server.
Definition client.hpp:109
connection_state
Connection state for the client.
Definition client.hpp:53
@ connected
Connected and ready for requests.
@ disconnected
Not connected to server.
@ connecting
Initial connection in progress.
@ reconnecting
Lost connection, attempting to reconnect.
std::function< void()> reconnected_callback_t
Callback invoked when client successfully reconnects.
Definition client.hpp:92
std::function< void()> reconnecting_callback_t
Callback invoked when client starts reconnection attempts.
Definition client.hpp:81
std::function< void()> disconnect_callback_t
Callback invoked when client detects server disconnect.
Definition client.hpp:70
Implements logging infrastructure for ORE Studio.
Definition boost_severity.hpp:28
Complete frame with header and payload.
Definition frame.hpp:77
Traits template for mapping request types to their response types and message type enum values.
Definition message_traits.hpp:66
ORES protocol client.
Definition client.hpp:116
boost::asio::awaitable< std::expected< ResponseType, ores::utility::serialization::error_code > > process_request_async(RequestType request)
Send typed request and receive typed response (async version).
Definition client.hpp:448
std::expected< ResponseType, ores::utility::serialization::error_code > process_request(RequestType request)
Send typed request and receive typed response (blocking version).
Definition client.hpp:408
boost::asio::awaitable< std::expected< typename messaging::message_traits< RequestType >::response_type, ores::utility::serialization::error_code > > process_request_async(RequestType request)
Send typed request using message_traits (async version).
Definition client.hpp:517
void set_notification_callback(notification_callback_t callback)
Set callback to be invoked when a notification is received.
Definition client.cpp:699
std::expected< messaging::frame, ores::utility::serialization::error_code > send_request_sync(messaging::frame request_frame)
Send a request frame and receive response frame (blocking version).
Definition client.cpp:974
bool is_recording() const
Check if session recording is currently active.
Definition client.cpp:741
void connect_with_retry_sync()
Connect to server with retry (blocking version).
Definition client.cpp:411
boost::asio::awaitable< void > connect_with_retry()
Connect to server with retry (async version).
Definition client.cpp:353
connection_state get_state() const
Get the current connection state.
Definition client.cpp:680
bool await_shutdown(std::chrono::milliseconds timeout=std::chrono::milliseconds{5000})
Wait for all coroutines to complete after disconnect.
Definition client.cpp:641
void set_reconnected_callback(reconnected_callback_t callback)
Set callback to be invoked when reconnection succeeds.
Definition client.cpp:694
boost::asio::awaitable< std::expected< messaging::frame, ores::utility::serialization::error_code > > send_request(messaging::frame request_frame)
Send a request frame and receive response frame (async version).
Definition client.cpp:848
void disable_recording()
Disable session recording.
Definition client.cpp:729
void disconnect()
Disconnect from server.
Definition client.cpp:612
boost::asio::awaitable< void > connect()
Connect to server and perform handshake (async version).
Definition client.cpp:269
void connect_sync()
Connect to server and perform handshake (blocking version).
Definition client.cpp:289
void set_reconnecting_callback(reconnecting_callback_t callback)
Set callback to be invoked when reconnection starts.
Definition client.cpp:689
std::expected< std::filesystem::path, recording::session_file_error > enable_recording(const std::filesystem::path &output_directory)
Enable session recording to the specified directory.
Definition client.cpp:705
~client()
Destructor.
Definition client.cpp:100
bool is_connected() const
Check if client is connected.
Definition client.cpp:676
std::expected< typename messaging::message_traits< RequestType >::response_type, ores::utility::serialization::error_code > process_request(RequestType request)
Send typed request using message_traits (blocking version).
Definition client.hpp:494
void set_disconnect_callback(disconnect_callback_t callback)
Set callback to be invoked when disconnect is detected.
Definition client.cpp:684
Configuration for the client.
Definition client_options.hpp:78
Concept for types that have message_traits specialization.
Definition message_traits.hpp:82