Redis Storage
The Redis storage package (@helix-agents/store-redis) provides production-ready state and stream storage using Redis. It supports persistence, multi-process deployments, and atomic operations for parallel tool execution.
When to Use
Good fit:
- Production deployments
- Multi-process/multi-server architectures
- Agents requiring crash recovery
- Real-time streaming across services
Not ideal for:
- Simple development (use in-memory)
- Serverless with cold starts (connection overhead)
- Cloudflare Workers (use D1/Durable Objects)
Installation
npm install @helix-agents/store-redis ioredisSetup
Basic Connection
import Redis from 'ioredis';
import { RedisStateStore, RedisStreamManager } from '@helix-agents/store-redis';
// Create Redis client
const redis = new Redis({
host: 'localhost',
port: 6379,
// password: 'your-password',
// tls: {}, // For Redis Cloud/AWS ElastiCache
});
// Create stores
const stateStore = new RedisStateStore(redis);
const streamManager = new RedisStreamManager(redis);With Connection URL
const redis = new Redis(process.env.REDIS_URL);
// e.g., redis://user:password@host:6379
const stateStore = new RedisStateStore(redis);
const streamManager = new RedisStreamManager(redis);With Options
const stateStore = new RedisStateStore(redis, {
prefix: 'myapp:agents:', // Key prefix (default: 'agent')
ttl: 86400 * 7, // State TTL in seconds (default: 7 days)
});
const streamManager = new RedisStreamManager(redis, {
prefix: 'myapp:streams:',
ttl: 86400, // Stream TTL (default: 24 hours)
maxChunks: 10000, // Max chunks per stream — applied via LTRIM (default: 10000, 0 = unlimited)
});RedisStateStore
Session vs Run Identifiers
sessionId: Primary key for all state operations. All Redis keys are scoped bysessionId.runId: Execution metadata identifying a specific run within a session.
The key structure uses sessionId as the primary scoping mechanism. Multiple runs within the same session share state.
Key Structure
The store uses a separated key structure for efficient atomic operations:
{prefix}:session:{sessionId} -> HASH (main state)
{prefix}:session:messages:{sessionId} -> LIST (messages)
{prefix}:session:subsession:ids:{sessionId} -> SET (sub-session IDs)
{prefix}:session:subsession:{sessionId}:{subId} -> HASH (sub-session details)
{prefix}:session:customstate:{sessionId} -> HASH (scalar custom state)
{prefix}:session:customstate:arraykeys:{sessionId} -> SET (array field names)
{prefix}:session:customstate:list:{sessionId}:{field} -> LIST (array fields)Basic Operations
// Create a session
await stateStore.createSession('session-123', { agentType: 'my-agent' });
// Load state (reconstitutes from multiple keys)
const state = await stateStore.loadState('session-123');
// Save state (updates existing session)
await stateStore.saveState('session-123', state);
// Delete session (removes all related keys)
await stateStore.deleteSession('session-123');Atomic Operations
These operations are safe for concurrent access:
// Append messages - uses RPUSH (atomic)
await stateStore.appendMessages('session-123', [{ role: 'assistant', content: 'Hello!' }]);
// Merge custom state - applies StepWrites ops atomically
await stateStore.mergeCustomState('run-123', {
ops: [
{ kind: 'replace', key: 'count', value: 5 }, // Scalar: HSET
{ kind: 'append', key: 'items', items: ['new'] }, // Array: RPUSH (append)
{ kind: 'replace', key: 'replaced', value: [1, 2] }, // Full replacement
],
warnings: [],
});
// Update status - uses HSET
await stateStore.updateStatus('run-123', 'completed');
// Increment step count - uses HINCRBY (atomic)
const newCount = await stateStore.incrementStepCount('run-123');Atomic Write (saveStateAndPromoteStaging)
The v7 atomic write primitive is implemented as a server-side Lua script (EVAL-loaded once, then EVALSHA-cached). The script executes the message append, state hash update, staging promotion, and checkpoint creation as a single atomic Redis operation — there is no client-side compensating-write window.
// Single round-trip, fully atomic on the server
await stateStore.saveStateAndPromoteStaging(
sessionId,
newState,
pendingMessages,
{ stepId: 'step-7', stepCount: 7, streamSequence: 142 },
{ expectedVersion: state.version }
);This satisfies cross-runtime invariant C-1: pendingClientToolCalls, clientToolCallOwnership, completed phase-1 messages, the checkpoint, and suspendedStepId all become visible together. Redis 6+ is required (already the documented minimum).
Sequential fallback escape hatch (allowSequentialFallback)
If your Redis deployment forbids EVAL/EVALSHA (some hardened managed offerings, certain proxy layers), pass allowSequentialFallback: true to the constructor:
const stateStore = new RedisStateStore(redisClient, {
keyPrefix: 'agent',
allowSequentialFallback: true, // ⚠️ NON-ATOMIC — see warning below
});When allowSequentialFallback is enabled, saveStateAndPromoteStaging falls back to a sequential pipeline of individual commands. This loses the atomicity guarantee — a process crash mid-pipeline can leave the session in a partially-updated state that violates invariant C-1.
WARNING: treat allowSequentialFallback: true as a debugging or hardware-constrained deployment escape hatch only. Production deployments SHOULD run a Redis instance that supports EVAL and leave this flag at its default (false).
Known atomicity gap: customState pipeline
RedisStateStore.saveState writes customState in a separate non-atomic pipeline AFTER the version-check Lua script returns. A process crash between the two leaves the session hash with the new version but customState partially updated. The next saveState's "clean up orphaned arrays" loop recovers from this within ~1 saveState window in active sessions (longer for paused sessions).
The proper fix (a Lua script that performs all customState mutations atomically) is tracked as FU-A2-41 in docs/dev/follow-ups.md.
v7 SessionState Fields
The following fields are persisted as JSON-encoded entries on the existing __agents:state:{sessionId} hash — no new keys are introduced and existing keys remain read-compatible (records that pre-date this version simply return undefined for the new fields):
suspendedAwaitingChildren— map ofparentToolCallId → SuspendedChildWaitsuspendedStepId— mid-step suspension markertracingContext—{ traceId, rootSpanId }for trace continuity across resumesexpiresAt— epoch-ms hint for operator GC viaexpiredSessionCleanupfailureReason— discriminator for failed children (e.g.'parent_suspended')completedClientToolCalls— root-session tombstone map for HITL idempotency
compareAndSetStatus Return Shape (v7)
The Lua-backed CAS now returns a discriminated union including the prior status and version on conflict, so callers can make richer retry decisions without an extra round-trip:
const result = await stateStore.compareAndSetStatus(sessionId, ['active'], 'paused', {
error: 'user paused',
expectedVersion: 7,
});
if (result.ok) {
// result.newVersion = post-bump version
} else {
// result.currentStatus, result.currentVersion = stored values at conflict
}Parallel Tool Execution
The key structure enables safe parallel updates:
// Two tools running in parallel
await Promise.all([
// Tool 1: Appends to items array
stateStore.mergeCustomState(runId, {
ops: [{ kind: 'append', key: 'items', items: ['from-tool-1'] }],
warnings: [],
}),
// Tool 2: Appends to items array
stateStore.mergeCustomState(runId, {
ops: [{ kind: 'append', key: 'items', items: ['from-tool-2'] }],
warnings: [],
}),
]);
// Result: items = [...original, 'from-tool-1', 'from-tool-2']
// (order may vary, but both appends are preserved)Message Queries
// Paginated messages (for UI)
const result = await stateStore.getMessages('run-123', {
offset: 0,
limit: 50,
includeThinking: false,
});
// Message count
const count = await stateStore.getMessageCount('run-123');Message Truncation
For crash recovery, messages can be truncated to match a checkpoint:
// Truncate to specific message count (removes orphaned messages)
await stateStore.truncateMessages('run-123', messageCount);This is used internally during crash recovery when resuming from a checkpoint. If a crash occurred mid-step, messages added after the checkpoint are orphaned and need to be removed. The messageCount parameter comes from the checkpoint's messageCount field.
RedisStreamManager
Key Structure
{prefix}:stream:{streamId} -> STREAM (Redis Stream for chunks)
{prefix}:stream:meta:{streamId} -> HASH (stream metadata)Basic Operations
// Create writer
const writer = await streamManager.createWriter('stream-123', 'run-123', 'researcher');
// Write chunks (uses XADD)
await writer.write({
type: 'text_delta',
agentId: 'run-123',
agentType: 'researcher',
timestamp: Date.now(),
delta: 'Hello world',
});
// Close writer (releases resources, doesn't end stream)
await writer.close();
// Create reader (uses XREAD with blocking)
const reader = await streamManager.createReader('stream-123');
if (reader) {
for await (const chunk of reader) {
console.log(chunk);
}
await reader.close();
}
// End stream
await streamManager.endStream('stream-123', { result: 'done' });
// Or fail stream
await streamManager.failStream('stream-123', 'Error message');Real-Time Streaming
Redis Streams provide real-time delivery:
// Writer (agent execution)
const writer = await streamManager.createWriter(streamId, agentId, agentType);
for (const token of tokens) {
await writer.write({ type: 'text_delta', delta: token, ... });
// Reader receives immediately
}
// Reader (UI/client) - receives chunks as they're written
const reader = await streamManager.createReader(streamId);
for await (const chunk of reader) {
updateUI(chunk); // Real-time updates
}Resumable Streams
Support for client reconnection:
// Get stream info
const info = await streamManager.getStreamInfo('stream-123');
console.log(info?.totalChunks);
console.log(info?.latestSequence);
console.log(info?.status);
// Create resumable reader
const reader = await streamManager.createResumableReader('stream-123', {
fromSequence: lastKnownSequence, // Resume from this point
});
if (reader) {
console.log(`Starting from sequence ${reader.currentSequence}`);
for await (const chunk of reader) {
// Process chunks, track position
savePosition(reader.currentSequence);
}
}Stream Cleanup
For crash recovery and stream reset scenarios:
// Clean up chunks beyond a specific step (atomic Lua script)
await streamManager.cleanupToStep('stream-123', stepCount);
// Reset entire stream (for fresh execution with same ID)
await streamManager.resetStream('stream-123');cleanupToStep() removes all chunks with step > stepCount. This is used during crash recovery to remove orphaned chunks that were written after the checkpoint being resumed from. The operation uses an atomic Lua script to ensure consistency.
resetStream() clears the entire stream. This is called by execute() when starting a fresh run within a session. It ensures clients don't see stale chunks from previous executions.
Stream TTL
Streams auto-expire to prevent unbounded growth:
const streamManager = new RedisStreamManager(redis, {
ttl: 3600, // 1 hour
});
// Stream keys expire after TTL
// Completed streams can be read until expiryComplete Example
import Redis from 'ioredis';
import { JSAgentExecutor } from '@helix-agents/runtime-js';
import { RedisStateStore, RedisStreamManager } from '@helix-agents/store-redis';
import { VercelAIAdapter } from '@helix-agents/llm-vercel';
// Setup
const redis = new Redis(process.env.REDIS_URL);
const stateStore = new RedisStateStore(redis, { prefix: 'myapp:' });
const streamManager = new RedisStreamManager(redis, { prefix: 'myapp:' });
const executor = new JSAgentExecutor(stateStore, streamManager, new VercelAIAdapter());
// Execute agent
const handle = await executor.execute(ResearchAgent, 'Research AI agents', {
sessionId: 'my-session-1',
});
// Stream in real-time
const stream = await handle.stream();
if (stream) {
for await (const chunk of stream) {
process.stdout.write(chunk.type === 'text_delta' ? chunk.delta : '');
}
}
// Get result
const result = await handle.result();
console.log('\\nResult:', result.output);
// Cleanup (optional)
await stateStore.deleteSession(handle.sessionId);
// Close connection when done
await redis.quit();Production Considerations
Connection Pooling
Use connection pooling for high-throughput:
const redis = new Redis({
host: 'localhost',
port: 6379,
maxRetriesPerRequest: 3,
enableReadyCheck: true,
connectTimeout: 10000,
});Cluster Support
For Redis Cluster:
import Redis from 'ioredis';
const redis = new Redis.Cluster([
{ host: 'node1', port: 6379 },
{ host: 'node2', port: 6379 },
{ host: 'node3', port: 6379 },
]);
const stateStore = new RedisStateStore(redis);
const streamManager = new RedisStreamManager(redis);Error Handling
Handle Redis connection errors:
redis.on('error', (err) => {
console.error('Redis error:', err);
});
redis.on('connect', () => {
console.log('Redis connected');
});
// Graceful shutdown
process.on('SIGTERM', async () => {
await redis.quit();
process.exit(0);
});Monitoring
Monitor Redis memory and connections:
# Memory usage
redis-cli INFO memory
# Connected clients
redis-cli INFO clients
# Key count
redis-cli DBSIZE
# Specific key patterns
redis-cli KEYS "myapp:state:*" | wc -lBackup and Recovery
Agent state can be recovered from Redis after crashes:
// After process restart
const handle = await executor.getHandle(MyAgent, savedSessionId);
if (handle) {
const { canResume, reason } = await handle.canResume();
if (canResume) {
// Resume from saved state
const newHandle = await handle.resume();
const result = await newHandle.result();
}
}Limitations
No Transactions Across Keys
Atomic operations work per-key, not across multiple keys:
// This is NOT atomic across both updates
await stateStore.updateStatus(sessionId, 'completed');
await stateStore.mergeCustomState(sessionId, {
ops: [{ kind: 'replace', key: 'finalResult', value: result }],
warnings: [],
});
// If process crashes between these, state may be inconsistentMemory Usage
Monitor Redis memory with many concurrent agents:
// Set maxmemory and policy
// redis.conf: maxmemory 1gb
// redis.conf: maxmemory-policy allkeys-lru
// Or use TTL aggressively
const stateStore = new RedisStateStore(redis, { ttl: 3600 });Network Latency
Each operation is a network round-trip. Batch when possible:
// Multiple sequential calls - multiple round-trips
await stateStore.appendMessages(sessionId, [msg1]);
await stateStore.appendMessages(sessionId, [msg2]);
// Single call - one round-trip
await stateStore.appendMessages(sessionId, [msg1, msg2]);Next Steps
- In-Memory Storage - For development
- Cloudflare Storage - For edge deployment
- Temporal Runtime - Uses Redis storage