20#ifndef ORES_DQ_CORE_MESSAGING_DATA_ORGANIZATION_HANDLER_HPP
21#define ORES_DQ_CORE_MESSAGING_DATA_ORGANIZATION_HANDLER_HPP
25#include <boost/uuid/string_generator.hpp>
26#include "ores.nats/domain/message.hpp"
27#include "ores.nats/service/client.hpp"
28#include "ores.database/domain/context.hpp"
29#include "ores.security/jwt/jwt_authenticator.hpp"
30#include "ores.service/messaging/handler_helpers.hpp"
31#include "ores.service/service/request_context.hpp"
32#include "ores.dq.api/messaging/data_organization_protocol.hpp"
33#include "ores.dq.core/service/data_organization_service.hpp"
34#include "ores.dq.core/service/dataset_service.hpp"
35#include "ores.logging/make_logger.hpp"
37namespace ores::dq::messaging {
39using ores::service::messaging::reply;
40using ores::service::messaging::decode;
41using ores::service::messaging::stamp;
42using ores::service::messaging::error_reply;
43using ores::service::messaging::has_permission;
47inline auto& data_organization_handler_lg() {
48 static auto instance = ores::logging::make_logger(
"ores.dq.messaging.data_organization_handler");
53class data_organization_handler {
55 data_organization_handler(
58 std::optional<ores::security::jwt::jwt_authenticator> verifier)
59 : nats_(nats), ctx_(
std::move(ctx)), verifier_(
std::move(verifier)) {}
66 BOOST_LOG_SEV(data_organization_handler_lg(), debug) <<
"Handling " << msg.
subject;
67 auto req = decode<get_catalogs_request>(msg);
69 BOOST_LOG_SEV(data_organization_handler_lg(), warn) <<
"Failed to decode: " << msg.
subject;
72 auto ctx_expected = ores::service::service::make_request_context(
73 ctx_, msg, verifier_);
75 error_reply(nats_, msg, ctx_expected.error());
78 const auto& ctx = *ctx_expected;
79 service::data_organization_service svc(ctx);
81 const auto items = svc.list_catalogs(
82 static_cast<std::uint32_t
>(req->offset),
83 static_cast<std::uint32_t
>(req->limit));
84 const auto count = svc.get_catalog_count();
85 get_catalogs_response resp;
86 resp.catalogs = items;
87 resp.total_available_count =
static_cast<int>(count);
88 BOOST_LOG_SEV(data_organization_handler_lg(), debug) <<
"Completed " << msg.
subject;
89 reply(nats_, msg, resp);
90 }
catch (
const std::exception& e) {
91 BOOST_LOG_SEV(data_organization_handler_lg(), error) << msg.
subject <<
" failed: " << e.what();
92 get_catalogs_response resp;
93 resp.total_available_count = 0;
94 reply(nats_, msg, resp);
99 BOOST_LOG_SEV(data_organization_handler_lg(), debug) <<
"Handling " << msg.
subject;
100 auto req = decode<save_catalog_request>(msg);
102 BOOST_LOG_SEV(data_organization_handler_lg(), warn) <<
"Failed to decode: " << msg.
subject;
105 auto ctx_expected = ores::service::service::make_request_context(
106 ctx_, msg, verifier_);
108 error_reply(nats_, msg, ctx_expected.error());
111 const auto& ctx = *ctx_expected;
112 if (!has_permission(ctx,
"dq::catalogs:write")) {
116 service::data_organization_service svc(ctx);
118 stamp(req->data, ctx);
119 svc.save_catalog(req->data);
120 BOOST_LOG_SEV(data_organization_handler_lg(), debug) <<
"Completed " << msg.
subject;
121 reply(nats_, msg, save_catalog_response{
true, {}});
122 }
catch (
const std::exception& e) {
123 BOOST_LOG_SEV(data_organization_handler_lg(), error) << msg.
subject <<
" failed: " << e.what();
124 reply(nats_, msg, save_catalog_response{
false, e.what()});
129 BOOST_LOG_SEV(data_organization_handler_lg(), debug) <<
"Handling " << msg.
subject;
130 auto req = decode<delete_catalog_request>(msg);
132 BOOST_LOG_SEV(data_organization_handler_lg(), warn) <<
"Failed to decode: " << msg.
subject;
135 auto ctx_expected = ores::service::service::make_request_context(
136 ctx_, msg, verifier_);
138 error_reply(nats_, msg, ctx_expected.error());
141 const auto& ctx = *ctx_expected;
142 if (!has_permission(ctx,
"dq::catalogs:delete")) {
146 service::data_organization_service svc(ctx);
148 for (
const auto& code : req->codes)
149 svc.remove_catalog(code);
150 BOOST_LOG_SEV(data_organization_handler_lg(), debug) <<
"Completed " << msg.
subject;
151 reply(nats_, msg, delete_catalog_response{
true, {}});
152 }
catch (
const std::exception& e) {
153 BOOST_LOG_SEV(data_organization_handler_lg(), error) << msg.
subject <<
" failed: " << e.what();
154 reply(nats_, msg, delete_catalog_response{
false, e.what()});
159 BOOST_LOG_SEV(data_organization_handler_lg(), debug) <<
"Handling " << msg.
subject;
160 auto req = decode<get_catalog_history_request>(msg);
162 BOOST_LOG_SEV(data_organization_handler_lg(), warn) <<
"Failed to decode: " << msg.
subject;
165 auto ctx_expected = ores::service::service::make_request_context(
166 ctx_, msg, verifier_);
168 error_reply(nats_, msg, ctx_expected.error());
171 const auto& ctx = *ctx_expected;
172 service::data_organization_service svc(ctx);
174 const auto history = svc.get_catalog_history(req->code);
175 get_catalog_history_response resp;
177 resp.history = history;
178 BOOST_LOG_SEV(data_organization_handler_lg(), debug) <<
"Completed " << msg.
subject;
179 reply(nats_, msg, resp);
180 }
catch (
const std::exception& e) {
181 BOOST_LOG_SEV(data_organization_handler_lg(), error) << msg.
subject <<
" failed: " << e.what();
182 get_catalog_history_response resp;
183 resp.success =
false;
184 resp.message = e.what();
185 reply(nats_, msg, resp);
194 BOOST_LOG_SEV(data_organization_handler_lg(), debug) <<
"Handling " << msg.
subject;
195 auto req = decode<get_data_domains_request>(msg);
197 BOOST_LOG_SEV(data_organization_handler_lg(), warn) <<
"Failed to decode: " << msg.
subject;
200 auto ctx_expected = ores::service::service::make_request_context(
201 ctx_, msg, verifier_);
203 error_reply(nats_, msg, ctx_expected.error());
206 const auto& ctx = *ctx_expected;
207 service::data_organization_service svc(ctx);
209 const auto items = svc.list_data_domains();
210 get_data_domains_response resp;
211 resp.domains = items;
212 resp.total_available_count =
static_cast<int>(items.size());
213 BOOST_LOG_SEV(data_organization_handler_lg(), debug) <<
"Completed " << msg.
subject;
214 reply(nats_, msg, resp);
215 }
catch (
const std::exception& e) {
216 BOOST_LOG_SEV(data_organization_handler_lg(), error) << msg.
subject <<
" failed: " << e.what();
217 get_data_domains_response resp;
218 resp.total_available_count = 0;
219 reply(nats_, msg, resp);
224 BOOST_LOG_SEV(data_organization_handler_lg(), debug) <<
"Handling " << msg.
subject;
225 auto req = decode<save_data_domain_request>(msg);
227 BOOST_LOG_SEV(data_organization_handler_lg(), warn) <<
"Failed to decode: " << msg.
subject;
230 auto ctx_expected = ores::service::service::make_request_context(
231 ctx_, msg, verifier_);
233 error_reply(nats_, msg, ctx_expected.error());
236 const auto& ctx = *ctx_expected;
237 if (!has_permission(ctx,
"dq::data_domains:write")) {
241 service::data_organization_service svc(ctx);
243 stamp(req->data, ctx);
244 svc.save_data_domain(req->data);
245 BOOST_LOG_SEV(data_organization_handler_lg(), debug) <<
"Completed " << msg.
subject;
246 reply(nats_, msg, save_data_domain_response{
true, {}});
247 }
catch (
const std::exception& e) {
248 BOOST_LOG_SEV(data_organization_handler_lg(), error) << msg.
subject <<
" failed: " << e.what();
249 reply(nats_, msg, save_data_domain_response{
false, e.what()});
254 BOOST_LOG_SEV(data_organization_handler_lg(), debug) <<
"Handling " << msg.
subject;
255 auto req = decode<delete_data_domain_request>(msg);
257 BOOST_LOG_SEV(data_organization_handler_lg(), warn) <<
"Failed to decode: " << msg.
subject;
260 auto ctx_expected = ores::service::service::make_request_context(
261 ctx_, msg, verifier_);
263 error_reply(nats_, msg, ctx_expected.error());
266 const auto& ctx = *ctx_expected;
267 if (!has_permission(ctx,
"dq::data_domains:delete")) {
271 service::data_organization_service svc(ctx);
273 for (
const auto& name : req->names)
274 svc.remove_data_domain(name);
275 BOOST_LOG_SEV(data_organization_handler_lg(), debug) <<
"Completed " << msg.
subject;
276 reply(nats_, msg, delete_data_domain_response{
true, {}});
277 }
catch (
const std::exception& e) {
278 BOOST_LOG_SEV(data_organization_handler_lg(), error) << msg.
subject <<
" failed: " << e.what();
279 reply(nats_, msg, delete_data_domain_response{
false, e.what()});
284 BOOST_LOG_SEV(data_organization_handler_lg(), debug) <<
"Handling " << msg.
subject;
285 auto req = decode<get_data_domain_history_request>(msg);
287 BOOST_LOG_SEV(data_organization_handler_lg(), warn) <<
"Failed to decode: " << msg.
subject;
290 auto ctx_expected = ores::service::service::make_request_context(
291 ctx_, msg, verifier_);
293 error_reply(nats_, msg, ctx_expected.error());
296 const auto& ctx = *ctx_expected;
297 service::data_organization_service svc(ctx);
299 const auto history = svc.get_data_domain_history(req->name);
300 get_data_domain_history_response resp;
302 resp.history = history;
303 BOOST_LOG_SEV(data_organization_handler_lg(), debug) <<
"Completed " << msg.
subject;
304 reply(nats_, msg, resp);
305 }
catch (
const std::exception& e) {
306 BOOST_LOG_SEV(data_organization_handler_lg(), error) << msg.
subject <<
" failed: " << e.what();
307 get_data_domain_history_response resp;
308 resp.success =
false;
309 resp.message = e.what();
310 reply(nats_, msg, resp);
319 BOOST_LOG_SEV(data_organization_handler_lg(), debug) <<
"Handling " << msg.
subject;
320 auto req = decode<get_methodologies_request>(msg);
322 BOOST_LOG_SEV(data_organization_handler_lg(), warn) <<
"Failed to decode: " << msg.
subject;
325 auto ctx_expected = ores::service::service::make_request_context(
326 ctx_, msg, verifier_);
328 error_reply(nats_, msg, ctx_expected.error());
331 const auto& ctx = *ctx_expected;
332 service::dataset_service svc(ctx);
334 const auto items = svc.list_methodologies(
335 static_cast<std::uint32_t
>(req->offset),
336 static_cast<std::uint32_t
>(req->limit));
337 const auto count = svc.get_methodology_count();
338 get_methodologies_response resp;
339 resp.methodologies = items;
340 resp.total_available_count =
static_cast<int>(count);
341 BOOST_LOG_SEV(data_organization_handler_lg(), debug) <<
"Completed " << msg.
subject;
342 reply(nats_, msg, resp);
343 }
catch (
const std::exception& e) {
344 BOOST_LOG_SEV(data_organization_handler_lg(), error) << msg.
subject <<
" failed: " << e.what();
345 get_methodologies_response resp;
346 resp.total_available_count = 0;
347 reply(nats_, msg, resp);
352 BOOST_LOG_SEV(data_organization_handler_lg(), debug) <<
"Handling " << msg.
subject;
353 auto req = decode<save_methodology_request>(msg);
355 BOOST_LOG_SEV(data_organization_handler_lg(), warn) <<
"Failed to decode: " << msg.
subject;
358 auto ctx_expected = ores::service::service::make_request_context(
359 ctx_, msg, verifier_);
361 error_reply(nats_, msg, ctx_expected.error());
364 const auto& ctx = *ctx_expected;
365 if (!has_permission(ctx,
"dq::methodologies:write")) {
369 service::dataset_service svc(ctx);
371 stamp(req->data, ctx);
372 svc.save_methodology(req->data);
373 BOOST_LOG_SEV(data_organization_handler_lg(), debug) <<
"Completed " << msg.
subject;
374 reply(nats_, msg, save_methodology_response{
true, {}});
375 }
catch (
const std::exception& e) {
376 BOOST_LOG_SEV(data_organization_handler_lg(), error) << msg.
subject <<
" failed: " << e.what();
377 reply(nats_, msg, save_methodology_response{
false, e.what()});
382 BOOST_LOG_SEV(data_organization_handler_lg(), debug) <<
"Handling " << msg.
subject;
383 auto req = decode<delete_methodology_request>(msg);
385 BOOST_LOG_SEV(data_organization_handler_lg(), warn) <<
"Failed to decode: " << msg.
subject;
388 auto ctx_expected = ores::service::service::make_request_context(
389 ctx_, msg, verifier_);
391 error_reply(nats_, msg, ctx_expected.error());
394 const auto& ctx = *ctx_expected;
395 if (!has_permission(ctx,
"dq::methodologies:delete")) {
399 service::dataset_service svc(ctx);
401 for (
const auto& code : req->codes)
402 svc.remove_methodology(boost::uuids::string_generator{}(code));
403 BOOST_LOG_SEV(data_organization_handler_lg(), debug) <<
"Completed " << msg.
subject;
404 reply(nats_, msg, delete_methodology_response{
true, {}});
405 }
catch (
const std::exception& e) {
406 BOOST_LOG_SEV(data_organization_handler_lg(), error) << msg.
subject <<
" failed: " << e.what();
407 reply(nats_, msg, delete_methodology_response{
false, e.what()});
412 BOOST_LOG_SEV(data_organization_handler_lg(), debug) <<
"Handling " << msg.
subject;
413 auto req = decode<get_methodology_history_request>(msg);
415 BOOST_LOG_SEV(data_organization_handler_lg(), warn) <<
"Failed to decode: " << msg.
subject;
418 auto ctx_expected = ores::service::service::make_request_context(
419 ctx_, msg, verifier_);
421 error_reply(nats_, msg, ctx_expected.error());
424 const auto& ctx = *ctx_expected;
425 service::dataset_service svc(ctx);
427 const auto history = svc.get_methodology_history(
428 boost::uuids::string_generator{}(req->code));
429 get_methodology_history_response resp;
431 resp.history = history;
432 BOOST_LOG_SEV(data_organization_handler_lg(), debug) <<
"Completed " << msg.
subject;
433 reply(nats_, msg, resp);
434 }
catch (
const std::exception& e) {
435 BOOST_LOG_SEV(data_organization_handler_lg(), error) << msg.
subject <<
" failed: " << e.what();
436 get_methodology_history_response resp;
437 resp.success =
false;
438 resp.message = e.what();
439 reply(nats_, msg, resp);
448 BOOST_LOG_SEV(data_organization_handler_lg(), debug) <<
"Handling " << msg.
subject;
449 auto req = decode<get_subject_areas_request>(msg);
451 BOOST_LOG_SEV(data_organization_handler_lg(), warn) <<
"Failed to decode: " << msg.
subject;
454 auto ctx_expected = ores::service::service::make_request_context(
455 ctx_, msg, verifier_);
457 error_reply(nats_, msg, ctx_expected.error());
460 const auto& ctx = *ctx_expected;
461 service::data_organization_service svc(ctx);
463 const auto items = svc.list_subject_areas(
464 static_cast<std::uint32_t
>(req->offset),
465 static_cast<std::uint32_t
>(req->limit));
466 const auto count = svc.get_subject_area_count();
467 get_subject_areas_response resp;
468 resp.subject_areas = items;
469 resp.total_available_count =
static_cast<int>(count);
470 BOOST_LOG_SEV(data_organization_handler_lg(), debug) <<
"Completed " << msg.
subject;
471 reply(nats_, msg, resp);
472 }
catch (
const std::exception& e) {
473 BOOST_LOG_SEV(data_organization_handler_lg(), error) << msg.
subject <<
" failed: " << e.what();
474 get_subject_areas_response resp;
475 resp.total_available_count = 0;
476 reply(nats_, msg, resp);
481 BOOST_LOG_SEV(data_organization_handler_lg(), debug) <<
"Handling " << msg.
subject;
482 auto req = decode<save_subject_area_request>(msg);
484 BOOST_LOG_SEV(data_organization_handler_lg(), warn) <<
"Failed to decode: " << msg.
subject;
487 auto ctx_expected = ores::service::service::make_request_context(
488 ctx_, msg, verifier_);
490 error_reply(nats_, msg, ctx_expected.error());
493 const auto& ctx = *ctx_expected;
494 if (!has_permission(ctx,
"dq::subject_areas:write")) {
498 service::data_organization_service svc(ctx);
500 stamp(req->data, ctx);
501 svc.save_subject_area(req->data);
502 BOOST_LOG_SEV(data_organization_handler_lg(), debug) <<
"Completed " << msg.
subject;
503 reply(nats_, msg, save_subject_area_response{
true, {}});
504 }
catch (
const std::exception& e) {
505 BOOST_LOG_SEV(data_organization_handler_lg(), error) << msg.
subject <<
" failed: " << e.what();
506 reply(nats_, msg, save_subject_area_response{
false, e.what()});
511 BOOST_LOG_SEV(data_organization_handler_lg(), debug) <<
"Handling " << msg.
subject;
512 auto req = decode<delete_subject_area_request>(msg);
514 BOOST_LOG_SEV(data_organization_handler_lg(), warn) <<
"Failed to decode: " << msg.
subject;
517 auto ctx_expected = ores::service::service::make_request_context(
518 ctx_, msg, verifier_);
520 error_reply(nats_, msg, ctx_expected.error());
523 const auto& ctx = *ctx_expected;
524 if (!has_permission(ctx,
"dq::subject_areas:delete")) {
528 service::data_organization_service svc(ctx);
530 for (
const auto& key : req->keys)
531 svc.remove_subject_area(key.name, key.domain_name);
532 BOOST_LOG_SEV(data_organization_handler_lg(), debug) <<
"Completed " << msg.
subject;
533 reply(nats_, msg, delete_subject_area_response{
true, {}});
534 }
catch (
const std::exception& e) {
535 BOOST_LOG_SEV(data_organization_handler_lg(), error) << msg.
subject <<
" failed: " << e.what();
536 reply(nats_, msg, delete_subject_area_response{
false, e.what()});
541 BOOST_LOG_SEV(data_organization_handler_lg(), debug) <<
"Handling " << msg.
subject;
542 auto req = decode<get_subject_area_history_request>(msg);
544 BOOST_LOG_SEV(data_organization_handler_lg(), warn) <<
"Failed to decode: " << msg.
subject;
547 auto ctx_expected = ores::service::service::make_request_context(
548 ctx_, msg, verifier_);
550 error_reply(nats_, msg, ctx_expected.error());
553 const auto& ctx = *ctx_expected;
554 service::data_organization_service svc(ctx);
556 const auto history = svc.get_subject_area_history(
557 req->key.name, req->key.domain_name);
558 get_subject_area_history_response resp;
560 resp.history = history;
561 BOOST_LOG_SEV(data_organization_handler_lg(), debug) <<
"Completed " << msg.
subject;
562 reply(nats_, msg, resp);
563 }
catch (
const std::exception& e) {
564 BOOST_LOG_SEV(data_organization_handler_lg(), error) << msg.
subject <<
" failed: " << e.what();
565 get_subject_area_history_response resp;
566 resp.success =
false;
567 resp.message = e.what();
568 reply(nats_, msg, resp);
576 std::optional<ores::security::jwt::jwt_authenticator> verifier_;
Implements logging infrastructure for ORE Studio.
Definition boost_severity.hpp:28
@ forbidden
The caller is authenticated but lacks the required permission.
Context for the operations on a postgres database.
Definition context.hpp:47
A received NATS message.
Definition message.hpp:40
std::string subject
The subject the message was published to.
Definition message.hpp:44
NATS client: connection, pub/sub, request/reply, and JetStream.
Definition client.hpp:73