Stream Protocol
This document describes the internal streaming protocol used by Helix Agents for real-time event communication.
Overview
The stream protocol enables real-time communication of:
- Text generation - Token-by-token LLM output
- Reasoning/thinking - Internal reasoning traces
- Tool execution - Start/end events for tools
- Sub-agent activity - Delegation to child agents
- State changes - RFC 6902 patches
- Custom events - Application-specific data
- Errors and output - Final results
Chunk Types
TextDeltaChunk
Token-by-token text from the LLM.
interface TextDeltaChunk {
type: 'text_delta';
delta: string; // The text delta content
agentId: string; // Run ID
agentType?: string; // Agent type name
timestamp: number; // Unix timestamp (ms)
}Example:
{
"type": "text_delta",
"delta": "Hello, ",
"agentId": "run-abc123",
"agentType": "assistant",
"timestamp": 1702329600000
}ThinkingChunk
Reasoning/thinking content from models that support it.
interface ThinkingChunk {
type: 'thinking';
content: string; // Thinking content
isComplete: boolean; // True if thinking is complete
agentId: string;
agentType?: string;
timestamp: number;
}Example:
{
"type": "thinking",
"content": "Let me analyze this step by step...",
"isComplete": false,
"agentId": "run-abc123",
"timestamp": 1702329600000
}ToolStartChunk
Emitted when a tool invocation begins.
interface ToolStartChunk {
type: 'tool_start';
toolCallId: string; // Unique ID from LLM
toolName: string; // Tool name
arguments: unknown; // Tool input arguments
agentId: string;
agentType?: string;
timestamp: number;
}Example:
{
"type": "tool_start",
"toolCallId": "call_xyz789",
"toolName": "web_search",
"arguments": { "query": "TypeScript tutorials" },
"agentId": "run-abc123",
"timestamp": 1702329600000
}ToolEndChunk
Emitted when a tool invocation completes.
interface ToolEndChunk {
type: 'tool_end';
toolCallId: string; // Matches tool_start
result: unknown; // Tool return value
success: boolean; // Whether execution succeeded
error?: string; // Error message if failed
agentId: string;
agentType?: string;
timestamp: number;
}Example:
{
"type": "tool_end",
"toolCallId": "call_xyz789",
"result": { "results": ["Tutorial 1", "Tutorial 2"] },
"success": true,
"agentId": "run-abc123",
"timestamp": 1702329601000
}SubAgentStartChunk
Emitted when a sub-agent begins execution.
interface SubAgentStartChunk {
type: 'subagent_start';
agentId: string; // Parent agent's session ID
agentType: string; // Parent agent's type
timestamp: number;
step: number;
subAgentType: string; // Sub-agent type (e.g., 'researcher')
subSessionId: string; // Sub-agent's session ID (use to correlate sub-agent chunks)
callId: string; // Tool call ID that spawned this sub-agent
}SubAgentEndChunk
Emitted when a sub-agent completes.
interface SubAgentEndChunk {
type: 'subagent_end';
agentId: string; // Parent agent's session ID
agentType: string; // Parent agent's type
timestamp: number;
step: number;
subAgentType: string; // Sub-agent type
subSessionId: string; // Sub-agent's session ID
callId: string; // Tool call ID
result: unknown; // Sub-agent output
}CustomEventChunk
Application-specific events from tools.
interface CustomEventChunk {
type: 'custom';
eventName: string; // Custom event name
data: unknown; // Event payload
agentId: string;
agentType?: string;
timestamp: number;
}Example:
{
"type": "custom",
"eventName": "search_progress",
"data": { "processed": 50, "total": 100 },
"agentId": "run-abc123",
"timestamp": 1702329600500
}StatePatchChunk
RFC 6902 JSON Patches for state updates.
interface StatePatchChunk {
type: 'state_patch';
patches: JSONPatchOperation[]; // RFC 6902 operations
agentId: string;
agentType?: string;
timestamp: number;
}
interface JSONPatchOperation {
op: 'add' | 'remove' | 'replace';
path: string; // JSON Pointer path
value?: unknown; // For add/replace
}Example:
{
"type": "state_patch",
"patches": [
{ "op": "add", "path": "/notes/-", "value": { "content": "Note 1" } },
{ "op": "replace", "path": "/searchCount", "value": 5 }
],
"agentId": "run-abc123",
"timestamp": 1702329600000
}ErrorChunk
Error events during execution.
interface ErrorChunk {
type: 'error';
error: string; // Error message
code?: string; // Error classification code (see ErrorCode in @helix-agents/core)
recoverable: boolean; // Whether the error is recoverable/retryable
agentId: string;
agentType?: string;
timestamp: number;
step: number; // Step number when error occurred
}When the error originates from a classified HelixError, the code field contains the error code (e.g., provider_overloaded, provider_rate_limited) and recoverable reflects whether the operation can be retried. For unclassified errors, code is omitted and recoverable defaults to false.
See Error Handling Guide for the complete error classification system.
OutputChunk
Final structured output.
interface OutputChunk {
type: 'output';
output: unknown; // The structured output
agentId: string;
agentType?: string;
timestamp: number;
}Stream Lifecycle
Creation
const writer = await streamManager.createWriter(streamId, agentId, agentType);Writing
await writer.write({
type: 'text_delta',
delta: 'Hello',
agentId: runId,
timestamp: Date.now(),
});
await writer.close();Reading
const reader = await streamManager.createReader(streamId);
for await (const chunk of reader) {
// Process chunk
}Ending
// Normal completion
await streamManager.endStream(streamId);
await streamManager.endStream(streamId, output); // With final output
// With error
await streamManager.failStream(streamId, 'Something went wrong');Stream End Sentinel
A special marker indicates stream completion:
const STREAM_END_SENTINEL = Symbol.for('helix.stream.end');
interface StreamEndSentinel {
__streamEnd: true;
status: 'ended' | 'failed';
error?: string;
}Readers yield chunks until they encounter the sentinel.
Resumability
Sequence Numbers
Each chunk can have a sequence number for resumability:
interface StoredChunk {
chunk: StreamChunk;
sequence: number; // Monotonically increasing
}Resumable Reader
const reader = await streamManager.createResumableReader(streamId, {
fromSequence: 100, // Resume from sequence 100
});
for await (const { chunk, sequence } of reader) {
// sequence can be used as Last-Event-ID
}Stream Status
type ResumableStreamStatus = 'active' | 'ended' | 'failed';
interface ResumableStreamReader {
status: ResumableStreamStatus;
[Symbol.asyncIterator](): AsyncIterator<{ chunk: StreamChunk; sequence: number }>;
}Wire Format
For transport over HTTP (SSE), events are serialized:
interface StreamEvent {
type: 'chunk' | 'end' | 'fail';
// ... payload based on type
}
interface StreamChunkEvent {
type: 'chunk';
chunk: StreamChunk;
sequence?: number;
}
interface StreamEndEvent {
type: 'end';
}
interface StreamFailEvent {
type: 'fail';
error: string;
}SSE Encoding
id: 1
data: {"type":"chunk","chunk":{"type":"text_delta","content":"Hello"},"sequence":1}
id: 2
data: {"type":"chunk","chunk":{"type":"text_delta","content":" world"},"sequence":2}
id: 3
data: {"type":"end"}Filtering Streams
Utility functions for stream manipulation:
import {
filterByAgentId,
filterByAgentType,
filterByType,
excludeTypes,
filterWith,
combineStreams,
take,
skip,
collectText,
collectAll,
} from '@helix-agents/core';
// Filter by agent
const agentChunks = filterByAgentId(stream, 'run-123');
// Only text chunks
const textChunks = filterByType(stream, ['text_delta']);
// Exclude thinking
const noThinking = excludeTypes(stream, ['thinking']);
// Custom filter
const important = filterWith(stream, (chunk) => chunk.type === 'error' || chunk.type === 'output');
// Combine multiple streams
const combined = combineStreams([stream1, stream2]);
// Collect all text
const fullText = await collectText(stream);Type Guards
Runtime type checking for chunks:
import {
isTextDeltaChunk,
isThinkingChunk,
isToolStartChunk,
isToolEndChunk,
isSubAgentStartChunk,
isSubAgentEndChunk,
isCustomEventChunk,
isStatePatchChunk,
isErrorChunk,
isOutputChunk,
isStreamEnd,
} from '@helix-agents/core';
for await (const chunk of stream) {
if (isTextDeltaChunk(chunk)) {
process.stdout.write(chunk.delta);
} else if (isToolStartChunk(chunk)) {
console.log(`Tool: ${chunk.toolName}`);
}
}Validation
Zod schemas for runtime validation:
import { StreamChunkSchema, StreamMessageSchema } from '@helix-agents/core';
// Validate a chunk
const result = StreamChunkSchema.safeParse(data);
if (result.success) {
const chunk: StreamChunk = result.data;
}Implementation Notes
Memory Streams
In-memory streams use arrays and async iterators:
class InMemoryStreamManager {
private streams = new Map<
string,
{
chunks: StreamChunk[];
readers: Set<() => void>;
status: 'active' | 'ended' | 'failed';
}
>();
}Redis Streams
Redis implementation uses Redis Streams (XADD/XREAD):
// Writing
await redis.xadd(
`stream:${streamId}`,
'*',
'chunk',
JSON.stringify(chunk),
'sequence',
sequence.toString()
);
// Reading with blocking
const entries = await redis.xread('BLOCK', 5000, 'STREAMS', `stream:${streamId}`, lastId);Durable Object Streams
Cloudflare Durable Objects provide coordination:
class StreamServer {
private chunks: StreamChunk[] = [];
private waiters: Set<(chunk: StreamChunk) => void> = new Set();
async write(chunk: StreamChunk) {
this.chunks.push(chunk);
for (const waiter of this.waiters) {
waiter(chunk);
}
}
}