ORE Studio 0.0.4
Loading...
Searching...
No Matches
workunit_handler.hpp
1/* -*- mode: c++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*-
2 *
3 * Copyright (C) 2026 Marco Craveiro <marco.craveiro@gmail.com>
4 *
5 * This program is free software; you can redistribute it and/or modify it under
6 * the terms of the GNU General Public License as published by the Free Software
7 * Foundation; either version 3 of the License, or (at your option) any later
8 * version.
9 *
10 * This program is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
12 * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
13 * details.
14 *
15 * You should have received a copy of the GNU General Public License along with
16 * this program; if not, write to the Free Software Foundation, Inc., 51
17 * Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18 *
19 */
20#ifndef ORES_COMPUTE_MESSAGING_WORKUNIT_HANDLER_HPP
21#define ORES_COMPUTE_MESSAGING_WORKUNIT_HANDLER_HPP
22
23#include <optional>
24#include <span>
25#include <stdexcept>
26#include <rfl/json.hpp>
27#include <boost/uuid/random_generator.hpp>
28#include <boost/uuid/uuid_io.hpp>
29#include "ores.logging/make_logger.hpp"
30#include "ores.nats/domain/message.hpp"
31#include "ores.nats/service/client.hpp"
32#include "ores.database/domain/context.hpp"
33#include "ores.security/jwt/jwt_authenticator.hpp"
34#include "ores.service/messaging/handler_helpers.hpp"
35#include "ores.service/service/request_context.hpp"
36#include "ores.compute.api/messaging/workunit_protocol.hpp"
37#include "ores.compute.api/messaging/work_protocol.hpp"
38#include "ores.compute.core/service/workunit_service.hpp"
39#include "ores.dq.api/domain/change_reason.hpp"
40#include "ores.compute.core/service/result_service.hpp"
41#include "ores.compute.core/service/app_version_service.hpp"
42
43namespace ores::compute::messaging {
44
45namespace {
46inline auto& workunit_handler_lg() {
47 static auto instance = ores::logging::make_logger(
48 "ores.compute.messaging.workunit_handler");
49 return instance;
50}
51} // namespace
52
53using ores::service::messaging::reply;
54using ores::service::messaging::error_reply;
55using ores::service::messaging::decode;
56using ores::service::messaging::stamp;
57using ores::service::messaging::error_reply;
58using ores::service::messaging::has_permission;
59using namespace ores::logging;
60
61class workunit_handler {
62public:
63 workunit_handler(ores::nats::service::client& nats,
65 std::optional<ores::security::jwt::jwt_authenticator> verifier)
66 : nats_(nats), ctx_(std::move(ctx)), verifier_(std::move(verifier)) {}
67
68 void list(ores::nats::message msg) {
69 BOOST_LOG_SEV(workunit_handler_lg(), debug)
70 << "Handling " << msg.subject;
71 auto ctx_expected = ores::service::service::make_request_context(
72 ctx_, msg, verifier_);
73 if (!ctx_expected) {
74 error_reply(nats_, msg, ctx_expected.error());
75 return;
76 }
77 const auto& ctx = *ctx_expected;
78 service::workunit_service svc(ctx);
79 list_workunits_response resp;
80 try {
81 if (auto req = decode<list_workunits_request>(msg)) {
82 resp.workunits = svc.list();
83 resp.total_available_count =
84 static_cast<int>(resp.workunits.size());
85 }
86 } catch (...) {}
87 reply(nats_, msg, resp);
88 BOOST_LOG_SEV(workunit_handler_lg(), debug)
89 << "Completed " << msg.subject;
90 }
91
92 void save(ores::nats::message msg) {
93 BOOST_LOG_SEV(workunit_handler_lg(), debug)
94 << "Handling " << msg.subject;
95 auto ctx_expected = ores::service::service::make_request_context(
96 ctx_, msg, verifier_);
97 if (!ctx_expected) {
98 error_reply(nats_, msg, ctx_expected.error());
99 return;
100 }
101 const auto& ctx = *ctx_expected;
102 if (!has_permission(ctx, "compute::batches:write")) {
103 error_reply(nats_, msg, ores::service::error_code::forbidden);
104 return;
105 }
106 if (auto req = decode<save_workunit_request>(msg)) {
107 try {
108 service::workunit_service wu_svc(ctx);
109 stamp(req->workunit, ctx);
110 wu_svc.save(req->workunit);
111
112 // Dispatcher: create target_redundancy result rows (state=2 Unsent)
113 // and publish each as a JetStream assignment event.
114 service::result_service result_svc(ctx);
115 const auto wu_id_str =
116 boost::uuids::to_string(req->workunit.id);
117 const auto av_id_str =
118 boost::uuids::to_string(req->workunit.app_version_id);
119 const auto tenant_id_str = ctx.tenant_id().to_string();
120 const auto redundancy = req->workunit.target_redundancy;
121
122 // Look up app_version once to get the package_uri.
123 service::app_version_service av_svc(ctx);
124 const auto av = av_svc.find(av_id_str);
125 const auto package_uri = av ? av->package_uri : std::string{};
126 if (!av) {
127 BOOST_LOG_SEV(workunit_handler_lg(), warn)
128 << "app_version not found: " << av_id_str
129 << " — package_uri will be empty in assignment event";
130 }
131
132 for (int i = 0; i < redundancy; ++i) {
133 domain::result r;
134 r.id = boost::uuids::random_generator()();
135 r.workunit_id = req->workunit.id;
136 r.server_state = 2; // Unsent
137 r.change_reason_code = ores::dq::domain::change_reasons::system_new_record;
138 r.change_commentary = "Created on workunit dispatch";
139 stamp(r, ctx);
140 result_svc.save(r);
141
142 const auto result_id_str = boost::uuids::to_string(r.id);
143 const auto event = work_assignment_event{
144 .result_id = result_id_str,
145 .workunit_id = wu_id_str,
146 .app_version_id = av_id_str,
147 .package_uri = package_uri,
148 .input_uri = req->workunit.input_uri,
149 .config_uri = req->workunit.config_uri,
150 .output_uri = "api/v1/compute/results/" + result_id_str + "/output"};
151 const auto json = rfl::json::write(event);
152 const auto* p =
153 reinterpret_cast<const std::byte*>(json.data());
154 nats_.js_publish(
155 "compute.v1.work.assignments." + tenant_id_str,
156 std::span<const std::byte>(p, json.size()));
157 BOOST_LOG_SEV(workunit_handler_lg(), debug)
158 << "Dispatched result " << result_id_str
159 << " for workunit " << wu_id_str;
160 }
161
162 reply(nats_, msg, save_workunit_response{.success = true});
163 } catch (const std::exception& e) {
164 reply(nats_, msg, save_workunit_response{
165 .success = false, .message = e.what()});
166 }
167 } else {
168 BOOST_LOG_SEV(workunit_handler_lg(), warn)
169 << "Failed to decode: " << msg.subject;
170 }
171 BOOST_LOG_SEV(workunit_handler_lg(), debug)
172 << "Completed " << msg.subject;
173 }
174
175private:
178 std::optional<ores::security::jwt::jwt_authenticator> verifier_;
179};
180
181} // namespace ores::compute::messaging
182
183#endif
STL namespace.
Implements logging infrastructure for ORE Studio.
Definition boost_severity.hpp:28
@ forbidden
The caller is authenticated but lacks the required permission.
Context for the operations on a postgres database.
Definition context.hpp:47
A received NATS message.
Definition message.hpp:40
std::string subject
The subject the message was published to.
Definition message.hpp:44
NATS client: connection, pub/sub, request/reply, and JetStream.
Definition client.hpp:73
void js_publish(std::string_view subject, std::span< const std::byte > data, std::unordered_map< std::string, std::string > headers={})
Publish a message to a JetStream stream.
Definition client.cpp:410