ORE Studio 0.0.4
Loading...
Searching...
No Matches
work_handler.hpp
1/* -*- mode: c++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*-
2 *
3 * Copyright (C) 2026 Marco Craveiro <marco.craveiro@gmail.com>
4 *
5 * This program is free software; you can redistribute it and/or modify it under
6 * the terms of the GNU General Public License as published by the Free Software
7 * Foundation; either version 3 of the License, or (at your option) any later
8 * version.
9 *
10 * This program is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
12 * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
13 * details.
14 *
15 * You should have received a copy of the GNU General Public License along with
16 * this program; if not, write to the Free Software Foundation, Inc., 51
17 * Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18 *
19 */
20#ifndef ORES_COMPUTE_MESSAGING_WORK_HANDLER_HPP
21#define ORES_COMPUTE_MESSAGING_WORK_HANDLER_HPP
22
23#include <array>
24#include <cstdint>
25#include <optional>
26#include <stdexcept>
27#include <chrono>
28#include <rfl/json.hpp>
29#include <boost/lexical_cast.hpp>
30#include <boost/uuid/uuid.hpp>
31#include <boost/uuid/uuid_io.hpp>
32#include "ores.logging/make_logger.hpp"
33#include "ores.nats/domain/message.hpp"
34#include "ores.nats/service/client.hpp"
35#include "ores.database/domain/context.hpp"
36#include "ores.security/jwt/jwt_authenticator.hpp"
37#include "ores.service/messaging/handler_helpers.hpp"
38#include "ores.service/service/request_context.hpp"
39#include "ores.compute.api/messaging/work_protocol.hpp"
40#include "ores.compute.core/service/host_service.hpp"
41#include "ores.dq.api/domain/change_reason.hpp"
42#include "ores.compute.core/service/result_service.hpp"
43#include "ores.compute.core/service/workunit_service.hpp"
44
45namespace ores::compute::messaging {
46
47namespace {
48inline auto& work_handler_lg() {
49 static auto instance = ores::logging::make_logger(
50 "ores.compute.messaging.work_handler");
51 return instance;
52}
53
60inline std::string make_display_name(const boost::uuids::uuid& id) {
61 static constexpr std::array adjectives = {
62 "amber", "bold", "brave", "bumbling", "calm", "clever", "daring",
63 "dazzling", "eager", "fierce", "frantic", "gentle", "graceful",
64 "happy", "humble", "idle", "jolly", "kind", "lively", "merry",
65 "nimble", "outgoing", "plucky", "quiet", "rowdy", "rusty", "swift",
66 "timid", "upbeat", "vibrant", "witty", "zany"
67 };
68 static constexpr std::array animals = {
69 "armadillo", "badger", "bison", "capybara", "caracal", "dhole",
70 "dingo", "echidna", "ermine", "falcon", "gecko", "hamster",
71 "iguana", "jackal", "koala", "lemur", "marmot", "narwhal",
72 "orca", "pangolin", "quokka", "raccoon", "salamander", "tapir",
73 "uakari", "viper", "walrus", "xerus", "yak", "zorilla"
74 };
75 const std::uint32_t seed =
76 (static_cast<std::uint32_t>(id.data[0]) << 24) |
77 (static_cast<std::uint32_t>(id.data[1]) << 16) |
78 (static_cast<std::uint32_t>(id.data[2]) << 8) |
79 static_cast<std::uint32_t>(id.data[3]);
80 return std::string(adjectives[seed % adjectives.size()]) + "-" +
81 std::string(animals[(seed >> 8) % animals.size()]);
82}
83
84} // namespace
85
86using ores::service::messaging::reply;
87using ores::service::messaging::error_reply;
88using ores::service::messaging::decode;
89using ores::service::messaging::stamp;
90using ores::service::messaging::error_reply;
91using ores::service::messaging::has_permission;
92using namespace ores::logging;
93
94class work_handler {
95public:
96 work_handler(ores::nats::service::client& nats,
98 std::optional<ores::security::jwt::jwt_authenticator> verifier)
99 : nats_(nats), ctx_(std::move(ctx)), verifier_(std::move(verifier)) {}
100
101 void pull(ores::nats::message msg) {
102 BOOST_LOG_SEV(work_handler_lg(), debug)
103 << "Handling " << msg.subject;
104 auto ctx_expected = ores::service::service::make_request_context(
105 ctx_, msg, verifier_);
106 if (!ctx_expected) {
107 error_reply(nats_, msg, ctx_expected.error());
108 return;
109 }
110 const auto& ctx = *ctx_expected;
111 if (!has_permission(ctx, "compute::batches:write")) {
112 error_reply(nats_, msg, ores::service::error_code::forbidden);
113 return;
114 }
115 if (auto req = decode<pull_work_request>(msg)) {
116 try {
117 service::result_service result_svc(ctx);
118 auto unsent = result_svc.list_by_state(2); // Unsent
119 if (unsent.empty()) {
120 reply(nats_, msg, pull_work_response{
121 .success = false,
122 .message = "No work available"});
123 BOOST_LOG_SEV(work_handler_lg(), debug)
124 << "No unsent results available";
125 return;
126 }
127
128 boost::uuids::uuid host_uuid;
129 try {
130 host_uuid = boost::lexical_cast<boost::uuids::uuid>(
131 req->host_id);
132 } catch (...) {
133 reply(nats_, msg, pull_work_response{
134 .success = false,
135 .message = "Invalid host_id: " + req->host_id});
136 return;
137 }
138
139 auto r = unsent.front();
140 r.host_id = host_uuid;
141 r.server_state = 4; // InProgress
142 r.change_reason_code = ores::dq::domain::change_reasons::system_new_record;
143 r.change_commentary = "Assigned to host on work.pull";
144 stamp(r, ctx);
145 result_svc.save(r);
146
147 service::workunit_service wu_svc(ctx);
148 const auto wu_id_str = boost::uuids::to_string(r.workunit_id);
149 const auto wu_opt = wu_svc.find(wu_id_str);
150 if (!wu_opt) {
151 reply(nats_, msg, pull_work_response{
152 .success = false,
153 .message = "Workunit not found: " + wu_id_str});
154 return;
155 }
156
157 reply(nats_, msg, pull_work_response{
158 .success = true,
159 .result_id = boost::uuids::to_string(r.id),
160 .workunit_id = wu_id_str,
161 .app_version_id = boost::uuids::to_string(
162 wu_opt->app_version_id),
163 .input_uri = wu_opt->input_uri,
164 .config_uri = wu_opt->config_uri});
165 } catch (const std::exception& e) {
166 BOOST_LOG_SEV(work_handler_lg(), error)
167 << "Pull error: " << e.what();
168 reply(nats_, msg, pull_work_response{
169 .success = false, .message = e.what()});
170 }
171 } else {
172 BOOST_LOG_SEV(work_handler_lg(), warn)
173 << "Failed to decode: " << msg.subject;
174 }
175 BOOST_LOG_SEV(work_handler_lg(), debug)
176 << "Completed " << msg.subject;
177 }
178
179 void heartbeat(ores::nats::message msg) {
180 BOOST_LOG_SEV(work_handler_lg(), debug)
181 << "Handling " << msg.subject;
182 // Heartbeats are unauthenticated fire-and-forget publishes from wrapper
183 // nodes — use the service context directly (no JWT required).
184 if (auto req = decode<heartbeat_message>(msg)) {
185 try {
186 service::host_service svc(ctx_);
187 auto existing = svc.find(req->host_id);
188 if (existing) {
189 auto h = *existing;
190 h.last_rpc_time = std::chrono::system_clock::now();
191 h.change_reason_code = ores::dq::domain::change_reasons::system_new_record;
192 stamp(h, ctx_);
193 svc.save(h);
194 } else {
195 // Auto-register the host on first heartbeat.
196 BOOST_LOG_SEV(work_handler_lg(), info)
197 << "Auto-registering new host from heartbeat: " << req->host_id;
198 domain::host h;
199 try {
200 h.id = boost::lexical_cast<boost::uuids::uuid>(req->host_id);
201 } catch (const boost::bad_lexical_cast& e) {
202 BOOST_LOG_SEV(work_handler_lg(), warn)
203 << "Invalid host_id in heartbeat: " << req->host_id
204 << " (" << e.what() << ")";
205 return;
206 }
207 h.external_id = req->host_id;
208 h.display_name = make_display_name(h.id);
209 h.last_rpc_time = std::chrono::system_clock::now();
210 h.change_reason_code = ores::dq::domain::change_reasons::system_new_record;
211 h.change_commentary = "Auto-registered on first heartbeat";
212 BOOST_LOG_SEV(work_handler_lg(), info)
213 << "Assigned display name: " << h.display_name;
214 stamp(h, ctx_);
215 svc.save(h);
216 }
217 } catch (const std::exception& e) {
218 BOOST_LOG_SEV(work_handler_lg(), error)
219 << "Heartbeat error for host " << req->host_id
220 << ": " << e.what();
221 }
222 } else {
223 BOOST_LOG_SEV(work_handler_lg(), warn)
224 << "Failed to decode: " << msg.subject;
225 }
226 BOOST_LOG_SEV(work_handler_lg(), debug)
227 << "Completed " << msg.subject;
228 }
229
230 void reap(ores::nats::message msg) {
231 BOOST_LOG_SEV(work_handler_lg(), debug)
232 << "Handling " << msg.subject;
233 // Reaper runs with service context (no per-request JWT needed).
234 static constexpr auto stale_threshold = std::chrono::minutes(5);
235 try {
236 service::result_service result_svc(ctx_);
237 service::host_service host_svc(ctx_);
238 auto in_progress = result_svc.list_by_state(4); // InProgress
239 int reaped = 0;
240 const auto now = std::chrono::system_clock::now();
241
242 for (auto& r : in_progress) {
243 if (r.host_id == boost::uuids::uuid{}) continue;
244 const auto host_id_str = boost::uuids::to_string(r.host_id);
245 const auto host_opt = host_svc.find(host_id_str);
246 if (!host_opt) continue;
247
248 const auto& last_seen = host_opt->last_rpc_time;
249 if (last_seen == std::chrono::system_clock::time_point{})
250 continue;
251 if (now - last_seen <= stale_threshold) continue;
252
253 r.host_id = boost::uuids::uuid{};
254 r.server_state = 2; // Unsent — back in the queue
255 r.change_reason_code = ores::dq::domain::change_reasons::system_new_record;
256 r.change_commentary = "Host went stale; result re-queued";
257 stamp(r, ctx_);
258 result_svc.save(r);
259 ++reaped;
260 BOOST_LOG_SEV(work_handler_lg(), info)
261 << "Reaped result " << boost::uuids::to_string(r.id)
262 << " from stale host " << host_id_str;
263 }
264
265 BOOST_LOG_SEV(work_handler_lg(), info)
266 << "Reap complete. Reaped " << reaped << " results.";
267 } catch (const std::exception& e) {
268 BOOST_LOG_SEV(work_handler_lg(), error)
269 << "Reap error: " << e.what();
270 }
271 }
272
273private:
276 std::optional<ores::security::jwt::jwt_authenticator> verifier_;
277};
278
279} // namespace ores::compute::messaging
280
281#endif
STL namespace.
Implements logging infrastructure for ORE Studio.
Definition boost_severity.hpp:28
@ forbidden
The caller is authenticated but lacks the required permission.
Context for the operations on a postgres database.
Definition context.hpp:47
A received NATS message.
Definition message.hpp:40
std::string subject
The subject the message was published to.
Definition message.hpp:44
NATS client: connection, pub/sub, request/reply, and JetStream.
Definition client.hpp:73