Files
fpc-events/docs/DEVELOPER_GUIDE.md
Ken Johnson 128ddcb4df v0.1.0: thread-safe pub/sub event bus
Verbatim port of Fastway-Server's TFWEventBus from fw_plugin_host.pas
per feedback_copy_dont_reinterpret.md.  Adjustments limited to:

  - Type renames (TFW* -> T*).
  - uses clause: drop fw_log; add log.types from fpc-log so the
    optional Logger property uses the canonical ecosystem-wide
    TLogProc shape, matching every other fpc-* library.
  - Per-handler exception logging now calls Logger with
    Level=llError, Category='events', and includes the source
    plugin (ASourcePlugin parameter) in the message text so the
    canonical signature stays meaningful.

Behaviours preserved verbatim: APluginName bulk-Unsubscribe key,
wildcard '*' subscriber, OnBroadcast external-listener tap,
snapshot-iterate-outside-lock pattern, per-handler exception
isolation, TCriticalSection.

docs/DEVELOPER_GUIDE.md added covering threading, payload
ownership, recursive Fire, OnBroadcast, logger plumbing, and
the relationship between fpc-events (ecosystem-wide pub/sub)
and per-library typed observer callbacks (bp.events / cm.events
pattern).

Tests: 44 assertions across 14 scenarios pass on x86_64-linux.
Pre-tag -vh audit on src/ev.bus.pas reports zero hints/warnings.
2026-05-05 18:13:10 -07:00

12 KiB
Raw Permalink Blame History

fpc-events Developer Guide

This guide walks through building consumers of events.bus.TEventBus end-to-end: the threading model, the lifetime contract for event payloads, the patterns that match the fpc-* ecosystem, and the common pitfalls. Read it in addition to the API table in the top-level README.md.

Table of contents

  1. What problem fpc-events solves
  2. Quick start (a real consumer in 30 lines)
  3. The lifecycle of a Fire call
  4. Subscribe vs. UnsubscribeCallback vs. Unsubscribe
  5. Wildcard '*' subscribers
  6. The OnBroadcast tap
  7. Logger plumbing — log.types.TLogProc
  8. Handler exception isolation
  9. Recursive Fire and re/un-subscribe from inside a callback
  10. Threading model and the snapshot-iterate pattern
  11. Memory ownership for event payloads
  12. How fpc-events relates to per-library typed callbacks
  13. Common pitfalls

What problem fpc-events solves

Many fpc-* libraries expose typed observer callbacks (see bp.events, cm.events). Those work great when the consumer binds one library to one class. fpc-events is for the other case: ecosystem-wide pub/sub where N publishers fire events and M consumers (web sockets, log sinks, audit trails) want to subscribe to all of them without each library having to know about each subscriber.

The classic Fastway use case: every plugin fires user.login / session.end / message.posted; the web admin subscribes to '*' and forwards every event over a WebSocket to the browser. No plugin needs to know the WebSocket exists.

If you only have one publisher and one consumer, prefer typed callbacks (set Session.OnFileBegin := @MyHandler). If you have many-to-many, fpc-events is the tool.

Quick start

A complete working consumer:

program audit_demo;

{$mode objfpc}{$H+}

uses
  {$IFDEF UNIX}cthreads,{$ENDIF}
  Classes, SysUtils, fpjson,
  log.types,
  events.bus;

type
  TAuditor = class
    procedure HandleLogin(const AEventType: string; AData: TJSONObject);
    procedure HandleAny(const AEventType: string; AData: TJSONObject);
    procedure HandleLog(Level: TLogLevel; const Category, Msg: string);
  end;

procedure TAuditor.HandleLogin(const AEventType: string; AData: TJSONObject);
begin
  Writeln('login: user=', AData.Get('username', '?'));
end;

procedure TAuditor.HandleAny(const AEventType: string; AData: TJSONObject);
begin
  Writeln('audit:', AEventType);
end;

procedure TAuditor.HandleLog(Level: TLogLevel; const Category, Msg: string);
begin
  Writeln('[', LogLevelChar(Level), '] ', Category, ': ', Msg);
end;

var
  Bus:     TEventBus;
  Auditor: TAuditor;
  Data:    TJSONObject;
begin
  Bus     := TEventBus.Create;
  Auditor := TAuditor.Create;
  try
    Bus.Logger := @Auditor.HandleLog;
    Bus.Subscribe('audit',  '*',          @Auditor.HandleAny);
    Bus.Subscribe('login',  'user.login', @Auditor.HandleLogin);

    Data := TJSONObject.Create;
    try
      Data.Add('username', 'alice');
      Bus.Fire('auth', 'user.login', Data);
    finally
      Data.Free;
    end;
  finally
    Auditor.Free;
    Bus.Free;
  end;
end.

Build with:

fpc -O2 -Sh -Fusrc -Fu../fpc-log/src \
    -FUbuild -FEbuild audit_demo.pas

The lifecycle of a Fire call

Fire(ASourcePlugin, AEventType, AData) does the following, in order:

  1. Acquire FLock.
  2. Copy the entire subscription array into a local Subs array.
  3. Release FLock.
  4. For each entry in Subs, if its EventType equals AEventType or equals '*', invoke its callback. A try/except around each invocation catches handler exceptions and routes them to the Logger callback (if assigned).
  5. If OnBroadcast is assigned, invoke it once, also under try/except with the same logger plumbing.

The snapshot-then-iterate pattern (steps 23) is load-bearing. It means a callback that calls Subscribe / Unsubscribe / Fire will not deadlock and will not iterate over its own modifications mid-loop. See Recursive Fire.

Subscribe vs UnsubscribeCallback vs Unsubscribe

There are three removal paths, designed for different use cases:

Method Removes Use when
UnsubscribeCallback(@Method) every subscription whose method pointer (Code+Data) matches @Method you tracked a single callback and want to remove just that one
Unsubscribe('plug1') every subscription whose PluginName equals 'plug1' (case-insensitive) a logical group (plugin, subsystem, session) is shutting down and owns N subscriptions

Subscribe does not deduplicate. Calling Subscribe('p', 'a', @Handler) twice produces two entries; Fire will deliver to @Handler twice. UnsubscribeCallback(@Handler) removes both.

Wildcard subscribers

A subscriber registered with AEventType = '*' is delivered every event fired through Fire, regardless of type. Used sparingly — most subscribers want a specific event type — but load-bearing for diagnostics and audit log mirroring.

Bus.Subscribe('audit', '*', @Auditor.HandleAny);

The wildcard is a literal * character; no glob pattern matching.

The OnBroadcast tap

OnBroadcast is a single nullable callback fired after all subscribers have been delivered, on every Fire call. Originally used by Fastway-Server to forward every event to web-admin WebSocket clients.

Bus.OnBroadcast := @WebSocketHub.RebroadcastEvent;

It's a single property, not a list — fpc-events expects exactly one external listener. If you need multiple, route through a hub class. Compared to a '*' subscription, the broadcast tap:

  • Fires once per Fire, after subscriber delivery.
  • Doesn't appear in GetSubscriptionCount.
  • Doesn't participate in Unsubscribe / UnsubscribeCallback.

Logger plumbing

Bus.Logger: TLogProc (from log.types) lets you capture handler exceptions. When a subscriber callback or the OnBroadcast tap raises, the exception is swallowed by Fire and the message is sent to Logger with Level = llError and Category = 'events'.

type
  TMySink = class
    procedure Log(Level: TLogLevel; const Category, Msg: string);
  end;

procedure TMySink.Log(Level: TLogLevel; const Category, Msg: string);
begin
  Writeln('[', LogLevelChar(Level), '] ', Category, ': ', Msg);
end;

Bus.Logger := @MySink.Log;

If Logger is nil (the default), handler exceptions are silently swallowed. This matches canonical Fastway behaviour where a missing logger is treated as a "no opinion" sink rather than an error.

The TLogProc type comes from log.types (fpc-log) — every fpc-* library uses the same logger shape so consumers can wire one log sink across the whole library set.

Handler exception isolation

A subscriber that raises does NOT block delivery to the others. Each subscriber invocation is wrapped in its own try/except. The exception's text is sent to Logger if assigned, then swallowed; the loop continues to the next subscriber.

The same applies to OnBroadcast: a raise there doesn't block subsequent Fire calls, and it doesn't retroactively undo the delivery to subscribers (since broadcast fires after the subscriber loop).

Recursive Fire

A subscriber callback can call Bus.Fire(...) again. The inner Fire takes its own snapshot of the subscription array and runs to completion before the outer Fire's loop continues.

This is safe because every Fire call is a self-contained "copy subscriptions, iterate copy" operation. A subscriber that re-subscribes during a callback is not delivered to in the outer Fire (because the snapshot was taken before the subscribe), but is delivered to in any subsequent Fire.

Mutating the subscription list from inside a callback (whether to add or remove) is well-defined and tested:

procedure TThing.HandleEvt(const AType: string; AData: TJSONObject);
begin
  Bus.UnsubscribeCallback(@Self.HandleEvt);   { OK }
  Bus.Subscribe('thing', 'next.evt', @Self.HandleNext); { OK }
end;

Threading model

TEventBus is thread-safe. The lock (TCriticalSection) is held only during the snapshot copy in Fire, the add/remove operation in Subscribe / Unsubscribe / UnsubscribeCallback, and the read in GetSubscriptionCount. Callback dispatch happens outside the lock.

Practical implications:

  • A thread firing an event can deliver to subscribers on the same thread; nothing about the bus itself implies cross-thread dispatch. If a subscriber wants to marshal to a different thread, that's the subscriber's job.
  • Subscribers may run concurrently if multiple threads call Fire simultaneously. The handler must be reentrant or guard its own state.

Memory ownership

The AData: TJSONObject parameter on Fire is owned by the caller. Subscribers must not call AData.Free; they may read fields and copy values out, but the object lifetime is the publisher's.

The publisher's standard pattern:

Data := TJSONObject.Create;
try
  Data.Add('user_id', 42);
  Bus.Fire('auth', 'user.login', Data);
finally
  Data.Free;
end;

If a subscriber needs to retain the data beyond the Fire call, it must clone (Data.Clone as TJSONObject) into its own owned copy.

Relation to typed callbacks

fpc-events is the ecosystem-wide pub/sub primitive; it does NOT replace the per-library typed observer pattern (bp.events, cm.events, etc.). The choice is:

  • Typed callbacks (per-library *.events units): prefer these for one-publisher-to-N-known-consumers wiring. Type safety, IDE autocomplete, no JSON marshalling. Example: Session.OnFileBegin := @MyHandler on a fpc-binkp session.

  • fpc-events: prefer this for many-to-many fan-out where the publisher doesn't know what subscribers exist. Costs: events are JSON-bagged (no static typing of AData); event type names are strings.

A common hybrid: a library exposes typed callbacks AND a library-wide OnAnyEvent hook that bridges to a TEventBus, letting consumers choose either style.

Common pitfalls

  • Passing nil as AData. TJSONObject parameters can technically be nil, but most subscribers will dereference fields without checking. Always pass a valid TJSONObject, even an empty one.

  • Forgetting to free AData after Fire. Fire does not take ownership; the caller must Data.Free after. Use try/finally.

  • Storing AData reference in a subscriber. The reference is invalid after Fire returns — the publisher will free. Always Data.Clone if you need to keep a copy.

  • Subscribing with nil callback. Defended-against technically (the bus stores it and will fail at Fire time), but a footgun in practice. Always pass a real method pointer.

  • Wildcard subscribers logging too much. A '*' subscriber in production with verbose events will swamp your log sink. Consider type filtering inside the wildcard handler if you only want a subset.

  • Wiring a logger that itself fires events. If your logger ends up calling Bus.Fire(...) (e.g., to broadcast an error upstream), make sure that call doesn't cause infinite recursion. The bus's recursive-Fire handling will keep you from deadlocking, but you can still infinite-loop if the logger fires an event whose subscriber raises and triggers the logger again.