Skip to content

Stream Protocol

This document describes the internal streaming protocol used by Helix Agents for real-time event communication.

Overview

The stream protocol enables real-time communication of:

  • Text generation - Token-by-token LLM output
  • Reasoning/thinking - Internal reasoning traces
  • Tool execution - Start/end events for tools
  • Sub-agent activity - Delegation to child agents
  • State changes - RFC 6902 patches
  • Custom events - Application-specific data
  • Errors and output - Final results

Chunk Types

TextDeltaChunk

Token-by-token text from the LLM.

typescript
interface TextDeltaChunk {
  type: 'text_delta';
  delta: string; // The text delta content
  agentId: string; // Run ID
  agentType?: string; // Agent type name
  timestamp: number; // Unix timestamp (ms)
}

Example:

json
{
  "type": "text_delta",
  "delta": "Hello, ",
  "agentId": "run-abc123",
  "agentType": "assistant",
  "timestamp": 1702329600000
}

ThinkingChunk

Reasoning/thinking content from models that support it.

typescript
interface ThinkingChunk {
  type: 'thinking';
  content: string; // Thinking content
  isComplete: boolean; // True if thinking is complete
  agentId: string;
  agentType?: string;
  timestamp: number;
}

Example:

json
{
  "type": "thinking",
  "content": "Let me analyze this step by step...",
  "isComplete": false,
  "agentId": "run-abc123",
  "timestamp": 1702329600000
}

ToolStartChunk

Emitted when a tool invocation begins.

typescript
interface ToolStartChunk {
  type: 'tool_start';
  toolCallId: string; // Unique ID from LLM
  toolName: string; // Tool name
  arguments: unknown; // Tool input arguments
  agentId: string;
  agentType?: string;
  timestamp: number;
}

Example:

json
{
  "type": "tool_start",
  "toolCallId": "call_xyz789",
  "toolName": "web_search",
  "arguments": { "query": "TypeScript tutorials" },
  "agentId": "run-abc123",
  "timestamp": 1702329600000
}

ToolEndChunk

Emitted when a tool invocation completes.

typescript
interface ToolEndChunk {
  type: 'tool_end';
  toolCallId: string; // Matches tool_start
  result: unknown; // Tool return value
  success: boolean; // Whether execution succeeded
  error?: string; // Error message if failed
  agentId: string;
  agentType?: string;
  timestamp: number;
}

Example:

json
{
  "type": "tool_end",
  "toolCallId": "call_xyz789",
  "result": { "results": ["Tutorial 1", "Tutorial 2"] },
  "success": true,
  "agentId": "run-abc123",
  "timestamp": 1702329601000
}

SubAgentStartChunk

Emitted when a sub-agent begins execution.

typescript
interface SubAgentStartChunk {
  type: 'subagent_start';
  subAgentId: string; // Sub-agent run ID
  agentType: string; // Sub-agent type
  input: unknown; // Input passed to sub-agent
  parentAgentId: string; // Parent's run ID
  timestamp: number;
}

SubAgentEndChunk

Emitted when a sub-agent completes.

typescript
interface SubAgentEndChunk {
  type: 'subagent_end';
  subAgentId: string; // Matches subagent_start
  agentType: string;
  result: unknown; // Sub-agent output
  success: boolean;
  error?: string;
  parentAgentId: string;
  timestamp: number;
}

CustomEventChunk

Application-specific events from tools.

typescript
interface CustomEventChunk {
  type: 'custom';
  eventName: string; // Custom event name
  data: unknown; // Event payload
  agentId: string;
  agentType?: string;
  timestamp: number;
}

Example:

json
{
  "type": "custom",
  "eventName": "search_progress",
  "data": { "processed": 50, "total": 100 },
  "agentId": "run-abc123",
  "timestamp": 1702329600500
}

StatePatchChunk

RFC 6902 JSON Patches for state updates.

typescript
interface StatePatchChunk {
  type: 'state_patch';
  patches: JSONPatchOperation[]; // RFC 6902 operations
  agentId: string;
  agentType?: string;
  timestamp: number;
}

interface JSONPatchOperation {
  op: 'add' | 'remove' | 'replace';
  path: string; // JSON Pointer path
  value?: unknown; // For add/replace
}

Example:

json
{
  "type": "state_patch",
  "patches": [
    { "op": "add", "path": "/notes/-", "value": { "content": "Note 1" } },
    { "op": "replace", "path": "/searchCount", "value": 5 }
  ],
  "agentId": "run-abc123",
  "timestamp": 1702329600000
}

ErrorChunk

Error events during execution.

typescript
interface ErrorChunk {
  type: 'error';
  error: string; // Error message
  recoverable: boolean; // True if execution continues
  agentId: string;
  agentType?: string;
  timestamp: number;
}

OutputChunk

Final structured output.

typescript
interface OutputChunk {
  type: 'output';
  output: unknown; // The structured output
  agentId: string;
  agentType?: string;
  timestamp: number;
}

Stream Lifecycle

Creation

typescript
const writer = await streamManager.createWriter(streamId, agentId, agentType);

Writing

typescript
await writer.write({
  type: 'text_delta',
  delta: 'Hello',
  agentId: runId,
  timestamp: Date.now(),
});

await writer.close();

Reading

typescript
const reader = await streamManager.createReader(streamId);

for await (const chunk of reader) {
  // Process chunk
}

Ending

typescript
// Normal completion
await streamManager.endStream(streamId);
await streamManager.endStream(streamId, output); // With final output

// With error
await streamManager.failStream(streamId, 'Something went wrong');

Stream End Sentinel

A special marker indicates stream completion:

typescript
const STREAM_END_SENTINEL = Symbol.for('helix.stream.end');

interface StreamEndSentinel {
  __streamEnd: true;
  status: 'ended' | 'failed';
  error?: string;
}

Readers yield chunks until they encounter the sentinel.

Resumability

Sequence Numbers

Each chunk can have a sequence number for resumability:

typescript
interface StoredChunk {
  chunk: StreamChunk;
  sequence: number; // Monotonically increasing
}

Resumable Reader

typescript
const reader = await streamManager.createResumableReader(streamId, {
  fromSequence: 100, // Resume from sequence 100
});

for await (const { chunk, sequence } of reader) {
  // sequence can be used as Last-Event-ID
}

Stream Status

typescript
type ResumableStreamStatus = 'active' | 'ended' | 'failed';

interface ResumableStreamReader {
  status: ResumableStreamStatus;
  [Symbol.asyncIterator](): AsyncIterator<{ chunk: StreamChunk; sequence: number }>;
}

Wire Format

For transport over HTTP (SSE), events are serialized:

typescript
interface StreamEvent {
  type: 'chunk' | 'end' | 'fail';
  // ... payload based on type
}

interface StreamChunkEvent {
  type: 'chunk';
  chunk: StreamChunk;
  sequence?: number;
}

interface StreamEndEvent {
  type: 'end';
}

interface StreamFailEvent {
  type: 'fail';
  error: string;
}

SSE Encoding

id: 1
data: {"type":"chunk","chunk":{"type":"text_delta","content":"Hello"},"sequence":1}

id: 2
data: {"type":"chunk","chunk":{"type":"text_delta","content":" world"},"sequence":2}

id: 3
data: {"type":"end"}

Filtering Streams

Utility functions for stream manipulation:

typescript
import {
  filterByAgentId,
  filterByAgentType,
  filterByType,
  excludeTypes,
  filterWith,
  combineStreams,
  take,
  skip,
  collectText,
  collectAll,
} from '@helix-agents/core';

// Filter by agent
const agentChunks = filterByAgentId(stream, 'run-123');

// Only text chunks
const textChunks = filterByType(stream, ['text_delta']);

// Exclude thinking
const noThinking = excludeTypes(stream, ['thinking']);

// Custom filter
const important = filterWith(stream, (chunk) => chunk.type === 'error' || chunk.type === 'output');

// Combine multiple streams
const combined = combineStreams([stream1, stream2]);

// Collect all text
const fullText = await collectText(stream);

Type Guards

Runtime type checking for chunks:

typescript
import {
  isTextDeltaChunk,
  isThinkingChunk,
  isToolStartChunk,
  isToolEndChunk,
  isSubAgentStartChunk,
  isSubAgentEndChunk,
  isCustomEventChunk,
  isStatePatchChunk,
  isErrorChunk,
  isOutputChunk,
  isStreamEnd,
} from '@helix-agents/core';

for await (const chunk of stream) {
  if (isTextDeltaChunk(chunk)) {
    process.stdout.write(chunk.delta);
  } else if (isToolStartChunk(chunk)) {
    console.log(`Tool: ${chunk.toolName}`);
  }
}

Validation

Zod schemas for runtime validation:

typescript
import { StreamChunkSchema, StreamMessageSchema } from '@helix-agents/core';

// Validate a chunk
const result = StreamChunkSchema.safeParse(data);
if (result.success) {
  const chunk: StreamChunk = result.data;
}

Implementation Notes

Memory Streams

In-memory streams use arrays and async iterators:

typescript
class InMemoryStreamManager {
  private streams = new Map<
    string,
    {
      chunks: StreamChunk[];
      readers: Set<() => void>;
      status: 'active' | 'ended' | 'failed';
    }
  >();
}

Redis Streams

Redis implementation uses Redis Streams (XADD/XREAD):

typescript
// Writing
await redis.xadd(
  `stream:${streamId}`,
  '*',
  'chunk',
  JSON.stringify(chunk),
  'sequence',
  sequence.toString()
);

// Reading with blocking
const entries = await redis.xread('BLOCK', 5000, 'STREAMS', `stream:${streamId}`, lastId);

Durable Object Streams

Cloudflare Durable Objects provide coordination:

typescript
class StreamServer {
  private chunks: StreamChunk[] = [];
  private waiters: Set<(chunk: StreamChunk) => void> = new Set();

  async write(chunk: StreamChunk) {
    this.chunks.push(chunk);
    for (const waiter of this.waiters) {
      waiter(chunk);
    }
  }
}

See Also

Released under the MIT License.