Eventing Pipeline

Table of Contents

Overview

Every entity change in ORE Studio flows through a four-stage pipeline:

  1. PostgreSQL fires a pg_notify on a named channel.
  2. The service's postgres_event_source deserialises the payload and publishes a typed in-process event to the event_bus.
  3. A subscriber in the service translates the in-process event to a NATS message and publishes it on a well-known subject.
  4. The Qt client (or any NATS subscriber) receives the NATS message and reacts.

The diagram below shows these four stages for a workspace change:

┌─────────────────────────────────────────────────────────────────────┐
│ PostgreSQL                                                          │
│  ores_workspaces_tbl → ores_workspaces_notify_trg                  │
│  → pg_notify('ores_workspaces', {entity, timestamp, ids, tenant})  │
└───────────────────────────┬─────────────────────────────────────────┘
                            │  LISTEN/NOTIFY (libpq)
┌───────────────────────────▼─────────────────────────────────────────┐
│ ores.workspace.service (server process)                             │
│  postgres_event_source → event_bus                                  │
│  → subscriber → nats.publish("ores.workspace.workspace_changed")   │
└───────────────────────────┬─────────────────────────────────────────┘
                            │  NATS publish/subscribe
┌───────────────────────────▼─────────────────────────────────────────┐
│ Qt client (ores.qt.workspace / WorkspaceController)                 │
│  subscribeToEvent("ores.workspace.workspace_changed")               │
│  → ClientManager::notificationReceived signal                       │
│  → onNotificationReceived → markAsStale()                          │
└─────────────────────────────────────────────────────────────────────┘

Stage 1 — PostgreSQL pg_notify

Every table that participates in eventing has a dedicated AFTER trigger that calls pg_notify. The trigger lives in ores.sql; the workspace example is in projects/ores.sql/create/workspace/workspace_notify_trigger.sql.

The notification payload is a JSON object with four fixed fields:

Field Type Meaning
entity text Fully-qualified entity name (see §Naming)
timestamp text UTC timestamp in YYYY-MM-DD HH24:MI:SS form
entity_ids jsonb array Identifiers of the changed rows
tenant_id text Tenant UUID for the changed rows

The channel name (first argument to pg_notify) follows the convention ores_<table_base_name>, e.g. ores_workspaces for ores_workspaces_tbl. Entities with a text primary key (e.g. ISO codes) put the text key in entity_ids; UUID-keyed entities put the UUID string.

Naming conventions

Event names follow a three-part dotted scheme:

ores.<domain>.<entity>_changed

Examples:

  • ores.refdata.currency_changed
  • ores.workspace.workspace_changed
  • ores.iam.account_changed

The entity name embedded in the payload (entity field) uses the same prefix without the _changed suffix:

ores.<domain>.<entity>

Stage 2 — Server-side pickup (postgres_event_sourceevent_bus)

postgres_event_source (in ores.eventing) maintains a persistent PostgreSQL LISTEN connection. On each incoming notification it:

  1. Deserialises the JSON payload into an entity_change_event struct.
  2. Looks up the registered mapping for the channel name to find the typed event type.
  3. Populates a typed event (e.g. workspace_changed_event) and publishes it on the in-process event_bus.

Mappings are registered during application startup via the eventing::service::registrar helper:

ev::service::registrar::register_mapping<wsev::workspace_changed_event>(
    event_source, "ores.workspace.workspace", "ores_workspaces");

The third argument is the PostgreSQL channel name (must match pg_notify); the second is the entity name for logging.

Each event type must have an event_traits specialisation that provides its NATS subject name as a constexpr string_view:

template<>
struct event_traits<workspace_changed_event> {
    static constexpr std::string_view name = "ores.workspace.workspace_changed";
};

This specialisation lives in the API package alongside the event struct, e.g. ores.workspace.api/include/ores.workspace.api/eventing/workspace_changed_event.hpp.

Stage 3 — NATS publish

A subscriber registered on the event_bus converts the typed in-process event to JSON and publishes it to NATS on the subject given by event_traits<T>::name:

auto sub = event_bus.subscribe<wsev::workspace_changed_event>(
    [&nats](const wsev::workspace_changed_event& e) {
        publish_entity_event(nats, "ores.workspace.workspace_changed",
            ev::domain::entity_change_event{
                .entity     = "ores.workspace.workspace",
                .timestamp  = e.timestamp,
                .entity_ids = e.ids,
                .tenant_id  = e.tenant_id
            });
    });

The entity_change_event JSON sent on NATS has the same four fields as the PostgreSQL payload; it is the wire format shared by all downstream consumers including the Qt client.

Stage 4 — Client pickup (Qt ClientManager)

The Qt client uses subscribeToEvent(subject) on ClientManager to register interest in a NATS subject. On arrival the client library deserialises the payload and emits the notificationReceived signal with:

Parameter Type Content
eventType QString NATS subject (e.g. ores.workspace.workspace_changed)
timestamp QDateTime Change timestamp
entityIds QStringList Identifiers of changed entities
tenantId QString Tenant UUID

Controllers connect to this signal and filter by eventType:

constexpr std::string_view workspace_event_name =
    eventing::domain::event_traits<workspace::eventing::workspace_changed_event>::name;

// In constructor:
connect(clientManager_, &ClientManager::notificationReceived,
        this, &WorkspaceController::onNotificationReceived);
clientManager_->subscribeToEvent(std::string{workspace_event_name});

// Slot:
void WorkspaceController::onNotificationReceived(
    const QString& eventType, ...) {
    if (eventType != QString::fromStdString(std::string{workspace_event_name}))
        return;
    // mark windows stale ...
}

Subscription is renewed on loggedIn and reconnected signals. Unsubscription happens in the controller destructor.

Verifying eventing with ores.shell

The events commands in Shell recipes let you inspect and exercise the pipeline from the command line without building a Qt client.

List registered channels

events channels
exit

Shows every channel registered with the event_channel_registry at startup. If ores.workspace.workspace_changed is absent, the service did not call register_mapping.

Subscribe and watch for events

events listen ores.workspace.workspace_changed

Then perform a workspace create/edit in the Qt app; the shell should print a JSON notification within a second. If nothing appears, check:

  1. The SQL trigger exists: \df ores_workspaces_notify_fn in psql.
  2. The service log shows "Entity change event pipeline started."
  3. The NATS subject matches exactly (case-sensitive).

See recipes Listen and Channels for executable examples.

Adding eventing to a new entity

To wire eventing for a new entity, four artefacts are needed:

  1. SQL triggerprojects/ores.sql/create/<domain>/<entity>_notify_trigger.sql calling pg_notify('<channel>', payload) after insert/update/delete.
  2. Event type<domain>.api/include/<domain>.api/eventing/<entity>_changed_event.hpp with the struct and event_traits specialisation.
  3. Service registration — in the service application.cpp, call registrar::register_mapping<T> and subscribe on the event_bus. Also add ores.eventing.lib to the service CMakeLists.
  4. Qt subscription — in the controller constructor/destructor, call subscribeToEvent / unsubscribeFromEvent; implement onNotificationReceived to mark windows stale.

See also

Emacs 29.1 (Org mode 9.6.6)