Message stream
Four typed targets, subagent dispatch, the cron split, wire-protocol contracts with the TUI
src/stream/ is the in-process pub/sub coordination primitive. Keyed by typed targets. Ephemeral — Bun process crash loses everything. Persistence is out of scope; agentic work isn't resumable mid-LLM-call, the container is the failure unit.
Targets
Four typed kinds:
broadcast— fan-out (mood, status, presence). WS server forwards to connected TUIs.session: { sessionId }— addressed to a liveAgentSession. WS server publishes; per-session drain loop subscribes.new-session: { subagent }— spawn a subagent. Published by cron consumer (when apromptjob has asubagentfield) and the WS server (idle → memory-logger). Consumed bySubagentConsumer, which validates payload againstpayloadSchemaand invokes the registered handler. Coalescing perinFlightKey(name, payload).cron: { jobId }— fired by the scheduler.CronConsumerdispatches to prompt/exec runner and handles per-jobId coalescing.
Adding a fifth kind is a deliberate design choice. No generic handler extension point.
Subagents
src/agent/subagents.ts defines the Subagent type and the SubagentConsumer. Two paths converge on invokeSubagent:
ctx.spawnSubagent(name, payload, options)— direct, no Stream. Plugin hooks, main-agentspawn-subagenttool, plugin commands.- Cron with a
subagentfield →target: { kind: 'new-session' }→SubagentConsumer→invokeSubagent. The Stream path exists so the scheduler doesn't need to import the registry.
Production registry is built from plugin contributions. The bundled memory plugin is auto-loaded and contributes memory-logger + dreaming. Each plugin Subagent may declare inFlightKey(payload); consumer uses ${name}:${key} as the dedup key (both memory-logger and dreaming key by agentDir). payloadSchema is validated by the cron loader at parse time and on every reloadAll().
Bundled plugins
src/bundled-plugins/memory/ ships with TypeClaw and is auto-loaded before user plugins[]. Statically imported by startAgent and passed via LoadPluginsOptions.bundled. Not copied into agent folders, not user-listable. memory config block ({ idleMs, dreaming: { schedule } }) is restart-required (read once at factory time). session.idle is the prompt-end signal fired synchronously after every session.prompt() in src/server/index.ts drain().
Wire protocol contract
TUI ↔ WS depends on the stream-driven drain loop:
- Concurrent prompts don't race. With a Stream wired,
{ type: 'prompt' }is published totarget: { kind: 'session' }; per-session drain owns allsession.prompt()calls. Fallback (no stream) keeps inline behavior for tests. - Queued prompts render in execution order. The TUI doesn't append the
> textline at submit. Server emitsprompt_startedfrom the drain loop right aftershift(). Old containers running pre-prompt_startedserver code leave typed messages invisible — restart after any protocol change. interrupt-delivery prompts abort on receipt.session.abort()from the publish path; in-flightprompt()resolves immediately; drain dequeues the new prompt.
Cron split (celery-style)
src/cron/scheduler.ts is a pure clock — computes next-fire and invokes onFire(job). Knows nothing about execution. src/cron/consumer.ts subscribes, dispatches, and coalesces by jobId. Long-running jobs no longer block scheduler ticks; overlapping fires are dropped by the consumer with a warning.
stream_snapshot agent tool
src/agent/tools/stream-snapshot.ts — read-only tool exposing the broker's bounded ring buffer (default 1000 events). Agent can ask "what cron jobs fired in the last minute?" without a round-trip. Wired only when a Stream is injected. Read-only by design.
Rules of thumb
- In-memory and ephemeral. Anything that must survive a restart belongs elsewhere (session JSONL,
cron.json, MEMORY.md). - Wire-protocol changes require a container restart. Source loads once at process start.
- Each new target kind is a deliberate addition. Prefer typed targets over a catch-all.
- Drain loops own serialization. Broker fans out; doesn't gate.