20#ifndef ORES_COMPUTE_MESSAGING_WORK_HANDLER_HPP
21#define ORES_COMPUTE_MESSAGING_WORK_HANDLER_HPP
28#include <rfl/json.hpp>
29#include <boost/lexical_cast.hpp>
30#include <boost/uuid/uuid.hpp>
31#include <boost/uuid/uuid_io.hpp>
32#include "ores.logging/make_logger.hpp"
33#include "ores.nats/domain/message.hpp"
34#include "ores.nats/service/client.hpp"
35#include "ores.database/domain/context.hpp"
36#include "ores.security/jwt/jwt_authenticator.hpp"
37#include "ores.service/messaging/handler_helpers.hpp"
38#include "ores.service/service/request_context.hpp"
39#include "ores.compute.api/messaging/work_protocol.hpp"
40#include "ores.compute.core/service/host_service.hpp"
41#include "ores.dq.api/domain/change_reason.hpp"
42#include "ores.compute.core/service/result_service.hpp"
43#include "ores.compute.core/service/workunit_service.hpp"
45namespace ores::compute::messaging {
48inline auto& work_handler_lg() {
49 static auto instance = ores::logging::make_logger(
50 "ores.compute.messaging.work_handler");
60inline std::string make_display_name(
const boost::uuids::uuid&
id) {
61 static constexpr std::array adjectives = {
62 "amber",
"bold",
"brave",
"bumbling",
"calm",
"clever",
"daring",
63 "dazzling",
"eager",
"fierce",
"frantic",
"gentle",
"graceful",
64 "happy",
"humble",
"idle",
"jolly",
"kind",
"lively",
"merry",
65 "nimble",
"outgoing",
"plucky",
"quiet",
"rowdy",
"rusty",
"swift",
66 "timid",
"upbeat",
"vibrant",
"witty",
"zany"
68 static constexpr std::array animals = {
69 "armadillo",
"badger",
"bison",
"capybara",
"caracal",
"dhole",
70 "dingo",
"echidna",
"ermine",
"falcon",
"gecko",
"hamster",
71 "iguana",
"jackal",
"koala",
"lemur",
"marmot",
"narwhal",
72 "orca",
"pangolin",
"quokka",
"raccoon",
"salamander",
"tapir",
73 "uakari",
"viper",
"walrus",
"xerus",
"yak",
"zorilla"
75 const std::uint32_t seed =
76 (
static_cast<std::uint32_t
>(
id.data[0]) << 24) |
77 (
static_cast<std::uint32_t
>(
id.data[1]) << 16) |
78 (
static_cast<std::uint32_t
>(
id.data[2]) << 8) |
79 static_cast<std::uint32_t
>(
id.data[3]);
80 return std::string(adjectives[seed % adjectives.size()]) +
"-" +
81 std::string(animals[(seed >> 8) % animals.size()]);
86using ores::service::messaging::reply;
87using ores::service::messaging::error_reply;
88using ores::service::messaging::decode;
89using ores::service::messaging::stamp;
90using ores::service::messaging::error_reply;
91using ores::service::messaging::has_permission;
98 std::optional<ores::security::jwt::jwt_authenticator> verifier)
99 : nats_(nats), ctx_(
std::move(ctx)), verifier_(
std::move(verifier)) {}
102 BOOST_LOG_SEV(work_handler_lg(), debug)
104 auto ctx_expected = ores::service::service::make_request_context(
105 ctx_, msg, verifier_);
107 error_reply(nats_, msg, ctx_expected.error());
110 const auto& ctx = *ctx_expected;
111 if (!has_permission(ctx,
"compute::batches:write")) {
115 if (
auto req = decode<pull_work_request>(msg)) {
117 service::result_service result_svc(ctx);
118 auto unsent = result_svc.list_by_state(2);
119 if (unsent.empty()) {
120 reply(nats_, msg, pull_work_response{
122 .message =
"No work available"});
123 BOOST_LOG_SEV(work_handler_lg(), debug)
124 <<
"No unsent results available";
128 boost::uuids::uuid host_uuid;
130 host_uuid = boost::lexical_cast<boost::uuids::uuid>(
133 reply(nats_, msg, pull_work_response{
135 .message =
"Invalid host_id: " + req->host_id});
139 auto r = unsent.front();
140 r.host_id = host_uuid;
142 r.change_reason_code = ores::dq::domain::change_reasons::system_new_record;
143 r.change_commentary =
"Assigned to host on work.pull";
147 service::workunit_service wu_svc(ctx);
148 const auto wu_id_str = boost::uuids::to_string(r.workunit_id);
149 const auto wu_opt = wu_svc.find(wu_id_str);
151 reply(nats_, msg, pull_work_response{
153 .message =
"Workunit not found: " + wu_id_str});
157 reply(nats_, msg, pull_work_response{
159 .result_id = boost::uuids::to_string(r.id),
160 .workunit_id = wu_id_str,
161 .app_version_id = boost::uuids::to_string(
162 wu_opt->app_version_id),
163 .input_uri = wu_opt->input_uri,
164 .config_uri = wu_opt->config_uri});
165 }
catch (
const std::exception& e) {
166 BOOST_LOG_SEV(work_handler_lg(), error)
167 <<
"Pull error: " << e.what();
168 reply(nats_, msg, pull_work_response{
169 .success =
false, .message = e.what()});
172 BOOST_LOG_SEV(work_handler_lg(), warn)
173 <<
"Failed to decode: " << msg.
subject;
175 BOOST_LOG_SEV(work_handler_lg(), debug)
176 <<
"Completed " << msg.
subject;
180 BOOST_LOG_SEV(work_handler_lg(), debug)
184 if (
auto req = decode<heartbeat_message>(msg)) {
186 service::host_service svc(ctx_);
187 auto existing = svc.find(req->host_id);
190 h.last_rpc_time = std::chrono::system_clock::now();
191 h.change_reason_code = ores::dq::domain::change_reasons::system_new_record;
196 BOOST_LOG_SEV(work_handler_lg(), info)
197 <<
"Auto-registering new host from heartbeat: " << req->host_id;
200 h.id = boost::lexical_cast<boost::uuids::uuid>(req->host_id);
201 }
catch (
const boost::bad_lexical_cast& e) {
202 BOOST_LOG_SEV(work_handler_lg(), warn)
203 <<
"Invalid host_id in heartbeat: " << req->host_id
204 <<
" (" << e.what() <<
")";
207 h.external_id = req->host_id;
208 h.display_name = make_display_name(h.id);
209 h.last_rpc_time = std::chrono::system_clock::now();
210 h.change_reason_code = ores::dq::domain::change_reasons::system_new_record;
211 h.change_commentary =
"Auto-registered on first heartbeat";
212 BOOST_LOG_SEV(work_handler_lg(), info)
213 <<
"Assigned display name: " << h.display_name;
217 }
catch (
const std::exception& e) {
218 BOOST_LOG_SEV(work_handler_lg(), error)
219 <<
"Heartbeat error for host " << req->host_id
223 BOOST_LOG_SEV(work_handler_lg(), warn)
224 <<
"Failed to decode: " << msg.
subject;
226 BOOST_LOG_SEV(work_handler_lg(), debug)
227 <<
"Completed " << msg.
subject;
231 BOOST_LOG_SEV(work_handler_lg(), debug)
234 static constexpr auto stale_threshold = std::chrono::minutes(5);
236 service::result_service result_svc(ctx_);
237 service::host_service host_svc(ctx_);
238 auto in_progress = result_svc.list_by_state(4);
240 const auto now = std::chrono::system_clock::now();
242 for (
auto& r : in_progress) {
243 if (r.host_id == boost::uuids::uuid{})
continue;
244 const auto host_id_str = boost::uuids::to_string(r.host_id);
245 const auto host_opt = host_svc.find(host_id_str);
246 if (!host_opt)
continue;
248 const auto& last_seen = host_opt->last_rpc_time;
249 if (last_seen == std::chrono::system_clock::time_point{})
251 if (now - last_seen <= stale_threshold)
continue;
253 r.host_id = boost::uuids::uuid{};
255 r.change_reason_code = ores::dq::domain::change_reasons::system_new_record;
256 r.change_commentary =
"Host went stale; result re-queued";
260 BOOST_LOG_SEV(work_handler_lg(), info)
261 <<
"Reaped result " << boost::uuids::to_string(r.id)
262 <<
" from stale host " << host_id_str;
265 BOOST_LOG_SEV(work_handler_lg(), info)
266 <<
"Reap complete. Reaped " << reaped <<
" results.";
267 }
catch (
const std::exception& e) {
268 BOOST_LOG_SEV(work_handler_lg(), error)
269 <<
"Reap error: " << e.what();
276 std::optional<ores::security::jwt::jwt_authenticator> verifier_;
Implements logging infrastructure for ORE Studio.
Definition boost_severity.hpp:28
@ forbidden
The caller is authenticated but lacks the required permission.
Context for the operations on a postgres database.
Definition context.hpp:47
A received NATS message.
Definition message.hpp:40
std::string subject
The subject the message was published to.
Definition message.hpp:44
NATS client: connection, pub/sub, request/reply, and JetStream.
Definition client.hpp:73