Skip to content

@helix-agents/store-redis

Redis implementations of store interfaces for production use. Provides durable state storage and stream management across processes.

Installation

bash
npm install @helix-agents/store-redis ioredis

RedisStateStore

Redis-backed state storage.

typescript
import { RedisStateStore } from '@helix-agents/store-redis';

const stateStore = new RedisStateStore({
  host: 'localhost',
  port: 6379,
  password: 'optional',
  db: 0,
  keyPrefix: 'helix:', // Optional prefix for keys
  ttl: 86400 * 7, // Optional TTL in seconds (default: 7 days)
});

Configuration Options

typescript
interface RedisStateStoreOptions {
  // Connection options (ioredis compatible)
  host?: string;
  port?: number;
  password?: string;
  db?: number;

  // Or provide existing client
  client?: Redis;

  // Key prefix for namespacing
  keyPrefix?: string;

  // TTL for state entries (seconds)
  ttl?: number;

  // Logger
  logger?: Logger;
}

Methods

Same interface as InMemoryStateStore:

typescript
await stateStore.save(state); // runId is inside state object
const state = await stateStore.load(runId);
await stateStore.delete(runId);
await stateStore.updateStatus(runId, status);
await stateStore.appendMessages(runId, messages);
const { messages, hasMore } = await stateStore.getMessages(runId, options);

Data Structure

State is stored as Redis hashes:

helix:state:{runId} -> {
  runId: string,
  agentType: string,
  status: string,
  stepCount: number,
  customState: JSON string,
  messages: JSON string,
  output: JSON string,
  error: string,
  ...
}

RedisStreamManager

Redis-backed stream management using Redis Streams.

typescript
import { RedisStreamManager } from '@helix-agents/store-redis';

const streamManager = new RedisStreamManager({
  host: 'localhost',
  port: 6379,
  keyPrefix: 'helix:',
  maxStreamLength: 10000, // Max chunks per stream
  blockTimeout: 5000, // Read timeout (ms)
});

Configuration Options

typescript
interface RedisStreamManagerOptions {
  // Connection options
  host?: string;
  port?: number;
  password?: string;
  db?: number;
  client?: Redis;

  // Stream options
  keyPrefix?: string;
  maxStreamLength?: number; // Trim streams to this length
  blockTimeout?: number; // XREAD BLOCK timeout
  logger?: Logger;
}

Methods

Same interface as InMemoryStreamManager:

typescript
const writer = await streamManager.createWriter(streamId, agentId, agentType);
const reader = await streamManager.createReader(streamId);
await streamManager.endStream(streamId);
await streamManager.failStream(streamId, error);
const info = await streamManager.getInfo(streamId);

Resumable Reader

typescript
const reader = await streamManager.createResumableReader(streamId, {
  fromSequence: 100,
});

if (reader) {
  for await (const { chunk, sequence } of reader) {
    // Process chunk
    // sequence can be used for Last-Event-ID
  }
}

Data Structure

Uses Redis Streams:

helix:stream:{streamId} -> XADD entries with:
  - chunk: JSON-encoded StreamChunk
  - sequence: Auto-incrementing ID

helix:stream:{streamId}:meta -> {
  status: 'active' | 'ended' | 'failed',
  error?: string,
  createdAt: ISO timestamp,
}

RedisResumableStorage

Lower-level resumable stream storage.

typescript
import { RedisResumableStorage } from '@helix-agents/store-redis';

const storage = new RedisResumableStorage({
  client: redisClient,
  keyPrefix: 'helix:',
});

// Create stream
await storage.create(streamId);

// Append chunk with sequence
await storage.append(streamId, chunk, sequence);

// Read from sequence
const chunks = await storage.readFrom(streamId, fromSequence, limit);

// Subscribe to new chunks
const unsubscribe = storage.subscribe(streamId, (chunk, sequence) => {
  console.log(`New chunk [${sequence}]:`, chunk);
});

// Get metadata
const metadata = await storage.getMetadata(streamId);

// End stream
await storage.end(streamId);

Error Classes

typescript
import {
  SequenceConflictError, // Sequence number conflict
  MalformedChunkError, // Invalid chunk data
} from '@helix-agents/store-redis';

try {
  await storage.append(streamId, chunk, sequence);
} catch (error) {
  if (error instanceof SequenceConflictError) {
    // Handle sequence conflict
  }
}

Usage Example

typescript
import { RedisStateStore, RedisStreamManager } from '@helix-agents/store-redis';
import { JSAgentExecutor } from '@helix-agents/runtime-js';
import { VercelAIAdapter } from '@helix-agents/llm-vercel';

// Create Redis-backed stores
const stateStore = new RedisStateStore({
  host: process.env.REDIS_HOST,
  port: parseInt(process.env.REDIS_PORT || '6379'),
  password: process.env.REDIS_PASSWORD,
  keyPrefix: 'myapp:agents:',
});

const streamManager = new RedisStreamManager({
  host: process.env.REDIS_HOST,
  port: parseInt(process.env.REDIS_PORT || '6379'),
  password: process.env.REDIS_PASSWORD,
  keyPrefix: 'myapp:agents:',
});

// Create executor
const executor = new JSAgentExecutor(stateStore, streamManager, new VercelAIAdapter());

// Execute agent
const handle = await executor.execute(MyAgent, 'Hello');
const result = await handle.result();

Production Considerations

Connection Pooling

Use a shared Redis client:

typescript
import Redis from 'ioredis';

const redisClient = new Redis({
  host: process.env.REDIS_HOST,
  maxRetriesPerRequest: 3,
  enableReadyCheck: true,
});

const stateStore = new RedisStateStore({ client: redisClient });
const streamManager = new RedisStreamManager({ client: redisClient });

Cluster Support

typescript
import Redis from 'ioredis';

const cluster = new Redis.Cluster([
  { host: 'node1', port: 6379 },
  { host: 'node2', port: 6379 },
]);

const stateStore = new RedisStateStore({ client: cluster });

Error Handling

typescript
stateStore.on('error', (error) => {
  console.error('Redis state store error:', error);
});

streamManager.on('error', (error) => {
  console.error('Redis stream manager error:', error);
});

TTL Management

Set appropriate TTLs based on your retention needs:

typescript
const stateStore = new RedisStateStore({
  ttl: 86400 * 30, // 30 days for state
});

RedisUsageStore

Redis-backed usage tracking storage. Track LLM tokens, tool executions, sub-agent calls, and custom metrics with persistence.

typescript
import Redis from 'ioredis';
import { RedisUsageStore } from '@helix-agents/store-redis';

const redis = new Redis(process.env.REDIS_URL);
const usageStore = new RedisUsageStore(redis, {
  keyPrefix: 'myapp',        // Key prefix (default: 'helix')
  ttlSeconds: 86400 * 7,     // Retention period (default: 24h)
});

Configuration Options

typescript
interface RedisUsageStoreOptions {
  keyPrefix?: string;     // Prefix for Redis keys
  ttlSeconds?: number;    // TTL for usage entries
}

Basic Usage

Pass to executor to enable usage tracking:

typescript
const handle = await executor.execute(agent, 'Do the task', { usageStore });
await handle.result();

// Get aggregated usage
const rollup = await handle.getUsageRollup();
console.log(`Total tokens: ${rollup?.tokens.total}`);

Methods

recordEntry

Record a usage entry (called internally by the framework).

typescript
await usageStore.recordEntry({
  kind: 'tokens',
  runId: 'run-123',
  stepCount: 1,
  timestamp: Date.now(),
  source: { type: 'agent', name: 'my-agent' },
  model: 'gpt-4o',
  tokens: { prompt: 100, completion: 50, total: 150 },
});

getEntries

Get usage entries for a run with optional filtering.

typescript
// All entries
const entries = await usageStore.getEntries('run-123');

// Filter by kind
const tokenEntries = await usageStore.getEntries('run-123', {
  kinds: ['tokens'],
});

// Filter by step range
const midRunEntries = await usageStore.getEntries('run-123', {
  stepRange: { min: 5, max: 10 },
});

// Filter by time range
const recentEntries = await usageStore.getEntries('run-123', {
  timeRange: { start: Date.now() - 3600000, end: Date.now() },
});

// Pagination
const page = await usageStore.getEntries('run-123', {
  limit: 10,
  offset: 20,
});

getRollup

Get aggregated usage rollup.

typescript
// This agent's usage only
const rollup = await usageStore.getRollup('run-123');

// Include sub-agent usage (lazy aggregation)
const totalRollup = await usageStore.getRollup('run-123', {
  includeSubAgents: true,
});

exists

Check if usage data exists for a run.

typescript
const hasUsage = await usageStore.exists('run-123');

delete

Delete usage data for a run.

typescript
await usageStore.delete('run-123');

getEntryCount

Get entry count without fetching entries.

typescript
const count = await usageStore.getEntryCount('run-123');

findRunIds

Find all tracked run IDs.

typescript
const runIds = await usageStore.findRunIds();

Data Structure

Usage entries are stored as JSON in Redis lists:

{keyPrefix}:usage:entries:{runId} -> LIST of JSON entries

Each entry includes:

  • id - Unique entry identifier
  • kind - Entry type (tokens, tool, subagent, custom)
  • runId - Associated run ID
  • stepCount - Step number when recorded
  • timestamp - Recording timestamp
  • source - Source information
  • Kind-specific fields (model, tokens, toolName, etc.)

TTL Management

Entries automatically expire based on ttlSeconds:

typescript
const usageStore = new RedisUsageStore(redis, {
  ttlSeconds: 86400 * 30,  // 30 days retention
});

Schemas

Zod schema for stored state:

typescript
import { AgentStateSchema } from '@helix-agents/store-redis';

// Validate state
const result = AgentStateSchema.safeParse(data);

See Also

Released under the MIT License.