ORE Studio 0.0.4
Loading...
Searching...
No Matches
party_handler.hpp
1/* -*- mode: c++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*-
2 *
3 * Copyright (C) 2026 Marco Craveiro <marco.craveiro@gmail.com>
4 *
5 * This program is free software; you can redistribute it and/or modify it under
6 * the terms of the GNU General Public License as published by the Free Software
7 * Foundation; either version 3 of the License, or (at your option) any later
8 * version.
9 *
10 * This program is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
12 * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
13 * details.
14 *
15 * You should have received a copy of the GNU General Public License along with
16 * this program; if not, write to the Free Software Foundation, Inc., 51
17 * Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18 *
19 */
20#ifndef ORES_REFDATA_CORE_MESSAGING_PARTY_HANDLER_HPP
21#define ORES_REFDATA_CORE_MESSAGING_PARTY_HANDLER_HPP
22
23#include <optional>
24#include <boost/uuid/string_generator.hpp>
25#include "ores.logging/make_logger.hpp"
26#include "ores.nats/domain/message.hpp"
27#include "ores.nats/service/client.hpp"
28#include "ores.database/domain/context.hpp"
29#include "ores.database/service/tenant_context.hpp"
30#include "ores.security/jwt/jwt_authenticator.hpp"
31#include "ores.service/messaging/handler_helpers.hpp"
32#include "ores.service/messaging/workflow_helpers.hpp"
33#include "ores.service/service/request_context.hpp"
34#include "ores.refdata.api/messaging/party_protocol.hpp"
35#include "ores.refdata.core/service/party_service.hpp"
36
37namespace ores::refdata::messaging {
38
39namespace {
40inline auto& party_handler_lg() {
41 static auto instance = ores::logging::make_logger(
42 "ores.refdata.messaging.party_handler");
43 return instance;
44}
45} // namespace
46
47using ores::service::messaging::reply;
48using ores::service::messaging::decode;
49using ores::service::messaging::error_reply;
50using ores::service::messaging::has_permission;
51using ores::service::messaging::log_handler_entry;
52using namespace ores::logging;
53
54class party_handler {
55public:
56 party_handler(ores::nats::service::client& nats,
58 std::optional<ores::security::jwt::jwt_authenticator> verifier)
59 : nats_(nats), ctx_(std::move(ctx)), verifier_(std::move(verifier)) {}
60
61 void list(ores::nats::message msg) {
62 [[maybe_unused]] const auto correlation_id =
63 log_handler_entry(party_handler_lg(), msg);
64 auto ctx_expected = ores::service::service::make_request_context(
65 ctx_, msg, verifier_);
66 if (!ctx_expected) {
67 error_reply(nats_, msg, ctx_expected.error());
68 return;
69 }
70 const auto& ctx = *ctx_expected;
71 service::party_service svc(ctx);
72 get_parties_response resp;
73 auto req = decode<get_parties_request>(msg);
74 if (!req) {
75 BOOST_LOG_SEV(party_handler_lg(), warn)
76 << "Failed to decode: " << msg.subject;
77 reply(nats_, msg, resp);
78 return;
79 }
80 try {
81 resp.parties = svc.list_parties(
82 static_cast<std::uint32_t>(req->offset),
83 static_cast<std::uint32_t>(req->limit));
84 resp.total_available_count =
85 static_cast<int>(svc.count_parties());
86 BOOST_LOG_SEV(party_handler_lg(), debug)
87 << "Completed " << msg.subject;
88 } catch (const std::exception& e) {
89 BOOST_LOG_SEV(party_handler_lg(), error)
90 << msg.subject << " failed: " << e.what();
91 }
92 reply(nats_, msg, resp);
93 }
94
95 void save(ores::nats::message msg) {
96 using ores::service::messaging::is_workflow_command;
97 using ores::service::messaging::extract_workflow_header;
98 using ores::service::messaging::publish_step_completion;
99 using ores::service::messaging::workflow_step_id_header;
100 using ores::service::messaging::workflow_instance_id_header;
101 using ores::service::messaging::workflow_tenant_id_header;
102
103 // Workflow step command: bypass JWT auth; use X-Tenant-Id for context.
104 if (is_workflow_command(msg)) {
105 const auto step_id = extract_workflow_header(msg, workflow_step_id_header);
106 const auto inst_id = extract_workflow_header(msg, workflow_instance_id_header);
107 const auto tenant_id = extract_workflow_header(msg, workflow_tenant_id_header);
108
109 auto req = decode<save_party_request>(msg);
110 if (!req) {
111 publish_step_completion(nats_, step_id, inst_id, false, "",
112 "Failed to decode save_party_request");
113 return;
114 }
115 try {
117 auto wf_ctx = tenant_context::with_tenant(ctx_, tenant_id);
118 service::party_service svc(wf_ctx);
119 svc.save_party(req->data);
120 BOOST_LOG_SEV(party_handler_lg(), debug)
121 << "Workflow step completed: " << msg.subject;
122 publish_step_completion(nats_, step_id, inst_id, true,
123 rfl::json::write(save_party_response{.success = true}), "");
124 } catch (const std::exception& e) {
125 BOOST_LOG_SEV(party_handler_lg(), error)
126 << "Workflow step failed: " << msg.subject
127 << " — " << e.what();
128 publish_step_completion(nats_, step_id, inst_id, false, "", e.what());
129 }
130 return;
131 }
132
133 [[maybe_unused]] const auto correlation_id =
134 log_handler_entry(party_handler_lg(), msg);
135 auto ctx_expected = ores::service::service::make_request_context(
136 ctx_, msg, verifier_);
137 if (!ctx_expected) {
138 error_reply(nats_, msg, ctx_expected.error());
139 return;
140 }
141 const auto& ctx = *ctx_expected;
142 if (!has_permission(ctx, "refdata::parties:write")) {
143 error_reply(nats_, msg, ores::service::error_code::forbidden);
144 return;
145 }
146 service::party_service svc(ctx);
147 auto req = decode<save_party_request>(msg);
148 if (!req) {
149 BOOST_LOG_SEV(party_handler_lg(), warn)
150 << "Failed to decode: " << msg.subject;
151 return;
152 }
153 try {
154 svc.save_party(req->data);
155 BOOST_LOG_SEV(party_handler_lg(), debug)
156 << "Completed " << msg.subject;
157 reply(nats_, msg, save_party_response{.success = true});
158 } catch (const std::exception& e) {
159 BOOST_LOG_SEV(party_handler_lg(), error)
160 << msg.subject << " failed: " << e.what();
161 reply(nats_, msg, save_party_response{
162 .success = false, .message = e.what()});
163 }
164 }
165
166 void remove(ores::nats::message msg) {
167 [[maybe_unused]] const auto correlation_id =
168 log_handler_entry(party_handler_lg(), msg);
169 auto ctx_expected = ores::service::service::make_request_context(
170 ctx_, msg, verifier_);
171 if (!ctx_expected) {
172 error_reply(nats_, msg, ctx_expected.error());
173 return;
174 }
175 const auto& ctx = *ctx_expected;
176 if (!has_permission(ctx, "refdata::parties:delete")) {
177 error_reply(nats_, msg, ores::service::error_code::forbidden);
178 return;
179 }
180 service::party_service svc(ctx);
181 auto req = decode<delete_party_request>(msg);
182 if (!req) {
183 BOOST_LOG_SEV(party_handler_lg(), warn)
184 << "Failed to decode: " << msg.subject;
185 return;
186 }
187 try {
188 boost::uuids::string_generator gen;
189 for (const auto& id_str : req->ids)
190 svc.remove_party(gen(id_str));
191 BOOST_LOG_SEV(party_handler_lg(), debug)
192 << "Completed " << msg.subject;
193 reply(nats_, msg, delete_party_response{.success = true});
194 } catch (const std::exception& e) {
195 BOOST_LOG_SEV(party_handler_lg(), error)
196 << msg.subject << " failed: " << e.what();
197 reply(nats_, msg, delete_party_response{
198 .success = false, .message = e.what()});
199 }
200 }
201
202 void history(ores::nats::message msg) {
203 [[maybe_unused]] const auto correlation_id =
204 log_handler_entry(party_handler_lg(), msg);
205 auto ctx_expected = ores::service::service::make_request_context(
206 ctx_, msg, verifier_);
207 if (!ctx_expected) {
208 error_reply(nats_, msg, ctx_expected.error());
209 return;
210 }
211 const auto& ctx = *ctx_expected;
212 service::party_service svc(ctx);
213 auto req = decode<get_party_history_request>(msg);
214 if (!req) {
215 BOOST_LOG_SEV(party_handler_lg(), warn)
216 << "Failed to decode: " << msg.subject;
217 return;
218 }
219 try {
220 boost::uuids::string_generator gen;
221 auto h = svc.get_party_history(gen(req->id));
222 BOOST_LOG_SEV(party_handler_lg(), debug)
223 << "Completed " << msg.subject;
224 reply(nats_, msg, get_party_history_response{
225 .success = true, .history = std::move(h)});
226 } catch (const std::exception& e) {
227 BOOST_LOG_SEV(party_handler_lg(), error)
228 << msg.subject << " failed: " << e.what();
229 reply(nats_, msg, get_party_history_response{
230 .success = false, .message = e.what()});
231 }
232 }
233
234private:
237 std::optional<ores::security::jwt::jwt_authenticator> verifier_;
238};
239
240} // namespace ores::refdata::messaging
241#endif
STL namespace.
Implements logging infrastructure for ORE Studio.
Definition boost_severity.hpp:28
@ forbidden
The caller is authenticated but lacks the required permission.
Context for the operations on a postgres database.
Definition context.hpp:47
Manages tenant context for multi-tenant database operations.
Definition tenant_context.hpp:37
A received NATS message.
Definition message.hpp:40
std::string subject
The subject the message was published to.
Definition message.hpp:44
NATS client: connection, pub/sub, request/reply, and JetStream.
Definition client.hpp:73