ORE Studio 0.0.4
Loading...
Searching...
No Matches
client.hpp
1/* -*- mode: c++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*-
2 *
3 * Copyright (C) 2025 Marco Craveiro <marco.craveiro@gmail.com>
4 *
5 * This program is free software; you can redistribute it and/or modify it under
6 * the terms of the GNU General Public License as published by the Free Software
7 * Foundation; either version 3 of the License, or (at your option) any later
8 * version.
9 *
10 * This program is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
12 * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
13 * details.
14 *
15 * You should have received a copy of the GNU General Public License along with
16 * this program; if not, write to the Free Software Foundation, Inc., 51
17 * Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18 *
19 */
20#ifndef ORES_COMMS_NET_CLIENT_HPP
21#define ORES_COMMS_NET_CLIENT_HPP
22
23#include <mutex>
24#include <atomic>
25#include <thread>
26#include <memory>
27#include <iosfwd>
28#include <cstdint>
29#include <concepts>
30#include <expected>
31#include <functional>
32#include <string_view>
33#include <condition_variable>
34#include <boost/asio/awaitable.hpp>
35#include <boost/asio/ssl.hpp>
36#include <boost/asio/strand.hpp>
37#include <boost/asio/ip/tcp.hpp>
38#include <boost/asio/io_context.hpp>
39#include "ores.logging/make_logger.hpp"
40#include "ores.eventing/service/event_bus.hpp"
41#include "ores.comms/net/client_options.hpp"
42#include "ores.comms/net/connection.hpp"
43#include "ores.comms/net/pending_request_map.hpp"
44#include "ores.comms/messaging/frame.hpp"
45#include "ores.comms/messaging/message_traits.hpp"
46#include "ores.comms/recording/session_recorder.hpp"
47
49
59
60std::ostream& operator<<(std::ostream& s, connection_state v);
61
70using disconnect_callback_t = std::function<void()>;
71
81using reconnecting_callback_t = std::function<void()>;
82
92using reconnected_callback_t = std::function<void()>;
93
106using notification_callback_t = std::function<void(
107 const std::string& event_type,
108 std::chrono::system_clock::time_point timestamp,
109 const std::vector<std::string>& entity_ids)>;
110
116class client final {
117private:
118 inline static std::string_view logger_name = "ores.comms.net.client";
119
120 static auto& lg() {
121 using namespace ores::logging;
122 static auto instance = make_logger(logger_name);
123 return instance;
124 }
125
129 void setup_ssl_context();
130
136 boost::asio::awaitable<void> perform_handshake();
137
144 boost::asio::awaitable<void> run_heartbeat();
145
154 boost::asio::awaitable<void> run_message_loop();
155
162 boost::asio::awaitable<void> run_reconnect_loop();
163
169 boost::asio::awaitable<void> perform_connection();
170
177 std::chrono::milliseconds calculate_backoff(std::uint32_t attempt) const;
178
185 void notify_shutdown_if_complete();
186
194 boost::asio::awaitable<void> write_frame(const messaging::frame& f);
195
199 std::uint32_t next_correlation_id();
200
201public:
210 explicit client(client_options config,
211 std::shared_ptr<eventing::service::event_bus> event_bus = nullptr);
212
220 explicit client(client_options config, boost::asio::any_io_executor executor,
221 std::shared_ptr<eventing::service::event_bus> event_bus = nullptr);
222
229 ~client();
230
238 boost::asio::awaitable<void> connect();
239
247 void connect_sync();
248
256 boost::asio::awaitable<void> connect_with_retry();
257
266
274 void disconnect();
275
286 bool await_shutdown(std::chrono::milliseconds timeout = std::chrono::milliseconds{5000});
287
293 bool is_connected() const;
294
299
309
319
329
339
340 // =========================================================================
341 // Session Recording
342 // =========================================================================
343
354 std::expected<std::filesystem::path, recording::session_file_error>
355 enable_recording(const std::filesystem::path& output_directory);
356
363 void disable_recording();
364
368 bool is_recording() const;
369
379 boost::asio::awaitable<std::expected<messaging::frame, ores::utility::serialization::error_code>>
380 send_request(messaging::frame request_frame);
381
391 std::expected<messaging::frame, ores::utility::serialization::error_code>
392 send_request_sync(messaging::frame request_frame);
393
405 template <typename RequestType, typename ResponseType,
406 messaging::message_type RequestMsgType>
407 std::expected<ResponseType, ores::utility::serialization::error_code>
408 process_request(RequestType request) {
409 using namespace ores::logging;
410
411 auto payload = request.serialize();
412 messaging::frame request_frame(RequestMsgType, 0, std::move(payload));
413
414 auto result = send_request_sync(std::move(request_frame));
415 if (!result) {
416 return std::unexpected(result.error());
417 }
418
419 auto response_payload = result->decompressed_payload();
420 if (!response_payload) {
421 BOOST_LOG_SEV(lg(), error) << "Failed to decompress response";
422 return std::unexpected(response_payload.error());
423 }
424
425 auto response = ResponseType::deserialize(*response_payload);
426 if (!response) {
427 BOOST_LOG_SEV(lg(), error) << "Failed to deserialize response";
428 return std::unexpected(response.error());
429 }
430
431 return *response;
432 }
433
445 template <typename RequestType, typename ResponseType,
446 messaging::message_type RequestMsgType>
447 boost::asio::awaitable<std::expected<ResponseType, ores::utility::serialization::error_code>>
448 process_request_async(RequestType request) {
449 using namespace ores::logging;
450
451 auto payload = request.serialize();
452 messaging::frame request_frame(RequestMsgType, 0, std::move(payload));
453
454 auto result = co_await send_request(std::move(request_frame));
455 if (!result) {
456 co_return std::unexpected(result.error());
457 }
458
459 auto response_payload = result->decompressed_payload();
460 if (!response_payload) {
461 BOOST_LOG_SEV(lg(), error) << "Failed to decompress response";
462 co_return std::unexpected(response_payload.error());
463 }
464
465 auto response = ResponseType::deserialize(*response_payload);
466 if (!response) {
467 BOOST_LOG_SEV(lg(), error) << "Failed to deserialize response";
468 co_return std::unexpected(response.error());
469 }
470
471 co_return *response;
472 }
473
474 // =========================================================================
475 // Traits-based process_request overloads
476 // =========================================================================
477 // These overloads use message_traits to infer the response type and
478 // message_type enum from the request type, simplifying the API.
479
490 template <typename RequestType>
492 std::expected<typename messaging::message_traits<RequestType>::response_type,
493 ores::utility::serialization::error_code>
494 process_request(RequestType request) {
496 return process_request<
497 RequestType,
498 typename traits::response_type,
499 traits::request_message_type>(std::move(request));
500 }
501
512 template <typename RequestType>
514 boost::asio::awaitable<
515 std::expected<typename messaging::message_traits<RequestType>::response_type,
516 ores::utility::serialization::error_code>>
517 process_request_async(RequestType request) {
520 RequestType,
521 typename traits::response_type,
522 traits::request_message_type>(std::move(request));
523 }
524
525private:
526 client_options config_;
527 std::unique_ptr<boost::asio::io_context> io_ctx_; // Owned io_context for sync operations
528 boost::asio::any_io_executor executor_;
529 boost::asio::ssl::context ssl_ctx_;
530 std::unique_ptr<connection> conn_;
531 std::uint32_t sequence_number_;
532 std::atomic<connection_state> state_;
533 mutable std::mutex state_mutex_; // Protects conn_, sequence_number_, callbacks, and recorder_
534 disconnect_callback_t disconnect_callback_;
535 reconnecting_callback_t reconnecting_callback_;
536 reconnected_callback_t reconnected_callback_;
537 notification_callback_t notification_callback_;
538 std::shared_ptr<eventing::service::event_bus> event_bus_; // Optional event bus for connection events
539
540 // Infrastructure for unified message loop
541 std::unique_ptr<boost::asio::strand<boost::asio::any_io_executor>> write_strand_;
542 std::atomic<bool> write_in_progress_{false}; // Async spinlock for serializing SSL writes
543 std::unique_ptr<pending_request_map> pending_requests_;
544 std::atomic<std::uint32_t> correlation_id_counter_{1};
545 std::atomic<bool> message_loop_running_{false};
546 std::atomic<bool> reconnect_loop_running_{false};
547 std::atomic<bool> heartbeat_loop_running_{false};
548 std::atomic<bool> shutdown_requested_{false};
549 std::condition_variable shutdown_cv_;
550 std::mutex shutdown_mutex_;
551 std::unique_ptr<std::thread> io_thread_; // Background thread for io_context
552 std::unique_ptr<boost::asio::executor_work_guard<boost::asio::io_context::executor_type>> work_guard_;
553
554 // Session compression type negotiated during handshake
555 messaging::compression_type session_compression_{messaging::compression_type::none};
556
557 // Session recording (protected by state_mutex_ for thread-safe access)
558 std::shared_ptr<recording::session_recorder> recorder_;
559};
560
561}
562
563#endif
compression_type
Compression algorithm used for payload compression.
Definition message_types.hpp:259
Contains the networking elements of the comms library.
Definition client.hpp:48
std::function< void(const std::string &event_type, std::chrono::system_clock::time_point timestamp, const std::vector< std::string > &entity_ids)> notification_callback_t
Callback invoked when client receives a notification from server.
Definition client.hpp:109
connection_state
Connection state for the client.
Definition client.hpp:53
@ connected
Connected and ready for requests.
@ disconnected
Not connected to server.
@ connecting
Initial connection in progress.
@ reconnecting
Lost connection, attempting to reconnect.
std::function< void()> reconnected_callback_t
Callback invoked when client successfully reconnects.
Definition client.hpp:92
std::function< void()> reconnecting_callback_t
Callback invoked when client starts reconnection attempts.
Definition client.hpp:81
std::function< void()> disconnect_callback_t
Callback invoked when client detects server disconnect.
Definition client.hpp:70
Implements logging infrastructure for ORE Studio.
Definition boost_severity.hpp:28
Complete frame with header and payload.
Definition frame.hpp:77
Traits template for mapping request types to their response types and message type enum values.
Definition message_traits.hpp:66
ORES protocol client.
Definition client.hpp:116
boost::asio::awaitable< std::expected< ResponseType, ores::utility::serialization::error_code > > process_request_async(RequestType request)
Send typed request and receive typed response (async version).
Definition client.hpp:448
std::expected< ResponseType, ores::utility::serialization::error_code > process_request(RequestType request)
Send typed request and receive typed response (blocking version).
Definition client.hpp:408
boost::asio::awaitable< std::expected< typename messaging::message_traits< RequestType >::response_type, ores::utility::serialization::error_code > > process_request_async(RequestType request)
Send typed request using message_traits (async version).
Definition client.hpp:517
void set_notification_callback(notification_callback_t callback)
Set callback to be invoked when a notification is received.
Definition client.cpp:699
std::expected< messaging::frame, ores::utility::serialization::error_code > send_request_sync(messaging::frame request_frame)
Send a request frame and receive response frame (blocking version).
Definition client.cpp:974
bool is_recording() const
Check if session recording is currently active.
Definition client.cpp:741
void connect_with_retry_sync()
Connect to server with retry (blocking version).
Definition client.cpp:411
boost::asio::awaitable< void > connect_with_retry()
Connect to server with retry (async version).
Definition client.cpp:353
connection_state get_state() const
Get the current connection state.
Definition client.cpp:680
bool await_shutdown(std::chrono::milliseconds timeout=std::chrono::milliseconds{5000})
Wait for all coroutines to complete after disconnect.
Definition client.cpp:641
void set_reconnected_callback(reconnected_callback_t callback)
Set callback to be invoked when reconnection succeeds.
Definition client.cpp:694
boost::asio::awaitable< std::expected< messaging::frame, ores::utility::serialization::error_code > > send_request(messaging::frame request_frame)
Send a request frame and receive response frame (async version).
Definition client.cpp:848
void disable_recording()
Disable session recording.
Definition client.cpp:729
void disconnect()
Disconnect from server.
Definition client.cpp:612
boost::asio::awaitable< void > connect()
Connect to server and perform handshake (async version).
Definition client.cpp:269
void connect_sync()
Connect to server and perform handshake (blocking version).
Definition client.cpp:289
void set_reconnecting_callback(reconnecting_callback_t callback)
Set callback to be invoked when reconnection starts.
Definition client.cpp:689
std::expected< std::filesystem::path, recording::session_file_error > enable_recording(const std::filesystem::path &output_directory)
Enable session recording to the specified directory.
Definition client.cpp:705
~client()
Destructor.
Definition client.cpp:100
bool is_connected() const
Check if client is connected.
Definition client.cpp:676
std::expected< typename messaging::message_traits< RequestType >::response_type, ores::utility::serialization::error_code > process_request(RequestType request)
Send typed request using message_traits (blocking version).
Definition client.hpp:494
void set_disconnect_callback(disconnect_callback_t callback)
Set callback to be invoked when disconnect is detected.
Definition client.cpp:684
Configuration for the client.
Definition client_options.hpp:78
Concept for types that have message_traits specialization.
Definition message_traits.hpp:82