20#ifndef ORES_DQ_CORE_MESSAGING_DATASET_DEPENDENCY_HANDLER_HPP
21#define ORES_DQ_CORE_MESSAGING_DATASET_DEPENDENCY_HANDLER_HPP
26#include <boost/uuid/string_generator.hpp>
27#include "ores.nats/domain/message.hpp"
28#include "ores.nats/service/client.hpp"
29#include "ores.database/domain/context.hpp"
30#include "ores.security/jwt/jwt_authenticator.hpp"
31#include "ores.service/messaging/handler_helpers.hpp"
32#include "ores.service/service/request_context.hpp"
33#include "ores.dq.api/messaging/dataset_dependency_protocol.hpp"
34#include "ores.dq.core/service/data_organization_service.hpp"
35#include "ores.dq.core/service/publication_service.hpp"
36#include "ores.logging/make_logger.hpp"
38namespace ores::dq::messaging {
40using ores::service::messaging::reply;
41using ores::service::messaging::decode;
42using ores::service::messaging::error_reply;
46inline auto& dataset_dependency_handler_lg() {
47 static auto instance = ores::logging::make_logger(
"ores.dq.messaging.dataset_dependency_handler");
52class dataset_dependency_handler {
54 dataset_dependency_handler(
57 std::optional<ores::security::jwt::jwt_authenticator> verifier)
58 : nats_(nats), ctx_(
std::move(ctx)), verifier_(
std::move(verifier)) {}
61 BOOST_LOG_SEV(dataset_dependency_handler_lg(), debug) <<
"Handling " << msg.
subject;
62 auto req = decode<get_dataset_dependencies_request>(msg);
64 BOOST_LOG_SEV(dataset_dependency_handler_lg(), warn) <<
"Failed to decode: " << msg.
subject;
67 auto ctx_expected = ores::service::service::make_request_context(
68 ctx_, msg, verifier_);
70 error_reply(nats_, msg, ctx_expected.error());
73 const auto& ctx = *ctx_expected;
74 service::data_organization_service svc(ctx);
76 const auto items = svc.list_dataset_dependencies();
77 get_dataset_dependencies_response resp;
78 resp.dependencies = items;
79 resp.total_available_count =
static_cast<int>(items.size());
80 BOOST_LOG_SEV(dataset_dependency_handler_lg(), debug) <<
"Completed " << msg.
subject;
81 reply(nats_, msg, resp);
82 }
catch (
const std::exception& e) {
83 BOOST_LOG_SEV(dataset_dependency_handler_lg(), error) << msg.
subject <<
" failed: " << e.what();
84 get_dataset_dependencies_response resp;
85 resp.total_available_count = 0;
86 reply(nats_, msg, resp);
91 BOOST_LOG_SEV(dataset_dependency_handler_lg(), debug) <<
"Handling " << msg.
subject;
93 decode<get_dataset_dependencies_by_dataset_request>(msg);
95 BOOST_LOG_SEV(dataset_dependency_handler_lg(), warn) <<
"Failed to decode: " << msg.
subject;
98 auto ctx_expected = ores::service::service::make_request_context(
99 ctx_, msg, verifier_);
101 error_reply(nats_, msg, ctx_expected.error());
104 const auto& ctx = *ctx_expected;
105 service::data_organization_service svc(ctx);
108 svc.list_dataset_dependencies_by_dataset(req->dataset_code);
109 get_dataset_dependencies_by_dataset_response resp;
111 resp.dependencies = deps;
112 BOOST_LOG_SEV(dataset_dependency_handler_lg(), debug) <<
"Completed " << msg.
subject;
113 reply(nats_, msg, resp);
114 }
catch (
const std::exception& e) {
115 BOOST_LOG_SEV(dataset_dependency_handler_lg(), error) << msg.
subject <<
" failed: " << e.what();
116 get_dataset_dependencies_by_dataset_response resp;
117 resp.success =
false;
118 resp.message = e.what();
119 reply(nats_, msg, resp);
124 BOOST_LOG_SEV(dataset_dependency_handler_lg(), debug) <<
"Handling " << msg.
subject;
125 auto req = decode<resolve_dependencies_request>(msg);
127 BOOST_LOG_SEV(dataset_dependency_handler_lg(), warn) <<
"Failed to decode: " << msg.
subject;
130 auto ctx_expected = ores::service::service::make_request_context(
131 ctx_, msg, verifier_);
133 error_reply(nats_, msg, ctx_expected.error());
136 const auto& ctx = *ctx_expected;
137 service::publication_service svc(ctx);
139 boost::uuids::string_generator gen;
140 std::vector<boost::uuids::uuid> uuids;
141 uuids.reserve(req->dataset_ids.size());
142 for (
const auto&
id : req->dataset_ids)
143 uuids.push_back(gen(id));
144 const auto datasets = svc.resolve_publication_order(uuids);
145 resolve_dependencies_response resp;
147 resp.datasets = datasets;
148 resp.requested_ids = req->dataset_ids;
149 BOOST_LOG_SEV(dataset_dependency_handler_lg(), debug) <<
"Completed " << msg.
subject;
150 reply(nats_, msg, resp);
151 }
catch (
const std::exception& e) {
152 BOOST_LOG_SEV(dataset_dependency_handler_lg(), error) << msg.
subject <<
" failed: " << e.what();
153 resolve_dependencies_response resp;
154 resp.success =
false;
155 resp.message = e.what();
156 reply(nats_, msg, resp);
164 std::optional<ores::security::jwt::jwt_authenticator> verifier_;
Implements logging infrastructure for ORE Studio.
Definition boost_severity.hpp:28
Context for the operations on a postgres database.
Definition context.hpp:47
A received NATS message.
Definition message.hpp:40
std::string subject
The subject the message was published to.
Definition message.hpp:44
NATS client: connection, pub/sub, request/reply, and JetStream.
Definition client.hpp:73