Verbatim port of Fastway-Server's TFWEventBus from fw_plugin_host.pas
per feedback_copy_dont_reinterpret.md. Adjustments limited to:
- Type renames (TFW* -> T*).
- uses clause: drop fw_log; add log.types from fpc-log so the
optional Logger property uses the canonical ecosystem-wide
TLogProc shape, matching every other fpc-* library.
- Per-handler exception logging now calls Logger with
Level=llError, Category='events', and includes the source
plugin (ASourcePlugin parameter) in the message text so the
canonical signature stays meaningful.
Behaviours preserved verbatim: APluginName bulk-Unsubscribe key,
wildcard '*' subscriber, OnBroadcast external-listener tap,
snapshot-iterate-outside-lock pattern, per-handler exception
isolation, TCriticalSection.
docs/DEVELOPER_GUIDE.md added covering threading, payload
ownership, recursive Fire, OnBroadcast, logger plumbing, and
the relationship between fpc-events (ecosystem-wide pub/sub)
and per-library typed observer callbacks (bp.events / cm.events
pattern).
Tests: 44 assertions across 14 scenarios pass on x86_64-linux.
Pre-tag -vh audit on src/ev.bus.pas reports zero hints/warnings.
12 KiB
fpc-events Developer Guide
This guide walks through building consumers of events.bus.TEventBus
end-to-end: the threading model, the lifetime contract for event
payloads, the patterns that match the fpc-* ecosystem, and the
common pitfalls. Read it in addition to the API table in the
top-level README.md.
Table of contents
- What problem fpc-events solves
- Quick start (a real consumer in 30 lines)
- The lifecycle of a Fire call
- Subscribe vs. UnsubscribeCallback vs. Unsubscribe
- Wildcard
'*'subscribers - The OnBroadcast tap
- Logger plumbing —
log.types.TLogProc - Handler exception isolation
- Recursive Fire and re/un-subscribe from inside a callback
- Threading model and the snapshot-iterate pattern
- Memory ownership for event payloads
- How fpc-events relates to per-library typed callbacks
- Common pitfalls
What problem fpc-events solves
Many fpc-* libraries expose typed observer callbacks (see
bp.events, cm.events). Those work great when the consumer
binds one library to one class. fpc-events is for the other
case: ecosystem-wide pub/sub where N publishers fire events and
M consumers (web sockets, log sinks, audit trails) want to
subscribe to all of them without each library having to know
about each subscriber.
The classic Fastway use case: every plugin fires
user.login / session.end / message.posted; the web admin
subscribes to '*' and forwards every event over a WebSocket
to the browser. No plugin needs to know the WebSocket exists.
If you only have one publisher and one consumer, prefer typed
callbacks (set Session.OnFileBegin := @MyHandler). If you
have many-to-many, fpc-events is the tool.
Quick start
A complete working consumer:
program audit_demo;
{$mode objfpc}{$H+}
uses
{$IFDEF UNIX}cthreads,{$ENDIF}
Classes, SysUtils, fpjson,
log.types,
events.bus;
type
TAuditor = class
procedure HandleLogin(const AEventType: string; AData: TJSONObject);
procedure HandleAny(const AEventType: string; AData: TJSONObject);
procedure HandleLog(Level: TLogLevel; const Category, Msg: string);
end;
procedure TAuditor.HandleLogin(const AEventType: string; AData: TJSONObject);
begin
Writeln('login: user=', AData.Get('username', '?'));
end;
procedure TAuditor.HandleAny(const AEventType: string; AData: TJSONObject);
begin
Writeln('audit:', AEventType);
end;
procedure TAuditor.HandleLog(Level: TLogLevel; const Category, Msg: string);
begin
Writeln('[', LogLevelChar(Level), '] ', Category, ': ', Msg);
end;
var
Bus: TEventBus;
Auditor: TAuditor;
Data: TJSONObject;
begin
Bus := TEventBus.Create;
Auditor := TAuditor.Create;
try
Bus.Logger := @Auditor.HandleLog;
Bus.Subscribe('audit', '*', @Auditor.HandleAny);
Bus.Subscribe('login', 'user.login', @Auditor.HandleLogin);
Data := TJSONObject.Create;
try
Data.Add('username', 'alice');
Bus.Fire('auth', 'user.login', Data);
finally
Data.Free;
end;
finally
Auditor.Free;
Bus.Free;
end;
end.
Build with:
fpc -O2 -Sh -Fusrc -Fu../fpc-log/src \
-FUbuild -FEbuild audit_demo.pas
The lifecycle of a Fire call
Fire(ASourcePlugin, AEventType, AData) does the following, in
order:
- Acquire
FLock. - Copy the entire subscription array into a local
Subsarray. - Release
FLock. - For each entry in
Subs, if itsEventTypeequalsAEventTypeor equals'*', invoke its callback. Atry/exceptaround each invocation catches handler exceptions and routes them to theLoggercallback (if assigned). - If
OnBroadcastis assigned, invoke it once, also undertry/exceptwith the same logger plumbing.
The snapshot-then-iterate pattern (steps 2–3) is load-bearing.
It means a callback that calls Subscribe / Unsubscribe /
Fire will not deadlock and will not iterate over its own
modifications mid-loop. See Recursive Fire.
Subscribe vs UnsubscribeCallback vs Unsubscribe
There are three removal paths, designed for different use cases:
| Method | Removes | Use when |
|---|---|---|
UnsubscribeCallback(@Method) |
every subscription whose method pointer (Code+Data) matches @Method |
you tracked a single callback and want to remove just that one |
Unsubscribe('plug1') |
every subscription whose PluginName equals 'plug1' (case-insensitive) |
a logical group (plugin, subsystem, session) is shutting down and owns N subscriptions |
Subscribe does not deduplicate. Calling
Subscribe('p', 'a', @Handler) twice produces two entries; Fire
will deliver to @Handler twice. UnsubscribeCallback(@Handler)
removes both.
Wildcard subscribers
A subscriber registered with AEventType = '*' is delivered
every event fired through Fire, regardless of type. Used
sparingly — most subscribers want a specific event type — but
load-bearing for diagnostics and audit log mirroring.
Bus.Subscribe('audit', '*', @Auditor.HandleAny);
The wildcard is a literal * character; no glob pattern matching.
The OnBroadcast tap
OnBroadcast is a single nullable callback fired after all
subscribers have been delivered, on every Fire call. Originally
used by Fastway-Server to forward every event to web-admin
WebSocket clients.
Bus.OnBroadcast := @WebSocketHub.RebroadcastEvent;
It's a single property, not a list — fpc-events expects exactly
one external listener. If you need multiple, route through a
hub class. Compared to a '*' subscription, the broadcast tap:
- Fires once per
Fire, after subscriber delivery. - Doesn't appear in
GetSubscriptionCount. - Doesn't participate in
Unsubscribe/UnsubscribeCallback.
Logger plumbing
Bus.Logger: TLogProc (from log.types) lets you capture
handler exceptions. When a subscriber callback or the
OnBroadcast tap raises, the exception is swallowed by Fire
and the message is sent to Logger with Level = llError and
Category = 'events'.
type
TMySink = class
procedure Log(Level: TLogLevel; const Category, Msg: string);
end;
procedure TMySink.Log(Level: TLogLevel; const Category, Msg: string);
begin
Writeln('[', LogLevelChar(Level), '] ', Category, ': ', Msg);
end;
Bus.Logger := @MySink.Log;
If Logger is nil (the default), handler exceptions are
silently swallowed. This matches canonical Fastway behaviour
where a missing logger is treated as a "no opinion" sink rather
than an error.
The TLogProc type comes from log.types (fpc-log) — every
fpc-* library uses the same logger shape so consumers can wire
one log sink across the whole library set.
Handler exception isolation
A subscriber that raises does NOT block delivery to the others.
Each subscriber invocation is wrapped in its own try/except.
The exception's text is sent to Logger if assigned, then
swallowed; the loop continues to the next subscriber.
The same applies to OnBroadcast: a raise there doesn't block
subsequent Fire calls, and it doesn't retroactively undo the
delivery to subscribers (since broadcast fires after the
subscriber loop).
Recursive Fire
A subscriber callback can call Bus.Fire(...) again. The inner
Fire takes its own snapshot of the subscription array and runs
to completion before the outer Fire's loop continues.
This is safe because every Fire call is a self-contained
"copy subscriptions, iterate copy" operation. A subscriber that
re-subscribes during a callback is not delivered to in the
outer Fire (because the snapshot was taken before the
subscribe), but is delivered to in any subsequent Fire.
Mutating the subscription list from inside a callback (whether to add or remove) is well-defined and tested:
procedure TThing.HandleEvt(const AType: string; AData: TJSONObject);
begin
Bus.UnsubscribeCallback(@Self.HandleEvt); { OK }
Bus.Subscribe('thing', 'next.evt', @Self.HandleNext); { OK }
end;
Threading model
TEventBus is thread-safe. The lock (TCriticalSection) is
held only during the snapshot copy in Fire, the
add/remove operation in Subscribe /
Unsubscribe / UnsubscribeCallback, and the read in
GetSubscriptionCount. Callback dispatch happens outside the
lock.
Practical implications:
- A thread firing an event can deliver to subscribers on the same thread; nothing about the bus itself implies cross-thread dispatch. If a subscriber wants to marshal to a different thread, that's the subscriber's job.
- Subscribers may run concurrently if multiple threads call
Firesimultaneously. The handler must be reentrant or guard its own state.
Memory ownership
The AData: TJSONObject parameter on Fire is owned by the
caller. Subscribers must not call AData.Free; they may
read fields and copy values out, but the object lifetime is
the publisher's.
The publisher's standard pattern:
Data := TJSONObject.Create;
try
Data.Add('user_id', 42);
Bus.Fire('auth', 'user.login', Data);
finally
Data.Free;
end;
If a subscriber needs to retain the data beyond the Fire call,
it must clone (Data.Clone as TJSONObject) into its own owned
copy.
Relation to typed callbacks
fpc-events is the ecosystem-wide pub/sub primitive; it does NOT
replace the per-library typed observer pattern (bp.events,
cm.events, etc.). The choice is:
-
Typed callbacks (per-library
*.eventsunits): prefer these for one-publisher-to-N-known-consumers wiring. Type safety, IDE autocomplete, no JSON marshalling. Example:Session.OnFileBegin := @MyHandleron a fpc-binkp session. -
fpc-events: prefer this for many-to-many fan-out where the publisher doesn't know what subscribers exist. Costs: events are JSON-bagged (no static typing of
AData); event type names are strings.
A common hybrid: a library exposes typed callbacks AND a
library-wide OnAnyEvent hook that bridges to a TEventBus,
letting consumers choose either style.
Common pitfalls
-
Passing
nilasAData.TJSONObjectparameters can technically benil, but most subscribers will dereference fields without checking. Always pass a validTJSONObject, even an empty one. -
Forgetting to free
ADataafterFire.Firedoes not take ownership; the caller mustData.Freeafter. Usetry/finally. -
Storing
ADatareference in a subscriber. The reference is invalid afterFirereturns — the publisher will free. AlwaysData.Cloneif you need to keep a copy. -
Subscribing with
nilcallback. Defended-against technically (the bus stores it and will fail at Fire time), but a footgun in practice. Always pass a real method pointer. -
Wildcard subscribers logging too much. A
'*'subscriber in production with verbose events will swamp your log sink. Consider type filtering inside the wildcard handler if you only want a subset. -
Wiring a logger that itself fires events. If your logger ends up calling
Bus.Fire(...)(e.g., to broadcast an error upstream), make sure that call doesn't cause infinite recursion. The bus's recursive-Fire handling will keep you from deadlocking, but you can still infinite-loop if the logger fires an event whose subscriber raises and triggers the logger again.