ORE Studio 0.0.4
Loading...
Searching...
No Matches
dataset_bundle_handler.hpp
1/* -*- mode: c++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*-
2 *
3 * Copyright (C) 2026 Marco Craveiro <marco.craveiro@gmail.com>
4 *
5 * This program is free software; you can redistribute it and/or modify it under
6 * the terms of the GNU General Public License as published by the Free Software
7 * Foundation; either version 3 of the License, or (at your option) any later
8 * version.
9 *
10 * This program is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
12 * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
13 * details.
14 *
15 * You should have received a copy of the GNU General Public License along with
16 * this program; if not, write to the Free Software Foundation, Inc., 51
17 * Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18 *
19 */
20#ifndef ORES_DQ_CORE_MESSAGING_DATASET_BUNDLE_HANDLER_HPP
21#define ORES_DQ_CORE_MESSAGING_DATASET_BUNDLE_HANDLER_HPP
22
23#include <optional>
24#include <stdexcept>
25#include <boost/uuid/string_generator.hpp>
26#include "ores.nats/domain/message.hpp"
27#include "ores.nats/service/client.hpp"
28#include "ores.database/domain/context.hpp"
29#include "ores.security/jwt/jwt_authenticator.hpp"
30#include "ores.service/messaging/handler_helpers.hpp"
31#include "ores.service/service/request_context.hpp"
32#include "ores.dq.api/messaging/dataset_bundle_protocol.hpp"
33#include "ores.dq.core/service/dataset_bundle_service.hpp"
34#include "ores.logging/make_logger.hpp"
35
36namespace ores::dq::messaging {
37
38using ores::service::messaging::reply;
39using ores::service::messaging::decode;
40using ores::service::messaging::stamp;
41using ores::service::messaging::error_reply;
42using ores::service::messaging::has_permission;
43using namespace ores::logging;
44
45namespace {
46inline auto& dataset_bundle_handler_lg() {
47 static auto instance = ores::logging::make_logger("ores.dq.messaging.dataset_bundle_handler");
48 return instance;
49}
50} // namespace
51
52class dataset_bundle_handler {
53public:
54 dataset_bundle_handler(
57 std::optional<ores::security::jwt::jwt_authenticator> verifier)
58 : nats_(nats), ctx_(std::move(ctx)), verifier_(std::move(verifier)) {}
59
60 void list(ores::nats::message msg) {
61 BOOST_LOG_SEV(dataset_bundle_handler_lg(), debug) << "Handling " << msg.subject;
62 auto req = decode<get_dataset_bundles_request>(msg);
63 if (!req) {
64 BOOST_LOG_SEV(dataset_bundle_handler_lg(), warn) << "Failed to decode: " << msg.subject;
65 return;
66 }
67 auto ctx_expected = ores::service::service::make_request_context(
68 ctx_, msg, verifier_);
69 if (!ctx_expected) {
70 error_reply(nats_, msg, ctx_expected.error());
71 return;
72 }
73 const auto& ctx = *ctx_expected;
74 service::dataset_bundle_service svc(ctx);
75 try {
76 const auto items = svc.list_bundles();
77 get_dataset_bundles_response resp;
78 resp.bundles = items;
79 resp.total_available_count = static_cast<int>(items.size());
80 BOOST_LOG_SEV(dataset_bundle_handler_lg(), debug) << "Completed " << msg.subject;
81 reply(nats_, msg, resp);
82 } catch (const std::exception& e) {
83 BOOST_LOG_SEV(dataset_bundle_handler_lg(), error) << msg.subject << " failed: " << e.what();
84 get_dataset_bundles_response resp;
85 resp.total_available_count = 0;
86 reply(nats_, msg, resp);
87 }
88 }
89
90 void save(ores::nats::message msg) {
91 BOOST_LOG_SEV(dataset_bundle_handler_lg(), debug) << "Handling " << msg.subject;
92 auto req = decode<save_dataset_bundle_request>(msg);
93 if (!req) {
94 BOOST_LOG_SEV(dataset_bundle_handler_lg(), warn) << "Failed to decode: " << msg.subject;
95 return;
96 }
97 auto ctx_expected = ores::service::service::make_request_context(
98 ctx_, msg, verifier_);
99 if (!ctx_expected) {
100 error_reply(nats_, msg, ctx_expected.error());
101 return;
102 }
103 const auto& ctx = *ctx_expected;
104 if (!has_permission(ctx, "dq::dataset_bundles:write")) {
105 error_reply(nats_, msg, ores::service::error_code::forbidden);
106 return;
107 }
108 service::dataset_bundle_service svc(ctx);
109 try {
110 for (auto& b : req->bundles)
111 stamp(b, ctx);
112 svc.save_bundles(req->bundles);
113 BOOST_LOG_SEV(dataset_bundle_handler_lg(), debug) << "Completed " << msg.subject;
114 reply(nats_, msg, save_dataset_bundle_response{true, {}});
115 } catch (const std::exception& e) {
116 BOOST_LOG_SEV(dataset_bundle_handler_lg(), error) << msg.subject << " failed: " << e.what();
117 reply(nats_, msg, save_dataset_bundle_response{false, e.what()});
118 }
119 }
120
121 void remove(ores::nats::message msg) {
122 BOOST_LOG_SEV(dataset_bundle_handler_lg(), debug) << "Handling " << msg.subject;
123 auto req = decode<delete_dataset_bundle_request>(msg);
124 if (!req) {
125 BOOST_LOG_SEV(dataset_bundle_handler_lg(), warn) << "Failed to decode: " << msg.subject;
126 return;
127 }
128 auto ctx_expected = ores::service::service::make_request_context(
129 ctx_, msg, verifier_);
130 if (!ctx_expected) {
131 error_reply(nats_, msg, ctx_expected.error());
132 return;
133 }
134 const auto& ctx = *ctx_expected;
135 if (!has_permission(ctx, "dq::dataset_bundles:delete")) {
136 error_reply(nats_, msg, ores::service::error_code::forbidden);
137 return;
138 }
139 service::dataset_bundle_service svc(ctx);
140 try {
141 boost::uuids::string_generator gen;
142 for (const auto& id : req->ids)
143 svc.remove_bundle(gen(id));
144 BOOST_LOG_SEV(dataset_bundle_handler_lg(), debug) << "Completed " << msg.subject;
145 reply(nats_, msg, delete_dataset_bundle_response{true, {}});
146 } catch (const std::exception& e) {
147 BOOST_LOG_SEV(dataset_bundle_handler_lg(), error) << msg.subject << " failed: " << e.what();
148 reply(nats_, msg,
149 delete_dataset_bundle_response{false, e.what()});
150 }
151 }
152
153 void history(ores::nats::message msg) {
154 BOOST_LOG_SEV(dataset_bundle_handler_lg(), debug) << "Handling " << msg.subject;
155 auto req = decode<get_dataset_bundle_history_request>(msg);
156 if (!req) {
157 BOOST_LOG_SEV(dataset_bundle_handler_lg(), warn) << "Failed to decode: " << msg.subject;
158 return;
159 }
160 auto ctx_expected = ores::service::service::make_request_context(
161 ctx_, msg, verifier_);
162 if (!ctx_expected) {
163 error_reply(nats_, msg, ctx_expected.error());
164 return;
165 }
166 const auto& ctx = *ctx_expected;
167 service::dataset_bundle_service svc(ctx);
168 try {
169 const auto hist = svc.get_bundle_history(
170 boost::uuids::string_generator{}(req->id));
171 get_dataset_bundle_history_response resp;
172 resp.success = true;
173 resp.history = hist;
174 BOOST_LOG_SEV(dataset_bundle_handler_lg(), debug) << "Completed " << msg.subject;
175 reply(nats_, msg, resp);
176 } catch (const std::exception& e) {
177 BOOST_LOG_SEV(dataset_bundle_handler_lg(), error) << msg.subject << " failed: " << e.what();
178 get_dataset_bundle_history_response resp;
179 resp.success = false;
180 resp.message = e.what();
181 reply(nats_, msg, resp);
182 }
183 }
184
185private:
186
189 std::optional<ores::security::jwt::jwt_authenticator> verifier_;
190};
191
192} // namespace ores::dq::messaging
193
194#endif
STL namespace.
Implements logging infrastructure for ORE Studio.
Definition boost_severity.hpp:28
@ forbidden
The caller is authenticated but lacks the required permission.
Context for the operations on a postgres database.
Definition context.hpp:47
A received NATS message.
Definition message.hpp:40
std::string subject
The subject the message was published to.
Definition message.hpp:44
NATS client: connection, pub/sub, request/reply, and JetStream.
Definition client.hpp:73