@helix-agents/store-redis
Redis implementations of store interfaces for production use. Provides durable state storage and stream management across processes.
Installation
npm install @helix-agents/store-redis ioredisRedisStateStore
Redis-backed state storage.
import { RedisStateStore } from '@helix-agents/store-redis';
const stateStore = new RedisStateStore({
host: 'localhost',
port: 6379,
password: 'optional',
db: 0,
keyPrefix: 'helix:', // Optional prefix for keys
ttl: 86400 * 7, // Optional TTL in seconds (default: 7 days)
});Configuration Options
interface RedisStateStoreOptions {
// Connection options (ioredis compatible)
host?: string;
port?: number;
password?: string;
db?: number;
// Or provide existing client
client?: Redis;
// Key prefix for namespacing
keyPrefix?: string;
// TTL for state entries (seconds)
ttl?: number;
// Logger
logger?: Logger;
}Methods
Same interface as InMemoryStateStore:
await stateStore.save(state); // runId is inside state object
const state = await stateStore.load(runId);
await stateStore.delete(runId);
await stateStore.updateStatus(runId, status);
await stateStore.appendMessages(runId, messages);
const { messages, hasMore } = await stateStore.getMessages(runId, options);Data Structure
State is stored as Redis hashes:
helix:state:{runId} -> {
runId: string,
agentType: string,
status: string,
stepCount: number,
customState: JSON string,
messages: JSON string,
output: JSON string,
error: string,
...
}RedisStreamManager
Redis-backed stream management using Redis Streams.
import { RedisStreamManager } from '@helix-agents/store-redis';
const streamManager = new RedisStreamManager({
host: 'localhost',
port: 6379,
keyPrefix: 'helix:',
maxStreamLength: 10000, // Max chunks per stream
blockTimeout: 5000, // Read timeout (ms)
});Configuration Options
interface RedisStreamManagerOptions {
// Connection options
host?: string;
port?: number;
password?: string;
db?: number;
client?: Redis;
// Stream options
keyPrefix?: string;
maxStreamLength?: number; // Trim streams to this length
blockTimeout?: number; // XREAD BLOCK timeout
logger?: Logger;
}Methods
Same interface as InMemoryStreamManager:
const writer = await streamManager.createWriter(streamId, agentId, agentType);
const reader = await streamManager.createReader(streamId);
await streamManager.endStream(streamId);
await streamManager.failStream(streamId, error);
const info = await streamManager.getInfo(streamId);Resumable Reader
const reader = await streamManager.createResumableReader(streamId, {
fromSequence: 100,
});
if (reader) {
for await (const { chunk, sequence } of reader) {
// Process chunk
// sequence can be used for Last-Event-ID
}
}Data Structure
Uses Redis Streams:
helix:stream:{streamId} -> XADD entries with:
- chunk: JSON-encoded StreamChunk
- sequence: Auto-incrementing ID
helix:stream:{streamId}:meta -> {
status: 'active' | 'ended' | 'failed',
error?: string,
createdAt: ISO timestamp,
}RedisResumableStorage
Lower-level resumable stream storage.
import { RedisResumableStorage } from '@helix-agents/store-redis';
const storage = new RedisResumableStorage({
client: redisClient,
keyPrefix: 'helix:',
});
// Create stream
await storage.create(streamId);
// Append chunk with sequence
await storage.append(streamId, chunk, sequence);
// Read from sequence
const chunks = await storage.readFrom(streamId, fromSequence, limit);
// Subscribe to new chunks
const unsubscribe = storage.subscribe(streamId, (chunk, sequence) => {
console.log(`New chunk [${sequence}]:`, chunk);
});
// Get metadata
const metadata = await storage.getMetadata(streamId);
// End stream
await storage.end(streamId);Error Classes
import {
SequenceConflictError, // Sequence number conflict
MalformedChunkError, // Invalid chunk data
} from '@helix-agents/store-redis';
try {
await storage.append(streamId, chunk, sequence);
} catch (error) {
if (error instanceof SequenceConflictError) {
// Handle sequence conflict
}
}Usage Example
import { RedisStateStore, RedisStreamManager } from '@helix-agents/store-redis';
import { JSAgentExecutor } from '@helix-agents/runtime-js';
import { VercelAIAdapter } from '@helix-agents/llm-vercel';
// Create Redis-backed stores
const stateStore = new RedisStateStore({
host: process.env.REDIS_HOST,
port: parseInt(process.env.REDIS_PORT || '6379'),
password: process.env.REDIS_PASSWORD,
keyPrefix: 'myapp:agents:',
});
const streamManager = new RedisStreamManager({
host: process.env.REDIS_HOST,
port: parseInt(process.env.REDIS_PORT || '6379'),
password: process.env.REDIS_PASSWORD,
keyPrefix: 'myapp:agents:',
});
// Create executor
const executor = new JSAgentExecutor(stateStore, streamManager, new VercelAIAdapter());
// Execute agent
const handle = await executor.execute(MyAgent, 'Hello');
const result = await handle.result();Production Considerations
Connection Pooling
Use a shared Redis client:
import Redis from 'ioredis';
const redisClient = new Redis({
host: process.env.REDIS_HOST,
maxRetriesPerRequest: 3,
enableReadyCheck: true,
});
const stateStore = new RedisStateStore({ client: redisClient });
const streamManager = new RedisStreamManager({ client: redisClient });Cluster Support
import Redis from 'ioredis';
const cluster = new Redis.Cluster([
{ host: 'node1', port: 6379 },
{ host: 'node2', port: 6379 },
]);
const stateStore = new RedisStateStore({ client: cluster });Error Handling
stateStore.on('error', (error) => {
console.error('Redis state store error:', error);
});
streamManager.on('error', (error) => {
console.error('Redis stream manager error:', error);
});TTL Management
Set appropriate TTLs based on your retention needs:
const stateStore = new RedisStateStore({
ttl: 86400 * 30, // 30 days for state
});RedisUsageStore
Redis-backed usage tracking storage. Track LLM tokens, tool executions, sub-agent calls, and custom metrics with persistence.
import Redis from 'ioredis';
import { RedisUsageStore } from '@helix-agents/store-redis';
const redis = new Redis(process.env.REDIS_URL);
const usageStore = new RedisUsageStore(redis, {
keyPrefix: 'myapp', // Key prefix (default: 'helix')
ttlSeconds: 86400 * 7, // Retention period (default: 24h)
});Configuration Options
interface RedisUsageStoreOptions {
keyPrefix?: string; // Prefix for Redis keys
ttlSeconds?: number; // TTL for usage entries
}Basic Usage
Pass to executor to enable usage tracking:
const handle = await executor.execute(agent, 'Do the task', { usageStore });
await handle.result();
// Get aggregated usage
const rollup = await handle.getUsageRollup();
console.log(`Total tokens: ${rollup?.tokens.total}`);Methods
recordEntry
Record a usage entry (called internally by the framework).
await usageStore.recordEntry({
kind: 'tokens',
runId: 'run-123',
stepCount: 1,
timestamp: Date.now(),
source: { type: 'agent', name: 'my-agent' },
model: 'gpt-4o',
tokens: { prompt: 100, completion: 50, total: 150 },
});getEntries
Get usage entries for a run with optional filtering.
// All entries
const entries = await usageStore.getEntries('run-123');
// Filter by kind
const tokenEntries = await usageStore.getEntries('run-123', {
kinds: ['tokens'],
});
// Filter by step range
const midRunEntries = await usageStore.getEntries('run-123', {
stepRange: { min: 5, max: 10 },
});
// Filter by time range
const recentEntries = await usageStore.getEntries('run-123', {
timeRange: { start: Date.now() - 3600000, end: Date.now() },
});
// Pagination
const page = await usageStore.getEntries('run-123', {
limit: 10,
offset: 20,
});getRollup
Get aggregated usage rollup.
// This agent's usage only
const rollup = await usageStore.getRollup('run-123');
// Include sub-agent usage (lazy aggregation)
const totalRollup = await usageStore.getRollup('run-123', {
includeSubAgents: true,
});exists
Check if usage data exists for a run.
const hasUsage = await usageStore.exists('run-123');delete
Delete usage data for a run.
await usageStore.delete('run-123');getEntryCount
Get entry count without fetching entries.
const count = await usageStore.getEntryCount('run-123');findRunIds
Find all tracked run IDs.
const runIds = await usageStore.findRunIds();Data Structure
Usage entries are stored as JSON in Redis lists:
{keyPrefix}:usage:entries:{runId} -> LIST of JSON entriesEach entry includes:
id- Unique entry identifierkind- Entry type (tokens, tool, subagent, custom)runId- Associated run IDstepCount- Step number when recordedtimestamp- Recording timestampsource- Source information- Kind-specific fields (model, tokens, toolName, etc.)
TTL Management
Entries automatically expire based on ttlSeconds:
const usageStore = new RedisUsageStore(redis, {
ttlSeconds: 86400 * 30, // 30 days retention
});Schemas
Zod schema for stored state:
import { AgentStateSchema } from '@helix-agents/store-redis';
// Validate state
const result = AgentStateSchema.safeParse(data);