@helix-agents/store-redis
Redis implementations of store interfaces for production use. Provides durable state storage and stream management across processes.
Installation
npm install @helix-agents/store-redis ioredisRedisStateStore
Redis-backed state storage.
import { RedisStateStore } from '@helix-agents/store-redis';
const stateStore = new RedisStateStore({
host: 'localhost',
port: 6379,
password: 'optional',
db: 0,
keyPrefix: 'helix:', // Optional prefix for keys
ttl: 86400 * 7, // Optional TTL in seconds (default: 7 days)
});Configuration Options
interface RedisStateStoreOptions {
// Connection options (ioredis compatible)
host?: string;
port?: number;
password?: string;
db?: number;
// Or provide existing client
client?: Redis;
// Key prefix for namespacing
keyPrefix?: string;
// TTL for state entries (seconds)
ttl?: number;
// Logger
logger?: Logger;
}Methods
Same interface as InMemoryStateStore:
await stateStore.saveState(sessionId, state);
const state = await stateStore.loadState(sessionId);
await stateStore.deleteSession(sessionId);
await stateStore.updateStatus(sessionId, status);
await stateStore.appendMessages(sessionId, messages);
const { messages, hasMore } = await stateStore.getMessages(sessionId, options);Checkpoint Methods
// Get a specific checkpoint
const checkpoint = await stateStore.getCheckpoint('session-123', 'cpv1-session-123-s5-...');
// Get most recent checkpoint
const latest = await stateStore.getLatestCheckpoint('session-123');
// List checkpoints with pagination
const result = await stateStore.listCheckpoints('session-123', {
limit: 10,
cursor: 'next-page-cursor',
});Staging Methods
// Stage changes for a tool call within a step
await stateStore.stageChanges('session-123', 'step-1', {
toolCallId: 'call-456',
patches: [{ op: 'add', path: '/notes/-', value: 'New finding' }],
mergeChanges: { notes: ['New finding'] },
timestamp: Date.now(),
});
// Commit staged changes
await stateStore.promoteStaging('session-123', 'step-1');
// Rollback staged changes
await stateStore.discardStaging('session-123', 'step-1');Distributed Coordination
// Atomic compare-and-set for status
const success = await stateStore.compareAndSetStatus(
'session-123',
['running'], // Expected statuses (array)
'interrupted' // New status
);
// Increment resume count (for tracking retries)
const count = await stateStore.incrementResumeCount('session-123');Data Structure
State is stored as Redis hashes:
helix:session:{sessionId} -> {
sessionId: string,
agentType: string,
status: string,
stepCount: number,
customState: JSON string,
messages: JSON string,
output: JSON string,
error: string,
...
}RedisStreamManager
Redis-backed stream management using Redis Streams.
import { RedisStreamManager } from '@helix-agents/store-redis';
const streamManager = new RedisStreamManager({
host: 'localhost',
port: 6379,
keyPrefix: 'helix:',
maxStreamLength: 10000, // Max chunks per stream
blockTimeout: 5000, // Read timeout (ms)
});Configuration Options
interface RedisStreamManagerOptions {
// Connection options
host?: string;
port?: number;
password?: string;
db?: number;
client?: Redis;
// Stream options
keyPrefix?: string;
maxStreamLength?: number; // Trim streams to this length
blockTimeout?: number; // XREAD BLOCK timeout
logger?: Logger;
}Methods
Same interface as InMemoryStreamManager:
const writer = await streamManager.createWriter(streamId, agentId, agentType);
const reader = await streamManager.createReader(streamId);
await streamManager.endStream(streamId);
await streamManager.failStream(streamId, error);
const info = await streamManager.getInfo(streamId);Resumable Reader
const reader = await streamManager.createResumableReader(streamId, {
fromSequence: 100,
});
if (reader) {
for await (const { chunk, sequence } of reader) {
// Process chunk
// sequence can be used for Last-Event-ID
}
}Data Structure
Uses Redis Streams:
helix:stream:{streamId} -> XADD entries with:
- chunk: JSON-encoded StreamChunk
- sequence: Auto-incrementing ID
helix:stream:{streamId}:meta -> {
status: 'active' | 'ended' | 'failed',
error?: string,
createdAt: ISO timestamp,
}RedisResumableStorage
Lower-level resumable stream storage.
import { RedisResumableStorage } from '@helix-agents/store-redis';
const storage = new RedisResumableStorage({
client: redisClient,
keyPrefix: 'helix:',
});
// Create stream
await storage.create(streamId);
// Append chunk with sequence
await storage.append(streamId, chunk, sequence);
// Read from sequence
const chunks = await storage.readFrom(streamId, fromSequence, limit);
// Subscribe to new chunks
const unsubscribe = storage.subscribe(streamId, (chunk, sequence) => {
console.log(`New chunk [${sequence}]:`, chunk);
});
// Get metadata
const metadata = await storage.getMetadata(streamId);
// End stream
await storage.end(streamId);Error Classes
import {
SequenceConflictError, // Sequence number conflict
MalformedChunkError, // Invalid chunk data
} from '@helix-agents/store-redis';
try {
await storage.append(streamId, chunk, sequence);
} catch (error) {
if (error instanceof SequenceConflictError) {
// Handle sequence conflict
}
}Usage Example
import { RedisStateStore, RedisStreamManager } from '@helix-agents/store-redis';
import { JSAgentExecutor } from '@helix-agents/runtime-js';
import { VercelAIAdapter } from '@helix-agents/llm-vercel';
// Create Redis-backed stores
const stateStore = new RedisStateStore({
host: process.env.REDIS_HOST,
port: parseInt(process.env.REDIS_PORT || '6379'),
password: process.env.REDIS_PASSWORD,
keyPrefix: 'myapp:agents:',
});
const streamManager = new RedisStreamManager({
host: process.env.REDIS_HOST,
port: parseInt(process.env.REDIS_PORT || '6379'),
password: process.env.REDIS_PASSWORD,
keyPrefix: 'myapp:agents:',
});
// Create executor
const executor = new JSAgentExecutor(stateStore, streamManager, new VercelAIAdapter());
// Execute agent
const handle = await executor.execute(MyAgent, 'Hello', { sessionId: 'my-session-1' });
const result = await handle.result();Production Considerations
Connection Pooling
Use a shared Redis client:
import Redis from 'ioredis';
const redisClient = new Redis({
host: process.env.REDIS_HOST,
maxRetriesPerRequest: 3,
enableReadyCheck: true,
});
const stateStore = new RedisStateStore({ client: redisClient });
const streamManager = new RedisStreamManager({ client: redisClient });Cluster Support
import Redis from 'ioredis';
const cluster = new Redis.Cluster([
{ host: 'node1', port: 6379 },
{ host: 'node2', port: 6379 },
]);
const stateStore = new RedisStateStore({ client: cluster });Error Handling
stateStore.on('error', (error) => {
console.error('Redis state store error:', error);
});
streamManager.on('error', (error) => {
console.error('Redis stream manager error:', error);
});TTL Management
Set appropriate TTLs based on your retention needs:
const stateStore = new RedisStateStore({
ttl: 86400 * 30, // 30 days for state
});RedisUsageStore
Redis-backed usage tracking storage. Track LLM tokens, tool executions, sub-agent calls, and custom metrics with persistence.
import Redis from 'ioredis';
import { RedisUsageStore } from '@helix-agents/store-redis';
const redis = new Redis(process.env.REDIS_URL);
const usageStore = new RedisUsageStore(redis, {
keyPrefix: 'myapp', // Key prefix (default: 'helix')
ttlSeconds: 86400 * 7, // Retention period (default: 24h)
});Configuration Options
interface RedisUsageStoreOptions {
keyPrefix?: string; // Prefix for Redis keys
ttlSeconds?: number; // TTL for usage entries
}Basic Usage
Pass to executor to enable usage tracking:
const handle = await executor.execute(agent, 'Do the task', { usageStore });
await handle.result();
// Get aggregated usage
const rollup = await handle.getUsageRollup();
console.log(`Total tokens: ${rollup?.tokens.total}`);Methods
recordEntry
Record a usage entry (called internally by the framework).
await usageStore.recordEntry({
kind: 'tokens',
sessionId: 'session-123',
stepCount: 1,
timestamp: Date.now(),
source: { type: 'agent', name: 'my-agent' },
model: 'gpt-4o',
tokens: { prompt: 100, completion: 50, total: 150 },
});getEntries
Get usage entries for a run with optional filtering.
// All entries
const entries = await usageStore.getEntries('session-123');
// Filter by kind
const tokenEntries = await usageStore.getEntries('session-123', {
kinds: ['tokens'],
});
// Filter by step range
const midRunEntries = await usageStore.getEntries('session-123', {
stepRange: { min: 5, max: 10 },
});
// Filter by time range
const recentEntries = await usageStore.getEntries('session-123', {
timeRange: { start: Date.now() - 3600000, end: Date.now() },
});
// Pagination
const page = await usageStore.getEntries('session-123', {
limit: 10,
offset: 20,
});getRollup
Get aggregated usage rollup.
// This agent's usage only
const rollup = await usageStore.getRollup('session-123');
// Include sub-agent usage (lazy aggregation)
const totalRollup = await usageStore.getRollup('session-123', {
includeSubAgents: true,
});exists
Check if usage data exists for a run.
const hasUsage = await usageStore.exists('session-123');delete
Delete usage data for a run.
await usageStore.delete('session-123');getEntryCount
Get entry count without fetching entries.
const count = await usageStore.getEntryCount('session-123');findSessionIds
Find all tracked session IDs.
const sessionIds = await usageStore.findSessionIds();Data Structure
Usage entries are stored as JSON in Redis lists:
{keyPrefix}:usage:entries:{sessionId} -> LIST of JSON entriesEach entry includes:
id- Unique entry identifierkind- Entry type (tokens, tool, subagent, custom)runId- Associated run IDstepCount- Step number when recordedtimestamp- Recording timestampsource- Source information- Kind-specific fields (model, tokens, toolName, etc.)
TTL Management
Entries automatically expire based on ttlSeconds:
const usageStore = new RedisUsageStore(redis, {
ttlSeconds: 86400 * 30, // 30 days retention
});RedisLockManager
Distributed lock manager using Redis. Prevents concurrent execution of the same agent across multiple processes.
import Redis from 'ioredis';
import { RedisLockManager } from '@helix-agents/store-redis';
const redis = new Redis(process.env.REDIS_URL);
const lockManager = new RedisLockManager(redis, {
keyPrefix: 'myapp:locks:', // Optional
defaultTTLMs: 30000, // Default lock TTL (30s)
});Configuration Options
interface RedisLockManagerOptions {
keyPrefix?: string; // Prefix for lock keys
defaultTTLMs?: number; // Default TTL in milliseconds
}Methods
acquire
Acquire a lock with fencing token.
const lock = await lockManager.acquire('session-123', {
ttlMs: 30000, // Lock TTL (optional, uses default)
owner: 'executor-1', // Owner identifier
});
if (lock) {
console.log('Lock acquired');
console.log('Fencing token:', lock.fencingToken);
// Do work...
// Refresh to extend TTL
await lock.refresh();
// Release when done
await lock.release();
} else {
console.log('Lock held by another process');
}isLocked
Check if a lock is currently held.
const locked = await lockManager.isLocked('session-123');Fencing Tokens
Fencing tokens prevent split-brain scenarios:
const lock = await lockManager.acquire('session-123');
// Pass fencing token to operations
await stateStore.save(state, { fencingToken: lock.fencingToken });
// Operations with stale tokens are rejectedUsage with Executor
import { JSAgentExecutor } from '@helix-agents/runtime-js';
import { RedisLockManager } from '@helix-agents/store-redis';
const lockManager = new RedisLockManager(redis);
const executor = new JSAgentExecutor(stateStore, streamManager, llmAdapter, { lockManager });
// Executor automatically acquires/releases locks
const handle = await executor.execute(MyAgent, 'Hello', { sessionId: 'my-session-1' });Lock Data Structure
Locks are stored as Redis strings with TTL:
{keyPrefix}lock:{sessionId} -> JSON { owner, fencingToken, acquiredAt }
TTL: configured ttlMsBest Practices
- Set appropriate TTL - Long enough for operations, short enough for quick recovery
- Refresh locks - For long-running operations, periodically refresh
- Handle lock failures - Retry with backoff when lock acquisition fails
- Use fencing tokens - Pass to operations to prevent split-brain
Schemas
Zod schema for stored state:
import { AgentStateSchema } from '@helix-agents/store-redis';
// Validate state
const result = AgentStateSchema.safeParse(data);