20#ifndef ORES_REFDATA_CORE_MESSAGING_PARTY_HANDLER_HPP
21#define ORES_REFDATA_CORE_MESSAGING_PARTY_HANDLER_HPP
24#include <boost/uuid/string_generator.hpp>
25#include "ores.logging/make_logger.hpp"
26#include "ores.nats/domain/message.hpp"
27#include "ores.nats/service/client.hpp"
28#include "ores.database/domain/context.hpp"
29#include "ores.database/service/tenant_context.hpp"
30#include "ores.security/jwt/jwt_authenticator.hpp"
31#include "ores.service/messaging/handler_helpers.hpp"
32#include "ores.service/messaging/workflow_helpers.hpp"
33#include "ores.service/service/request_context.hpp"
34#include "ores.refdata.api/messaging/party_protocol.hpp"
35#include "ores.refdata.core/service/party_service.hpp"
37namespace ores::refdata::messaging {
40inline auto& party_handler_lg() {
41 static auto instance = ores::logging::make_logger(
42 "ores.refdata.messaging.party_handler");
47using ores::service::messaging::reply;
48using ores::service::messaging::decode;
49using ores::service::messaging::error_reply;
50using ores::service::messaging::has_permission;
51using ores::service::messaging::log_handler_entry;
58 std::optional<ores::security::jwt::jwt_authenticator> verifier)
59 : nats_(nats), ctx_(
std::move(ctx)), verifier_(
std::move(verifier)) {}
62 [[maybe_unused]]
const auto correlation_id =
63 log_handler_entry(party_handler_lg(), msg);
64 auto ctx_expected = ores::service::service::make_request_context(
65 ctx_, msg, verifier_);
67 error_reply(nats_, msg, ctx_expected.error());
70 const auto& ctx = *ctx_expected;
71 service::party_service svc(ctx);
72 get_parties_response resp;
73 auto req = decode<get_parties_request>(msg);
75 BOOST_LOG_SEV(party_handler_lg(), warn)
76 <<
"Failed to decode: " << msg.
subject;
77 reply(nats_, msg, resp);
81 resp.parties = svc.list_parties(
82 static_cast<std::uint32_t
>(req->offset),
83 static_cast<std::uint32_t
>(req->limit));
84 resp.total_available_count =
85 static_cast<int>(svc.count_parties());
86 BOOST_LOG_SEV(party_handler_lg(), debug)
88 }
catch (
const std::exception& e) {
89 BOOST_LOG_SEV(party_handler_lg(), error)
90 << msg.
subject <<
" failed: " << e.what();
92 reply(nats_, msg, resp);
96 using ores::service::messaging::is_workflow_command;
97 using ores::service::messaging::extract_workflow_header;
98 using ores::service::messaging::publish_step_completion;
99 using ores::service::messaging::workflow_step_id_header;
100 using ores::service::messaging::workflow_instance_id_header;
101 using ores::service::messaging::workflow_tenant_id_header;
104 if (is_workflow_command(msg)) {
105 const auto step_id = extract_workflow_header(msg, workflow_step_id_header);
106 const auto inst_id = extract_workflow_header(msg, workflow_instance_id_header);
107 const auto tenant_id = extract_workflow_header(msg, workflow_tenant_id_header);
109 auto req = decode<save_party_request>(msg);
111 publish_step_completion(nats_, step_id, inst_id,
false,
"",
112 "Failed to decode save_party_request");
117 auto wf_ctx = tenant_context::with_tenant(ctx_, tenant_id);
118 service::party_service svc(wf_ctx);
119 svc.save_party(req->data);
120 BOOST_LOG_SEV(party_handler_lg(), debug)
121 <<
"Workflow step completed: " << msg.
subject;
122 publish_step_completion(nats_, step_id, inst_id,
true,
123 rfl::json::write(save_party_response{.success =
true}),
"");
124 }
catch (
const std::exception& e) {
125 BOOST_LOG_SEV(party_handler_lg(), error)
126 <<
"Workflow step failed: " << msg.
subject
127 <<
" — " << e.what();
128 publish_step_completion(nats_, step_id, inst_id,
false,
"", e.what());
133 [[maybe_unused]]
const auto correlation_id =
134 log_handler_entry(party_handler_lg(), msg);
135 auto ctx_expected = ores::service::service::make_request_context(
136 ctx_, msg, verifier_);
138 error_reply(nats_, msg, ctx_expected.error());
141 const auto& ctx = *ctx_expected;
142 if (!has_permission(ctx,
"refdata::parties:write")) {
146 service::party_service svc(ctx);
147 auto req = decode<save_party_request>(msg);
149 BOOST_LOG_SEV(party_handler_lg(), warn)
150 <<
"Failed to decode: " << msg.
subject;
154 svc.save_party(req->data);
155 BOOST_LOG_SEV(party_handler_lg(), debug)
156 <<
"Completed " << msg.
subject;
157 reply(nats_, msg, save_party_response{.success =
true});
158 }
catch (
const std::exception& e) {
159 BOOST_LOG_SEV(party_handler_lg(), error)
160 << msg.
subject <<
" failed: " << e.what();
161 reply(nats_, msg, save_party_response{
162 .success =
false, .message = e.what()});
167 [[maybe_unused]]
const auto correlation_id =
168 log_handler_entry(party_handler_lg(), msg);
169 auto ctx_expected = ores::service::service::make_request_context(
170 ctx_, msg, verifier_);
172 error_reply(nats_, msg, ctx_expected.error());
175 const auto& ctx = *ctx_expected;
176 if (!has_permission(ctx,
"refdata::parties:delete")) {
180 service::party_service svc(ctx);
181 auto req = decode<delete_party_request>(msg);
183 BOOST_LOG_SEV(party_handler_lg(), warn)
184 <<
"Failed to decode: " << msg.
subject;
188 boost::uuids::string_generator gen;
189 for (
const auto& id_str : req->ids)
190 svc.remove_party(gen(id_str));
191 BOOST_LOG_SEV(party_handler_lg(), debug)
192 <<
"Completed " << msg.
subject;
193 reply(nats_, msg, delete_party_response{.success =
true});
194 }
catch (
const std::exception& e) {
195 BOOST_LOG_SEV(party_handler_lg(), error)
196 << msg.
subject <<
" failed: " << e.what();
197 reply(nats_, msg, delete_party_response{
198 .success =
false, .message = e.what()});
203 [[maybe_unused]]
const auto correlation_id =
204 log_handler_entry(party_handler_lg(), msg);
205 auto ctx_expected = ores::service::service::make_request_context(
206 ctx_, msg, verifier_);
208 error_reply(nats_, msg, ctx_expected.error());
211 const auto& ctx = *ctx_expected;
212 service::party_service svc(ctx);
213 auto req = decode<get_party_history_request>(msg);
215 BOOST_LOG_SEV(party_handler_lg(), warn)
216 <<
"Failed to decode: " << msg.
subject;
220 boost::uuids::string_generator gen;
221 auto h = svc.get_party_history(gen(req->id));
222 BOOST_LOG_SEV(party_handler_lg(), debug)
223 <<
"Completed " << msg.
subject;
224 reply(nats_, msg, get_party_history_response{
225 .success =
true, .history = std::move(h)});
226 }
catch (
const std::exception& e) {
227 BOOST_LOG_SEV(party_handler_lg(), error)
228 << msg.
subject <<
" failed: " << e.what();
229 reply(nats_, msg, get_party_history_response{
230 .success =
false, .message = e.what()});
237 std::optional<ores::security::jwt::jwt_authenticator> verifier_;
Implements logging infrastructure for ORE Studio.
Definition boost_severity.hpp:28
@ forbidden
The caller is authenticated but lacks the required permission.
Context for the operations on a postgres database.
Definition context.hpp:47
Manages tenant context for multi-tenant database operations.
Definition tenant_context.hpp:37
A received NATS message.
Definition message.hpp:40
std::string subject
The subject the message was published to.
Definition message.hpp:44
NATS client: connection, pub/sub, request/reply, and JetStream.
Definition client.hpp:73