Skip to content

Stream Protocol

This document describes the internal streaming protocol used by Helix Agents for real-time event communication.

The canonical schema lives at packages/core/src/types/stream.ts. This document is the authoritative internals reference for the on-the-wire shape — when other internals docs (e.g. subagent-execution.md) reference chunk fields, they should match the schemas defined here.

Overview

The stream protocol enables real-time communication of:

  • Text generation — Token-by-token LLM output
  • Reasoning/thinking — Internal reasoning traces
  • Tool execution — Start / end / approval / argument-streaming events
  • Sub-agent activity — Delegation to child agents
  • State changes — RFC 6902 patches
  • Custom events — Application-specific data
  • Errors and output — Final results
  • Run lifecycle — Interrupt, resume, pause, supersede, checkpoint, step commit/discard, stream resync, suspension marker

All chunk schemas extend BaseChunkSchema which carries agentId, agentType, timestamp, and step fields. The discriminator field is type. Validation is via Zod (StreamChunkSchema, StreamMessageSchema).

Chunk Types

The full discriminated union has 30+ variants. They group into the following categories:

LLM-emitted chunks

TextDeltaChunk

Token-by-token text from the LLM.

typescript
interface TextDeltaChunk {
  type: 'text_delta';
  delta: string; // Incremental text
  agentId: string;
  agentType: string;
  timestamp: number;
  step: number;
}

ThinkingChunk

Reasoning/thinking content from models that support it.

typescript
interface ThinkingChunk {
  type: 'thinking';
  content: string;
  isComplete: boolean; // false = streaming; true = complete block
  agentId: string;
  agentType: string;
  timestamp: number;
  step: number;
}

Tool argument streaming

For models that emit tool arguments incrementally.

typescript
interface ToolArgStreamStartChunk {
  type: 'tool_arg_stream_start';
  toolCallId: string;
  toolName: string;
  // + Base fields
}

interface ToolArgStreamDeltaChunk {
  type: 'tool_arg_stream_delta';
  toolCallId: string;
  delta: string; // Incremental JSON text
  // + Base fields
}

interface ToolArgStreamEndChunk {
  type: 'tool_arg_stream_end';
  toolCallId: string;
  // + Base fields
}

Tool execution chunks

ToolStartChunk

Emitted when a tool invocation begins.

typescript
interface ToolStartChunk {
  type: 'tool_start';
  toolCallId: string;
  toolName: string;
  arguments: Record<string, unknown>;
  // + Base fields
}

ToolEndChunk

Emitted when a tool invocation completes.

typescript
interface ToolEndChunk {
  type: 'tool_end';
  toolCallId: string;
  toolName: string;
  result: unknown;
  /** Human-readable error message if the tool failed. */
  error?: string;
  /**
   * Machine-readable error code when the tool failed. For client-executed
   * tools the runtime emits one of `CLIENT_TOOL_ERROR_CODES` (e.g.
   * `client_tool_timeout`, `aborted`); operator tooling can grep on this
   * rather than the humanized `error` text.
   */
  errorCode?: string;
  /**
   * True when the tool was executed by the framework (regular function
   * tools, sub-agents, remote sub-agents, finishWith). False when the
   * tool was executed client-side (`execute: 'client'`).
   *
   * Optional for backward compatibility — chunks emitted by older
   * runtimes don't carry this flag and consumers should treat absence
   * as "client-executed" (the conservative default that matches the
   * Vercel AI SDK's `!providerExecuted` semantics). Used by
   * `extractToolOutputs` in `HelixChatTransport` to filter out
   * provider-executed tool parts from the auto-flush to
   * `/submit-tool-result` — these are already complete on the server.
   */
  providerExecuted?: boolean;
  // + Base fields
}

ToolApprovalRequestChunk (v7)

Emitted when an approval-gated tool requires human (or automatic) approval before execution. The UI should render the proposed action for the user to approve or deny.

typescript
interface ToolApprovalRequestChunk {
  type: 'tool_approval_request';
  toolCallId: string;
  toolName: string;
  /** Server-generated `${runId}::${toolCallId}` (matches Mastra's APPROVAL_ID_SEPARATOR). */
  approvalId: string;
  /** The validated tool input the LLM produced; sent so the UI can render the proposed action. */
  input: unknown;
  /** True when approval was decided automatically (e.g., a deterministic `requireApproval` evaluator). */
  isAutomatic?: boolean;
  // + Base fields
}

ToolApprovalResponseChunk (v7)

Mirrors the request chunk so consumers can correlate request/response pairs via approvalId.

typescript
interface ToolApprovalResponseChunk {
  type: 'tool_approval_response';
  toolCallId: string;
  toolName: string;
  approvalId: string;
  approved: boolean;
  reason?: string;
  isAutomatic?: boolean;
  // + Base fields
}

Both approval-gated and client-executed tool flows share the same RunOutcome.kind = 'suspended_client_tool' AgentResult.status (see ./concepts.md). Routing happens off the kind field of the submitToolResult payload ('client-tool-result' vs 'approval-response'), not off the stream-chunk type. UIs typically dispatch off the chunk type for rendering and off the kind for submission.

Tool input/output errors

Distinct from the regular error chunk — these carry toolCallId/toolName and are emitted at specific failure boundaries.

typescript
interface ToolInputErrorChunk {
  type: 'tool_input_error';
  toolCallId: string;
  toolName: string;
  error: string;
  partialInput?: unknown;
  // + Base fields
}

interface ToolOutputErrorChunk {
  type: 'tool_output_error';
  toolCallId: string;
  toolName: string;
  error: string;
  // + Base fields
}

Sub-agent chunks

SubAgentStartChunk

Emitted when a sub-agent begins execution.

typescript
interface SubAgentStartChunk {
  type: 'subagent_start';
  subAgentType: string; // Sub-agent type (e.g. 'researcher')
  subSessionId: string; // Sub-agent's session ID; correlate sub-agent chunks via this
  callId: string; // Tool call ID that spawned this sub-agent
  // + Base fields (agentId is the PARENT's session ID)
}

SubAgentEndChunk

Emitted when a sub-agent completes.

typescript
interface SubAgentEndChunk {
  type: 'subagent_end';
  subAgentType: string;
  subSessionId: string;
  callId: string;
  result: unknown; // Sub-agent output (failures surface via the result payload itself or via emitted error chunks)
  // + Base fields
}

This is the authoritative schema. Other internals docs that reference these chunks should match.

Custom + state chunks

CustomEventChunk

Application-specific events from tools (created via ToolContext.emit()).

typescript
interface CustomEventChunk {
  type: 'custom';
  eventName: string;
  data: unknown;
  // + Base fields
}

StatePatchChunk

RFC 6902 JSON Patches for state updates.

typescript
interface StatePatchChunk {
  type: 'state_patch';
  patches: JSONPatchOperation[]; // RFC 6902 'add'|'remove'|'replace'|'move'|'copy'|'test'
  // + Base fields
}

Error + output chunks

ErrorChunk

typescript
interface ErrorChunk {
  type: 'error';
  error: string;
  code?: string;
  errorDetail?: {
    message: string;
    code?: string;
    category?: string;
    cause?: string;
    retryable?: boolean;
  };
  recoverable: boolean;
  // + Base fields
}

When the error originates from a classified HelixError, the code field contains the error code (e.g., provider_overloaded, provider_rate_limited) and recoverable reflects whether the operation can be retried. See Error Handling Guide.

OutputChunk

Final structured output (when the agent has an outputSchema).

typescript
interface OutputChunk {
  type: 'output';
  output: unknown;
  // + Base fields
}

Source / citation chunks (RAG)

typescript
interface SourceUrlChunk {
  type: 'source_url';
  sourceId: string;
  url: string;
  title?: string;
  // + Base fields
}

interface SourceDocumentChunk {
  type: 'source_document';
  sourceId: string;
  mediaType: string;
  title: string;
  filename?: string;
  // + Base fields
}

Step chunks (multi-step workflow tracking)

typescript
interface StepStartChunk {
  type: 'step_start';
  stepId?: string;
  // + Base fields
}

interface StepEndChunk {
  type: 'step_end';
  stepId?: string;
  usage?: {
    inputTokens: number;
    outputTokens: number;
    cachedTokens?: number;
    cacheWriteTokens?: number;
  };
  finishReason?: 'stop' | 'length' | 'tool-calls' | 'content-filter' | 'error' | 'other';
  // + Base fields
}

File chunk (generated files / attachments)

typescript
interface FileChunk {
  type: 'file';
  url: string; // Data URL or hosted URL
  mediaType: string;
  filename?: string;
  // + Base fields
}

Run lifecycle chunks (v7)

These are emitted by the runtime (not the LLM) and signal run-level state transitions.

RunInterruptedChunk

Emitted when the agent is interrupted. The agent can be resumed via executor.resume().

typescript
interface RunInterruptedChunk {
  type: 'run_interrupted';
  runId: string;
  checkpointId: string | null; // null if checkpointing failed
  reason?: string;
  // + Base fields
}

RunResumedChunk

Emitted when an interrupted/paused agent is resumed.

typescript
interface RunResumedChunk {
  type: 'run_resumed';
  runId: string;
  fromCheckpointId: string | null;
  fromStepCount: number;
  mode: string; // 'continue' | 'with_message' | 'with_confirmation' | 'from_checkpoint'
  // + Base fields
}

RunPausedChunk

Emitted when the agent has paused (typically waiting for user confirmation).

typescript
interface RunPausedChunk {
  type: 'run_paused';
  runId: string;
  reason: string;
  pendingToolName?: string;
  pendingToolCallId?: string;
  // + Base fields
}

ExecutorSupersededChunk

Emitted when this executor has been superseded by another (StaleStateError detected during save).

typescript
interface ExecutorSupersededChunk {
  type: 'executor_superseded';
  reason?: string;
  // + Base fields
}

CheckpointCreatedChunk

typescript
interface CheckpointCreatedChunk {
  type: 'checkpoint_created';
  runId: string;
  checkpointId: string;
  stepCount: number;
  // + Base fields
}

StepCommittedChunk / StepDiscardedChunk

Signal whether staged changes were committed or discarded at a step boundary.

typescript
interface StepCommittedChunk {
  type: 'step_committed';
  runId: string;
  stepId: string;
  checkpointId: string;
  // + Base fields
}

interface StepDiscardedChunk {
  type: 'step_discarded';
  runId: string;
  stepId: string;
  reason: string;
  // + Base fields
}

StreamResyncChunk

Emitted on resume only when orphaned messages or stream chunks were actually cleaned up — i.e. crash recovery, interrupt cleanup, retry, or checkpoint rollback/branch. The runtime gates emission on a real truncation (cleanedUpOrphans && checkpoint in runtime-js); a normal client-tool resume or follow-up turn truncates nothing and emits no resync. Clients receiving it should reload state from the server (e.g. a /snapshot endpoint).

This chunk is not emitted on the happy path. In particular, it is not part of the client-executed-tool continuation flow: on the Cloudflare DO and DBOS runtimes the run auto-continues after a submit and streams the continuation live (the AI SDK re-attaches to the running stream without a resync). See Client-Executed Tools for the per-runtime continuation model.

Run-scoped orphan cleanup (cleanupToStep minSequence). Orphan stream-chunk cleanup on resume is bounded to the resumed run. Step numbers reset per turn (each turn restarts step counting at 1), but stream sequence numbers are monotonic across the whole per-session stream. A naive cleanupToStep(checkpoint.stepCount) would therefore delete prior turns' committed chunks — they carry per-turn step numbers greater than the current turn's low stepCount — spuriously tripping cleanedUpOrphans and emitting a bogus crash_recovery resync on every multi-turn client-tool resume. To prevent this, runtimes pass the resumed run's startSequence as the cleanupToStep minSequence floor; only chunks with sequence > minSequence are eligible for deletion, so earlier turns are protected while genuine within-run orphans (a crashed step's chunks) are still removed. All four stream managers (in-memory, Cloudflare DO SQLite, Cloudflare StreamDurableObject, Redis) honor the floor; the cross-store contract test pins the behavior. The retry / from_checkpoint rollback path deliberately does not pass the floor — it intends to clean later turns' chunks.

fromSequence accuracy. fromSequence is the checkpoint's streamSequence. Checkpoints record the stream's live latestSequence at write time (resolved by resolveCheckpointStreamSequence from @helix-agents/core, invoked at each runtime's persistence point — a JS hook, or a Temporal/DBOS/CF-Workflow activity/step that is allowed to perform I/O, replay-safe). Earlier releases hardcoded 0 here; that was never load-bearing (the /snapshot endpoint computes its own sequence and no consumer replays from the resync's fromSequence), but the field is now accurate for observability and any future consumer. DBOS does not emit stream_resync, so it leaves streamSequence unset.

typescript
interface StreamResyncChunk {
  type: 'stream_resync';
  checkpointId: string;
  stepCount: number;
  messageCount: number;
  fromSequence: number;
  reason: 'crash_recovery' | 'rollback' | 'branch' | 'retry';
  // + Base fields
}

SuspensionMarkerChunk (Helix-internal)

Emitted by runtimes whose workflow body stays alive across suspensions (Temporal, Cloudflare Workflows). Lets the executor's handle observe via stream-watching that the run is suspended and project the right 'suspended_*' status from handle.result() without re-deriving from persisted state.

typescript
interface SuspensionMarkerChunk {
  type: 'suspension_marker';
  /** Suspension kind, matching `AgentResult.status`. */
  kind: 'suspended_client_tool' | 'suspended_awaiting_children' | 'suspended_step_partial';
  /**
   * Suspension payload, matching `AgentResult.suspended` shape.
   * - `suspended_client_tool`: { toolCallIds }
   * - `suspended_step_partial`: { toolCallIds, stepId }
   * - `suspended_awaiting_children`: { children }
   */
  payload:
    | { toolCallIds: string[]; stepId: string }
    | { toolCallIds: string[] }
    | { children: SuspendedChildWaitPayload[] };
  // + Base fields
}

@internal Helix coordination chunk — NOT part of the public stream surface. The binding-layer transformer filters this out of v6 streams; AI SDK consumers see suspension via existing tool_start, tool_approval_request, and subagent_start chunks.

Control message: stream_end

The only non-data variant. Lives on StreamMessageSchema (the union of data chunks + control messages).

typescript
interface StreamEnd {
  type: 'stream_end';
  streamId: string;
  timestamp: number;
  finalOutput?: unknown;
}

Per-runtime chunk-emission parity

Chunk typeruntime-jsruntime-cloudflare (DO)runtime-cloudflare (Workflow)runtime-temporalruntime-dbos
text_delta / thinking / tool_arg_stream_*yesyesyesyesyes
tool_start / tool_endyesyesyesyesyes
tool_end.providerExecuted fieldyes (v7)yes (v7)yes (v7)yes (v7)yes (v7)
tool_approval_request / tool_approval_responseyes (v7)yes (v7)yes (v7)yes (v7)yes (DBOS-native)
subagent_start / subagent_endyesyesyesyespartial
state_patch / custom / error / outputyesyesyesyesyes
run_interrupted / run_resumed / run_pausedyesyesyesyesyes
checkpoint_created / step_committed / step_discardedyesyesyesyesyes
stream_resyncyesyesyesyesyes
executor_supersededyesyesyesyesyes
suspension_markern/an/ayesyesn/a

suspension_marker is only emitted by runtimes whose workflow body stays alive across suspensions (Temporal, CFW Workflows). JS, CF DO, and DBOS surface suspension via the AgentResult.status projection instead.

Stream Lifecycle

Creation

typescript
const writer = await streamManager.createWriter(streamId, agentId, agentType);

Writing

typescript
await writer.write({
  type: 'text_delta',
  delta: 'Hello',
  agentId: runId,
  agentType: 'assistant',
  timestamp: Date.now(),
  step: 0,
});

await writer.close();

Reading

typescript
const reader = await streamManager.createReader(streamId);

for await (const chunk of reader) {
  // Process chunk
}

Ending

typescript
// Normal completion
await streamManager.endStream(streamId);
await streamManager.endStream(streamId, output); // With final output

// With error
await streamManager.failStream(streamId, 'Something went wrong');

Stream End Sentinel

A special marker indicates stream completion:

typescript
const STREAM_END_SENTINEL = Symbol.for('helix.stream.end');

interface StreamEndSentinel {
  __streamEnd: true;
  status: 'ended' | 'failed';
  error?: string;
}

Readers yield chunks until they encounter the sentinel.

Resumability

Sequence Numbers

Each chunk can have a sequence number for resumability:

typescript
interface StoredChunk {
  chunk: StreamChunk;
  sequence: number; // Monotonically increasing
}

Resumable Reader

typescript
const reader = await streamManager.createResumableReader(streamId, {
  fromSequence: 100, // Resume from sequence 100
});

for await (const { chunk, sequence } of reader) {
  // sequence can be used as Last-Event-ID
}

Stream Status

typescript
type ResumableStreamStatus = 'active' | 'ended' | 'failed';

interface ResumableStreamReader {
  status: ResumableStreamStatus;
  [Symbol.asyncIterator](): AsyncIterator<{ chunk: StreamChunk; sequence: number }>;
}

Wire Format

For transport over HTTP (SSE), events are serialized:

typescript
interface StreamEvent {
  type: 'chunk' | 'end' | 'fail' | 'status' | 'truncated';
  // ... payload based on type
}

interface StreamChunkEvent {
  type: 'chunk';
  chunk: StreamChunk;
  sequence?: number;
}

interface StreamEndEvent {
  type: 'end';
  finalOutput?: unknown;
}

interface StreamFailEvent {
  type: 'fail';
  error: string;
}

interface StreamStatusEvent {
  type: 'status';
  // Lifecycle transition. Used as a wakeup ping for blocked readers when
  // the stream pauses/resumes — readers re-check metadata and do NOT emit a
  // chunk to the consumer.
  status: 'paused' | 'active';
}

interface StreamTruncatedEvent {
  type: 'truncated';
  // The `stepCount` boundary passed to `cleanupToStep`; chunks with
  // `step > truncatedAtStep` were removed.
  truncatedAtStep: number;
  // The stream's sequence high-water mark at the moment of cleanup. Readers
  // gate the throw on `atSequence > lastYieldedSequence`. Optional for
  // wire compatibility.
  atSequence?: number;
}

G4 — truncation surfacing

The 'truncated' event implements StreamManager concurrency guarantee #4 (G4): when a cleanupToStep(N) removes chunks past an attached reader's cursor, PUSH transports (the Cloudflare Durable Object SSE/WS path) broadcast a truncated control event so the attached reader surfaces a StreamTruncatedError on its next iteration. This is best-effort — a reader that attaches after cleanup (a fresh replay) won't see the event and instead resumes from the surviving chunks.

MARKER/POLL transports do not need the wire event: they detect the same condition via a stored marker consulted on each next() — the Redis hash field __truncated_at_step, the in-process Durable Object SQLite truncated_at_step meta, and an equivalent O(1) in-process field in the in-memory manager.

SSE Encoding

id: 1
data: {"type":"chunk","chunk":{"type":"text_delta","delta":"Hello"},"sequence":1}

id: 2
data: {"type":"chunk","chunk":{"type":"text_delta","delta":" world"},"sequence":2}

id: 3
data: {"type":"end"}

Resume Headers (v7)

Reconnecting clients negotiate resume position via these headers (see prepare-helix-chat-request.ts):

  • X-Resume-From-Sequence — Client-supplied resume position; server honors via extractResumePosition(headers) and synthesizes a stream_resync chunk with the matching fromSequence.
  • Last-Event-ID — Standard SSE; mapped to the same resume semantics as X-Resume-From-Sequence if both are absent (server falls back to Last-Event-ID).
  • X-Existing-Message-Id — Client-supplied existing message ID so the reconnected stream's start event reuses it (avoids creating a duplicate assistant message client-side on reconnect).

Filtering Streams

Utility functions for stream manipulation:

typescript
import {
  filterByAgentId,
  filterByAgentType,
  filterByType,
  excludeTypes,
  filterWith,
  combineStreams,
  take,
  skip,
  collectText,
  collectAll,
} from '@helix-agents/core';

// Filter by agent
const agentChunks = filterByAgentId(stream, 'run-123');

// Only text chunks
const textChunks = filterByType(stream, ['text_delta']);

// Exclude thinking
const noThinking = excludeTypes(stream, ['thinking']);

// Custom filter
const important = filterWith(stream, (chunk) => chunk.type === 'error' || chunk.type === 'output');

// Combine multiple streams
const combined = combineStreams([stream1, stream2]);

// Collect all text
const fullText = await collectText(stream);

Type Guards

Runtime type checking for chunks (full set — one per chunk variant):

typescript
import {
  isTextDeltaChunk,
  isThinkingChunk,
  isToolStartChunk,
  isToolEndChunk,
  isSubAgentStartChunk,
  isSubAgentEndChunk,
  isCustomEventChunk,
  isStatePatchChunk,
  isErrorChunk,
  isOutputChunk,
  isToolArgStreamStartChunk,
  isToolArgStreamDeltaChunk,
  isToolArgStreamEndChunk,
  isToolInputErrorChunk,
  isToolOutputErrorChunk,
  isSourceUrlChunk,
  isSourceDocumentChunk,
  isStepStartChunk,
  isStepEndChunk,
  isFileChunk,
  isRunInterruptedChunk,
  isRunResumedChunk,
  isExecutorSupersededChunk,
  isRunPausedChunk,
  isCheckpointCreatedChunk,
  isStepCommittedChunk,
  isStepDiscardedChunk,
  isStreamResyncChunk,
  isSuspensionMarkerChunk,
  isStreamEnd,
} from '@helix-agents/core';

for await (const chunk of stream) {
  if (isTextDeltaChunk(chunk)) {
    process.stdout.write(chunk.delta);
  } else if (isToolStartChunk(chunk)) {
    console.log(`Tool: ${chunk.toolName}`);
  }
}

Validation

Zod schemas for runtime validation:

typescript
import { StreamChunkSchema, StreamMessageSchema } from '@helix-agents/core';

// Validate a chunk
const result = StreamChunkSchema.safeParse(data);
if (result.success) {
  const chunk: StreamChunk = result.data;
}

Implementation Notes

Memory Streams

In-memory streams use arrays and async iterators:

typescript
class InMemoryStreamManager {
  private streams = new Map<
    string,
    {
      chunks: StreamChunk[];
      readers: Set<() => void>;
      status: 'active' | 'ended' | 'failed';
    }
  >();
}

Redis Streams

Redis implementation uses Redis Streams (XADD/XREAD):

typescript
// Writing
await redis.xadd(
  `stream:${streamId}`,
  '*',
  'chunk',
  JSON.stringify(chunk),
  'sequence',
  sequence.toString()
);

// Reading with blocking
const entries = await redis.xread('BLOCK', 5000, 'STREAMS', `stream:${streamId}`, lastId);

Durable Object Streams

Cloudflare Durable Objects provide coordination:

typescript
class StreamServer {
  private chunks: StreamChunk[] = [];
  private waiters: Set<(chunk: StreamChunk) => void> = new Set();

  async write(chunk: StreamChunk) {
    this.chunks.push(chunk);
    for (const waiter of this.waiters) {
      waiter(chunk);
    }
  }
}

See Also

Released under the MIT License.