Skip to content

@helix-agents/store-memory

In-memory implementations of state store and stream manager. Useful for development, testing, and single-process deployments.

Installation

bash
npm install @helix-agents/store-memory

InMemoryStateStore

In-memory state storage implementation.

typescript
import { InMemoryStateStore } from '@helix-agents/store-memory';

const stateStore = new InMemoryStateStore();

Session vs Run Identifiers

  • sessionId: Primary key for all state operations. A session contains all messages, state, and checkpoints.
  • runId: Execution metadata identifying a specific run within a session.

All store methods use sessionId as the primary key for lookups and storage.

Methods

save

Save agent state. The sessionId from the state object is used as the storage key.

typescript
await stateStore.save(state);

load

Load agent state.

typescript
const state = await stateStore.load('session-123');

if (state) {
  console.log('Status:', state.status);
  console.log('Output:', state.output);
}

exists

Check if state exists.

typescript
const exists = await stateStore.exists('session-123');

updateStatus

Update agent status.

typescript
await stateStore.updateStatus('session-123', 'completed');

compareAndSetStatus

Atomically compare and set status (CAS operation).

typescript
const success = await stateStore.compareAndSetStatus(
  'session-123',
  ['running'],    // Expected statuses (array)
  'interrupted'   // New status
);

if (!success) {
  // Status was not in ['running'], update failed
}

getCheckpoint

Get a specific checkpoint.

typescript
const checkpoint = await stateStore.getCheckpoint('session-123', 'cpv1-session-123-s5-...');

if (checkpoint) {
  console.log('State at step:', checkpoint.state.stepCount);
}

getLatestCheckpoint

Get the most recent checkpoint for a run.

typescript
const checkpoint = await stateStore.getLatestCheckpoint('session-123');

listCheckpoints

List checkpoints with pagination.

typescript
const result = await stateStore.listCheckpoints('session-123', {
  limit: 10,
  cursor: 'next-page-cursor',
});

for (const checkpoint of result.items) {
  console.log(`${checkpoint.id}: step ${checkpoint.state.stepCount}`);
}

if (result.nextCursor) {
  // More pages available
}

stageChanges

Stage changes for a tool call within a step (before commit).

typescript
await stateStore.stageChanges('session-123', 'step-1', {
  toolCallId: 'call-456',
  patches: [{ op: 'add', path: '/notes/-', value: 'New finding' }],
  mergeChanges: { notes: ['New finding'] },
  timestamp: Date.now(),
});

promoteStaging

Commit staged changes to the main state.

typescript
await stateStore.promoteStaging('session-123', 'step-1');

discardStaging

Discard staged changes (rollback).

typescript
await stateStore.discardStaging('session-123', 'step-1');

getMessages

Get paginated messages.

typescript
const { messages, hasMore } = await stateStore.getMessages('session-123', {
  offset: 0,
  limit: 50,
});

Testing Utilities

The in-memory store provides additional methods for testing:

typescript
// Clear all stored state
stateStore.clear();

// Get all session IDs
const sessionIds = stateStore.getAllSessionIds();

// Get state without async
const state = stateStore.getSync('session-123');

InMemoryStreamManager

In-memory stream management implementation.

typescript
import { InMemoryStreamManager } from '@helix-agents/store-memory';

const streamManager = new InMemoryStreamManager();

Methods

createWriter

Create a stream writer (implicitly creates stream if needed).

typescript
const writer = await streamManager.createWriter('stream-123', 'run-123', 'agent-name');

// Write chunks
await writer.write({
  type: 'text_delta',
  delta: 'Hello',
  agentId: 'run-123',
  timestamp: Date.now(),
});

// Close writer when done
await writer.close();

createReader

Create a stream reader.

typescript
const reader = await streamManager.createReader('stream-123');

if (reader) {
  for await (const chunk of reader) {
    console.log(chunk);
  }
}

endStream

End a stream normally with optional output.

typescript
await streamManager.endStream('stream-123');
await streamManager.endStream('stream-123', output); // With final output

failStream

End a stream with an error.

typescript
await streamManager.failStream('stream-123', 'Something went wrong');

getInfo

Get stream information.

typescript
const info = await streamManager.getInfo('stream-123');

if (info) {
  console.log('Status:', info.status); // 'active' | 'ended' | 'failed'
  console.log('Chunk count:', info.chunkCount);
}

createResumableReader

Create a reader that can resume from a position.

typescript
const reader = await streamManager.createResumableReader('stream-123', {
  fromSequence: 10, // Resume from sequence 10
});

if (reader) {
  console.log('Status:', reader.status); // 'active' | 'ended' | 'failed'

  for await (const { chunk, sequence } of reader) {
    console.log(`[${sequence}]`, chunk);
  }
}

Testing Utilities

typescript
// Clear all streams
streamManager.clear();

// Get all stream IDs
const streamIds = streamManager.getAllStreamIds();

// Get stream status synchronously
const status = streamManager.getStatusSync('stream-123');

// Get all chunks for a stream
const chunks = streamManager.getChunksSync('stream-123');

Usage Example

typescript
import { InMemoryStateStore, InMemoryStreamManager } from '@helix-agents/store-memory';
import { JSAgentExecutor } from '@helix-agents/runtime-js';
import { VercelAIAdapter } from '@helix-agents/llm-vercel';

// Create stores
const stateStore = new InMemoryStateStore();
const streamManager = new InMemoryStreamManager();

// Create executor
const executor = new JSAgentExecutor(stateStore, streamManager, new VercelAIAdapter());

// Execute agent
const handle = await executor.execute(MyAgent, 'Hello');
const result = await handle.result();

// Inspect stored state (for debugging)
const state = stateStore.getSync(handle.sessionId);
const chunks = streamManager.getChunksSync(handle.streamId);

Testing Patterns

Reset Between Tests

typescript
import { beforeEach, describe, it } from 'vitest';

describe('MyAgent', () => {
  let stateStore: InMemoryStateStore;
  let streamManager: InMemoryStreamManager;

  beforeEach(() => {
    stateStore = new InMemoryStateStore();
    streamManager = new InMemoryStreamManager();
  });

  it('should complete successfully', async () => {
    const executor = new JSAgentExecutor(stateStore, streamManager, mockAdapter);

    const handle = await executor.execute(MyAgent, 'test');
    const result = await handle.result();

    expect(result.status).toBe('completed');
  });
});

Inspect State After Execution

typescript
it('should update custom state', async () => {
  const handle = await executor.execute(CounterAgent, 'increment 3 times');
  await handle.result();

  const state = stateStore.getSync(handle.sessionId);
  expect(state?.customState.count).toBe(3);
});

Inspect Stream Chunks

typescript
it('should emit tool events', async () => {
  const handle = await executor.execute(ToolAgent, 'use the search tool');
  await handle.result();

  const chunks = streamManager.getChunksSync(handle.streamId);
  const toolStarts = chunks.filter((c) => c.type === 'tool_start');

  expect(toolStarts).toHaveLength(1);
  expect(toolStarts[0].toolName).toBe('search');
});

InMemoryUsageStore

In-memory usage tracking storage. Track LLM tokens, tool executions, sub-agent calls, and custom metrics.

typescript
import { InMemoryUsageStore } from '@helix-agents/store-memory';

const usageStore = new InMemoryUsageStore();

Basic Usage

Pass to executor to enable usage tracking:

typescript
const handle = await executor.execute(agent, 'Do the task', { usageStore });
await handle.result();

// Get aggregated usage
const rollup = await handle.getUsageRollup();
console.log(`Total tokens: ${rollup?.tokens.total}`);
console.log(`Tool calls: ${rollup?.toolStats.totalCalls}`);

Methods

recordEntry

Record a usage entry (called internally by the framework).

typescript
await usageStore.recordEntry({
  kind: 'tokens',
  sessionId: 'session-123',
  stepCount: 1,
  timestamp: Date.now(),
  source: { type: 'agent', name: 'my-agent' },
  model: 'gpt-4o',
  tokens: { prompt: 100, completion: 50, total: 150 },
});

getEntries

Get usage entries for a run with optional filtering.

typescript
// All entries
const entries = await usageStore.getEntries('session-123');

// Filter by kind
const tokenEntries = await usageStore.getEntries('session-123', {
  kinds: ['tokens'],
});

// Filter by step range
const midRunEntries = await usageStore.getEntries('session-123', {
  stepRange: { min: 5, max: 10 },
});

// Pagination
const page = await usageStore.getEntries('session-123', {
  limit: 10,
  offset: 20,
});

getRollup

Get aggregated usage rollup.

typescript
// This agent's usage only
const rollup = await usageStore.getRollup('session-123');

// Include sub-agent usage (lazy aggregation)
const totalRollup = await usageStore.getRollup('session-123', {
  includeSubAgents: true,
});

exists

Check if usage data exists for a run.

typescript
const hasUsage = usageStore.exists('session-123');

delete

Delete usage data for a run.

typescript
usageStore.delete('session-123');

Testing Utilities

typescript
// Clear all usage data
usageStore.clear();

// Get all tracked session IDs
const sessionIds = usageStore.getAllSessionIds();

// Get counts
console.log('Tracked runs:', usageStore.size);
console.log('Total entries:', usageStore.totalEntryCount);

Testing Patterns

typescript
import { beforeEach, describe, it, expect } from 'vitest';

describe('MyAgent usage', () => {
  let usageStore: InMemoryUsageStore;

  beforeEach(() => {
    usageStore = new InMemoryUsageStore();
  });

  it('should track tool usage', async () => {
    const handle = await executor.execute(agent, 'Use the search tool', {
      usageStore,
    });
    await handle.result();

    const rollup = await handle.getUsageRollup();
    expect(rollup?.toolStats.totalCalls).toBeGreaterThan(0);
    expect(rollup?.toolStats.byTool['search']).toBeDefined();
  });

  it('should track custom metrics', async () => {
    const handle = await executor.execute(agent, 'Search for something', {
      usageStore,
    });
    await handle.result();

    const rollup = await handle.getUsageRollup();
    expect(rollup?.custom['api_calls']['tavily']).toBe(1);
  });
});

InMemoryLockManager

In-memory lock manager for single-process coordination. Prevents concurrent execution of the same agent run.

typescript
import { InMemoryLockManager } from '@helix-agents/store-memory';

const lockManager = new InMemoryLockManager();

Methods

acquire

Acquire a lock with fencing token.

typescript
const lock = await lockManager.acquire('session-123', {
  ttlMs: 30000, // Lock expires after 30s
  owner: 'executor-1',
});

if (lock) {
  console.log('Lock acquired with token:', lock.fencingToken);

  // Do work...

  // Optionally refresh to extend TTL
  await lock.refresh();

  // Release when done
  await lock.release();
}

isLocked

Check if a lock is currently held.

typescript
const locked = await lockManager.isLocked('session-123');

Usage with Executor

typescript
import { JSAgentExecutor } from '@helix-agents/runtime-js';
import { InMemoryLockManager } from '@helix-agents/store-memory';

const executor = new JSAgentExecutor(stateStore, streamManager, llmAdapter, {
  lockManager: new InMemoryLockManager(),
});

Testing Utilities

typescript
// Clear all locks
lockManager.clear();

// Get all active locks
const locks = lockManager.getActiveLocks();

Limitations

  • No persistence - Data is lost when process exits
  • No sharing - Cannot share state between processes
  • Memory bound - Large states consume memory
  • Single process only - Lock manager doesn't coordinate across processes

For production use with persistence and distributed coordination, see:

See Also

Released under the MIT License.