Storage Overview
Storage in Helix Agents consists of two components: State Stores for persisting agent state, and Stream Managers for real-time event streaming. Both are defined as interfaces, enabling different implementations for different environments.
Why Separate Components?
State and streaming have different requirements:
| Concern | State Store | Stream Manager |
|---|---|---|
| Purpose | Persist agent state | Real-time event delivery |
| Access Pattern | Read-modify-write | Append-only, subscribe |
| Consistency | Strong (atomic updates) | Eventual (ordering matters) |
| Retention | Until explicit delete | Temporary (TTL-based) |
By separating them, you can:
- Use in-memory streams with Redis state
- Scale streaming infrastructure independently
- Apply different retention policies
Session-Centric Model
All state store operations use sessionId as the primary key. This is a fundamental design principle:
- Messages are stored per session, not duplicated per run
- Runs are lightweight execution metadata within a session
- Checkpoints reference message boundaries by count, not by copying messages
This design enables efficient storage (O(n) messages vs O(n²) for run-centric models) and natural conversation continuity. See Concepts: Session vs Run for details.
Available Implementations
| Package | State Store | Stream Manager | Use Case |
|---|---|---|---|
@helix-agents/store-memory | InMemoryStateStore | InMemoryStreamManager | Development, testing |
@helix-agents/store-redis | RedisStateStore | RedisStreamManager | Production (self-hosted) |
@helix-agents/store-postgres | PostgresStateStore | (any) | Production (Postgres-backed) |
@helix-agents/store-cloudflare | D1StateStore | DOStreamManager | Cloudflare Workers (Workflows path) |
@helix-agents/runtime-cloudflare (DO stores) | DOStateStore | DOStreamManager | Cloudflare DO runtime |
State Store Interface
The SessionStateStore interface defines how agent state is persisted. sessionId is the primary key for all operations:
interface SessionStateStore {
// Session lifecycle
createSession(sessionId: string, options?: CreateSessionOptions): Promise<void>;
sessionExists(sessionId: string): Promise<boolean>;
deleteSession(sessionId: string): Promise<void>;
// State operations
loadState(sessionId: string): Promise<SessionState | null>;
saveState(sessionId: string, state: SessionState): Promise<void>;
// v7: Atomic write — appendMessages + saveState + promoteStaging in one shot
saveStateAndPromoteStaging<TState, TOutput>(
sessionId: string,
state: SessionState<TState, TOutput>,
appendMessages: Message[],
checkpointMeta: { stepId: string; stepCount: number; streamSequence: number },
options?: { expectedVersion?: number }
): Promise<{ checkpointId: string; newVersion: number }>;
// Atomic operations (for parallel tool execution)
appendMessages(sessionId: string, messages: Message[]): Promise<void>;
mergeCustomState(sessionId: string, writes: StepWrites): Promise<{ warnings: string[] }>;
updateStatus(sessionId: string, status: SessionStatus): Promise<void>;
incrementStepCount(sessionId: string): Promise<number>;
// v7: discriminated-union return for richer conflict handling
compareAndSetStatus(
sessionId: string,
expectedStatuses: SessionStatus[],
newStatus: SessionStatus,
options?: { interruptContext?: InterruptContext; error?: string; expectedVersion?: number }
): Promise<
| { ok: true; newVersion: number }
| { ok: false; currentStatus: SessionStatus; currentVersion: number }
>;
// Durable interrupt flags (all v7 stores)
setInterruptFlag(sessionId: string, reason?: string): Promise<void>;
checkInterruptFlag(sessionId: string): Promise<{ reason?: string } | null>;
clearInterruptFlag(sessionId: string): Promise<void>;
// Sub-agent tracking
addSubSessionRefs(sessionId: string, refs: SubSessionRef[]): Promise<void>;
updateSubSessionRef(sessionId: string, update: SubSessionRefUpdate): Promise<void>;
// Message queries (for UI/recovery)
getMessages(sessionId: string, options?: GetMessagesOptions): Promise<PaginatedMessages>;
getMessageCount(sessionId: string): Promise<number>;
// Cross-session listing (admin / cleanup)
listSessions(options?: ListSessionsOptions): Promise<PaginatedSessions>;
}Operator: expiredSessionCleanup
@helix-agents/agent-server exports expiredSessionCleanup(stateStore, options), an operator-driven sweep that uses SessionState.expiresAt to garbage-collect abandoned sessions (e.g., HITL waits where the user never returned). Run on a schedule (cron, Cloudflare cron trigger) to reclaim storage. All five v7 stores (memory, Redis, postgres, D1, DO) honor options.error so reaped sessions are recorded with a consistent failure reason.
Atomic Operations
The atomic operations enable safe concurrent modifications from parallel tool execution:
// Multiple tools can safely append messages
await Promise.all([
stateStore.appendMessages(sessionId, [toolResult1]),
stateStore.appendMessages(sessionId, [toolResult2]),
]);
// Multiple tools can safely update different state keys
await Promise.all([
stateStore.mergeCustomState(sessionId, {
ops: [{ kind: 'replace', key: 'key1', value: 'value1' }],
warnings: [],
}),
stateStore.mergeCustomState(sessionId, {
ops: [{ kind: 'replace', key: 'key2', value: 'value2' }],
warnings: [],
}),
]);State Merge Semantics
The mergeCustomState operation applies a StepWrites op-list — the canonical write representation produced by ImmerStateTracker.getStepWrites():
// Array delta mode: new items are appended using 'append' ops
// Tool 1: draft.items.push('a') → op { kind: 'append', key: 'items', items: ['a'] }
// Tool 2: draft.items.push('b') → op { kind: 'append', key: 'items', items: ['b'] }
// Result: items = [...original, 'a', 'b']
// Scalar values: last write wins ('replace' op)
// Tool 1: draft.count = 5 → op { kind: 'replace', key: 'count', value: 5 }
// Tool 2: draft.count = 10 → op { kind: 'replace', key: 'count', value: 10 }
// Result: count = 5 or 10 (depends on timing)
// Explicit delete: 'delete' op (not null assignment)
// { kind: 'delete', key: 'temp' }
// Result: temp key is removedRun Tracking
The state store also tracks execution runs within sessions. A run represents a single execution (via execute() or resume()), while a session represents the overall conversation:
interface SessionStateStore {
// ... other methods ...
// Run tracking
createRun(sessionId: string, runId: string, metadata: RunMetadata): Promise<void>;
updateRunStatus(runId: string, status: RunStatus, updates?: RunStatusUpdate): Promise<void>;
getCurrentRun(sessionId: string): Promise<RunRecord | null>;
listRuns(sessionId: string): Promise<RunRecord[]>;
}When runs are created:
execute()creates a new run withturn: 1resume()creates a new run with incremented turn number
Run status values:
running- Execution in progresscompleted- Finished successfully with outputfailed- Terminated with errorinterrupted- User-requested pause (can be resumed)
Example: Querying run history:
// Get the current/most recent run
const currentRun = await stateStore.getCurrentRun(sessionId);
console.log(currentRun?.status); // 'running' | 'completed' | 'failed' | 'interrupted'
// List all runs for a session (useful for debugging/auditing)
const runs = await stateStore.listRuns(sessionId);
for (const run of runs) {
console.log(`Turn ${run.turn}: ${run.status} (${run.stepCount} steps)`);
}Stream Manager Interface
The StreamManager interface defines how events are streamed:
interface StreamManager {
// Writer lifecycle
createWriter(streamId: string, runId: string, agentType: string): Promise<StreamWriter>;
// Reader lifecycle
createReader(streamId: string): Promise<StreamReader | null>;
// Stream control
endStream(streamId: string, finalOutput?: unknown): Promise<void>;
failStream(streamId: string, error: string): Promise<void>;
// Optional: Resumable streams
createResumableReader?(
streamId: string,
options?: ReaderOptions
): Promise<ResumableStreamReader | null>;
getStreamInfo?(streamId: string): Promise<StreamInfo | null>;
pauseStream?(streamId: string): Promise<void>;
resumeStream?(streamId: string): Promise<void>;
// Optional: Content reconstruction (for mid-stream refresh)
getAllChunks?(streamId: string): Promise<StreamChunk[]>;
getChunksFromStep?(streamId: string, fromStep: number): Promise<StreamChunk[]>;
}Resumable Stream Features
Different implementations support different resumability features:
| Feature | InMemory | Redis | Cloudflare DO |
|---|---|---|---|
createResumableReader | No | Yes | Yes |
getStreamInfo | Yes | Yes | Yes |
getAllChunks | Yes | Yes | Yes |
getChunksFromStep | Yes | Yes | Yes |
pauseStream / resumeStream | No | Yes | Yes |
| Sequence tracking | No | Yes | Yes |
| Multi-process support | No | Yes | Yes |
maxChunks LTRIM cap | No | Yes | n/a (DO SQLite) |
Key methods for resumability:
createResumableReader: Creates a reader that tracks position with sequence numbers. UsefromSequenceoption to resume from a specific position.getStreamInfo: Returns stream status, total chunks, and latest sequence number. Use to check if a stream is still active before connecting.getAllChunks: Returns all chunks in the stream. Used for full content reconstruction on page refresh.getChunksFromStep: Returns chunks from a specific step onwards. More efficient thangetAllChunkswhen checkpoint data is available.
For comprehensive resumability documentation, see Resumable Streams Guide.
Writer/Reader Pattern
Writers emit chunks, readers consume them:
// Create writer (usually done by runtime)
const writer = await streamManager.createWriter(streamId, agentId, agentType);
await writer.write({ type: 'text_delta', delta: 'Hello', ... });
await writer.close(); // Close writer, NOT the stream
// Create reader (usually done by client/UI)
const reader = await streamManager.createReader(streamId);
if (reader) {
try {
for await (const chunk of reader) {
console.log(chunk);
}
} finally {
await reader.close(); // Always close reader
}
}Stream Lifecycle
graph TB
Created["Stream Created (first writer)"]
Created --> Writers["Writers emit chunks"]
Writers --> Readers["Readers receive in real-time"]
Created --> End["endStream() called"]
End --> Complete["Readers complete normally"]
Complete --> Historical["New readers get historical chunks"]
Created --> Fail["OR failStream() called"]
Fail --> Error["Readers receive error"]
Error --> Null["New readers get null"]Choosing Storage
Development
Use in-memory stores for fast iteration:
import { InMemoryStateStore, InMemoryStreamManager } from '@helix-agents/store-memory';
const stateStore = new InMemoryStateStore();
const streamManager = new InMemoryStreamManager();Pros: Zero setup, fast, easy debugging Cons: Data lost on restart, single process only
Production (Self-Hosted)
Use Redis for durability and multi-process:
import { RedisStateStore, RedisStreamManager } from '@helix-agents/store-redis';
import Redis from 'ioredis';
const redis = new Redis(process.env.REDIS_URL);
const stateStore = new RedisStateStore(redis);
const streamManager = new RedisStreamManager(redis);Pros: Persistent, multi-process, production-ready Cons: Requires Redis infrastructure
Production (Cloudflare)
Use D1 and Durable Objects for edge deployment:
import { D1StateStore, DOStreamManager } from '@helix-agents/store-cloudflare';
const stateStore = new D1StateStore(env.DB);
const streamManager = new DOStreamManager(env.STREAM_MANAGER);Pros: Global edge, serverless, integrated with Cloudflare Cons: Cloudflare-specific, different API from other stores
Swapping Stores
Because stores implement standard interfaces, swapping is straightforward:
// Environment-based selection
const createStores = () => {
if (process.env.NODE_ENV === 'production') {
const redis = new Redis(process.env.REDIS_URL);
return {
stateStore: new RedisStateStore(redis),
streamManager: new RedisStreamManager(redis),
};
}
return {
stateStore: new InMemoryStateStore(),
streamManager: new InMemoryStreamManager(),
};
};
const { stateStore, streamManager } = createStores();
const executor = new JSAgentExecutor(stateStore, streamManager, llmAdapter);State Data Model
Session state includes:
interface SessionState<TState, TOutput> {
// Identity
sessionId: string; // Primary key for all operations
agentType: string; // Agent type name
streamId?: string; // Stream identifier (defaults to sessionId)
// Hierarchy
parentSessionId?: string; // Parent if this is a sub-agent
rootSessionId?: string; // Root pointer for cross-session ownership
// Execution state
customState: TState; // Your custom state schema
stepCount: number; // LLM call count
status: SessionStatus; // 'active' | 'completed' | 'failed' | 'interrupted' | 'paused' | 'suspended_*'
// Output / failure
output?: TOutput; // Final structured output
error?: string;
failureReason?: string; // v7: cascade discriminator (e.g., 'parent_suspended')
// v7: Stateless suspension fields
suspendedAwaitingChildren?: Record<string, SuspendedChildWait>;
suspendedStepId?: string;
pendingClientToolCalls?: Record<string, PendingClientToolCall>;
completedClientToolCalls?: Record<string, number>; // root-only tombstones
clientToolCallOwnership?: ClientToolCallOwnership; // root-only
interruptContext?: InterruptContext;
interruptFlags?: { reason?: string; setAt: number }; // durable interrupt flag
tracingContext?: { traceId: string; rootSpanId: string };
expiresAt?: number; // operator GC
// User context
userId?: string;
tags?: string[];
metadata?: Record<string, string>;
// Versioning (for optimistic concurrency)
version: number;
resumeCount: number;
// Timestamps
createdAt: number;
updatedAt: number;
}The fields tagged "v7" are persisted across all v7 stores. On D1 / DO they pack into a single suspension_context TEXT column (V9 / V5 respectively); on Redis they are JSON-encoded fields on the existing __agents:state:{sessionId} hash; on Postgres they are JSONB columns. See packages/core/src/types/session.ts for the canonical type definition.
Note: Messages are stored separately in the state store (not embedded in state) for O(1) append operations.
Querying Across Sessions
State stores, usage stores, and memory stores support cross-session queries for admin dashboards and analytics:
listSessions()onSessionStateStore— browse and filter sessions by user, agent type, status, tags, metadata, and date rangequeryUsage()onUsageStore— query usage entries across sessions for cost tracking and analyticslistEntities()onMemoryStore— list distinct entity/source pairs with memory counts
All methods return paginated results with { total, offset, limit, hasMore }.
See Querying Guide for complete documentation and examples.
Next Steps
- In-Memory Storage - Development and testing
- Redis Storage - Production deployment
- Cloudflare Storage - Edge deployment