Eventing Pipeline
Table of Contents
Overview
Every entity change in ORE Studio flows through a four-stage pipeline:
- PostgreSQL fires a
pg_notifyon a named channel. - The service's
postgres_event_sourcedeserialises the payload and publishes a typed in-process event to theevent_bus. - A subscriber in the service translates the in-process event to a NATS message and publishes it on a well-known subject.
- The Qt client (or any NATS subscriber) receives the NATS message and reacts.
The diagram below shows these four stages for a workspace change:
┌─────────────────────────────────────────────────────────────────────┐
│ PostgreSQL │
│ ores_workspaces_tbl → ores_workspaces_notify_trg │
│ → pg_notify('ores_workspaces', {entity, timestamp, ids, tenant}) │
└───────────────────────────┬─────────────────────────────────────────┘
│ LISTEN/NOTIFY (libpq)
┌───────────────────────────▼─────────────────────────────────────────┐
│ ores.workspace.service (server process) │
│ postgres_event_source → event_bus │
│ → subscriber → nats.publish("ores.workspace.workspace_changed") │
└───────────────────────────┬─────────────────────────────────────────┘
│ NATS publish/subscribe
┌───────────────────────────▼─────────────────────────────────────────┐
│ Qt client (ores.qt.workspace / WorkspaceController) │
│ subscribeToEvent("ores.workspace.workspace_changed") │
│ → ClientManager::notificationReceived signal │
│ → onNotificationReceived → markAsStale() │
└─────────────────────────────────────────────────────────────────────┘
Stage 1 — PostgreSQL pg_notify
Every table that participates in eventing has a dedicated AFTER trigger
that calls pg_notify. The trigger lives in
ores.sql; the workspace example is in
projects/ores.sql/create/workspace/workspace_notify_trigger.sql.
The notification payload is a JSON object with four fixed fields:
| Field | Type | Meaning |
|---|---|---|
entity |
text |
Fully-qualified entity name (see §Naming) |
timestamp |
text |
UTC timestamp in YYYY-MM-DD HH24:MI:SS form |
entity_ids |
jsonb array |
Identifiers of the changed rows |
tenant_id |
text |
Tenant UUID for the changed rows |
The channel name (first argument to pg_notify) follows the convention
ores_<table_base_name>, e.g. ores_workspaces for
ores_workspaces_tbl. Entities with a text primary key (e.g. ISO
codes) put the text key in entity_ids; UUID-keyed entities put the
UUID string.
Naming conventions
Event names follow a three-part dotted scheme:
ores.<domain>.<entity>_changed
Examples:
ores.refdata.currency_changedores.workspace.workspace_changedores.iam.account_changed
The entity name embedded in the payload (entity field) uses the same
prefix without the _changed suffix:
ores.<domain>.<entity>
Stage 2 — Server-side pickup (postgres_event_source → event_bus)
postgres_event_source (in ores.eventing) maintains a persistent
PostgreSQL LISTEN connection. On each incoming notification it:
- Deserialises the JSON payload into an
entity_change_eventstruct. - Looks up the registered mapping for the channel name to find the typed event type.
- Populates a typed event (e.g.
workspace_changed_event) and publishes it on the in-processevent_bus.
Mappings are registered during application startup via the
eventing::service::registrar helper:
ev::service::registrar::register_mapping<wsev::workspace_changed_event>( event_source, "ores.workspace.workspace", "ores_workspaces");
The third argument is the PostgreSQL channel name (must match
pg_notify); the second is the entity name for logging.
Each event type must have an event_traits specialisation that provides
its NATS subject name as a constexpr string_view:
template<> struct event_traits<workspace_changed_event> { static constexpr std::string_view name = "ores.workspace.workspace_changed"; };
This specialisation lives in the API package alongside the event struct,
e.g.
ores.workspace.api/include/ores.workspace.api/eventing/workspace_changed_event.hpp.
Stage 3 — NATS publish
A subscriber registered on the event_bus converts the typed in-process
event to JSON and publishes it to NATS on the subject given by
event_traits<T>::name:
auto sub = event_bus.subscribe<wsev::workspace_changed_event>( [&nats](const wsev::workspace_changed_event& e) { publish_entity_event(nats, "ores.workspace.workspace_changed", ev::domain::entity_change_event{ .entity = "ores.workspace.workspace", .timestamp = e.timestamp, .entity_ids = e.ids, .tenant_id = e.tenant_id }); });
The entity_change_event JSON sent on NATS has the same four fields as
the PostgreSQL payload; it is the wire format shared by all downstream
consumers including the Qt client.
Stage 4 — Client pickup (Qt ClientManager)
The Qt client uses subscribeToEvent(subject) on ClientManager to
register interest in a NATS subject. On arrival the client library
deserialises the payload and emits the notificationReceived signal with:
| Parameter | Type | Content |
|---|---|---|
eventType |
QString |
NATS subject (e.g. ores.workspace.workspace_changed) |
timestamp |
QDateTime |
Change timestamp |
entityIds |
QStringList |
Identifiers of changed entities |
tenantId |
QString |
Tenant UUID |
Controllers connect to this signal and filter by eventType:
constexpr std::string_view workspace_event_name = eventing::domain::event_traits<workspace::eventing::workspace_changed_event>::name; // In constructor: connect(clientManager_, &ClientManager::notificationReceived, this, &WorkspaceController::onNotificationReceived); clientManager_->subscribeToEvent(std::string{workspace_event_name}); // Slot: void WorkspaceController::onNotificationReceived( const QString& eventType, ...) { if (eventType != QString::fromStdString(std::string{workspace_event_name})) return; // mark windows stale ... }
Subscription is renewed on loggedIn and reconnected signals.
Unsubscription happens in the controller destructor.
Verifying eventing with ores.shell
The events commands in Shell recipes let you inspect and exercise the
pipeline from the command line without building a Qt client.
List registered channels
events channels exit
Shows every channel registered with the event_channel_registry at
startup. If ores.workspace.workspace_changed is absent, the service
did not call register_mapping.
Subscribe and watch for events
events listen ores.workspace.workspace_changed
Then perform a workspace create/edit in the Qt app; the shell should print a JSON notification within a second. If nothing appears, check:
- The SQL trigger exists:
\df ores_workspaces_notify_fninpsql. - The service log shows "Entity change event pipeline started."
- The NATS subject matches exactly (case-sensitive).
Adding eventing to a new entity
To wire eventing for a new entity, four artefacts are needed:
- SQL trigger —
projects/ores.sql/create/<domain>/<entity>_notify_trigger.sqlcallingpg_notify('<channel>', payload)after insert/update/delete. - Event type —
<domain>.api/include/<domain>.api/eventing/<entity>_changed_event.hppwith the struct andevent_traitsspecialisation. - Service registration — in the service
application.cpp, callregistrar::register_mapping<T>and subscribe on theevent_bus. Also addores.eventing.libto the service CMakeLists. - Qt subscription — in the controller constructor/destructor, call
subscribeToEvent/unsubscribeFromEvent; implementonNotificationReceivedto mark windows stale.
See also
- ores.eventing — component overview
- ores.sql — database schema and triggers
- Shell recipes —
events listen,events channels