ORE Studio 0.0.4
Loading...
Searching...
No Matches
batch_handler.hpp
1/* -*- mode: c++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*-
2 *
3 * Copyright (C) 2026 Marco Craveiro <marco.craveiro@gmail.com>
4 *
5 * This program is free software; you can redistribute it and/or modify it under
6 * the terms of the GNU General Public License as published by the Free Software
7 * Foundation; either version 3 of the License, or (at your option) any later
8 * version.
9 *
10 * This program is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
12 * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
13 * details.
14 *
15 * You should have received a copy of the GNU General Public License along with
16 * this program; if not, write to the Free Software Foundation, Inc., 51
17 * Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18 *
19 */
20#ifndef ORES_COMPUTE_MESSAGING_BATCH_HANDLER_HPP
21#define ORES_COMPUTE_MESSAGING_BATCH_HANDLER_HPP
22
23#include <optional>
24#include <stdexcept>
25#include <rfl/json.hpp>
26#include "ores.logging/make_logger.hpp"
27#include "ores.nats/domain/message.hpp"
28#include "ores.nats/service/client.hpp"
29#include "ores.database/domain/context.hpp"
30#include "ores.security/jwt/jwt_authenticator.hpp"
31#include "ores.service/messaging/handler_helpers.hpp"
32#include "ores.service/service/request_context.hpp"
33#include "ores.compute.api/messaging/batch_protocol.hpp"
34#include "ores.compute.core/service/batch_service.hpp"
35
36namespace ores::compute::messaging {
37
38namespace {
39inline auto& batch_handler_lg() {
40 static auto instance = ores::logging::make_logger(
41 "ores.compute.messaging.batch_handler");
42 return instance;
43}
44} // namespace
45
46using ores::service::messaging::reply;
47using ores::service::messaging::error_reply;
48using ores::service::messaging::decode;
49using ores::service::messaging::has_permission;
50using namespace ores::logging;
51
52class batch_handler {
53public:
54 batch_handler(ores::nats::service::client& nats,
56 std::optional<ores::security::jwt::jwt_authenticator> verifier)
57 : nats_(nats), ctx_(std::move(ctx)), verifier_(std::move(verifier)) {}
58
59 void list(ores::nats::message msg) {
60 BOOST_LOG_SEV(batch_handler_lg(), debug)
61 << "Handling " << msg.subject;
62 auto ctx_expected = ores::service::service::make_request_context(
63 ctx_, msg, verifier_);
64 if (!ctx_expected) {
65 error_reply(nats_, msg, ctx_expected.error());
66 return;
67 }
68 const auto& ctx = *ctx_expected;
69 service::batch_service svc(ctx);
70 list_batches_response resp;
71 try {
72 if (auto req = decode<list_batches_request>(msg)) {
73 resp.batches = svc.list();
74 resp.total_available_count =
75 static_cast<int>(resp.batches.size());
76 }
77 } catch (...) {}
78 reply(nats_, msg, resp);
79 BOOST_LOG_SEV(batch_handler_lg(), debug)
80 << "Completed " << msg.subject;
81 }
82
83 void save(ores::nats::message msg) {
84 BOOST_LOG_SEV(batch_handler_lg(), debug)
85 << "Handling " << msg.subject;
86 auto ctx_expected = ores::service::service::make_request_context(
87 ctx_, msg, verifier_);
88 if (!ctx_expected) {
89 error_reply(nats_, msg, ctx_expected.error());
90 return;
91 }
92 const auto& ctx = *ctx_expected;
93 if (!has_permission(ctx, "compute::batches:write")) {
94 error_reply(nats_, msg, ores::service::error_code::forbidden);
95 return;
96 }
97 if (auto req = decode<save_batch_request>(msg)) {
98 try {
99 service::batch_service svc(ctx);
100 svc.save(req->batch);
101 reply(nats_, msg, save_batch_response{.success = true});
102 } catch (const std::exception& e) {
103 reply(nats_, msg, save_batch_response{
104 .success = false, .message = e.what()});
105 }
106 } else {
107 BOOST_LOG_SEV(batch_handler_lg(), warn)
108 << "Failed to decode: " << msg.subject;
109 }
110 BOOST_LOG_SEV(batch_handler_lg(), debug)
111 << "Completed " << msg.subject;
112 }
113
114 void history(ores::nats::message msg) {
115 BOOST_LOG_SEV(batch_handler_lg(), debug)
116 << "Handling " << msg.subject;
117 auto ctx_expected = ores::service::service::make_request_context(
118 ctx_, msg, verifier_);
119 if (!ctx_expected) {
120 error_reply(nats_, msg, ctx_expected.error());
121 return;
122 }
123 const auto& ctx = *ctx_expected;
124 get_batch_history_response resp;
125 try {
126 if (auto req = decode<get_batch_history_request>(msg)) {
127 service::batch_service svc(ctx);
128 resp.versions = svc.history(req->id);
129 }
130 } catch (const std::exception& e) {
131 resp.success = false;
132 resp.message = e.what();
133 }
134 reply(nats_, msg, resp);
135 BOOST_LOG_SEV(batch_handler_lg(), debug)
136 << "Completed " << msg.subject;
137 }
138
139private:
142 std::optional<ores::security::jwt::jwt_authenticator> verifier_;
143};
144
145} // namespace ores::compute::messaging
146
147#endif
STL namespace.
Implements logging infrastructure for ORE Studio.
Definition boost_severity.hpp:28
@ forbidden
The caller is authenticated but lacks the required permission.
Context for the operations on a postgres database.
Definition context.hpp:47
A received NATS message.
Definition message.hpp:40
std::string subject
The subject the message was published to.
Definition message.hpp:44
NATS client: connection, pub/sub, request/reply, and JetStream.
Definition client.hpp:73