ORE Studio 0.0.4
Loading...
Searching...
No Matches
publication_handler.hpp
1/* -*- mode: c++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*-
2 *
3 * Copyright (C) 2026 Marco Craveiro <marco.craveiro@gmail.com>
4 *
5 * This program is free software; you can redistribute it and/or modify it under
6 * the terms of the GNU General Public License as published by the Free Software
7 * Foundation; either version 3 of the License, or (at your option) any later
8 * version.
9 *
10 * This program is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
12 * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
13 * details.
14 *
15 * You should have received a copy of the GNU General Public License along with
16 * this program; if not, write to the Free Software Foundation, Inc., 51
17 * Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18 *
19 */
20#ifndef ORES_DQ_CORE_MESSAGING_PUBLICATION_HANDLER_HPP
21#define ORES_DQ_CORE_MESSAGING_PUBLICATION_HANDLER_HPP
22
23#include <optional>
24#include <stdexcept>
25#include <boost/uuid/string_generator.hpp>
26#include "ores.nats/domain/message.hpp"
27#include "ores.nats/service/client.hpp"
28#include "ores.database/domain/context.hpp"
29#include "ores.security/jwt/jwt_authenticator.hpp"
30#include "ores.service/messaging/handler_helpers.hpp"
31#include "ores.service/service/request_context.hpp"
32#include "ores.dq.api/messaging/publication_protocol.hpp"
33#include "ores.dq.api/messaging/publish_bundle_protocol.hpp"
34#include "ores.dq.core/service/publication_service.hpp"
35#include "ores.logging/make_logger.hpp"
36
37namespace ores::dq::messaging {
38
39using ores::service::messaging::reply;
40using ores::service::messaging::decode;
41using ores::service::messaging::error_reply;
42using ores::service::messaging::has_permission;
43using namespace ores::logging;
44
45namespace {
46inline auto& publication_handler_lg() {
47 static auto instance = ores::logging::make_logger("ores.dq.messaging.publication_handler");
48 return instance;
49}
50} // namespace
51
52class publication_handler {
53public:
54 publication_handler(
57 std::optional<ores::security::jwt::jwt_authenticator> verifier)
58 : nats_(nats), ctx_(std::move(ctx)), verifier_(std::move(verifier)) {}
59
60 void list_publications(ores::nats::message msg) {
61 BOOST_LOG_SEV(publication_handler_lg(), debug) << "Handling " << msg.subject;
62 auto req = decode<get_publications_request>(msg);
63 if (!req) {
64 BOOST_LOG_SEV(publication_handler_lg(), warn) << "Failed to decode: " << msg.subject;
65 return;
66 }
67 auto ctx_expected = ores::service::service::make_request_context(
68 ctx_, msg, verifier_);
69 if (!ctx_expected) {
70 error_reply(nats_, msg, ctx_expected.error());
71 return;
72 }
73 const auto& ctx = *ctx_expected;
74 service::publication_service svc(ctx);
75 try {
76 boost::uuids::string_generator gen;
77 const auto pubs =
78 svc.get_publication_history(gen(req->dataset_id));
79 get_publications_response resp;
80 resp.success = true;
81 resp.publications = pubs;
82 BOOST_LOG_SEV(publication_handler_lg(), debug) << "Completed " << msg.subject;
83 reply(nats_, msg, resp);
84 } catch (const std::exception& e) {
85 BOOST_LOG_SEV(publication_handler_lg(), error) << msg.subject << " failed: " << e.what();
86 get_publications_response resp;
87 resp.success = false;
88 resp.message = e.what();
89 reply(nats_, msg, resp);
90 }
91 }
92
93 void publish_bundle(ores::nats::message msg) {
94 BOOST_LOG_SEV(publication_handler_lg(), debug) << "Handling " << msg.subject;
95 auto req = decode<publish_bundle_request>(msg);
96 if (!req) {
97 BOOST_LOG_SEV(publication_handler_lg(), warn) << "Failed to decode: " << msg.subject;
98 return;
99 }
100 auto ctx_expected = ores::service::service::make_request_context(
101 ctx_, msg, verifier_);
102 if (!ctx_expected) {
103 error_reply(nats_, msg, ctx_expected.error());
104 return;
105 }
106 const auto& ctx = *ctx_expected;
107 if (!has_permission(ctx, "dq::datasets:write")) {
108 error_reply(nats_, msg, ores::service::error_code::forbidden);
109 return;
110 }
111 service::publication_service svc(ctx);
112 try {
113 const auto result = svc.publish_bundle(
114 req->bundle_code,
115 req->mode,
116 req->published_by,
117 req->atomic,
118 req->params_json);
119 publish_bundle_response resp;
120 resp.success = result.success;
121 resp.error_message = result.error_message;
122 resp.datasets_succeeded =
123 static_cast<int>(result.datasets_succeeded);
124 resp.total_records_inserted =
125 static_cast<int>(result.total_records_inserted);
126 resp.total_records_updated =
127 static_cast<int>(result.total_records_updated);
128 for (const auto& dr : result.dataset_results) {
129 bundle_dataset_result bdr;
130 bdr.dataset_code = dr.dataset_code;
131 bdr.success = (dr.status == "success");
132 bdr.error_message = dr.error_message;
133 bdr.records_inserted =
134 static_cast<int>(dr.records_inserted);
135 bdr.records_updated =
136 static_cast<int>(dr.records_updated);
137 bdr.records_skipped =
138 static_cast<int>(dr.records_skipped);
139 bdr.records_deleted =
140 static_cast<int>(dr.records_deleted);
141 resp.dataset_results.push_back(std::move(bdr));
142 }
143 BOOST_LOG_SEV(publication_handler_lg(), debug) << "Completed " << msg.subject;
144 reply(nats_, msg, resp);
145 } catch (const std::exception& e) {
146 BOOST_LOG_SEV(publication_handler_lg(), error) << msg.subject << " failed: " << e.what();
147 publish_bundle_response resp;
148 resp.success = false;
149 resp.error_message = e.what();
150 reply(nats_, msg, resp);
151 }
152 }
153
154private:
155
158 std::optional<ores::security::jwt::jwt_authenticator> verifier_;
159};
160
161} // namespace ores::dq::messaging
162
163#endif
STL namespace.
Implements logging infrastructure for ORE Studio.
Definition boost_severity.hpp:28
@ forbidden
The caller is authenticated but lacks the required permission.
Context for the operations on a postgres database.
Definition context.hpp:47
A received NATS message.
Definition message.hpp:40
std::string subject
The subject the message was published to.
Definition message.hpp:44
NATS client: connection, pub/sub, request/reply, and JetStream.
Definition client.hpp:73