Skip to content

Storage Overview

Storage in Helix Agents consists of two components: State Stores for persisting agent state, and Stream Managers for real-time event streaming. Both are defined as interfaces, enabling different implementations for different environments.

Why Separate Components?

State and streaming have different requirements:

ConcernState StoreStream Manager
PurposePersist agent stateReal-time event delivery
Access PatternRead-modify-writeAppend-only, subscribe
ConsistencyStrong (atomic updates)Eventual (ordering matters)
RetentionUntil explicit deleteTemporary (TTL-based)

By separating them, you can:

  • Use in-memory streams with Redis state
  • Scale streaming infrastructure independently
  • Apply different retention policies

Session-Centric Model

All state store operations use sessionId as the primary key. This is a fundamental design principle:

  • Messages are stored per session, not duplicated per run
  • Runs are lightweight execution metadata within a session
  • Checkpoints reference message boundaries by count, not by copying messages

This design enables efficient storage (O(n) messages vs O(n²) for run-centric models) and natural conversation continuity. See Concepts: Session vs Run for details.

Available Implementations

PackageState StoreStream ManagerUse Case
@helix-agents/store-memoryInMemoryStateStoreInMemoryStreamManagerDevelopment, testing
@helix-agents/store-redisRedisStateStoreRedisStreamManagerProduction (self-hosted)
@helix-agents/store-postgresPostgresStateStore(any)Production (Postgres-backed)
@helix-agents/store-cloudflareD1StateStoreDOStreamManagerCloudflare Workers (Workflows path)
@helix-agents/runtime-cloudflare (DO stores)DOStateStoreDOStreamManagerCloudflare DO runtime

State Store Interface

The SessionStateStore interface defines how agent state is persisted. sessionId is the primary key for all operations:

typescript
interface SessionStateStore {
  // Session lifecycle
  createSession(sessionId: string, options?: CreateSessionOptions): Promise<void>;
  sessionExists(sessionId: string): Promise<boolean>;
  deleteSession(sessionId: string): Promise<void>;

  // State operations
  loadState(sessionId: string): Promise<SessionState | null>;
  saveState(sessionId: string, state: SessionState): Promise<void>;

  // v7: Atomic write — appendMessages + saveState + promoteStaging in one shot
  saveStateAndPromoteStaging<TState, TOutput>(
    sessionId: string,
    state: SessionState<TState, TOutput>,
    appendMessages: Message[],
    checkpointMeta: { stepId: string; stepCount: number; streamSequence: number },
    options?: { expectedVersion?: number }
  ): Promise<{ checkpointId: string; newVersion: number }>;

  // Atomic operations (for parallel tool execution)
  appendMessages(sessionId: string, messages: Message[]): Promise<void>;
  mergeCustomState(sessionId: string, writes: StepWrites): Promise<{ warnings: string[] }>;
  updateStatus(sessionId: string, status: SessionStatus): Promise<void>;
  incrementStepCount(sessionId: string): Promise<number>;

  // v7: discriminated-union return for richer conflict handling
  compareAndSetStatus(
    sessionId: string,
    expectedStatuses: SessionStatus[],
    newStatus: SessionStatus,
    options?: { interruptContext?: InterruptContext; error?: string; expectedVersion?: number }
  ): Promise<
    | { ok: true; newVersion: number }
    | { ok: false; currentStatus: SessionStatus; currentVersion: number }
  >;

  // Durable interrupt flags (all v7 stores)
  setInterruptFlag(sessionId: string, reason?: string): Promise<void>;
  checkInterruptFlag(sessionId: string): Promise<{ reason?: string } | null>;
  clearInterruptFlag(sessionId: string): Promise<void>;

  // Sub-agent tracking
  addSubSessionRefs(sessionId: string, refs: SubSessionRef[]): Promise<void>;
  updateSubSessionRef(sessionId: string, update: SubSessionRefUpdate): Promise<void>;

  // Message queries (for UI/recovery)
  getMessages(sessionId: string, options?: GetMessagesOptions): Promise<PaginatedMessages>;
  getMessageCount(sessionId: string): Promise<number>;

  // Cross-session listing (admin / cleanup)
  listSessions(options?: ListSessionsOptions): Promise<PaginatedSessions>;
}

Operator: expiredSessionCleanup

@helix-agents/agent-server exports expiredSessionCleanup(stateStore, options), an operator-driven sweep that uses SessionState.expiresAt to garbage-collect abandoned sessions (e.g., HITL waits where the user never returned). Run on a schedule (cron, Cloudflare cron trigger) to reclaim storage. All five v7 stores (memory, Redis, postgres, D1, DO) honor options.error so reaped sessions are recorded with a consistent failure reason.

Atomic Operations

The atomic operations enable safe concurrent modifications from parallel tool execution:

typescript
// Multiple tools can safely append messages
await Promise.all([
  stateStore.appendMessages(sessionId, [toolResult1]),
  stateStore.appendMessages(sessionId, [toolResult2]),
]);

// Multiple tools can safely update different state keys
await Promise.all([
  stateStore.mergeCustomState(sessionId, {
    ops: [{ kind: 'replace', key: 'key1', value: 'value1' }],
    warnings: [],
  }),
  stateStore.mergeCustomState(sessionId, {
    ops: [{ kind: 'replace', key: 'key2', value: 'value2' }],
    warnings: [],
  }),
]);

State Merge Semantics

The mergeCustomState operation applies a StepWrites op-list — the canonical write representation produced by ImmerStateTracker.getStepWrites():

typescript
// Array delta mode: new items are appended using 'append' ops
// Tool 1: draft.items.push('a') → op { kind: 'append', key: 'items', items: ['a'] }
// Tool 2: draft.items.push('b') → op { kind: 'append', key: 'items', items: ['b'] }
// Result: items = [...original, 'a', 'b']

// Scalar values: last write wins ('replace' op)
// Tool 1: draft.count = 5  → op { kind: 'replace', key: 'count', value: 5 }
// Tool 2: draft.count = 10 → op { kind: 'replace', key: 'count', value: 10 }
// Result: count = 5 or 10 (depends on timing)

// Explicit delete: 'delete' op (not null assignment)
// { kind: 'delete', key: 'temp' }
// Result: temp key is removed

Run Tracking

The state store also tracks execution runs within sessions. A run represents a single execution (via execute() or resume()), while a session represents the overall conversation:

typescript
interface SessionStateStore {
  // ... other methods ...

  // Run tracking
  createRun(sessionId: string, runId: string, metadata: RunMetadata): Promise<void>;
  updateRunStatus(runId: string, status: RunStatus, updates?: RunStatusUpdate): Promise<void>;
  getCurrentRun(sessionId: string): Promise<RunRecord | null>;
  listRuns(sessionId: string): Promise<RunRecord[]>;
}

When runs are created:

  • execute() creates a new run with turn: 1
  • resume() creates a new run with incremented turn number

Run status values:

  • running - Execution in progress
  • completed - Finished successfully with output
  • failed - Terminated with error
  • interrupted - User-requested pause (can be resumed)

Example: Querying run history:

typescript
// Get the current/most recent run
const currentRun = await stateStore.getCurrentRun(sessionId);
console.log(currentRun?.status); // 'running' | 'completed' | 'failed' | 'interrupted'

// List all runs for a session (useful for debugging/auditing)
const runs = await stateStore.listRuns(sessionId);
for (const run of runs) {
  console.log(`Turn ${run.turn}: ${run.status} (${run.stepCount} steps)`);
}

Stream Manager Interface

The StreamManager interface defines how events are streamed:

typescript
interface StreamManager {
  // Writer lifecycle
  createWriter(streamId: string, runId: string, agentType: string): Promise<StreamWriter>;

  // Reader lifecycle
  createReader(streamId: string): Promise<StreamReader | null>;

  // Stream control
  endStream(streamId: string, finalOutput?: unknown): Promise<void>;
  failStream(streamId: string, error: string): Promise<void>;

  // Optional: Resumable streams
  createResumableReader?(
    streamId: string,
    options?: ReaderOptions
  ): Promise<ResumableStreamReader | null>;
  getStreamInfo?(streamId: string): Promise<StreamInfo | null>;
  pauseStream?(streamId: string): Promise<void>;
  resumeStream?(streamId: string): Promise<void>;

  // Optional: Content reconstruction (for mid-stream refresh)
  getAllChunks?(streamId: string): Promise<StreamChunk[]>;
  getChunksFromStep?(streamId: string, fromStep: number): Promise<StreamChunk[]>;
}

Resumable Stream Features

Different implementations support different resumability features:

FeatureInMemoryRedisCloudflare DO
createResumableReaderNoYesYes
getStreamInfoYesYesYes
getAllChunksYesYesYes
getChunksFromStepYesYesYes
pauseStream / resumeStreamNoYesYes
Sequence trackingNoYesYes
Multi-process supportNoYesYes
maxChunks LTRIM capNoYesn/a (DO SQLite)

Key methods for resumability:

  • createResumableReader: Creates a reader that tracks position with sequence numbers. Use fromSequence option to resume from a specific position.
  • getStreamInfo: Returns stream status, total chunks, and latest sequence number. Use to check if a stream is still active before connecting.
  • getAllChunks: Returns all chunks in the stream. Used for full content reconstruction on page refresh.
  • getChunksFromStep: Returns chunks from a specific step onwards. More efficient than getAllChunks when checkpoint data is available.

For comprehensive resumability documentation, see Resumable Streams Guide.

Writer/Reader Pattern

Writers emit chunks, readers consume them:

typescript
// Create writer (usually done by runtime)
const writer = await streamManager.createWriter(streamId, agentId, agentType);
await writer.write({ type: 'text_delta', delta: 'Hello', ... });
await writer.close();  // Close writer, NOT the stream

// Create reader (usually done by client/UI)
const reader = await streamManager.createReader(streamId);
if (reader) {
  try {
    for await (const chunk of reader) {
      console.log(chunk);
    }
  } finally {
    await reader.close();  // Always close reader
  }
}

Stream Lifecycle

mermaid
graph TB
    Created["Stream Created (first writer)"]

    Created --> Writers["Writers emit chunks"]
    Writers --> Readers["Readers receive in real-time"]

    Created --> End["endStream() called"]
    End --> Complete["Readers complete normally"]
    Complete --> Historical["New readers get historical chunks"]

    Created --> Fail["OR failStream() called"]
    Fail --> Error["Readers receive error"]
    Error --> Null["New readers get null"]

Choosing Storage

Development

Use in-memory stores for fast iteration:

typescript
import { InMemoryStateStore, InMemoryStreamManager } from '@helix-agents/store-memory';

const stateStore = new InMemoryStateStore();
const streamManager = new InMemoryStreamManager();

Pros: Zero setup, fast, easy debugging Cons: Data lost on restart, single process only

Production (Self-Hosted)

Use Redis for durability and multi-process:

typescript
import { RedisStateStore, RedisStreamManager } from '@helix-agents/store-redis';
import Redis from 'ioredis';

const redis = new Redis(process.env.REDIS_URL);
const stateStore = new RedisStateStore(redis);
const streamManager = new RedisStreamManager(redis);

Pros: Persistent, multi-process, production-ready Cons: Requires Redis infrastructure

Production (Cloudflare)

Use D1 and Durable Objects for edge deployment:

typescript
import { D1StateStore, DOStreamManager } from '@helix-agents/store-cloudflare';

const stateStore = new D1StateStore(env.DB);
const streamManager = new DOStreamManager(env.STREAM_MANAGER);

Pros: Global edge, serverless, integrated with Cloudflare Cons: Cloudflare-specific, different API from other stores

Swapping Stores

Because stores implement standard interfaces, swapping is straightforward:

typescript
// Environment-based selection
const createStores = () => {
  if (process.env.NODE_ENV === 'production') {
    const redis = new Redis(process.env.REDIS_URL);
    return {
      stateStore: new RedisStateStore(redis),
      streamManager: new RedisStreamManager(redis),
    };
  }

  return {
    stateStore: new InMemoryStateStore(),
    streamManager: new InMemoryStreamManager(),
  };
};

const { stateStore, streamManager } = createStores();
const executor = new JSAgentExecutor(stateStore, streamManager, llmAdapter);

State Data Model

Session state includes:

typescript
interface SessionState<TState, TOutput> {
  // Identity
  sessionId: string; // Primary key for all operations
  agentType: string; // Agent type name
  streamId?: string; // Stream identifier (defaults to sessionId)

  // Hierarchy
  parentSessionId?: string; // Parent if this is a sub-agent
  rootSessionId?: string; // Root pointer for cross-session ownership

  // Execution state
  customState: TState; // Your custom state schema
  stepCount: number; // LLM call count
  status: SessionStatus; // 'active' | 'completed' | 'failed' | 'interrupted' | 'paused' | 'suspended_*'

  // Output / failure
  output?: TOutput; // Final structured output
  error?: string;
  failureReason?: string; // v7: cascade discriminator (e.g., 'parent_suspended')

  // v7: Stateless suspension fields
  suspendedAwaitingChildren?: Record<string, SuspendedChildWait>;
  suspendedStepId?: string;
  pendingClientToolCalls?: Record<string, PendingClientToolCall>;
  completedClientToolCalls?: Record<string, number>; // root-only tombstones
  clientToolCallOwnership?: ClientToolCallOwnership; // root-only
  interruptContext?: InterruptContext;
  interruptFlags?: { reason?: string; setAt: number }; // durable interrupt flag
  tracingContext?: { traceId: string; rootSpanId: string };
  expiresAt?: number; // operator GC

  // User context
  userId?: string;
  tags?: string[];
  metadata?: Record<string, string>;

  // Versioning (for optimistic concurrency)
  version: number;
  resumeCount: number;

  // Timestamps
  createdAt: number;
  updatedAt: number;
}

The fields tagged "v7" are persisted across all v7 stores. On D1 / DO they pack into a single suspension_context TEXT column (V9 / V5 respectively); on Redis they are JSON-encoded fields on the existing __agents:state:{sessionId} hash; on Postgres they are JSONB columns. See packages/core/src/types/session.ts for the canonical type definition.

Note: Messages are stored separately in the state store (not embedded in state) for O(1) append operations.

Querying Across Sessions

State stores, usage stores, and memory stores support cross-session queries for admin dashboards and analytics:

  • listSessions() on SessionStateStore — browse and filter sessions by user, agent type, status, tags, metadata, and date range
  • queryUsage() on UsageStore — query usage entries across sessions for cost tracking and analytics
  • listEntities() on MemoryStore — list distinct entity/source pairs with memory counts

All methods return paginated results with { total, offset, limit, hasMore }.

See Querying Guide for complete documentation and examples.

Next Steps

Released under the MIT License.