ORE Studio 0.0.4
Loading...
Searching...
No Matches
lei_entity_handler.hpp
1/* -*- mode: c++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*-
2 *
3 * Copyright (C) 2026 Marco Craveiro <marco.craveiro@gmail.com>
4 *
5 * This program is free software; you can redistribute it and/or modify it under
6 * the terms of the GNU General Public License as published by the Free Software
7 * Foundation; either version 3 of the License, or (at your option) any later
8 * version.
9 *
10 * This program is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
12 * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
13 * details.
14 *
15 * You should have received a copy of the GNU General Public License along with
16 * this program; if not, write to the Free Software Foundation, Inc., 51
17 * Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18 *
19 */
20#ifndef ORES_DQ_CORE_MESSAGING_LEI_ENTITY_HANDLER_HPP
21#define ORES_DQ_CORE_MESSAGING_LEI_ENTITY_HANDLER_HPP
22
23#include <optional>
24#include <rfl/json.hpp>
25#include "ores.nats/domain/message.hpp"
26#include "ores.nats/service/client.hpp"
27#include "ores.database/domain/context.hpp"
28#include "ores.security/jwt/jwt_authenticator.hpp"
29#include "ores.service/messaging/handler_helpers.hpp"
30#include "ores.service/service/request_context.hpp"
31#include "ores.dq.api/messaging/lei_entity_summary_protocol.hpp"
32#include "ores.database/repository/bitemporal_operations.hpp"
33#include "ores.logging/make_logger.hpp"
34
35namespace ores::dq::messaging {
36
37using ores::service::messaging::reply;
38using ores::service::messaging::decode;
39using ores::service::messaging::error_reply;
40using namespace ores::logging;
41
42namespace {
43inline auto& lei_entity_handler_lg() {
44 static auto instance = ores::logging::make_logger("ores.dq.messaging.lei_entity_handler");
45 return instance;
46}
47} // namespace
48
49class lei_entity_handler {
50public:
51 lei_entity_handler(
54 std::optional<ores::security::jwt::jwt_authenticator> verifier)
55 : nats_(nats), ctx_(std::move(ctx)), verifier_(std::move(verifier)) {}
56
57 void summary(ores::nats::message msg) {
58 BOOST_LOG_SEV(lei_entity_handler_lg(), debug) << "Handling " << msg.subject;
59 const std::string_view data(
60 reinterpret_cast<const char*>(msg.data.data()), msg.data.size());
61 const auto parsed = rfl::json::read<get_lei_entities_summary_request>(data);
62 if (!parsed) {
63 const auto err = parsed.error().what();
64 BOOST_LOG_SEV(lei_entity_handler_lg(), error)
65 << "Failed to decode " << msg.subject
66 << ": " << err
67 << " (payload: " << data << ")";
68 get_lei_entities_summary_response err_resp;
69 err_resp.success = false;
70 err_resp.error_message = std::string("Failed to decode request: ") + err;
71 reply(nats_, msg, err_resp);
72 return;
73 }
74 const auto& req = *parsed;
75 auto ctx_expected = ores::service::service::make_request_context(
76 ctx_, msg, verifier_);
77 if (!ctx_expected) {
78 error_reply(nats_, msg, ctx_expected.error());
79 return;
80 }
81 const auto& ctx = *ctx_expected;
82 get_lei_entities_summary_response resp;
83 try {
84 using namespace ores::database::repository;
85 if (req.country_filter.empty()) {
86 const std::string sql =
87 "SELECT * FROM ores_dq_lei_entities_distinct_countries_fn()";
88 auto rows = execute_raw_multi_column_query(
89 ctx, sql, lei_entity_handler_lg(),
90 "listing LEI countries");
91 for (const auto& row : rows) {
92 if (!row.empty() && row[0])
93 resp.entities.push_back({.country = *row[0]});
94 }
95 } else {
96 const std::string sql =
97 "SELECT * FROM ores_dq_lei_entities_summary_by_country_fn($1, $2, $3)";
98 auto rows = execute_parameterized_multi_column_query(
99 ctx, sql,
100 {req.country_filter,
101 std::to_string(req.limit),
102 std::to_string(req.offset)},
103 lei_entity_handler_lg(),
104 "listing LEI entities by country");
105 for (const auto& row : rows) {
106 if (row.size() >= 4)
107 resp.entities.push_back({
108 .lei = row[0].value_or(""),
109 .entity_legal_name = row[1].value_or(""),
110 .entity_category = row[2].value_or(""),
111 .country = row[3].value_or("")});
112 }
113 }
114 resp.success = true;
115 } catch (const std::exception& e) {
116 BOOST_LOG_SEV(lei_entity_handler_lg(), error)
117 << msg.subject << " failed: " << e.what();
118 resp.success = false;
119 resp.error_message = e.what();
120 }
121 BOOST_LOG_SEV(lei_entity_handler_lg(), debug) << "Completed " << msg.subject;
122 reply(nats_, msg, resp);
123 }
124
125private:
126
129 std::optional<ores::security::jwt::jwt_authenticator> verifier_;
130};
131
132} // namespace ores::dq::messaging
133
134#endif
STL namespace.
Repository infrastructure and bitemporal operations.
Definition bitemporal_operations.hpp:31
Implements logging infrastructure for ORE Studio.
Definition boost_severity.hpp:28
Context for the operations on a postgres database.
Definition context.hpp:47
A received NATS message.
Definition message.hpp:40
std::string subject
The subject the message was published to.
Definition message.hpp:44
std::vector< std::byte > data
The message payload bytes.
Definition message.hpp:58
NATS client: connection, pub/sub, request/reply, and JetStream.
Definition client.hpp:73