ORE Studio 0.0.4
Loading...
Searching...
No Matches
dataset_handler.hpp
1/* -*- mode: c++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*-
2 *
3 * Copyright (C) 2026 Marco Craveiro <marco.craveiro@gmail.com>
4 *
5 * This program is free software; you can redistribute it and/or modify it under
6 * the terms of the GNU General Public License as published by the Free Software
7 * Foundation; either version 3 of the License, or (at your option) any later
8 * version.
9 *
10 * This program is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
12 * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
13 * details.
14 *
15 * You should have received a copy of the GNU General Public License along with
16 * this program; if not, write to the Free Software Foundation, Inc., 51
17 * Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18 *
19 */
20#ifndef ORES_DQ_CORE_MESSAGING_DATASET_HANDLER_HPP
21#define ORES_DQ_CORE_MESSAGING_DATASET_HANDLER_HPP
22
23#include <optional>
24#include <stdexcept>
25#include <vector>
26#include <boost/uuid/string_generator.hpp>
27#include "ores.nats/domain/message.hpp"
28#include "ores.nats/service/client.hpp"
29#include "ores.database/domain/context.hpp"
30#include "ores.security/jwt/jwt_authenticator.hpp"
31#include "ores.service/messaging/handler_helpers.hpp"
32#include "ores.service/service/request_context.hpp"
33#include "ores.dq.api/messaging/dataset_protocol.hpp"
34#include "ores.dq.core/service/dataset_service.hpp"
35#include "ores.dq.core/service/publication_service.hpp"
36#include "ores.logging/make_logger.hpp"
37
38namespace ores::dq::messaging {
39
40using ores::service::messaging::reply;
41using ores::service::messaging::decode;
42using ores::service::messaging::stamp;
43using ores::service::messaging::error_reply;
44using ores::service::messaging::has_permission;
45using namespace ores::logging;
46
47namespace {
48inline auto& dataset_handler_lg() {
49 static auto instance = ores::logging::make_logger("ores.dq.messaging.dataset_handler");
50 return instance;
51}
52} // namespace
53
54class dataset_handler {
55public:
56 dataset_handler(
59 std::optional<ores::security::jwt::jwt_authenticator> verifier)
60 : nats_(nats), ctx_(std::move(ctx)), verifier_(std::move(verifier)) {}
61
62 void list(ores::nats::message msg) {
63 BOOST_LOG_SEV(dataset_handler_lg(), debug) << "Handling " << msg.subject;
64 auto req = decode<get_datasets_request>(msg);
65 if (!req) {
66 BOOST_LOG_SEV(dataset_handler_lg(), warn) << "Failed to decode: " << msg.subject;
67 return;
68 }
69 auto ctx_expected = ores::service::service::make_request_context(
70 ctx_, msg, verifier_);
71 if (!ctx_expected) {
72 error_reply(nats_, msg, ctx_expected.error());
73 return;
74 }
75 const auto& ctx = *ctx_expected;
76 service::dataset_service svc(ctx);
77 try {
78 const auto items = svc.list_datasets(
79 static_cast<std::uint32_t>(req->offset),
80 static_cast<std::uint32_t>(req->limit));
81 const auto count = svc.get_dataset_count();
82 get_datasets_response resp;
83 resp.datasets = items;
84 resp.total_available_count = static_cast<int>(count);
85 BOOST_LOG_SEV(dataset_handler_lg(), debug) << "Completed " << msg.subject;
86 reply(nats_, msg, resp);
87 } catch (const std::exception& e) {
88 BOOST_LOG_SEV(dataset_handler_lg(), error) << msg.subject << " failed: " << e.what();
89 get_datasets_response resp;
90 resp.total_available_count = 0;
91 reply(nats_, msg, resp);
92 }
93 }
94
95 void save(ores::nats::message msg) {
96 BOOST_LOG_SEV(dataset_handler_lg(), debug) << "Handling " << msg.subject;
97 auto req = decode<save_dataset_request>(msg);
98 if (!req) {
99 BOOST_LOG_SEV(dataset_handler_lg(), warn) << "Failed to decode: " << msg.subject;
100 return;
101 }
102 auto ctx_expected = ores::service::service::make_request_context(
103 ctx_, msg, verifier_);
104 if (!ctx_expected) {
105 error_reply(nats_, msg, ctx_expected.error());
106 return;
107 }
108 const auto& ctx = *ctx_expected;
109 if (!has_permission(ctx, "dq::datasets:write")) {
110 error_reply(nats_, msg, ores::service::error_code::forbidden);
111 return;
112 }
113 service::dataset_service svc(ctx);
114 try {
115 for (auto& ds : req->datasets)
116 stamp(ds, ctx);
117 svc.save_datasets(req->datasets);
118 BOOST_LOG_SEV(dataset_handler_lg(), debug) << "Completed " << msg.subject;
119 reply(nats_, msg, save_dataset_response{true, {}});
120 } catch (const std::exception& e) {
121 BOOST_LOG_SEV(dataset_handler_lg(), error) << msg.subject << " failed: " << e.what();
122 reply(nats_, msg, save_dataset_response{false, e.what()});
123 }
124 }
125
126 void remove(ores::nats::message msg) {
127 BOOST_LOG_SEV(dataset_handler_lg(), debug) << "Handling " << msg.subject;
128 auto req = decode<delete_dataset_request>(msg);
129 if (!req) {
130 BOOST_LOG_SEV(dataset_handler_lg(), warn) << "Failed to decode: " << msg.subject;
131 return;
132 }
133 auto ctx_expected = ores::service::service::make_request_context(
134 ctx_, msg, verifier_);
135 if (!ctx_expected) {
136 error_reply(nats_, msg, ctx_expected.error());
137 return;
138 }
139 const auto& ctx = *ctx_expected;
140 if (!has_permission(ctx, "dq::datasets:delete")) {
141 error_reply(nats_, msg, ores::service::error_code::forbidden);
142 return;
143 }
144 service::dataset_service svc(ctx);
145 try {
146 boost::uuids::string_generator gen;
147 for (const auto& id : req->ids)
148 svc.remove_dataset(gen(id));
149 BOOST_LOG_SEV(dataset_handler_lg(), debug) << "Completed " << msg.subject;
150 reply(nats_, msg, delete_dataset_response{true, {}});
151 } catch (const std::exception& e) {
152 BOOST_LOG_SEV(dataset_handler_lg(), error) << msg.subject << " failed: " << e.what();
153 reply(nats_, msg, delete_dataset_response{false, e.what()});
154 }
155 }
156
157 void history(ores::nats::message msg) {
158 BOOST_LOG_SEV(dataset_handler_lg(), debug) << "Handling " << msg.subject;
159 auto req = decode<get_dataset_history_request>(msg);
160 if (!req) {
161 BOOST_LOG_SEV(dataset_handler_lg(), warn) << "Failed to decode: " << msg.subject;
162 return;
163 }
164 auto ctx_expected = ores::service::service::make_request_context(
165 ctx_, msg, verifier_);
166 if (!ctx_expected) {
167 error_reply(nats_, msg, ctx_expected.error());
168 return;
169 }
170 const auto& ctx = *ctx_expected;
171 service::dataset_service svc(ctx);
172 try {
173 const auto hist = svc.get_dataset_history(
174 boost::uuids::string_generator{}(req->id));
175 get_dataset_history_response resp;
176 resp.success = true;
177 resp.history = hist;
178 BOOST_LOG_SEV(dataset_handler_lg(), debug) << "Completed " << msg.subject;
179 reply(nats_, msg, resp);
180 } catch (const std::exception& e) {
181 BOOST_LOG_SEV(dataset_handler_lg(), error) << msg.subject << " failed: " << e.what();
182 get_dataset_history_response resp;
183 resp.success = false;
184 resp.message = e.what();
185 reply(nats_, msg, resp);
186 }
187 }
188
189 void publish(ores::nats::message msg) {
190 BOOST_LOG_SEV(dataset_handler_lg(), debug) << "Handling " << msg.subject;
191 auto req = decode<publish_datasets_request>(msg);
192 if (!req) {
193 BOOST_LOG_SEV(dataset_handler_lg(), warn) << "Failed to decode: " << msg.subject;
194 return;
195 }
196 auto ctx_expected = ores::service::service::make_request_context(
197 ctx_, msg, verifier_);
198 if (!ctx_expected) {
199 error_reply(nats_, msg, ctx_expected.error());
200 return;
201 }
202 const auto& ctx = *ctx_expected;
203 if (!has_permission(ctx, "dq::datasets:write")) {
204 error_reply(nats_, msg, ores::service::error_code::forbidden);
205 return;
206 }
207 service::publication_service svc(ctx);
208 try {
209 boost::uuids::string_generator gen;
210 std::vector<boost::uuids::uuid> uuids;
211 uuids.reserve(req->dataset_ids.size());
212 for (const auto& id : req->dataset_ids)
213 uuids.push_back(gen(id));
214 const auto results = svc.publish(
215 uuids, req->mode, req->published_by,
216 req->resolve_dependencies);
217 publish_datasets_response resp;
218 resp.success = true;
219 resp.results = results;
220 BOOST_LOG_SEV(dataset_handler_lg(), debug) << "Completed " << msg.subject;
221 reply(nats_, msg, resp);
222 } catch (const std::exception& e) {
223 BOOST_LOG_SEV(dataset_handler_lg(), error) << msg.subject << " failed: " << e.what();
224 publish_datasets_response resp;
225 resp.success = false;
226 resp.message = e.what();
227 reply(nats_, msg, resp);
228 }
229 }
230
231private:
232
235 std::optional<ores::security::jwt::jwt_authenticator> verifier_;
236};
237
238} // namespace ores::dq::messaging
239
240#endif
STL namespace.
Implements logging infrastructure for ORE Studio.
Definition boost_severity.hpp:28
@ forbidden
The caller is authenticated but lacks the required permission.
Context for the operations on a postgres database.
Definition context.hpp:47
A received NATS message.
Definition message.hpp:40
std::string subject
The subject the message was published to.
Definition message.hpp:44
NATS client: connection, pub/sub, request/reply, and JetStream.
Definition client.hpp:73