ORE Studio 0.0.4
Loading...
Searching...
No Matches
telemetry_streaming_service.hpp
1/* -*- mode: c++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*-
2 *
3 * Copyright (C) 2025 Marco Craveiro <marco.craveiro@gmail.com>
4 *
5 * This program is free software; you can redistribute it and/or modify it under
6 * the terms of the GNU General Public License as published by the Free Software
7 * Foundation; either version 3 of the License, or (at your option) any later
8 * version.
9 *
10 * This program is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
12 * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
13 * details.
14 *
15 * You should have received a copy of the GNU General Public License along with
16 * this program; if not, write to the Free Software Foundation, Inc., 51
17 * Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18 *
19 */
20#ifndef ORES_COMMS_SERVICE_TELEMETRY_STREAMING_SERVICE_HPP
21#define ORES_COMMS_SERVICE_TELEMETRY_STREAMING_SERVICE_HPP
22
23#include <mutex>
24#include <atomic>
25#include <thread>
26#include <chrono>
27#include <string>
28#include <vector>
29#include <memory>
30#include <condition_variable>
31#include <boost/log/sinks/basic_sink_backend.hpp>
32#include <boost/log/sinks/frontend_requirements.hpp>
33#include <boost/log/core.hpp>
34#include <boost/log/sinks/sync_frontend.hpp>
35#include "ores.logging/make_logger.hpp"
36#include "ores.telemetry/domain/log_record.hpp"
37#include "ores.telemetry/domain/resource.hpp"
38#include "ores.telemetry/log/telemetry_sink_backend.hpp"
39
40namespace ores::comms::net {
41class client;
42}
43
44namespace ores::comms::service {
45
55 std::string source_name;
56
60 std::string source_version;
61
68 std::size_t batch_size = 50;
69
76 std::chrono::seconds flush_interval{5};
77
84 bool retry_on_failure = true;
85
92 std::size_t max_pending_records = 1000;
93};
94
124private:
125 inline static std::string_view logger_name =
126 "ores.comms.service.telemetry_streaming";
127
128 static auto& lg() {
129 using namespace ores::logging;
130 static auto instance = make_logger(logger_name);
131 return instance;
132 }
133
134public:
142 std::shared_ptr<net::client> client,
144
149
150 // Non-copyable, non-movable
155
162 void start();
163
171 void stop();
172
176 [[nodiscard]] bool is_running() const noexcept;
177
184 void flush();
185
189 [[nodiscard]] std::size_t pending_count() const;
190
194 [[nodiscard]] std::uint64_t total_sent() const noexcept;
195
199 [[nodiscard]] std::uint64_t total_dropped() const noexcept;
200
201private:
205 void on_log_record(telemetry::domain::log_record record);
206
210 void flush_thread_func();
211
217 bool send_batch(std::vector<telemetry::domain::log_record>& records);
218
219 std::shared_ptr<net::client> client_;
221 std::shared_ptr<telemetry::domain::resource> resource_;
222
223 // Sink management
224 using sink_t = boost::log::sinks::synchronous_sink<
226 boost::shared_ptr<sink_t> sink_;
227
228 // Thread-safe record buffer
229 mutable std::mutex buffer_mutex_;
230 std::vector<telemetry::domain::log_record> buffer_;
231 std::condition_variable flush_cv_;
232
233 // Background flush thread
234 std::thread flush_thread_;
235 std::atomic<bool> running_{false};
236
237 // Statistics
238 std::atomic<std::uint64_t> total_sent_{0};
239 std::atomic<std::uint64_t> total_dropped_{0};
240};
241
242}
243
244#endif
Main server application for ORE Studio.
Definition application.hpp:30
Contains the networking elements of the comms library.
Definition client.hpp:48
Implements logging infrastructure for ORE Studio.
Definition boost_severity.hpp:28
@ client
Indicates that the span describes a request to some remote service. This is often the client-side of ...
Configuration options for telemetry streaming.
Definition telemetry_streaming_service.hpp:49
std::chrono::seconds flush_interval
Maximum time to wait before sending a batch.
Definition telemetry_streaming_service.hpp:76
bool retry_on_failure
Whether to retry failed submissions.
Definition telemetry_streaming_service.hpp:84
std::size_t max_pending_records
Maximum number of records to keep pending when disconnected.
Definition telemetry_streaming_service.hpp:92
std::size_t batch_size
Maximum number of log records per batch.
Definition telemetry_streaming_service.hpp:68
std::string source_version
Version of the source application.
Definition telemetry_streaming_service.hpp:60
std::string source_name
Name of the source application.
Definition telemetry_streaming_service.hpp:55
Service for streaming telemetry logs to the server.
Definition telemetry_streaming_service.hpp:123
~telemetry_streaming_service()
Destructor. Stops the service if running.
Definition telemetry_streaming_service.cpp:45
std::uint64_t total_dropped() const noexcept
Returns the total number of records dropped due to overflow.
Definition telemetry_streaming_service.cpp:143
void start()
Starts the streaming service.
Definition telemetry_streaming_service.cpp:49
void stop()
Stops the streaming service.
Definition telemetry_streaming_service.cpp:73
std::size_t pending_count() const
Returns the number of log records pending transmission.
Definition telemetry_streaming_service.cpp:134
std::uint64_t total_sent() const noexcept
Returns the total number of records sent successfully.
Definition telemetry_streaming_service.cpp:139
void flush()
Forces an immediate flush of pending logs.
Definition telemetry_streaming_service.cpp:106
bool is_running() const noexcept
Checks if the service is currently running.
Definition telemetry_streaming_service.cpp:102
A log record with trace correlation.
Definition log_record.hpp:46
Boost.Log sink backend that creates telemetry log_records.
Definition telemetry_sink_backend.hpp:54