20#ifndef ORES_DQ_CORE_MESSAGING_PUBLICATION_HANDLER_HPP
21#define ORES_DQ_CORE_MESSAGING_PUBLICATION_HANDLER_HPP
25#include <boost/uuid/string_generator.hpp>
26#include "ores.nats/domain/message.hpp"
27#include "ores.nats/service/client.hpp"
28#include "ores.database/domain/context.hpp"
29#include "ores.security/jwt/jwt_authenticator.hpp"
30#include "ores.service/messaging/handler_helpers.hpp"
31#include "ores.service/service/request_context.hpp"
32#include "ores.dq.api/messaging/publication_protocol.hpp"
33#include "ores.dq.api/messaging/publish_bundle_protocol.hpp"
34#include "ores.dq.core/service/publication_service.hpp"
35#include "ores.logging/make_logger.hpp"
37namespace ores::dq::messaging {
39using ores::service::messaging::reply;
40using ores::service::messaging::decode;
41using ores::service::messaging::error_reply;
42using ores::service::messaging::has_permission;
46inline auto& publication_handler_lg() {
47 static auto instance = ores::logging::make_logger(
"ores.dq.messaging.publication_handler");
52class publication_handler {
57 std::optional<ores::security::jwt::jwt_authenticator> verifier)
58 : nats_(nats), ctx_(
std::move(ctx)), verifier_(
std::move(verifier)) {}
61 BOOST_LOG_SEV(publication_handler_lg(), debug) <<
"Handling " << msg.
subject;
62 auto req = decode<get_publications_request>(msg);
64 BOOST_LOG_SEV(publication_handler_lg(), warn) <<
"Failed to decode: " << msg.
subject;
67 auto ctx_expected = ores::service::service::make_request_context(
68 ctx_, msg, verifier_);
70 error_reply(nats_, msg, ctx_expected.error());
73 const auto& ctx = *ctx_expected;
74 service::publication_service svc(ctx);
76 boost::uuids::string_generator gen;
78 svc.get_publication_history(gen(req->dataset_id));
79 get_publications_response resp;
81 resp.publications = pubs;
82 BOOST_LOG_SEV(publication_handler_lg(), debug) <<
"Completed " << msg.
subject;
83 reply(nats_, msg, resp);
84 }
catch (
const std::exception& e) {
85 BOOST_LOG_SEV(publication_handler_lg(), error) << msg.
subject <<
" failed: " << e.what();
86 get_publications_response resp;
88 resp.message = e.what();
89 reply(nats_, msg, resp);
94 BOOST_LOG_SEV(publication_handler_lg(), debug) <<
"Handling " << msg.
subject;
95 auto req = decode<publish_bundle_request>(msg);
97 BOOST_LOG_SEV(publication_handler_lg(), warn) <<
"Failed to decode: " << msg.
subject;
100 auto ctx_expected = ores::service::service::make_request_context(
101 ctx_, msg, verifier_);
103 error_reply(nats_, msg, ctx_expected.error());
106 const auto& ctx = *ctx_expected;
107 if (!has_permission(ctx,
"dq::datasets:write")) {
111 service::publication_service svc(ctx);
113 const auto result = svc.publish_bundle(
119 publish_bundle_response resp;
120 resp.success = result.success;
121 resp.error_message = result.error_message;
122 resp.datasets_succeeded =
123 static_cast<int>(result.datasets_succeeded);
124 resp.total_records_inserted =
125 static_cast<int>(result.total_records_inserted);
126 resp.total_records_updated =
127 static_cast<int>(result.total_records_updated);
128 for (
const auto& dr : result.dataset_results) {
129 bundle_dataset_result bdr;
130 bdr.dataset_code = dr.dataset_code;
131 bdr.success = (dr.status ==
"success");
132 bdr.error_message = dr.error_message;
133 bdr.records_inserted =
134 static_cast<int>(dr.records_inserted);
135 bdr.records_updated =
136 static_cast<int>(dr.records_updated);
137 bdr.records_skipped =
138 static_cast<int>(dr.records_skipped);
139 bdr.records_deleted =
140 static_cast<int>(dr.records_deleted);
141 resp.dataset_results.push_back(std::move(bdr));
143 BOOST_LOG_SEV(publication_handler_lg(), debug) <<
"Completed " << msg.
subject;
144 reply(nats_, msg, resp);
145 }
catch (
const std::exception& e) {
146 BOOST_LOG_SEV(publication_handler_lg(), error) << msg.
subject <<
" failed: " << e.what();
147 publish_bundle_response resp;
148 resp.success =
false;
149 resp.error_message = e.what();
150 reply(nats_, msg, resp);
158 std::optional<ores::security::jwt::jwt_authenticator> verifier_;
Implements logging infrastructure for ORE Studio.
Definition boost_severity.hpp:28
@ forbidden
The caller is authenticated but lacks the required permission.
Context for the operations on a postgres database.
Definition context.hpp:47
A received NATS message.
Definition message.hpp:40
std::string subject
The subject the message was published to.
Definition message.hpp:44
NATS client: connection, pub/sub, request/reply, and JetStream.
Definition client.hpp:73