20#ifndef ORES_COMMS_SERVICE_TELEMETRY_STREAMING_SERVICE_HPP
21#define ORES_COMMS_SERVICE_TELEMETRY_STREAMING_SERVICE_HPP
30#include <condition_variable>
31#include <boost/log/sinks/basic_sink_backend.hpp>
32#include <boost/log/sinks/frontend_requirements.hpp>
33#include <boost/log/core.hpp>
34#include <boost/log/sinks/sync_frontend.hpp>
35#include "ores.logging/make_logger.hpp"
36#include "ores.telemetry/domain/log_record.hpp"
37#include "ores.telemetry/domain/resource.hpp"
38#include "ores.telemetry/log/telemetry_sink_backend.hpp"
125 inline static std::string_view logger_name =
126 "ores.comms.service.telemetry_streaming";
130 static auto instance = make_logger(logger_name);
142 std::shared_ptr<net::client> client,
176 [[nodiscard]]
bool is_running()
const noexcept;
194 [[nodiscard]] std::uint64_t
total_sent()
const noexcept;
210 void flush_thread_func();
217 bool send_batch(std::vector<telemetry::domain::log_record>& records);
219 std::shared_ptr<net::client> client_;
221 std::shared_ptr<telemetry::domain::resource> resource_;
224 using sink_t = boost::log::sinks::synchronous_sink<
226 boost::shared_ptr<sink_t> sink_;
229 mutable std::mutex buffer_mutex_;
230 std::vector<telemetry::domain::log_record> buffer_;
231 std::condition_variable flush_cv_;
234 std::thread flush_thread_;
235 std::atomic<bool> running_{
false};
238 std::atomic<std::uint64_t> total_sent_{0};
239 std::atomic<std::uint64_t> total_dropped_{0};
Main server application for ORE Studio.
Definition application.hpp:30
Contains the networking elements of the comms library.
Definition client.hpp:48
Implements logging infrastructure for ORE Studio.
Definition boost_severity.hpp:28
@ client
Indicates that the span describes a request to some remote service. This is often the client-side of ...
Configuration options for telemetry streaming.
Definition telemetry_streaming_service.hpp:49
std::chrono::seconds flush_interval
Maximum time to wait before sending a batch.
Definition telemetry_streaming_service.hpp:76
bool retry_on_failure
Whether to retry failed submissions.
Definition telemetry_streaming_service.hpp:84
std::size_t max_pending_records
Maximum number of records to keep pending when disconnected.
Definition telemetry_streaming_service.hpp:92
std::size_t batch_size
Maximum number of log records per batch.
Definition telemetry_streaming_service.hpp:68
std::string source_version
Version of the source application.
Definition telemetry_streaming_service.hpp:60
std::string source_name
Name of the source application.
Definition telemetry_streaming_service.hpp:55
Service for streaming telemetry logs to the server.
Definition telemetry_streaming_service.hpp:123
~telemetry_streaming_service()
Destructor. Stops the service if running.
Definition telemetry_streaming_service.cpp:45
std::uint64_t total_dropped() const noexcept
Returns the total number of records dropped due to overflow.
Definition telemetry_streaming_service.cpp:143
void start()
Starts the streaming service.
Definition telemetry_streaming_service.cpp:49
void stop()
Stops the streaming service.
Definition telemetry_streaming_service.cpp:73
std::size_t pending_count() const
Returns the number of log records pending transmission.
Definition telemetry_streaming_service.cpp:134
std::uint64_t total_sent() const noexcept
Returns the total number of records sent successfully.
Definition telemetry_streaming_service.cpp:139
void flush()
Forces an immediate flush of pending logs.
Definition telemetry_streaming_service.cpp:106
bool is_running() const noexcept
Checks if the service is currently running.
Definition telemetry_streaming_service.cpp:102
A log record with trace correlation.
Definition log_record.hpp:46
Boost.Log sink backend that creates telemetry log_records.
Definition telemetry_sink_backend.hpp:54