20#ifndef ORES_DQ_CORE_MESSAGING_DATASET_HANDLER_HPP
21#define ORES_DQ_CORE_MESSAGING_DATASET_HANDLER_HPP
26#include <boost/uuid/string_generator.hpp>
27#include "ores.nats/domain/message.hpp"
28#include "ores.nats/service/client.hpp"
29#include "ores.database/domain/context.hpp"
30#include "ores.security/jwt/jwt_authenticator.hpp"
31#include "ores.service/messaging/handler_helpers.hpp"
32#include "ores.service/service/request_context.hpp"
33#include "ores.dq.api/messaging/dataset_protocol.hpp"
34#include "ores.dq.core/service/dataset_service.hpp"
35#include "ores.dq.core/service/publication_service.hpp"
36#include "ores.logging/make_logger.hpp"
38namespace ores::dq::messaging {
40using ores::service::messaging::reply;
41using ores::service::messaging::decode;
42using ores::service::messaging::stamp;
43using ores::service::messaging::error_reply;
44using ores::service::messaging::has_permission;
48inline auto& dataset_handler_lg() {
49 static auto instance = ores::logging::make_logger(
"ores.dq.messaging.dataset_handler");
54class dataset_handler {
59 std::optional<ores::security::jwt::jwt_authenticator> verifier)
60 : nats_(nats), ctx_(
std::move(ctx)), verifier_(
std::move(verifier)) {}
63 BOOST_LOG_SEV(dataset_handler_lg(), debug) <<
"Handling " << msg.
subject;
64 auto req = decode<get_datasets_request>(msg);
66 BOOST_LOG_SEV(dataset_handler_lg(), warn) <<
"Failed to decode: " << msg.
subject;
69 auto ctx_expected = ores::service::service::make_request_context(
70 ctx_, msg, verifier_);
72 error_reply(nats_, msg, ctx_expected.error());
75 const auto& ctx = *ctx_expected;
76 service::dataset_service svc(ctx);
78 const auto items = svc.list_datasets(
79 static_cast<std::uint32_t
>(req->offset),
80 static_cast<std::uint32_t
>(req->limit));
81 const auto count = svc.get_dataset_count();
82 get_datasets_response resp;
83 resp.datasets = items;
84 resp.total_available_count =
static_cast<int>(count);
85 BOOST_LOG_SEV(dataset_handler_lg(), debug) <<
"Completed " << msg.
subject;
86 reply(nats_, msg, resp);
87 }
catch (
const std::exception& e) {
88 BOOST_LOG_SEV(dataset_handler_lg(), error) << msg.
subject <<
" failed: " << e.what();
89 get_datasets_response resp;
90 resp.total_available_count = 0;
91 reply(nats_, msg, resp);
96 BOOST_LOG_SEV(dataset_handler_lg(), debug) <<
"Handling " << msg.
subject;
97 auto req = decode<save_dataset_request>(msg);
99 BOOST_LOG_SEV(dataset_handler_lg(), warn) <<
"Failed to decode: " << msg.
subject;
102 auto ctx_expected = ores::service::service::make_request_context(
103 ctx_, msg, verifier_);
105 error_reply(nats_, msg, ctx_expected.error());
108 const auto& ctx = *ctx_expected;
109 if (!has_permission(ctx,
"dq::datasets:write")) {
113 service::dataset_service svc(ctx);
115 for (
auto& ds : req->datasets)
117 svc.save_datasets(req->datasets);
118 BOOST_LOG_SEV(dataset_handler_lg(), debug) <<
"Completed " << msg.
subject;
119 reply(nats_, msg, save_dataset_response{
true, {}});
120 }
catch (
const std::exception& e) {
121 BOOST_LOG_SEV(dataset_handler_lg(), error) << msg.
subject <<
" failed: " << e.what();
122 reply(nats_, msg, save_dataset_response{
false, e.what()});
127 BOOST_LOG_SEV(dataset_handler_lg(), debug) <<
"Handling " << msg.
subject;
128 auto req = decode<delete_dataset_request>(msg);
130 BOOST_LOG_SEV(dataset_handler_lg(), warn) <<
"Failed to decode: " << msg.
subject;
133 auto ctx_expected = ores::service::service::make_request_context(
134 ctx_, msg, verifier_);
136 error_reply(nats_, msg, ctx_expected.error());
139 const auto& ctx = *ctx_expected;
140 if (!has_permission(ctx,
"dq::datasets:delete")) {
144 service::dataset_service svc(ctx);
146 boost::uuids::string_generator gen;
147 for (
const auto&
id : req->ids)
148 svc.remove_dataset(gen(id));
149 BOOST_LOG_SEV(dataset_handler_lg(), debug) <<
"Completed " << msg.
subject;
150 reply(nats_, msg, delete_dataset_response{
true, {}});
151 }
catch (
const std::exception& e) {
152 BOOST_LOG_SEV(dataset_handler_lg(), error) << msg.
subject <<
" failed: " << e.what();
153 reply(nats_, msg, delete_dataset_response{
false, e.what()});
158 BOOST_LOG_SEV(dataset_handler_lg(), debug) <<
"Handling " << msg.
subject;
159 auto req = decode<get_dataset_history_request>(msg);
161 BOOST_LOG_SEV(dataset_handler_lg(), warn) <<
"Failed to decode: " << msg.
subject;
164 auto ctx_expected = ores::service::service::make_request_context(
165 ctx_, msg, verifier_);
167 error_reply(nats_, msg, ctx_expected.error());
170 const auto& ctx = *ctx_expected;
171 service::dataset_service svc(ctx);
173 const auto hist = svc.get_dataset_history(
174 boost::uuids::string_generator{}(req->id));
175 get_dataset_history_response resp;
178 BOOST_LOG_SEV(dataset_handler_lg(), debug) <<
"Completed " << msg.
subject;
179 reply(nats_, msg, resp);
180 }
catch (
const std::exception& e) {
181 BOOST_LOG_SEV(dataset_handler_lg(), error) << msg.
subject <<
" failed: " << e.what();
182 get_dataset_history_response resp;
183 resp.success =
false;
184 resp.message = e.what();
185 reply(nats_, msg, resp);
190 BOOST_LOG_SEV(dataset_handler_lg(), debug) <<
"Handling " << msg.
subject;
191 auto req = decode<publish_datasets_request>(msg);
193 BOOST_LOG_SEV(dataset_handler_lg(), warn) <<
"Failed to decode: " << msg.
subject;
196 auto ctx_expected = ores::service::service::make_request_context(
197 ctx_, msg, verifier_);
199 error_reply(nats_, msg, ctx_expected.error());
202 const auto& ctx = *ctx_expected;
203 if (!has_permission(ctx,
"dq::datasets:write")) {
207 service::publication_service svc(ctx);
209 boost::uuids::string_generator gen;
210 std::vector<boost::uuids::uuid> uuids;
211 uuids.reserve(req->dataset_ids.size());
212 for (
const auto&
id : req->dataset_ids)
213 uuids.push_back(gen(id));
214 const auto results = svc.publish(
215 uuids, req->mode, req->published_by,
216 req->resolve_dependencies);
217 publish_datasets_response resp;
219 resp.results = results;
220 BOOST_LOG_SEV(dataset_handler_lg(), debug) <<
"Completed " << msg.
subject;
221 reply(nats_, msg, resp);
222 }
catch (
const std::exception& e) {
223 BOOST_LOG_SEV(dataset_handler_lg(), error) << msg.
subject <<
" failed: " << e.what();
224 publish_datasets_response resp;
225 resp.success =
false;
226 resp.message = e.what();
227 reply(nats_, msg, resp);
235 std::optional<ores::security::jwt::jwt_authenticator> verifier_;
Implements logging infrastructure for ORE Studio.
Definition boost_severity.hpp:28
@ forbidden
The caller is authenticated but lacks the required permission.
Context for the operations on a postgres database.
Definition context.hpp:47
A received NATS message.
Definition message.hpp:40
std::string subject
The subject the message was published to.
Definition message.hpp:44
NATS client: connection, pub/sub, request/reply, and JetStream.
Definition client.hpp:73