Workflow Monitor — Design & Implementation Plan

Table of Contents

Overview

Add a generic, data-driven Workflow Monitor to OreStudio so that users can observe the status and progress of any workflow execution in real time. The immediate motivation is the ORE import workflow: after submitting an import the user currently receives a workflow instance ID with no way to track what happened. The solution must be generic — any workflow type (party provisioning, report execution, ORE import, …) is rendered by the same UI without type-specific code.

The monitor lives in a new ores.qt.workflow plugin with its own top-level &Workflows menu, since workflows are a cross-cutting concern that does not belong in any existing domain plugin.

Goals

  • Push events from the workflow engine so the UI is notified without polling.
  • Stale-indicator + reload-button pattern matching all other entity list windows.
  • Auto-refresh option following the ServiceDashboard pattern.
  • Two-pane workflow view: dashboard summary (counts + recent failures) and a searchable execution list.
  • Badge-based status rendering matching colour constants used everywhere else.
  • Standard toolbar icons from the icon registry defined in Qt Entity Creator Skill.
  • Step-level detail dialog with its own status badges.
  • Phase-2 scope (not in this branch): throughput chart, pause/resume, retry, pagination, settings tab.

Non-Goals

  • Workflow-type-specific rendering (every field is generic).
  • Workflow creation / editing (read-only monitor only in Phase 1).
  • Moving any existing functionality out of the compute plugin.

Architecture

Server side
-----------
workflow_engine.cpp           publish entity_change_event on every state transition
workflow_query_handler.cpp    new: NATS req/reply for list-instances and get-steps
registrar.cpp                 register new handlers

ores.workflow.api / eventing
  workflow_instance_changed_event.hpp    event_traits specialisation

ores.workflow.api / messaging
  workflow_query_protocol.hpp            list-instances req/resp, get-steps req/resp

Client side (ores.qt.workflow plugin)
--------------------------------------
WorkflowPlugin.hpp/cpp          load_order=450, creates Workflows top-level menu
WorkflowController.hpp/cpp      subscribeToEvent, owns MDI window lifecycle
WorkflowMdiWindow.hpp/cpp       QTabWidget: Dashboard tab + Execution List tab
WorkflowInstanceDetailDialog    read-only step list dialog

Event bus wiring

The workflow engine publishes an entity_change_event (same struct used by all other services) to subject "ores.workflow.workflow_instance_changed" on every instance state transition. ClientManager::subscribeToEvent subscribes to that NATS subject; ClientManager::notificationReceived is then emitted on the Qt thread. WorkflowController::onNotificationReceived filters by event type and calls listWindow_->markAsStale(). This is identical to the pattern used by ReportDefinitionController.

Work Packages

WP-1 New eventing event type

File: projects/ores.workflow.api/include/ores.workflow.api/eventing/workflow_instance_changed_event.hpp

namespace ores::workflow::eventing {

struct workflow_instance_changed_event final {
    std::chrono::system_clock::time_point timestamp;
    std::vector<std::string> ids;   // changed instance UUIDs
    std::string tenant_id;
};

}  // ores::workflow::eventing

namespace ores::eventing::domain {

template<>
struct event_traits<ores::workflow::eventing::workflow_instance_changed_event> {
    static constexpr std::string_view name =
        "ores.workflow.workflow_instance_changed";
};

}  // ores::eventing::domain

This mirrors report_definition_changed_event.hpp exactly.

WP-2 Query protocol

File: projects/ores.workflow.api/include/ores.workflow.api/messaging/workflow_query_protocol.hpp

New request/response structs (authenticated, JWT required, tenant-scoped):

// --- List instances ---

struct workflow_instance_summary {
    std::string id;
    std::string type;              // e.g. "ore_import_workflow"
    std::string status;            // in_progress | completed | failed |
                                   // compensating | compensated
    int current_step_index = 0;
    int step_count = 0;
    std::string correlation_id;
    std::string created_by;
    std::string created_at;        // ISO-8601 UTC string
    std::optional<std::string> completed_at;
    std::string error;
};

struct list_workflow_instances_request {
    using response_type = list_workflow_instances_response;
    static constexpr std::string_view nats_subject =
        "workflow.v1.instances.list";
    int limit = 200;
    std::optional<std::string> status_filter;  // empty = all
};

struct list_workflow_instances_response {
    bool success = false;
    std::string message;
    std::vector<workflow_instance_summary> instances;
};

// --- Get steps ---

struct workflow_step_summary {
    std::string id;
    std::string name;
    std::string status;            // pending | in_progress | completed | failed
    int step_index = 0;
    std::string created_at;
    std::optional<std::string> started_at;
    std::optional<std::string> completed_at;
    std::string error;
};

struct get_workflow_steps_request {
    using response_type = get_workflow_steps_response;
    static constexpr std::string_view nats_subject =
        "workflow.v1.instances.steps";
    std::string workflow_instance_id;
};

struct get_workflow_steps_response {
    bool success = false;
    std::string message;
    std::vector<workflow_step_summary> steps;
};

WP-3 Workflow engine: publish status events

File: projects/ores.workflow/src/service/workflow_engine.cpp

Add a private helper:

void workflow_engine::publish_status_event(
    const boost::uuids::uuid& instance_id,
    const boost::uuids::uuid& tenant_id) {

    using ev = ores::eventing::domain::entity_change_event;
    ev e;
    e.entity     = "ores.workflow.workflow_instance";
    e.timestamp  = std::chrono::system_clock::now();
    e.entity_ids = { boost::uuids::to_string(instance_id) };
    e.tenant_id  = boost::uuids::to_string(tenant_id);

    const auto json = rfl::json::write(e);
    const auto data = std::as_bytes(std::span{json.data(), json.size()});
    try {
        nats_.publish("ores.workflow.workflow_instance_changed", data, {});
    } catch (const std::exception& ex) {
        BOOST_LOG_SEV(lg(), warn)
            << "Failed to publish status event: " << ex.what();
    }
}

Call publish_status_event at each state-change point in the engine:

Location in engine Trigger
on_start_workflow() after first step dispatch workflow started
dispatch_next_step() after each step dispatch step advanced
dispatch_next_step() when all steps complete workflow completed
begin_compensation() when compensation starts workflow failed → compensating
check_compensation_complete() when all done compensation complete

Required additional includes in workflow_engine.cpp:

#include <span>
#include "ores.eventing/domain/entity_change_event.hpp"

WP-4 Query handler

New files:

  • projects/ores.workflow/include/ores.workflow/messaging/workflow_query_handler.hpp
  • projects/ores.workflow/src/messaging/workflow_query_handler.cpp

The handler validates the JWT in each incoming message (using service::make_request_context, same pattern as workflow_handler.cpp), then queries the database using the tenant context for RLS isolation.

Methods:

class workflow_query_handler {
public:
    workflow_query_handler(ores::nats::service::client& nats,
                           ores::database::context ctx,
                           ores::security::jwt::jwt_authenticator signer);

    void list_instances(ores::nats::message msg);
    void get_steps(ores::nats::message msg);

private:
    ores::nats::service::client& nats_;
    ores::database::context ctx_;
    ores::security::jwt::jwt_authenticator signer_;
    repository::workflow_instance_repository instance_repo_;
    repository::workflow_step_repository step_repo_;
};

list_instances implementation sketch:

  1. Validate JWT → req_ctx.
  2. Check permission "workflow::instances:read" (add to IAM if missing).
  3. Decode list_workflow_instances_request from message body.
  4. Call instance_repo_.find_by_tenant(ctx_, req_ctx.tenant_id(), req.limit) (add this repository method if it doesn't exist).
  5. Map domain objects → workflow_instance_summary (convert timestamps to ISO strings via std::format("{:%FT%T}Z", tp) or rfl chrono adapter).
  6. Reply with list_workflow_instances_response.

get_steps implementation sketch:

  1. Validate JWT → req_ctx.
  2. Decode get_workflow_steps_request.
  3. Load instance, verify instance.tenant_id = req_ctx.tenant_id()= (RLS guard).
  4. Call step_repo_.find_by_workflow_id(ctx_, instance_id).
  5. Map → workflow_step_summary structs.
  6. Reply with get_workflow_steps_response.

If the instance repository lacks find_by_tenant, add it to:

  • projects/ores.workflow/include/ores.workflow/repository/workflow_instance_repository.hpp
  • projects/ores.workflow/src/repository/workflow_instance_repository.cpp

WP-5 Registrar: wire new handlers

File: projects/ores.workflow/src/messaging/registrar.cpp

After the engine is created, add:

auto qh = std::make_shared<messaging::workflow_query_handler>(
    nats, ctx, signer);

subs.push_back(nats.queue_subscribe(
    messaging::list_workflow_instances_request::nats_subject, qg,
    [qh](ores::nats::message msg) {
        qh->list_instances(std::move(msg));
    }));

subs.push_back(nats.queue_subscribe(
    messaging::get_workflow_steps_request::nats_subject, qg,
    [qh](ores::nats::message msg) {
        qh->get_steps(std::move(msg));
    }));

Note: qh captures a copy-by-value shared_ptr so it is safe to outlive the registrar frame; same pattern as the existing handlers.

WP-6 New plugin directory: ores.qt.workflow

projects/ores.qt.workflow/
├── CMakeLists.txt
├── include/
│   └── ores.qt/
│       ├── WorkflowPlugin.hpp
│       ├── WorkflowController.hpp
│       ├── WorkflowMdiWindow.hpp
│       └── WorkflowInstanceDetailDialog.hpp
└── src/
    ├── CMakeLists.txt
    ├── WorkflowPlugin.cpp
    ├── WorkflowController.cpp
    ├── WorkflowMdiWindow.cpp
    └── WorkflowInstanceDetailDialog.cpp

Top-level CMakeLists.txt:

add_subdirectory(src)

src/CMakeLists.txt (mirrors ores.qt.compute pattern):

set(name "ores.qt.workflow")
set(lib_binary_name ${name})
set(lib_target_name ${name}.lib)
set(ORES_QT_WORKFLOW_DIR ${CMAKE_SOURCE_DIR}/projects/ores.qt.workflow)

set(CMAKE_AUTOMOC ON)

file(GLOB_RECURSE files RELATIVE "${CMAKE_CURRENT_SOURCE_DIR}/"
    "${CMAKE_CURRENT_SOURCE_DIR}/*.cpp")
file(GLOB_RECURSE HEADERS "${ORES_QT_WORKFLOW_DIR}/include/*.hpp")

add_library(${lib_target_name} SHARED ${files} ${HEADERS})

set_target_properties(${lib_target_name} PROPERTIES
    OUTPUT_NAME          ${lib_binary_name}
    VERSION              ${PROJECT_VERSION}
    SOVERSION            ${PROJECT_VERSION_MAJOR}
    LIBRARY_OUTPUT_DIRECTORY "${ORES_PLUGIN_OUTPUT_DIRECTORY}"
    INSTALL_RPATH        "$ORIGIN/../lib:$ORIGIN"
    BUILD_RPATH          "${CMAKE_LIBRARY_OUTPUT_DIRECTORY}")

target_include_directories(${lib_target_name}
    PUBLIC  ${ORES_QT_WORKFLOW_DIR}/include
    PRIVATE "${CMAKE_CURRENT_BINARY_DIR}/${lib_target_name}_autogen/include"
            "${CMAKE_SOURCE_DIR}/projects/ores.qt/include")

target_link_libraries(${lib_target_name}
    PUBLIC
        ores.qt.api.lib
        ores.workflow.api.lib
        ores.eventing.lib
    PRIVATE
        ores.qt.lib)

install(TARGETS ${lib_target_name}
    LIBRARY DESTINATION bin
    RUNTIME DESTINATION bin)

Register in projects/CMakeLists.txt after line 98 (ores.qt.trading):

add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/ores.qt.workflow)

WP-7 WorkflowPlugin

load_order = 450 — inserts the &Workflows menu after the Compute menu (400) and before the &System menu added by the host.

// WorkflowPlugin.hpp
class WorkflowPlugin : public PluginBase {
    Q_OBJECT
    Q_PLUGIN_METADATA(IID "ores.qt.IPlugin/1.0")
    Q_INTERFACES(ores::qt::IPlugin)
public:
    explicit WorkflowPlugin(QObject* parent = nullptr);
    ~WorkflowPlugin() override;

    QString name() const override { return QStringLiteral("ores.qt.workflow"); }
    int load_order() const override { return 450; }

    void on_login(const plugin_context& ctx) override;
    QList<QMenu*> create_menus() override;
    void on_logout() override;

private:
    plugin_context ctx_;
    std::unique_ptr<WorkflowController> controller_;
};

create_menus() returns one QMenu* ("&Workflows") with two actions:

Action Icon Shortcut
&Dashboard Icon::Chart  
&Execution List Icon::TasksApp  

Both actions call the appropriate WorkflowController::show*() methods. on_login creates the controller and wires connectControllerSignals. on_logout resets the controller.

WP-8 WorkflowController

class WorkflowController : public EntityController {
    Q_OBJECT
public:
    explicit WorkflowController(QMainWindow* mainWindow,
                                QMdiArea* mdiArea,
                                ClientManager* clientManager,
                                const QString& username,
                                QObject* parent = nullptr);
    ~WorkflowController() override;

public slots:
    void showDashboardWindow();
    void showListWindow();

private slots:
    void onNotificationReceived(const QString& eventType,
                                const QDateTime& timestamp,
                                const QStringList& entityIds,
                                const QString& tenantId);

private:
    ClientManager* clientManager_;
    QString username_;
    QPointer<WorkflowMdiWindow> mdiWindow_;
};

Constructor subscribes to the event name given by eventing::domain::event_traits<workflow::eventing::workflow_instance_changed_event>::name using the same pattern as ReportDefinitionController:

constexpr std::string_view workflow_event_name =
    eventing::domain::event_traits<
        workflow::eventing::workflow_instance_changed_event>::name;

// In constructor:
connect(clientManager_, &ClientManager::notificationReceived,
        this, &WorkflowController::onNotificationReceived);

auto subscribeAll = [self = QPointer<WorkflowController>(this)]() {
    if (!self) return;
    self->clientManager_->subscribeToEvent(std::string{workflow_event_name});
};
connect(clientManager_, &ClientManager::loggedIn,    this, subscribeAll);
connect(clientManager_, &ClientManager::reconnected, this, subscribeAll);
if (clientManager_->isConnected()) subscribeAll();

onNotificationReceived filters by event type and calls mdiWindow_->markAsStale() if the window is open.

showListWindow() / showDashboardWindow() create WorkflowMdiWindow (with the appropriate tab selected) using the existing try_reuse_window pattern.

WP-9 WorkflowMdiWindow

A single EntityListMdiWindow subclass using a QTabWidget with two tabs:

Tab 1 — Dashboard

Three summary cards (QGroupBox with a large QLabel):

  • Active (in_progress count) — amber border
  • Completed (today) — green border
  • Failed (today) — red border

Below the cards: a QTableWidget labelled "Recent Failures" with columns:

Instance ID (truncated) Workflow Type Occurred Actions

Actions per row: [VIEW] (opens detail dialog).

The counts and failures table are populated from the data fetched by doReload() (the same list-instances response used by Tab 2, filtered client-side).

Tab 2 — Execution List

Layout: search bar + filter controls on top, table below, bulk-action row at bottom.

Search / filter bar:

  • QLineEdit "Search by instance ID or correlation ID…"
  • QComboBox Status filter: All | Running | Completed | Failed | Compensating
  • QComboBox Type filter: All | (populated from distinct types in loaded data)

Table columns:

Instance ID Workflow Type Status Progress Started Duration Actions
(truncated) human name badge "2 / 4" HH:MM:SS 00:45s [VIEW]
  • Status column uses the same BadgeDelegate + make_badge_item pattern as ServiceDashboardMdiWindow.
  • Status → colour mapping:

    Status Color
    in_progress color_constants::level_warn (amber)
    completed color_constants::level_info (green)
    failed color_constants::level_error (red)
    compensating color_constants::level_warn (amber)
    compensated color_constants::level_debug (blue)
  • [VIEW] button in the Actions column opens WorkflowInstanceDetailDialog.
  • Double-click on row also opens detail dialog.
  • Table supports multi-select; bulk actions (Phase 2) appear in a bar below.

Toolbar

Button Icon Notes
Reload Icon::ArrowClockwise Calls EntityListMdiWindow::reload(); stale indicator wired via initializeStaleIndicator
Auto-Refresh Icon::ArrowSync Checkable; interval dialog on enable; QTimer follows ServiceDashboard pattern
─ separator ─    

doReload() implementation

void WorkflowMdiWindow::doReload() {
    emit statusChanged(tr("Loading workflows…"));
    using Request  = workflow::messaging::list_workflow_instances_request;
    using Response = workflow::messaging::list_workflow_instances_response;

    auto* watcher = new QFutureWatcher<Response>(this);
    connect(watcher, &QFutureWatcher<Response>::finished,
            this, &WorkflowMdiWindow::onDataLoaded);

    auto* cm = clientManager_;
    watcher->setFuture(QtConcurrent::run([cm]() -> Response {
        Request req;
        req.limit = 200;
        auto result = cm->process_authenticated_request(req, std::chrono::seconds(30));
        if (!result)
            return Response{.success = false, .message = result.error()};
        return *result;
    }));
}

onDataLoaded populates both tabs from the same instances vector (no second request needed for the dashboard summary counts).

Auto-refresh

Follows ServiceDashboardMdiWindow exactly:

// Member: QTimer* autoRefreshTimer_  +  QAction* autoRefreshAction_

void WorkflowMdiWindow::onRefreshToggled(bool checked) {
    if (checked) {
        int secs = QInputDialog::getInt(this, tr("Auto-Refresh"),
            tr("Refresh interval (seconds):"), 30, 5, 3600);
        autoRefreshTimer_->setInterval(secs * 1000);
        autoRefreshAction_->setToolTip(
            tr("Auto-refresh every %1 s — click to disable").arg(secs));
        autoRefreshTimer_->start();
    } else {
        autoRefreshTimer_->stop();
        autoRefreshAction_->setToolTip(tr("Enable automatic refresh…"));
    }
}

WP-10 WorkflowInstanceDetailDialog

A QDialog (non-modal, show with show()) opened by the controller or by WorkflowMdiWindow when the user clicks VIEW or double-clicks a row.

Layout:

  • Header: QLabel showing "Workflow: {type} | {instance_id_truncated} | Started: {time}"
  • QTableWidget for steps:

    Step Name Status Started Duration Error

    Status column uses BadgeDelegate with same colour mapping (pending=gray, in_progress=amber, completed=green, failed=red).

  • QTextEdit (read-only) below the table showing the full error message for the selected step (shown/hidden based on selection).

Data loaded via a separate QtConcurrent::run call to get_workflow_steps NATS endpoint when the dialog opens. No live refresh in Phase 1 (user can close and reopen for updated steps).

WP-11 OreImportController update

After the wizard closes on the async path, update the status bar message to: "Import submitted (workflow {id}) — see Workflows → Execution List to track progress"

No wizard changes beyond what was done in the previous commit (copyable ID fields remain).

WP-12 Build system registration

In projects/CMakeLists.txt add after line 98:

add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/ores.qt.workflow)

In projects/ores.workflow/src/CMakeLists.txt ensure ores.eventing is already a dependency (it is, transitively via ores.eventing.lib); add the new workflow_query_handler.cpp to the source list (it is picked up automatically by GLOB_RECURSE).

In projects/ores.workflow.api/src/CMakeLists.txt (or equivalent): no change needed — the new header-only event struct and protocol structs live in include/.

Implementation Order

Execute in this sequence to keep the build green at each step:

  1. WP-1: add workflow_instance_changed_event.hpp (header only, no build break)
  2. WP-2: add workflow_query_protocol.hpp (header only)
  3. WP-3: add publish_status_event to workflow engine (server rebuild, no Qt changes)
  4. WP-4 + WP-5: add query handler and register it (server rebuild)
  5. WP-6: scaffold plugin directory and CMakeLists (empty plugin compiles)
  6. WP-7: WorkflowPlugin (minimal — create_menus returns stub menu)
  7. WP-8: WorkflowController (event subscription, stub show* methods)
  8. WP-9: WorkflowMdiWindow (full implementation)
  9. WP-10: WorkflowInstanceDetailDialog
  10. WP-11: OreImportController status bar update
  11. WP-12: confirm build, run smoke test

Status Badge Colour Reference

Used in both WorkflowMdiWindow and WorkflowInstanceDetailDialog:

Status text ColorConstants field Meaning
in_progress level_warn (amber) Running — awaiting step result
completed level_info (green) All steps succeeded
failed level_error (red) One step failed, no recovery
compensating level_warn (amber) Rolling back
compensated level_debug (blue) Rollback complete
pending level_trace (gray) Step not yet started

Phase 2 (Out of Scope for this Branch)

  • Retry action: workflow.v1.instances.retry NATS endpoint + [RETRY] button.
  • Terminate action: workflow.v1.instances.terminate NATS endpoint + [X] button.
  • Throughput chart: QChart widget on the Dashboard tab.
  • Pagination: cursor-based or offset paging in list_workflow_instances_request.
  • Settings tab: persist auto-refresh interval and default status filter.
  • Bulk retry / terminate for multi-selected rows.

Date: 2026-04-10

Emacs 29.1 (Org mode 9.6.6)