Skip to content

@helix-agents/store-memory

In-memory implementations of state store and stream manager. Useful for development, testing, and single-process deployments.

Installation

bash
npm install @helix-agents/store-memory

InMemoryStateStore

In-memory state storage implementation.

typescript
import { InMemoryStateStore } from '@helix-agents/store-memory';

const stateStore = new InMemoryStateStore();

Methods

save

Save agent state. The runId is taken from inside the state object.

typescript
await stateStore.save(state);

load

Load agent state.

typescript
const state = await stateStore.load('run-123');

if (state) {
  console.log('Status:', state.status);
  console.log('Output:', state.output);
}

exists

Check if state exists.

typescript
const exists = await stateStore.exists('run-123');

updateStatus

Update agent status.

typescript
await stateStore.updateStatus('run-123', 'completed');

getMessages

Get paginated messages.

typescript
const { messages, hasMore } = await stateStore.getMessages('run-123', {
  offset: 0,
  limit: 50,
});

Testing Utilities

The in-memory store provides additional methods for testing:

typescript
// Clear all stored state
stateStore.clear();

// Get all run IDs
const runIds = stateStore.getAllRunIds();

// Get state without async
const state = stateStore.getSync('run-123');

InMemoryStreamManager

In-memory stream management implementation.

typescript
import { InMemoryStreamManager } from '@helix-agents/store-memory';

const streamManager = new InMemoryStreamManager();

Methods

createWriter

Create a stream writer (implicitly creates stream if needed).

typescript
const writer = await streamManager.createWriter('stream-123', 'run-123', 'agent-name');

// Write chunks
await writer.write({
  type: 'text_delta',
  delta: 'Hello',
  agentId: 'run-123',
  timestamp: Date.now(),
});

// Close writer when done
await writer.close();

createReader

Create a stream reader.

typescript
const reader = await streamManager.createReader('stream-123');

if (reader) {
  for await (const chunk of reader) {
    console.log(chunk);
  }
}

endStream

End a stream normally with optional output.

typescript
await streamManager.endStream('stream-123');
await streamManager.endStream('stream-123', output); // With final output

failStream

End a stream with an error.

typescript
await streamManager.failStream('stream-123', 'Something went wrong');

getInfo

Get stream information.

typescript
const info = await streamManager.getInfo('stream-123');

if (info) {
  console.log('Status:', info.status); // 'active' | 'ended' | 'failed'
  console.log('Chunk count:', info.chunkCount);
}

createResumableReader

Create a reader that can resume from a position.

typescript
const reader = await streamManager.createResumableReader('stream-123', {
  fromSequence: 10, // Resume from sequence 10
});

if (reader) {
  console.log('Status:', reader.status); // 'active' | 'ended' | 'failed'

  for await (const { chunk, sequence } of reader) {
    console.log(`[${sequence}]`, chunk);
  }
}

Testing Utilities

typescript
// Clear all streams
streamManager.clear();

// Get all stream IDs
const streamIds = streamManager.getAllStreamIds();

// Get stream status synchronously
const status = streamManager.getStatusSync('stream-123');

// Get all chunks for a stream
const chunks = streamManager.getChunksSync('stream-123');

Usage Example

typescript
import { InMemoryStateStore, InMemoryStreamManager } from '@helix-agents/store-memory';
import { JSAgentExecutor } from '@helix-agents/runtime-js';
import { VercelAIAdapter } from '@helix-agents/llm-vercel';

// Create stores
const stateStore = new InMemoryStateStore();
const streamManager = new InMemoryStreamManager();

// Create executor
const executor = new JSAgentExecutor(stateStore, streamManager, new VercelAIAdapter());

// Execute agent
const handle = await executor.execute(MyAgent, 'Hello');
const result = await handle.result();

// Inspect stored state (for debugging)
const state = stateStore.getSync(handle.runId);
const chunks = streamManager.getChunksSync(handle.streamId);

Testing Patterns

Reset Between Tests

typescript
import { beforeEach, describe, it } from 'vitest';

describe('MyAgent', () => {
  let stateStore: InMemoryStateStore;
  let streamManager: InMemoryStreamManager;

  beforeEach(() => {
    stateStore = new InMemoryStateStore();
    streamManager = new InMemoryStreamManager();
  });

  it('should complete successfully', async () => {
    const executor = new JSAgentExecutor(stateStore, streamManager, mockAdapter);

    const handle = await executor.execute(MyAgent, 'test');
    const result = await handle.result();

    expect(result.status).toBe('completed');
  });
});

Inspect State After Execution

typescript
it('should update custom state', async () => {
  const handle = await executor.execute(CounterAgent, 'increment 3 times');
  await handle.result();

  const state = stateStore.getSync(handle.runId);
  expect(state?.customState.count).toBe(3);
});

Inspect Stream Chunks

typescript
it('should emit tool events', async () => {
  const handle = await executor.execute(ToolAgent, 'use the search tool');
  await handle.result();

  const chunks = streamManager.getChunksSync(handle.streamId);
  const toolStarts = chunks.filter((c) => c.type === 'tool_start');

  expect(toolStarts).toHaveLength(1);
  expect(toolStarts[0].toolName).toBe('search');
});

InMemoryUsageStore

In-memory usage tracking storage. Track LLM tokens, tool executions, sub-agent calls, and custom metrics.

typescript
import { InMemoryUsageStore } from '@helix-agents/store-memory';

const usageStore = new InMemoryUsageStore();

Basic Usage

Pass to executor to enable usage tracking:

typescript
const handle = await executor.execute(agent, 'Do the task', { usageStore });
await handle.result();

// Get aggregated usage
const rollup = await handle.getUsageRollup();
console.log(`Total tokens: ${rollup?.tokens.total}`);
console.log(`Tool calls: ${rollup?.toolStats.totalCalls}`);

Methods

recordEntry

Record a usage entry (called internally by the framework).

typescript
await usageStore.recordEntry({
  kind: 'tokens',
  runId: 'run-123',
  stepCount: 1,
  timestamp: Date.now(),
  source: { type: 'agent', name: 'my-agent' },
  model: 'gpt-4o',
  tokens: { prompt: 100, completion: 50, total: 150 },
});

getEntries

Get usage entries for a run with optional filtering.

typescript
// All entries
const entries = await usageStore.getEntries('run-123');

// Filter by kind
const tokenEntries = await usageStore.getEntries('run-123', {
  kinds: ['tokens'],
});

// Filter by step range
const midRunEntries = await usageStore.getEntries('run-123', {
  stepRange: { min: 5, max: 10 },
});

// Pagination
const page = await usageStore.getEntries('run-123', {
  limit: 10,
  offset: 20,
});

getRollup

Get aggregated usage rollup.

typescript
// This agent's usage only
const rollup = await usageStore.getRollup('run-123');

// Include sub-agent usage (lazy aggregation)
const totalRollup = await usageStore.getRollup('run-123', {
  includeSubAgents: true,
});

exists

Check if usage data exists for a run.

typescript
const hasUsage = usageStore.exists('run-123');

delete

Delete usage data for a run.

typescript
usageStore.delete('run-123');

Testing Utilities

typescript
// Clear all usage data
usageStore.clear();

// Get all tracked run IDs
const runIds = usageStore.getAllRunIds();

// Get counts
console.log('Tracked runs:', usageStore.size);
console.log('Total entries:', usageStore.totalEntryCount);

Testing Patterns

typescript
import { beforeEach, describe, it, expect } from 'vitest';

describe('MyAgent usage', () => {
  let usageStore: InMemoryUsageStore;

  beforeEach(() => {
    usageStore = new InMemoryUsageStore();
  });

  it('should track tool usage', async () => {
    const handle = await executor.execute(agent, 'Use the search tool', {
      usageStore,
    });
    await handle.result();

    const rollup = await handle.getUsageRollup();
    expect(rollup?.toolStats.totalCalls).toBeGreaterThan(0);
    expect(rollup?.toolStats.byTool['search']).toBeDefined();
  });

  it('should track custom metrics', async () => {
    const handle = await executor.execute(agent, 'Search for something', {
      usageStore,
    });
    await handle.result();

    const rollup = await handle.getUsageRollup();
    expect(rollup?.custom['api_calls']['tavily']).toBe(1);
  });
});

Limitations

  • No persistence - Data is lost when process exits
  • No sharing - Cannot share state between processes
  • Memory bound - Large states consume memory

For production use with persistence, see:

See Also

Released under the MIT License.