20#ifndef ORES_DQ_CORE_MESSAGING_LEI_ENTITY_HANDLER_HPP
21#define ORES_DQ_CORE_MESSAGING_LEI_ENTITY_HANDLER_HPP
24#include <rfl/json.hpp>
25#include "ores.nats/domain/message.hpp"
26#include "ores.nats/service/client.hpp"
27#include "ores.database/domain/context.hpp"
28#include "ores.security/jwt/jwt_authenticator.hpp"
29#include "ores.service/messaging/handler_helpers.hpp"
30#include "ores.service/service/request_context.hpp"
31#include "ores.dq.api/messaging/lei_entity_summary_protocol.hpp"
32#include "ores.database/repository/bitemporal_operations.hpp"
33#include "ores.logging/make_logger.hpp"
35namespace ores::dq::messaging {
37using ores::service::messaging::reply;
38using ores::service::messaging::decode;
39using ores::service::messaging::error_reply;
43inline auto& lei_entity_handler_lg() {
44 static auto instance = ores::logging::make_logger(
"ores.dq.messaging.lei_entity_handler");
49class lei_entity_handler {
54 std::optional<ores::security::jwt::jwt_authenticator> verifier)
55 : nats_(nats), ctx_(
std::move(ctx)), verifier_(
std::move(verifier)) {}
58 BOOST_LOG_SEV(lei_entity_handler_lg(), debug) <<
"Handling " << msg.
subject;
59 const std::string_view data(
60 reinterpret_cast<const char*
>(msg.
data.data()), msg.
data.size());
61 const auto parsed = rfl::json::read<get_lei_entities_summary_request>(data);
63 const auto err = parsed.error().what();
64 BOOST_LOG_SEV(lei_entity_handler_lg(), error)
65 <<
"Failed to decode " << msg.
subject
67 <<
" (payload: " << data <<
")";
68 get_lei_entities_summary_response err_resp;
69 err_resp.success =
false;
70 err_resp.error_message = std::string(
"Failed to decode request: ") + err;
71 reply(nats_, msg, err_resp);
74 const auto& req = *parsed;
75 auto ctx_expected = ores::service::service::make_request_context(
76 ctx_, msg, verifier_);
78 error_reply(nats_, msg, ctx_expected.error());
81 const auto& ctx = *ctx_expected;
82 get_lei_entities_summary_response resp;
85 if (req.country_filter.empty()) {
86 const std::string sql =
87 "SELECT * FROM ores_dq_lei_entities_distinct_countries_fn()";
88 auto rows = execute_raw_multi_column_query(
89 ctx, sql, lei_entity_handler_lg(),
90 "listing LEI countries");
91 for (
const auto& row : rows) {
92 if (!row.empty() && row[0])
93 resp.entities.push_back({.country = *row[0]});
96 const std::string sql =
97 "SELECT * FROM ores_dq_lei_entities_summary_by_country_fn($1, $2, $3)";
98 auto rows = execute_parameterized_multi_column_query(
101 std::to_string(req.limit),
102 std::to_string(req.offset)},
103 lei_entity_handler_lg(),
104 "listing LEI entities by country");
105 for (
const auto& row : rows) {
107 resp.entities.push_back({
108 .lei = row[0].value_or(
""),
109 .entity_legal_name = row[1].value_or(
""),
110 .entity_category = row[2].value_or(
""),
111 .country = row[3].value_or(
"")});
115 }
catch (
const std::exception& e) {
116 BOOST_LOG_SEV(lei_entity_handler_lg(), error)
117 << msg.
subject <<
" failed: " << e.what();
118 resp.success =
false;
119 resp.error_message = e.what();
121 BOOST_LOG_SEV(lei_entity_handler_lg(), debug) <<
"Completed " << msg.
subject;
122 reply(nats_, msg, resp);
129 std::optional<ores::security::jwt::jwt_authenticator> verifier_;
Repository infrastructure and bitemporal operations.
Definition bitemporal_operations.hpp:31
Implements logging infrastructure for ORE Studio.
Definition boost_severity.hpp:28
Context for the operations on a postgres database.
Definition context.hpp:47
A received NATS message.
Definition message.hpp:40
std::string subject
The subject the message was published to.
Definition message.hpp:44
std::vector< std::byte > data
The message payload bytes.
Definition message.hpp:58
NATS client: connection, pub/sub, request/reply, and JetStream.
Definition client.hpp:73