Signals
POSIX signal integration and application shutdown hooks.
SignalBus
class SignalBus(logger: Logger | None = None)In-process pub/sub bus for •LifecycleEvent subclasses.
The bus is owned by a •LaurenApp — there is one bus per
application so multiple apps in the same process do not share
listeners. The bus is thread-agnostic in that listener lookup
is a plain dict read; concurrent emission from multiple
coroutines is safe because each •emit iterates its own
listener list snapshot.
Listener discipline
- Listener errors are logged but swallowed. Observability code must not be able to break the request path.
- Async listeners are awaited sequentially, not concurrently, so subsequent listeners see the side-effects of earlier ones in a deterministic order.
- A listener registered for a base-class event receives every
instance of that class, including subclasses — so
bus.on(LifecycleEvent)is a firehose of every event.
Basic usage::
bus = app.signals # SignalBus owned by the app
@bus.on(RequestComplete)
def log_slow(event: RequestComplete) -> None:
if event.duration_s > 1.0:
app.logger.warn("slow request", path=event.request.path)SignalBus.on
def on(self, event_type: type[E]) -> Callable[[Listener], Listener]Register fn as a listener for event_type.
Returns the original callable so @bus.on(StartupComplete)
can decorate a function idiomatically. The callable may be
sync or async; both work identically from the caller's
perspective.
A listener is registered at most once per (event_type, fn)
pair — re-registration is silently idempotent so module
reloads during development do not create duplicate
subscriptions.
SignalBus.off
def off(self, event_type: type[E], fn: Listener) -> boolRemove a previously-registered listener.
Returns True when the listener was removed, False
when it wasn't registered in the first place. Never raises —
callers typically use this during test teardown where a
missing listener is not a bug.
SignalBus.listener_count
def listener_count(self, event_type: type[LifecycleEvent]) -> intTotal listeners that would be invoked for event_type.
Walks the MRO the same way •emit does, so a listener
on LifecycleEvent is counted once for every concrete
subclass. Primarily useful for tests and diagnostics.
SignalBus.clear
def clear(self) -> NoneRemove every listener.
Primarily useful between tests when the same SignalBus
is reused across cases.
SignalBus.emit
def emit(self, event: LifecycleEvent) -> NoneFire event to every registered listener.
Listeners registered on any class in the event's MRO are invoked, in registration order within a class and most-specific-first across classes. Async listeners are awaited sequentially; sync listeners run inline.
Exceptions inside listeners are logged through the bus's
•~lauren.logging.Logger and then swallowed so a
misbehaving listener can never break the request path.
SignalBus.emit_sync
def emit_sync(self, event: LifecycleEvent) -> NoneSynchronous variant of •emit.
Used from framework code paths that are not themselves async (startup / shutdown construction). Async listeners are scheduled on the running loop when one is available and otherwise skipped with a warning — we never create an event loop just to run a listener, because doing so would subtly break applications that rely on a specific loop policy.
Lifecycle events
LifecycleEvent
class LifecycleEvent(timestamp: float = time.monotonic())Base class for every event published on the •SignalBus.
Subclasses carry event-specific context (the live Request,
the duration, the captured exception, ...). The base class only
holds a monotonic timestamp so listeners can compute deltas
without having to record wall-clock time themselves.
StartupBegin
class StartupBegin(timestamp: float = time.monotonic(), app: Any = None)Emitted when the app receives its first lifespan.startup.
At this point the DI graph is compiled but @post_construct
hooks have NOT run yet. Listeners observing this event are
typically interested in recording "app build completed" telemetry
without depending on user services being constructed.
StartupComplete
class StartupComplete(timestamp: float = time.monotonic(), app: Any = None, duration_s: float = 0.0)Fired once every @post_construct hook has finished running.
The app is fully ready to accept traffic at this point. Metrics pipelines typically record a "ready" counter here; health-check endpoints can be wired to flip a readiness flag.
RequestReceived
class RequestReceived(timestamp: float = time.monotonic(), request: Any = None)Fired as soon as the ASGI scope has been parsed into a •Request.
The router has NOT yet run, so request.get_matched_route()
returns None. Listeners should avoid reading the body here
unless they also intend to be the sole consumer — doing so
would flip the body into buffered mode and defeat any downstream
•~lauren.extractors.ByteStream handler.
RequestComplete
class RequestComplete(timestamp: float = time.monotonic(), request: Any = None, response: Any = None, status: int = 0, duration_s: float = 0.0, error: BaseException | None = None)Fired after the response has been fully sent to the client.
Fires on both success and error paths. When the handler raised
an exception that escaped the middleware chain, the exception
instance is attached to •error and •status
reflects the final HTTP status surfaced to the client.
ShutdownBegin
class ShutdownBegin(timestamp: float = time.monotonic(), app: Any = None)Fired when lifespan.shutdown arrives or a signal triggered.
Listeners are typically used to flush buffers, close external
connections, or emit a final heartbeat. The event does NOT
block shutdown — listeners run concurrently with the framework's
own @pre_destruct machinery.