WebSockets
First-class WebSocket support via @ws_controller gateways.
Gateway decorators
ws_controller
def ws_controller(path: str = '', tags: list[str] | None = None, summary: str | None = None, description: str | None = None) -> Callable[[C], C]Mark a class as a WebSocket gateway mounted at path.
The path may contain {name} parameters just like HTTP routes;
they're parsed out and made available via •WebSocket.path_params.
Per the framework-wide rule, @ws_controller attaches metadata to
the decorated class only. Subclasses do NOT inherit gateway
status — they must be re-decorated. This is symmetric with
•~lauren.controller and keeps inheritance explicit.
path is required in practice; passing an empty string is valid
and registers the gateway at the module root. The bare form
@ws_controller (no parentheses) is rejected with
•~lauren.exceptions.DecoratorUsageError.
on_connect
def on_connect(fn: F) -> FMark a method to run after the WebSocket handshake completes.
The marker lives on the function object itself
(fn.__lauren_ws_on_connect__). A subclass that overrides this
method without re-applying the decorator will NOT inherit the hook
— symmetric with how @get and @post work on HTTP
controllers.
Also accepts •staticmethod / •classmethod
descriptors so users can stack the decorator in either order
(@on_connect above or below @staticmethod) — the marker
lands wherever setattr will accept it.
on_message
def on_message(event: str, summary: str | None = None, description: str | None = None) -> Callable[[F], F]Route one inbound frame event to this method.
The handler's signature declares what validated payload it expects::
@on_message("chat.send")
async def send(self, ws: WebSocket, body: Json[ChatMessage]) -> None: ...During gateway compilation lauren inspects the signature once, picks
out the Json[...] / path / query / DI extractors, and builds an
immutable dispatch plan — request-time dispatch is pure lookup, no
reflection.
Multiple @on_message(...) decorators may stack on the same
method to handle several event names; each creates its own
•WsMessageMeta entry and its own dispatch-table row.
The wildcard event name "*" matches any event that has no
specific handler — useful for a catch-all logger. The special name
"__binary__" captures binary frames (bytes) rather than
JSON-decoded text frames.
on_disconnect
def on_disconnect(fn: F) -> FMark a method to run when the connection closes.
Runs for both peer-initiated and server-initiated closures. The hook is best-effort: exceptions raised here are logged but don't affect the connection (which is already dead) or the response status of the handshake.
on_error
def on_error(fn: F) -> FMark a method as the connection's error handler.
The runtime calls the decorated method with the raised exception
whenever a per-frame handler throws something other than
•WebSocketDisconnect. Returning normally resumes the
connection; raising closes it. Without @on_error lauren falls
back to sending a structured error frame and keeping the connection
open.
WebSocket object
WebSocket
class WebSocket(scope: dict[str, Any], receive: Callable[[], Awaitable[dict[str, Any]]], send: Callable[[dict[str, Any]], Awaitable[None]], path_template: str, path_params: dict[str, str], app_state: Any = None)A live WebSocket connection.
Instances are constructed by the ASGI runtime and passed to
@on_connect / @on_message / @on_disconnect handlers that
declare a ws: WebSocket parameter. User code never instantiates
this directly.
The object is intentionally thin: it wraps the ASGI receive /
send callables and exposes the subset of surface that controller
authors actually need. Typed frame reception is mediated by the
dispatcher so handlers normally work with already-validated Pydantic
models rather than raw frames; the raw •receive_text /
•receive_bytes / •receive_json helpers remain
available for advanced use.
WebSocket.accept
def accept(self, subprotocol: str | None = None, headers: list[tuple[str, str]] | None = None) -> NoneComplete the WebSocket handshake.
lauren's runtime calls this for you if @on_connect returns
normally — controllers therefore only need to call it explicitly
when they want to reject the connection or negotiate a specific
subprotocol before any application logic runs.
WebSocket.receive
def receive(self) -> dict[str, Any]Pull the next raw ASGI message from the peer.
Handles the websocket.disconnect message by raising
•WebSocketDisconnect, so callers that loop with
while True: await ws.receive() automatically terminate on
peer close.
WebSocket.receive_text
def receive_text(self) -> strAwait the next text frame, returning its string payload.
WebSocket.receive_bytes
def receive_bytes(self) -> bytesAwait the next binary frame.
WebSocket.receive_json
def receive_json(self) -> AnyAwait the next text frame and JSON-decode it.
WebSocket.send_text
def send_text(self, data: str) -> NoneWebSocket.send_bytes
def send_bytes(self, data: bytes) -> NoneWebSocket.send_json
def send_json(self, data: Any) -> NoneSerialize data and send it as a text frame.
Handles Pydantic models (model_dump(mode="json")),
dataclasses, and standard JSON types via a permissive default
handler so rich domain objects can be sent without manual
coercion.
WebSocket.close
def close(self, code: int = 1000, reason: str = '') -> NoneInitiate a server-side close.
Idempotent: calling •close after the connection has
already terminated is a no-op. The close code / reason are
preserved on the instance for @on_disconnect hooks.
Broadcast
BroadcastGroup
class BroadcastGroup()A named set of •WebSocket connections.
Rooms (chat, presence, realtime dashboards) compose out of
•BroadcastGroup instances keyed by opaque group names.
Because the registry is a plain dict keyed by string, groups are
auto-created on first reference — no extra ceremony.
Usage::
@injectable()
class BroadcastRegistry(BroadcastGroup):
pass # BroadcastGroup is already a subclassable provider
@ws_controller("/chat/{room_id}")
class ChatGateway:
def __init__(self, rooms: BroadcastGroup) -> None:
self._rooms = rooms
@on_connect
async def joined(self, ws: WebSocket) -> None:
rid = ws.path_params["room_id"]
await self._rooms.subscribe(rid, ws)The default •BroadcastGroup is safe for single-process
deployments. Production multi-worker setups should subclass it and
back it with Redis Pub/Sub (or similar) — the same controller code
works unchanged because the fan-out surface is just subscribe
/ unsubscribe / broadcast.
BroadcastGroup.subscribe
def subscribe(self, group: str, ws: WebSocket) -> NoneAdd ws to group. Idempotent.
BroadcastGroup.unsubscribe
def unsubscribe(self, group: str, ws: WebSocket) -> NoneRemove ws from group. Safe to call if not a member.
BroadcastGroup.unsubscribe_all
def unsubscribe_all(self, ws: WebSocket) -> NoneRemove ws from every group it's a member of.
Called automatically by the runtime on disconnect so leaked subscriptions don't accumulate after clients drop off — but controllers can also invoke it eagerly during cleanup logic.
BroadcastGroup.broadcast
def broadcast(self, group: str, message: Any, as_bytes: bool = False, exclude: WebSocket | None = None) -> intDeliver message to every subscriber of group.
Returns the count of frames actually sent — callers can use it
for basic observability. Dead connections (those whose
•WebSocket.send_json raises) are detected and removed
from the group automatically so broadcast storms don't repeat
doomed sends.
exclude lets a broadcaster skip echoing the message back to
the original sender, the common pattern for chat UIs.
BroadcastGroup.groups
def groups(self) -> list[str]BroadcastGroup.members
def members(self, group: str) -> list[WebSocket]BroadcastGroup.member_count
def member_count(self, group: str) -> intSocket.IO
socketio_controller
def socketio_controller(path: str, ping_interval_ms: int = 25000, ping_timeout_ms: int = 20000, max_payload_bytes: int = 1000000) -> Callable[[type], type]Mount a class as a Socket.IO endpoint at path.
Internally:
- Discovers every method marked with •
on_socketio_event. - Synthesizes
@on_connect/@on_message/@on_disconnecthooks on the class that drive the Socket.IO protocol state machine, dispatching inbound packets to the user's •@on_socketio_eventmethods. - Applies •
@ws_controller <lauren.ws_controller>so the existing WebSocket runtime (DI, lifecycle, middleware) mounts the route exactly like a hand-written WS gateway.
The synthesised hooks live on the class's own __dict__, never
on a parent class, so the framework rule "inheritance does NOT
propagate metadata" is preserved.
on_socketio_event
def on_socketio_event(event: str, summary: str | None = None) -> Callable[[F], F]Mark a method as the handler for a Socket.IO event.
Two reserved names route to lifecycle hooks instead of client emits:
"connect"-- invoked once after the Socket.IO handshake completes successfully. Useful for sending a welcome message, joining the connection to broadcast groups, or rejecting the connection by raising an exception."disconnect"-- invoked once when the transport closes (peer- or server-initiated). Use this to clean up subscriptions or persistent state.
Every other event name is dispatched on inbound EVENT packets. The handler signature determines what's injected:
self-- the controller instance (DI-built).conn: SocketIOConnection-- the per-connection facade.- Positional payload args -- the JSON args the client sent,
in order.
async def chat(self, conn, payload)matches asocket.emit("chat", {...})call from JS. - Return value -- if the client supplied an ack callback, the
handler's return value is forwarded as the ack args. Returning
a tuple sends multiple ack args; returning a single value sends
one; returning
Nonesends a singlenullarg.
The decorator follows the framework convention: it attaches
metadata to the function via setattr and returns the original
object unchanged. Subclasses that override the handler without
re-applying the decorator do NOT inherit the marker.
SocketIOConnection
class SocketIOConnection(ws: WebSocket, sid: str, namespace: str = _proto.DEFAULT_NAMESPACE)A live Socket.IO client connection.
Instances are constructed by the adapter once the Socket.IO
handshake succeeds and passed to every user event handler. The
object is intentionally small: it wraps the underlying
•~lauren.WebSocket and adds Socket.IO-aware send helpers
(•emit, •send_ack, •disconnect).
Lifecycle
- The handshake (Engine.IO OPEN, Socket.IO CONNECT) is completed before the connection is exposed to user code.
- The connection lives until either the peer disconnects, the
server calls •
disconnect, or the underlying WebSocket is closed. - The user's
on_socketio_event("disconnect")handler is guaranteed to run once on closure, even if a previous event handler raised.
Concurrency
Outbound emits go through a per-connection asyncio.Lock to keep
frames atomic on the wire. Without the lock, two concurrent
conn.emit(...) calls could interleave their bytes and confuse
the JS client.
SocketIOConnection.emit
def emit(self, event: str, args: Any = ()) -> NoneEmit a Socket.IO event to this client.
args are sent as positional payload elements, mirroring the
JS client's socket.emit(event, ...args) signature. Any
JSON-able value is acceptable: lauren's permissive default
handler turns Pydantic models, dataclasses, datetimes, etc.
into wire-friendly JSON.
SocketIOConnection.send_ack
def send_ack(self, ack_id: int, args: Any = ()) -> NoneSend an ACK packet for an event the client previously emitted.
Most user code doesn't call this directly: returning a value from an event handler triggers an automatic ACK reply. Exposed for cases where the ack must be sent asynchronously (e.g. after kicking off a background task).
SocketIOConnection.disconnect
def disconnect(self) -> NoneInitiate a graceful Socket.IO + transport closure.
Sends the Socket.IO DISCONNECT packet, then the Engine.IO
CLOSE packet, then closes the underlying WebSocket. The
sequence matches what the official Socket.IO server does so
the JS client surfaces the disconnect cleanly.
Exceptions
WebSocketError
class WebSocketError(message: str = '', close_code: int | None = None, detail: dict[str, Any] | None = None)Base class for WebSocket-layer errors.
WebSocketDisconnect
class WebSocketDisconnectRaised inside a handler when the peer closes the connection.
Handlers that loop over •WebSocket.receive_text or the various
typed helpers may catch this to release resources; the runtime always
catches it as the normal end-of-connection signal and runs
@on_disconnect hooks.
WebSocketValidationError
class WebSocketValidationErrorInbound frame failed validation against a Pydantic model.
Raised from the typed dispatcher; the runtime catches it, sends a structured error frame back to the client, and continues the connection (mismatched frames should not terminate the session).
WebSocketRouteNotFoundError
class WebSocketRouteNotFoundErrorThe handshake path doesn't match any registered @ws_controller.