Skip to content

Cloudflare Storage

The Cloudflare storage package (@helix-agents/store-cloudflare) provides state and stream storage for edge deployment. There are two sets of stores depending on which runtime approach you use:

ApproachState StoreStream ManagerPackage
WorkflowsD1StateStoreDurableObjectStreamManager@helix-agents/store-cloudflare
Durable ObjectsDOStateStoreDOStreamManager@helix-agents/runtime-cloudflare

Choose Based on Runtime

  • Workflows runtime: Use D1StateStore + DurableObjectStreamManager from @helix-agents/store-cloudflare
  • DO runtime: Use DOStateStore + DOStreamManager from @helix-agents/runtime-cloudflare (built into AgentServer)

When to Use

Good fit:

  • Cloudflare Workers deployments
  • Global edge distribution
  • Serverless architecture
  • Existing Cloudflare infrastructure

Not ideal for:

  • Non-Cloudflare environments
  • High write throughput (D1 limitations - Workflows only)
  • Complex queries (limited SQL support)

Installation

bash
npm install @helix-agents/store-cloudflare

Entry Points and Import Patterns

The package provides three entry points to support different deployment contexts:

Entry PointUse CaseContains
@helix-agents/store-cloudflareWorker entry points (full package)Everything
@helix-agents/store-cloudflare/d1Next.js routes, any Node.js contextD1StateStore, D1UsageStore, migrations
@helix-agents/store-cloudflare/streamWorker entry points onlyStreamServer, DurableObjectStreamManager

Why Split Entry Points?

The /stream entry point imports partyserver which depends on cloudflare:workers - a Cloudflare-specific module that cannot be imported in:

  • Next.js API routes (even with OpenNext for Cloudflare)
  • Node.js environments
  • Any code that runs during Next.js build

This is because:

  1. Build-time evaluation: Next.js evaluates all imports during build, even for server routes
  2. cloudflare:workers is runtime-only: The cloudflare:workers URL scheme only works inside the Cloudflare Workers runtime
  3. Durable Object class definitions vs bindings: You CAN access Durable Objects via env bindings, but you CANNOT import the class definition outside a Worker

Correct Import Patterns

In Next.js API routes (OpenNext):

typescript
// ✅ CORRECT - Use /d1 entry point
import { D1StateStore, runMigration } from '@helix-agents/store-cloudflare/d1';

export async function GET(request: Request) {
  const { env } = getCloudflareContext();
  const stateStore = new D1StateStore({ database: env.DATABASE });
  // Access Durable Objects via binding (NOT import)
  const streamStub = env.STREAMS.get(env.STREAMS.idFromName('my-stream'));
}

// ❌ WRONG - Will fail during Next.js build
import { D1StateStore } from '@helix-agents/store-cloudflare';
import { StreamServer } from '@helix-agents/store-cloudflare/stream';

In Cloudflare Worker entry points:

typescript
// ✅ CORRECT - Worker entry points can import everything
import { StreamServer } from '@helix-agents/store-cloudflare/stream';
import { D1StateStore } from '@helix-agents/store-cloudflare/d1';

// Export Durable Object class for Cloudflare to instantiate
export { StreamServer };

export default {
  async fetch(request: Request, env: Env) {
    // Now you can use both D1 and stream functionality
    const stateStore = new D1StateStore({ database: env.DB });
    // Access StreamServer via binding
    const stub = env.STREAMS.get(env.STREAMS.idFromName('stream-id'));
  },
};

Architecture Pattern for Next.js + Cloudflare

When using OpenNext with Cloudflare:

mermaid
graph TB
    subgraph NextJS ["Next.js App (OpenNext)"]
        subgraph API ["API Routes"]
            A1["Import from @helix-agents/store-cloudflare/d1"]
            A2["Query D1 directly for state/messages"]
            A3["Access workers via env.WORKER_NAME service binding"]
        end
    end

    NextJS -->|"Service Binding"| AgentWorker

    subgraph AgentWorker ["Agent Worker (separate worker)"]
        subgraph Entry ["Entry Point (src/index.ts)"]
            E1["Export StreamServer (Durable Object class)"]
            E2["Import from any entry point"]
            E3["Handle agent execution"]
        end
    end

Prerequisites

You need:

  • Cloudflare account with Workers Paid plan
  • D1 database created
  • Durable Object namespace configured

D1StateStore

Setup

Configure D1 in wrangler.toml:

toml
[[d1_databases]]
binding = "DB"
database_name = "agent-state"
database_id = "your-database-id"

Migrations

The D1StateStore uses programmatic migrations that are automatically applied. You should call runMigration() on startup:

typescript
import { D1StateStore, runMigration } from '@helix-agents/store-cloudflare';

export default {
  async fetch(request: Request, env: Env) {
    // Run migrations on startup (safe to call every request - no-op if already migrated)
    await runMigration(env.DB);

    const stateStore = new D1StateStore({ database: env.DB });
    // ...
  },
};

The framework creates these tables automatically (all prefixed with __agents_). The current schema is V9; migrations apply progressively from V1.

sql
-- Core state table (V1 baseline + V3 user context + V7 HITL columns
--                  + V8 completed_client_tool_calls + V9 suspension_context)
CREATE TABLE IF NOT EXISTS __agents_states (
  session_id TEXT PRIMARY KEY,
  agent_type TEXT NOT NULL,
  parent_session_id TEXT,
  stream_id TEXT NOT NULL,
  step_count INTEGER NOT NULL DEFAULT 0,
  status TEXT NOT NULL DEFAULT 'active',
  output TEXT,
  error TEXT,
  failure_reason TEXT,                  -- V7: cascade discriminator
  aborted INTEGER NOT NULL DEFAULT 0,
  abort_reason TEXT,
  custom_state TEXT NOT NULL DEFAULT '{}',
  user_id TEXT,                         -- V3
  tags TEXT,                            -- V3 (JSON array)
  metadata TEXT,                        -- V3 (JSON object)
  pending_client_tool_calls TEXT,       -- V7: HITL pending map
  completed_client_tool_calls TEXT,     -- V8: idempotency tombstones
  expires_at INTEGER,                   -- V7 (also packed in suspension_context)
  suspension_context TEXT DEFAULT NULL, -- V9: stateless HITL/client-tools
  created_at INTEGER NOT NULL,
  updated_at INTEGER NOT NULL
);

-- Partial index for operator cleanup of abandoned suspended sessions
CREATE INDEX IF NOT EXISTS idx___agents_states_suspension_expires_at
  ON __agents_states(json_extract(suspension_context, '$.expiresAt'))
  WHERE suspension_context IS NOT NULL;

-- Messages table (separated for O(1) append)
CREATE TABLE IF NOT EXISTS __agents_messages (
  id INTEGER PRIMARY KEY AUTOINCREMENT,
  session_id TEXT NOT NULL,
  sequence INTEGER NOT NULL,
  message TEXT NOT NULL,
  created_at INTEGER NOT NULL,
  UNIQUE(session_id, sequence)
);

-- Sub-session refs (V2 adds remote_json; V4 adds mode + name for persistent children)
CREATE TABLE IF NOT EXISTS __agents_sub_session_refs (
  session_id TEXT NOT NULL,
  sub_session_id TEXT NOT NULL,
  agent_type TEXT NOT NULL,
  parent_tool_call_id TEXT NOT NULL,
  status TEXT NOT NULL DEFAULT 'active',
  remote_json TEXT,                     -- V2
  mode TEXT,                            -- V4: 'persistent' | 'ephemeral'
  name TEXT,                            -- V4: persistent child name
  started_at INTEGER NOT NULL,
  completed_at INTEGER,
  PRIMARY KEY (session_id, sub_session_id)
);

-- Run tracking (__agents_runs is part of the V1 baseline)
CREATE TABLE IF NOT EXISTS __agents_runs (
  run_id TEXT PRIMARY KEY,
  session_id TEXT NOT NULL,
  turn INTEGER NOT NULL,
  status TEXT NOT NULL,
  step_count INTEGER NOT NULL DEFAULT 0,
  started_at INTEGER NOT NULL,
  completed_at INTEGER
);

suspension_context packs suspendedAwaitingChildren, suspendedStepId, tracingContext, and expiresAt as a single JSON-encoded blob, written and read atomically as part of saveStateAndPromoteStaging.

Additional tables are created for usage tracking, checkpoints, interrupt flags, and message-merge staging. See d1-migrations.ts for the full schema progression V1 → V9.

Atomic Writes (saveStateAndPromoteStaging)

D1StateStore implements the v7 saveStateAndPromoteStaging primitive using db.batch([...]), which Cloudflare D1 executes as a single auto-commit transaction. The atomic batch:

  1. INSERTs new messages (one row per message)
  2. UPDATEs __agents_states with the new state, version (post-bump), and suspension context
  3. UPDATEs the staging row to mark it promoted with the new checkpoint id
  4. Optional: INSERTs the __agents_checkpoints row when one is needed

This guarantees the cross-runtime invariant C-1: pendingClientToolCalls, clientToolCallOwnership, completed phase-1 messages, the checkpoint, and suspendedStepId all become visible together.

Usage

typescript
import { D1StateStore } from '@helix-agents/store-cloudflare';

export default {
  async fetch(request: Request, env: Env) {
    const stateStore = new D1StateStore({ database: env.DB });

    // Save state
    await stateStore.saveState(sessionId, sessionState);

    // Load state
    const loaded = await stateStore.loadState('session-123');

    // Delete state
    await stateStore.deleteState('session-123');
  },
};

Atomic Operations

typescript
// Append messages
await stateStore.appendMessages('session-123', [{ role: 'assistant', content: 'Hello!' }]);

// Get messages with pagination
const { messages, hasMore } = await stateStore.getMessages('session-123', { limit: 100 });

// Update state atomically
await stateStore.saveState('session-123', updatedState);

Message Truncation

For crash recovery scenarios:

typescript
// Truncate to specific message count (removes orphaned messages)
await stateStore.truncateMessages('session-123', messageCount);

This removes messages beyond the specified count, used during crash recovery to sync messages with a checkpoint's messageCount field.

Interrupt Flag Operations

D1StateStore (and all v7 stores — memory, Redis, postgres, DO) implement durable interrupt flags directly on the SessionStateStore interface, enabling soft interruption of running agents.

typescript
// Set interrupt flag (called by executor.interrupt() / POST /chat/{id}/interrupt)
await stateStore.setInterruptFlag('session-123', 'user_requested');

// Check for interrupt (called inside the workflow / DO loop)
const flag = await stateStore.checkInterruptFlag('session-123');
if (flag) {
  console.log(`Interrupted: ${flag.reason}`);
}

// Clear flag (called on resume or after handling)
await stateStore.clearInterruptFlag('session-123');

Durable interrupt parity (v7)

All v7 stores write durable interrupt flags. The previous restriction (only D1 supported InterruptableStateStore) is gone — setInterruptFlag / checkInterruptFlag / clearInterruptFlag are now part of the base SessionStateStore contract and work uniformly across memory, Redis, postgres, D1, and DO.

The interrupt flags are automatically created by the programmatic migrations in a dedicated table:

sql
CREATE TABLE IF NOT EXISTS __agents_interrupt_flags (
  session_id TEXT PRIMARY KEY,
  interrupted INTEGER NOT NULL DEFAULT 0,
  reason TEXT,
  timestamp INTEGER
);

See Interrupt and Resume for complete documentation on interrupt behavior, including sub-agent interrupt propagation.

Programmatic Migrations

For dynamic environments:

typescript
import { D1StateStore, runMigration } from '@helix-agents/store-cloudflare';

// Run migrations on first request
let migrated = false;

export default {
  async fetch(request: Request, env: Env) {
    if (!migrated) {
      await runMigration(env.DB);
      migrated = true;
    }

    const stateStore = new D1StateStore({ database: env.DB });
    // ...
  },
};

DOStreamManager (Durable Objects)

Setup

Configure Durable Objects in wrangler.toml:

toml
[durable_objects]
bindings = [
  { name = "STREAM_MANAGER", class_name = "StreamManagerDO" }
]

[[migrations]]
tag = "v1"
new_classes = ["StreamManagerDO"]

Durable Object Implementation

typescript
// src/stream-manager-do.ts
import { DurableObject } from 'cloudflare:workers';
import type { StreamChunk } from '@helix-agents/core';

interface StreamState {
  chunks: StreamChunk[];
  status: 'active' | 'ended' | 'failed';
  endedAt?: number;
  error?: string;
}

export class StreamManagerDO extends DurableObject {
  private state: StreamState = {
    chunks: [],
    status: 'active',
  };

  constructor(ctx: DurableObjectState, env: Env) {
    super(ctx, env);
    // Load state from storage
    ctx.blockConcurrencyWhile(async () => {
      const stored = await ctx.storage.get<StreamState>('state');
      if (stored) {
        this.state = stored;
      }
    });
  }

  async write(chunk: StreamChunk): Promise<void> {
    if (this.state.status !== 'active') {
      throw new Error(`Cannot write to ${this.state.status} stream`);
    }

    this.state.chunks.push(chunk);
    await this.ctx.storage.put('state', this.state);

    // Notify WebSocket clients
    for (const ws of this.ctx.getWebSockets()) {
      ws.send(JSON.stringify(chunk));
    }
  }

  async read(fromOffset: number = 0): Promise<StreamChunk[]> {
    return this.state.chunks.slice(fromOffset);
  }

  async end(finalOutput?: unknown): Promise<void> {
    this.state.status = 'ended';
    this.state.endedAt = Date.now();
    await this.ctx.storage.put('state', this.state);

    // Close WebSocket connections
    for (const ws of this.ctx.getWebSockets()) {
      ws.close(1000, 'Stream ended');
    }
  }

  async fail(error: string): Promise<void> {
    this.state.status = 'failed';
    this.state.error = error;
    await this.ctx.storage.put('state', this.state);

    for (const ws of this.ctx.getWebSockets()) {
      ws.close(1011, error);
    }
  }

  async getInfo(): Promise<{ status: string; chunkCount: number }> {
    return {
      status: this.state.status,
      chunkCount: this.state.chunks.length,
    };
  }

  async fetch(request: Request): Promise<Response> {
    // WebSocket upgrade for real-time streaming
    if (request.headers.get('Upgrade') === 'websocket') {
      const [client, server] = Object.values(new WebSocketPair());
      this.ctx.acceptWebSocket(server);

      // Send historical chunks
      for (const chunk of this.state.chunks) {
        server.send(JSON.stringify(chunk));
      }

      return new Response(null, { status: 101, webSocket: client });
    }

    return new Response('Expected WebSocket', { status: 400 });
  }
}

Stream Manager Wrapper

typescript
// src/do-stream-manager.ts
import type { StreamManager, StreamWriter, StreamReader, StreamChunk } from '@helix-agents/core';

export class DOStreamManager implements StreamManager {
  constructor(private readonly binding: DurableObjectNamespace) {}

  async createWriter(streamId: string, agentId: string, agentType: string): Promise<StreamWriter> {
    const stub = this.getStub(streamId);

    return {
      write: async (chunk: StreamChunk) => {
        await stub.write(chunk);
      },
      close: async () => {
        // Writer close is a no-op - stream continues
      },
    };
  }

  async createReader(streamId: string): Promise<StreamReader | null> {
    const stub = this.getStub(streamId);
    const info = await stub.getInfo();

    if (info.status === 'failed') {
      return null;
    }

    let offset = 0;
    let done = false;

    return {
      [Symbol.asyncIterator]: () => ({
        next: async () => {
          if (done) {
            return { done: true, value: undefined };
          }

          const chunks = await stub.read(offset);

          if (chunks.length > 0) {
            offset += chunks.length;
            return { done: false, value: chunks[0] };
          }

          // Check if stream ended
          const currentInfo = await stub.getInfo();
          if (currentInfo.status === 'ended') {
            done = true;
            return { done: true, value: undefined };
          }

          // Poll for new chunks (in practice, use WebSocket)
          await new Promise((r) => setTimeout(r, 100));
          return this.next();
        },
      }),
      close: async () => {},
    };
  }

  async endStream(streamId: string, finalOutput?: unknown): Promise<void> {
    const stub = this.getStub(streamId);
    await stub.end(finalOutput);
  }

  async failStream(streamId: string, error: string): Promise<void> {
    const stub = this.getStub(streamId);
    await stub.fail(error);
  }

  private getStub(streamId: string) {
    const id = this.binding.idFromName(streamId);
    return this.binding.get(id);
  }
}

Export Durable Object

typescript
// src/index.ts
export { StreamManagerDO } from './stream-manager-do';

Complete Example

typescript
// src/index.ts
import { CloudflareAgentExecutor } from '@helix-agents/runtime-cloudflare';
import { D1StateStore, runMigration } from '@helix-agents/store-cloudflare';
import { DOStreamManager } from './do-stream-manager';
import { StreamManagerDO } from './stream-manager-do';
import { registry } from './agents';

export { StreamManagerDO };

export interface Env {
  DB: D1Database;
  STREAM_MANAGER: DurableObjectNamespace;
  AGENT_WORKFLOW: Workflow;
}

export default {
  async fetch(request: Request, env: Env): Promise<Response> {
    const url = new URL(request.url);

    // Run migrations on startup
    await runMigration(env.DB);

    const stateStore = new D1StateStore({ database: env.DB });
    const streamManager = new DOStreamManager(env.STREAM_MANAGER);

    const executor = new CloudflareAgentExecutor({
      workflowBinding: env.AGENT_WORKFLOW,
      stateStore,
      streamManager,
    });

    // POST /execute
    if (url.pathname === '/execute' && request.method === 'POST') {
      const { agentType, message, sessionId } = await request.json();
      const agent = registry.get(agentType);

      const handle = await executor.execute(agent, { message }, { sessionId });

      return Response.json({
        sessionId: handle.sessionId,
        streamUrl: `/stream/${handle.sessionId}`,
      });
    }

    // GET /stream/:sessionId (SSE)
    if (url.pathname.startsWith('/stream/')) {
      const sessionId = url.pathname.split('/').pop()!;
      const handle = await executor.getHandle(registry.get('default'), sessionId);

      if (!handle) {
        return new Response('Not found', { status: 404 });
      }

      const stream = await handle.stream();
      if (!stream) {
        return new Response('Stream not available', { status: 404 });
      }

      return new Response(
        new ReadableStream({
          async start(controller) {
            const encoder = new TextEncoder();
            for await (const chunk of stream) {
              controller.enqueue(encoder.encode(`data: ${JSON.stringify(chunk)}\n\n`));
            }
            controller.close();
          },
        }),
        {
          headers: {
            'Content-Type': 'text/event-stream',
            'Cache-Control': 'no-cache',
          },
        }
      );
    }

    // GET /result/:sessionId
    if (url.pathname.startsWith('/result/')) {
      const sessionId = url.pathname.split('/').pop()!;
      const handle = await executor.getHandle(registry.get('default'), sessionId);

      if (!handle) {
        return new Response('Not found', { status: 404 });
      }

      const result = await handle.result();
      return Response.json(result);
    }

    return new Response('Not found', { status: 404 });
  },
};

D1 Considerations

Write Limits

D1 has write throughput limits. For high-volume writes:

typescript
// The framework handles message batching internally via appendMessages()
// For custom implementations, batch message inserts like this:
const messages = [msg1, msg2, msg3];
const stmt = env.DB.prepare(
  'INSERT INTO __agents_messages (session_id, sequence, message, created_at) VALUES (?, ?, ?, ?)'
);

await env.DB.batch(
  messages.map((msg, i) => stmt.bind(sessionId, baseSequence + i, JSON.stringify(msg), Date.now()))
);

Read Replicas

D1 supports read replicas for global reads:

typescript
// Reads are automatically routed to nearest replica
const state = await stateStore.loadState(sessionId);

// Writes go to primary
await stateStore.saveState(sessionId, state);

SQLite Limitations

D1 is SQLite-based:

  • No stored procedures
  • Limited concurrent writes
  • JSON stored as TEXT (use JSON1 functions)

Durable Object Considerations

Geographic Pinning

Durable Objects are pinned to a region:

typescript
// First access pins the DO to nearest region
const stub = env.STREAM_MANAGER.get(id);

// Subsequent access goes to that region
// For global apps, consider DO per-region

Memory Limits

Durable Objects have memory limits:

typescript
// For large streams, persist to storage
async write(chunk: StreamChunk) {
  this.state.chunks.push(chunk);

  // Persist every N chunks
  if (this.state.chunks.length % 100 === 0) {
    await this.ctx.storage.put('state', this.state);
  }
}

WebSocket Limits

Limited concurrent WebSocket connections:

typescript
// Max ~32 WebSocket connections per DO
// For more, use pub/sub pattern or multiple DOs

Deployment

Create D1 Database

bash
wrangler d1 create agent-state

Run Migrations

bash
wrangler d1 migrations apply agent-state

Deploy

bash
wrangler deploy

Secrets

bash
wrangler secret put OPENAI_API_KEY

DO Runtime Stores

When using the Durable Objects runtime, stores are built into the AgentServer DO:

DOStateStore

Uses the DO's built-in SQLite for state persistence:

typescript
import { DOStateStore } from '@helix-agents/runtime-cloudflare';

// Created internally by AgentServer
const stateStore = new DOStateStore({
  sql: this.sql, // PartyServer's sql tagged template
  logger: this.logger,
});

// Same SessionStateStore interface
await stateStore.saveState(sessionId, state);
const loaded = await stateStore.loadState(sessionId);
await stateStore.appendMessages(sessionId, messages);

Key differences from D1StateStore:

  • Uses synchronous this.sql template literals (not async D1 queries)
  • Single-session-per-DO architecture (no session_id filtering needed)
  • No race conditions (single-threaded DO execution)
  • Tables created automatically on first use

compareAndSetStatus return shape (v7)

The v7 contract returns a discriminated union (replacing the boolean of v6) so callers can branch on conflict without an extra round-trip:

typescript
const result = await stateStore.compareAndSetStatus(
  sessionId,
  ['active'], // expected statuses
  'paused', // new status
  { error: 'user paused', expectedVersion: 7 }
);

if (result.ok) {
  console.log(`Promoted; new version=${result.newVersion}`);
} else {
  console.log(`CAS conflict: stored=${result.currentStatus} v${result.currentVersion}`);
}

Both D1StateStore and DOStateStore implement this signature uniformly with the rest of the v7 stores.

DOStateStore Schema V5

The DO-local SQLite schema is at V5 as of the v7 stateless-suspension release (DO_SCHEMA_VERSION = 5). Migrations apply lazily on first DO access — no operator action required, even for upgrades from earlier versions.

VersionAdds
V1Baseline state, messages, checkpoints, sub-session refs (incl. message_count/stream_sequence on checkpoints)
V2mode column on sub_session_refs (persistent sub-agent disambiguation)
V3pending_client_tool_calls, root_session_id, client_tool_call_ownership on state (HITL pending map + cross-session ownership)
V4completed_client_tool_calls TEXT on state (HITL idempotency tombstones)
V5suspension_context TEXT DEFAULT NULL on state (stateless HITL: packs suspendedAwaitingChildren, suspendedStepId, tracingContext, expiresAt, failureReason)
sql
-- V3
ALTER TABLE state ADD COLUMN pending_client_tool_calls TEXT;
ALTER TABLE state ADD COLUMN root_session_id TEXT;
ALTER TABLE state ADD COLUMN client_tool_call_ownership TEXT;
-- V4
ALTER TABLE state ADD COLUMN completed_client_tool_calls TEXT;
-- V5
ALTER TABLE state ADD COLUMN suspension_context TEXT DEFAULT NULL;

suspension_context carries the same JSON-encoded payload as the D1 column: suspendedAwaitingChildren, suspendedStepId, tracingContext, expiresAt.

Atomic Writes via db.batch

DOStateStore.saveStateAndPromoteStaging uses transactionSync(() => { ... }) (the DO equivalent of db.batch) so that messages, state row updates, and staging promotion become visible together. This satisfies the cross-runtime atomicity invariant C-1 documented in the core SessionStateStore interface.

Lazy Migration

When a DO with schema V<5 wakes up after the v7 upgrade, the next saveState / loadState call detects the older schema_version row and runs the V→5 migrations inline before serving the request. Idle DOs migrate on their next access — there is no SchemaVersionMismatchError and no namespace-rename workaround needed.

DOStreamManager

Streams directly to connected WebSocket/SSE clients:

typescript
import { DOStreamManager } from '@helix-agents/runtime-cloudflare';

// Created internally by AgentServer
const streamManager = new DOStreamManager({
  sql: this.sql,
  getConnections: () => this.getConnections(),
  broadcast: (msg) => this.broadcast(msg),
  sseConnectionManager: this.sseConnectionManager,
  logger: this.logger,
});

// Writes to SQLite AND broadcasts to all connected clients
const writer = await streamManager.createWriter(streamId, agentId, agentType);
await writer.write(chunk); // FREE - no subrequest!

Key differences from DurableObjectStreamManager:

  • Writes directly to WebSocket/SSE connections (no subrequest cost)
  • Also persists to SQLite for resumability
  • Built into AgentServer (not separate DO)
  • Supports hibernation (connections survive DO sleep/wake)

Stream cleanup methods:

typescript
// Clean up chunks beyond a specific step (for crash recovery)
await streamManager.cleanupToStep(streamId, stepCount);

// Reset entire stream (for fresh execution with same ID)
await streamManager.resetStream(streamId);

cleanupToStep() removes chunks with step > stepCount, used during crash recovery. resetStream() clears all chunks, called by execute() for fresh starts.

DOUsageStore

Tracks token/tool/sub-agent usage using DO's SQLite:

typescript
import { DOUsageStore } from '@helix-agents/runtime-cloudflare';

// Created internally by AgentServer
const usageStore = new DOUsageStore({
  sql: this.sql,
  sessionId: 'session-123',
  logger: this.logger,
});

// Same UsageStore interface
await usageStore.recordEntry(entry);
const rollup = await usageStore.getRollup(sessionId);
const entries = await usageStore.getEntries(sessionId, { kinds: ['tokens'] });

Key differences from D1UsageStore:

  • Uses synchronous this.sql template literals (not async D1 queries)
  • Single-session-per-DO architecture (sessionId passed at construction)
  • Created automatically by AgentServer
  • Usage accessible via /usage endpoint on the DO

Automatic usage tracking:

AgentServer automatically creates DOUsageStore and passes it to the JSAgentExecutor. After execution, retrieve usage via the handle:

typescript
const handle = await executor.execute(agent, 'Do the task');
await handle.result();

// Get usage rollup
const rollup = await handle.getUsageRollup();
console.log(`Total tokens: ${rollup?.tokens.total}`);

Automatic Schema

DOStateStore creates these tables automatically:

sql
CREATE TABLE state (
  id INTEGER PRIMARY KEY DEFAULT 1,
  session_id TEXT NOT NULL,
  agent_type TEXT NOT NULL,
  stream_id TEXT NOT NULL,
  status TEXT NOT NULL DEFAULT 'active',
  step_count INTEGER DEFAULT 0,
  custom_state TEXT,
  output TEXT,
  error TEXT,
  checkpoint_id TEXT,
  ...
);

CREATE TABLE messages (
  id INTEGER PRIMARY KEY AUTOINCREMENT,
  role TEXT NOT NULL,
  content TEXT,
  tool_calls TEXT,
  ...
);

CREATE TABLE stream_chunks (
  sequence INTEGER PRIMARY KEY,
  chunk TEXT NOT NULL,
  created_at INTEGER NOT NULL
);

Next Steps

Released under the MIT License.