Cloudflare Durable Objects Runtime
The Durable Objects (DO) runtime executes agents directly inside a Durable Object, bypassing Cloudflare's subrequest limits by streaming directly to WebSocket and SSE connections.
When to Use DO Runtime
Choose the DO runtime when you need heavy streaming (>100 chunks per execution), real-time WebSocket/SSE connections, or simpler single-DO-per-run architecture. For step-level durability with automatic retries, consider the Workflows runtime instead.
Why Durable Objects?
Cloudflare Workers have a 1000 subrequest limit per invocation. In the Workflows runtime, each stream chunk write to the streaming Durable Object counts as a subrequest. For streaming-heavy agents, this can quickly exhaust the limit.
The DO runtime solves this by:
- Hosting agent execution inside the DO - The JSAgentExecutor runs within the Durable Object
- Streaming directly to connections - WebSocket/SSE writes don't count as subrequests
- Using local SQLite - State is stored in DO's built-in SQLite, not external D1
- Only LLM calls consume subrequests - The only external calls are to the LLM API
Key Benefits
| Feature | Description |
|---|---|
| FREE Streaming | WebSocket/SSE writes don't count against subrequest limits |
| Built-in SQLite | No D1 database needed - state lives in the DO |
| Hibernation | Cost-efficient sleep/wake with state preservation |
| Simpler Architecture | One DO = one agent run (no coordination needed) |
| Native WebSocket/SSE | Real-time bidirectional communication |
Session vs Run Identifiers
sessionId: Primary key for state storage. Each Durable Object hosts one session.runId: Identifies a specific execution within a session. Multiple runs can occur within one session (e.g., after interrupts).
Use sessionId for DO naming (e.g., env.AGENTS.idFromName(\session:${sessionId}`)). The runId` is execution metadata for tracing.
Architecture
graph TB
subgraph Edge ["Edge Location"]
subgraph CFWorker ["Cloudflare Worker"]
W1["Routes requests to DOs<br/>Creates LLM adapters<br/>Manages DO stubs"]
end
CFWorker --> AgentDO
subgraph AgentDO ["AgentServer (Durable Object)"]
subgraph Executor ["JSAgentExecutor"]
E1["Agent loop execution<br/>Tool execution<br/>Sub-agent spawning"]
end
subgraph Stores [" "]
direction LR
StateStore["DOStateStore<br/>(SQLite)"]
UsageStore["DOUsageStore<br/>(SQLite)"]
StreamMgr["DOStreamManager<br/>(WebSocket/SSE)"]
end
end
AgentDO --> Clients
subgraph Clients [" "]
direction LR
WS["<b>WebSocket Clients</b><br/>Real-time, survives<br/>hibernation"]
SSE["<b>SSE Clients</b><br/>One-way, simpler"]
end
endPrerequisites
- Cloudflare account with Workers Paid plan
- Wrangler CLI:
npm install -g wrangler - No additional services needed (D1, etc.) - SQLite is built into the DO
Installation
npm install @helix-agents/runtime-cloudflare @helix-agents/coreThe partyserver dependency is included automatically.
Quick Start (Composition API)
The composition API using createAgentServer() is the recommended approach for building AgentServer instances.
1. Configure wrangler.toml
name = "my-agent-worker"
main = "src/worker.ts"
compatibility_date = "2024-01-01"
# Durable Object binding with SQLite support
[[durable_objects.bindings]]
name = "AGENTS"
class_name = "MyAgentServer"
# Enable SQLite for the DO class
[[migrations]]
tag = "v1"
new_sqlite_classes = ["MyAgentServer"]Important
Use new_sqlite_classes instead of new_classes to enable SQLite storage in the Durable Object. Without this, the DO won't have access to the this.sql method.
2. Create AgentServer with Composition
// src/agent-server.ts
import { createAgentServer, AgentRegistry } from '@helix-agents/runtime-cloudflare';
import { VercelAIAdapter } from '@helix-agents/llm-vercel';
import { createAnthropic } from '@ai-sdk/anthropic';
import { createOpenAI } from '@ai-sdk/openai';
import { myAgent } from './agent.js';
interface Env {
AGENTS: DurableObjectNamespace;
ANTHROPIC_API_KEY: string;
OPENAI_API_KEY: string;
}
// Create and populate registry
const registry = new AgentRegistry();
registry.register(myAgent);
// Create AgentServer using composition
export const MyAgentServer = createAgentServer<Env>({
// Required: Factory to create LLM adapters
llmAdapter: (env, ctx) => new VercelAIAdapter({
anthropic: createAnthropic({ apiKey: env.ANTHROPIC_API_KEY }),
}),
// Required: Agent registry or resolver function
agents: registry,
// Optional: Lifecycle hooks
hooks: {
beforeStart: async ({ executionState, body }) => {
// "Last message wins" - interrupt existing execution
if (executionState.isExecuting) {
await executionState.interrupt('New message received');
}
console.log(`Starting agent: ${body.agentType}`);
},
afterStart: async ({ sessionId, agentType }) => {
console.log(`Agent ${agentType} started (session: ${sessionId})`);
},
onComplete: async ({ sessionId, status, result, error }) => {
console.log(`Execution complete: ${status}`);
},
},
});3. Create Worker Entry Point
// src/worker.ts
import { MyAgentServer } from './agent-server.js';
import type { Env } from './agent-server.js';
// Export the AgentServer DO class
export { MyAgentServer };
export default {
async fetch(request: Request, env: Env): Promise<Response> {
const url = new URL(request.url);
// Start new agent execution
if (url.pathname === '/execute' && request.method === 'POST') {
const { message } = await request.json<{ message: string }>();
// Generate unique session ID
const sessionId = crypto.randomUUID();
// Get DO stub for this session
const doId = env.AGENTS.idFromName(`session:${sessionId}`);
const stub = env.AGENTS.get(doId);
// Start execution using the new request format
const response = await stub.fetch('/start', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
agentType: 'researcher', // Agent type from registry
input: { message },
sessionId,
}),
});
const result = await response.json<{ sessionId: string; streamId: string }>();
return Response.json({
sessionId: result.sessionId,
streamUrl: `/stream/${result.sessionId}`,
websocketUrl: `/ws/${result.sessionId}`,
});
}
// SSE streaming endpoint
if (url.pathname.startsWith('/stream/')) {
const sessionId = url.pathname.split('/').pop()!;
const doId = env.AGENTS.idFromName(`session:${sessionId}`);
const stub = env.AGENTS.get(doId);
// Forward to DO's SSE endpoint
return stub.fetch(new URL('/sse', request.url), request);
}
// WebSocket upgrade
if (url.pathname.startsWith('/ws/')) {
const sessionId = url.pathname.split('/').pop()!;
const doId = env.AGENTS.idFromName(`session:${sessionId}`);
const stub = env.AGENTS.get(doId);
// Forward WebSocket upgrade to DO
return stub.fetch(request);
}
// Get status
if (url.pathname.startsWith('/status/')) {
const sessionId = url.pathname.split('/').pop()!;
const doId = env.AGENTS.idFromName(`session:${sessionId}`);
const stub = env.AGENTS.get(doId);
return stub.fetch(new URL('/status', request.url));
}
return new Response('Not found', { status: 404 });
},
};4. Define Your Agent
// src/agent.ts
import { defineAgent, defineTool } from '@helix-agents/core';
import { z } from 'zod';
const searchTool = defineTool({
name: 'search',
description: 'Search for information',
inputSchema: z.object({
query: z.string(),
}),
execute: async ({ query }) => {
// Your search implementation
return { results: [`Found: ${query}`] };
},
});
export const myAgent = defineAgent({
name: 'researcher',
description: 'Research assistant',
systemPrompt: 'You are a helpful research assistant.',
tools: [searchTool],
outputSchema: z.object({
summary: z.string(),
sources: z.array(z.string()),
}),
maxSteps: 10,
});AgentServerConfig
The createAgentServer<TEnv>(config) factory accepts this configuration:
interface AgentServerConfig<TEnv = unknown> {
/**
* Factory to create LLM adapters.
* Called once per execution with environment and context.
* Required.
*/
llmAdapter: LLMAdapterFactory<TEnv>;
/**
* Agent resolution strategy.
* Can be an AgentRegistry instance or a resolver function.
* Required.
*/
agents: AgentResolverInterface<TEnv> | AgentResolver<TEnv>;
/**
* Optional lifecycle hooks for customization.
*/
hooks?: ServerHooks<TEnv>;
/**
* Optional custom endpoints.
* Keys are paths (e.g., "/my-endpoint"), values are handlers.
* Custom endpoints are checked BEFORE built-in endpoints.
*/
endpoints?: Record<string, EndpointHandler<TEnv>>;
/**
* Optional usage store factory.
* If not provided, uses the DO-local DOUsageStore.
*/
usageStore?: UsageStoreFactory<TEnv>;
/**
* Optional logger.
* If not provided, uses noopLogger.
*/
logger?: Logger;
}LLMAdapterFactory
Creates LLM adapters with access to environment and execution context:
type LLMAdapterFactory<TEnv> = (
env: TEnv,
context: LLMAdapterContext
) => LLMAdapter;
interface LLMAdapterContext {
sessionId: string;
agentType: string;
runId: string;
}Example: Per-agent model selection
llmAdapter: (env, ctx) => {
// Choose model based on agent type
if (ctx.agentType === 'fast-responder') {
return new VercelAIAdapter({
openai: createOpenAI({ apiKey: env.OPENAI_API_KEY }),
defaultModel: 'gpt-4o-mini',
});
}
return new VercelAIAdapter({
anthropic: createAnthropic({ apiKey: env.ANTHROPIC_API_KEY }),
defaultModel: 'claude-sonnet-4-20250514',
});
}AgentRegistry with Factories
The AgentRegistry supports both static registration and factory functions for dynamic agent creation.
Static Registration
For agents that don't need runtime dependencies:
import { AgentRegistry } from '@helix-agents/runtime-cloudflare';
const registry = new AgentRegistry();
registry.register(researchAgent);
registry.register(summarizerAgent);Factory Registration
For agents that need environment bindings, per-request configuration, or dependency injection:
registry.registerFactory<Env>('brainstorm', (ctx) => {
return createBrainstormAgent({
tavilyApiKey: ctx.env.TAVILY_API_KEY,
database: ctx.env.DATABASE,
userId: ctx.userId,
});
});AgentFactoryContext
Factory functions receive this context:
interface AgentFactoryContext<TEnv = unknown> {
/** Environment bindings */
env: TEnv;
/** Session ID for the execution */
sessionId: string;
/** Run ID for this execution */
runId: string;
/** Optional user identifier */
userId?: string;
}Using resolve() vs get()
// get() - Static agents only (legacy, for backward compatibility)
const agent = registry.get('researcher');
// resolve() - Handles both static and factory agents (preferred)
const agent = registry.resolve('brainstorm', {
env,
sessionId: 'session-123',
runId: 'run-456',
userId: 'user-789',
});Registry Methods
| Method | Description |
|---|---|
register(config) | Register a static agent configuration |
registerFactory(type, factory) | Register a factory function |
resolve(type, context) | Resolve agent (factory-first, then static) |
get(type) | Get static agent only (legacy) |
has(type) | Check if agent type is registered |
unregister(type) | Remove an agent registration |
getRegisteredTypes() | Get all registered agent type names |
clear() | Remove all registrations (for testing) |
AgentNotFoundError
When an agent type is not found, AgentNotFoundError provides helpful information:
import { AgentNotFoundError } from '@helix-agents/runtime-cloudflare';
try {
const agent = registry.resolve('unknown', context);
} catch (error) {
if (error instanceof AgentNotFoundError) {
console.log(`Unknown type: ${error.agentType}`);
console.log(`Available: ${error.availableTypes.join(', ')}`);
}
}ExecutionState
The ExecutionState interface provides hooks with a clean way to check and control execution:
interface ExecutionState {
/** Session ID of current execution, or null if none */
readonly sessionId: string | null;
/** Run ID of current execution, or null if none */
readonly runId: string | null;
/** Whether an execution is currently running */
readonly isExecuting: boolean;
/** Current execution status */
readonly status: 'idle' | 'running' | 'paused' | 'interrupted' | 'completed' | 'failed';
/** Get the current execution handle (if any) */
getHandle<T = unknown>(): AgentExecutionHandle<T> | null;
/** Interrupt the current execution (soft stop, resumable) */
interrupt(reason?: string): Promise<void>;
/** Abort the current execution (hard stop, not resumable) */
abort(reason?: string): Promise<void>;
}Example: "Last message wins" pattern
hooks: {
beforeStart: async ({ executionState }) => {
if (executionState.isExecuting) {
await executionState.interrupt('New message received');
}
},
}Lifecycle Hooks
All hooks are optional. They enable customization without subclassing.
interface ServerHooks<TEnv = unknown> {
/**
* Called before starting execution.
* Can abort by returning { abort: true, response: Response }.
*/
beforeStart?: (context: BeforeStartContext<TEnv>) => Promise<void | AbortResult>;
/**
* Called after execution handle is created (before it completes).
*/
afterStart?: (context: AfterStartContext<TEnv>) => Promise<void>;
/**
* Called before resuming execution.
* Can abort by returning { abort: true, response: Response }.
*/
beforeResume?: (context: BeforeResumeContext<TEnv>) => Promise<void | AbortResult>;
/**
* Called after resume handle is created.
*/
afterResume?: (context: AfterResumeContext<TEnv>) => Promise<void>;
/**
* Called before retrying execution.
* Can abort by returning { abort: true, response: Response }.
*/
beforeRetry?: (context: BeforeRetryContext<TEnv>) => Promise<void | AbortResult>;
/**
* Called after retry handle is created.
*/
afterRetry?: (context: AfterRetryContext<TEnv>) => Promise<void>;
/**
* Called when execution completes (success, failure, or interrupt).
*/
onComplete?: (context: CompleteContext<TEnv>) => Promise<void>;
/**
* Transform incoming /start request to standard format.
*/
transformRequest?: (rawBody: unknown, env: TEnv) => Promise<StartAgentRequestV2>;
/**
* Transform outgoing /start response.
*/
transformResponse?: (
response: { sessionId: string; streamId: string; status: 'started' | 'resumed' },
context: { sessionId: string; agentType: string }
) => Promise<unknown>;
}Hook Contexts
BeforeStartContext:
interface BeforeStartContext<TEnv> {
env: TEnv;
request: Request;
body: StartAgentRequestV2;
executionState: ExecutionState;
}AfterStartContext:
interface AfterStartContext<TEnv> {
env: TEnv;
sessionId: string;
runId: string;
agentType: string;
handle: AgentExecutionHandle<unknown>;
}CompleteContext:
interface CompleteContext<TEnv> {
env: TEnv;
sessionId: string;
runId: string;
status: 'completed' | 'failed' | 'interrupted';
result?: unknown;
error?: Error;
}Aborting Requests
Hooks can abort requests by returning an AbortResult:
hooks: {
beforeStart: async ({ body }) => {
if (!body.userId) {
return {
abort: true,
response: Response.json({ error: 'User ID required' }, { status: 401 }),
};
}
},
}Request/Response Transformation
Transform custom request formats to the standard format:
hooks: {
transformRequest: async (rawBody, env) => {
const body = rawBody as { prompt: string; conversationId: string };
return {
agentType: 'brainstorm',
input: { message: body.prompt },
sessionId: body.conversationId,
};
},
transformResponse: async (response, { sessionId }) => {
return {
id: sessionId,
status: 'processing',
};
},
}Custom Endpoints
Add custom HTTP endpoints that are checked before built-in endpoints:
export const MyAgentServer = createAgentServer<Env>({
llmAdapter: (env) => new VercelAIAdapter({ /* ... */ }),
agents: registry,
endpoints: {
'/health': async (request, { executionState }) => {
return Response.json({
status: 'healthy',
isExecuting: executionState.isExecuting,
});
},
'/custom-action': async (request, { env, logger }) => {
if (request.method !== 'POST') {
return Response.json({ error: 'Method not allowed' }, { status: 405 });
}
const body = await request.json();
logger.info('Custom action', { body });
return Response.json({ success: true });
},
},
});EndpointHandler
type EndpointHandler<TEnv> = (
request: Request,
context: EndpointContext<TEnv>
) => Promise<Response>;
interface EndpointContext<TEnv> {
env: TEnv;
executionState: ExecutionState;
logger: Logger;
}New Request Format
The composition API uses a new request format that uses agentType instead of the full AgentConfig:
StartAgentRequestV2
interface StartAgentRequestV2 {
/** Agent type to execute (looked up in registry/resolver) */
agentType: string;
/** Input for the agent */
input: {
message: string;
state?: Record<string, unknown>;
messages?: Message[];
};
/** Session identifier (required) */
sessionId: string;
/** Optional user identifier */
userId?: string;
/** Optional tags for the session */
tags?: string[];
/** Optional metadata for the session */
metadata?: Record<string, string>;
}ResumeAgentRequestV2
interface ResumeAgentRequestV2 {
/** Agent type (for context) */
agentType: string;
/** Resume options */
options: {
mode?: 'continue' | 'retry' | 'branch';
checkpointId?: string;
modifyState?: unknown;
appendMessages?: Message[];
};
}AgentServer HTTP Endpoints
The AgentServer exposes the following HTTP endpoints:
| Endpoint | Method | Description |
|---|---|---|
/start | POST | Start new agent execution |
/resume | POST | Resume paused/interrupted agent |
/retry | POST | Retry failed agent |
/interrupt | POST | Request soft interruption (agent finishes current step) |
/abort | POST | Force stop execution immediately |
/status | GET | Get current execution status |
/sse | GET | SSE streaming connection |
/history | GET | Get historical stream chunks |
/usage | GET | Get usage data (tokens, tool calls) |
/messages | GET | Get conversation messages (paginated) |
/snapshot | GET | Get UI state snapshot for initialization |
Starting Execution
// Request
const response = await stub.fetch('/start', {
method: 'POST',
body: JSON.stringify({
agentType: 'researcher', // Agent type from registry
input: { message: 'Hello' }, // Input message
sessionId: 'session-123', // Session ID for conversation continuity
userId: 'user-456', // Optional: user context
tags: ['test'], // Optional: tags
metadata: { key: 'value' }, // Optional: metadata
}),
});
// Response
interface AgentStartResponse {
sessionId: string;
streamId: string;
status: 'started' | 'resumed';
}Resuming Execution
const response = await stub.fetch('/resume', {
method: 'POST',
body: JSON.stringify({
agentType: 'researcher',
options: {
mode: 'continue', // 'continue' | 'retry' | 'branch'
checkpointId: 'cp-123', // Optional: resume from checkpoint
modifyState: (s) => s, // Optional: modify state before resume
appendMessages: [...], // Optional: add messages before resume
},
}),
});Interrupting Execution
// Soft interrupt - agent finishes current step
await stub.fetch('/interrupt', {
method: 'POST',
body: JSON.stringify({ reason: 'user_requested' }),
});
// Hard abort - stops immediately
await stub.fetch('/abort', {
method: 'POST',
body: JSON.stringify({ reason: 'timeout' }),
});Getting Status
const response = await stub.fetch('/status');
const status = await response.json<AgentStatusResponse>();
interface AgentStatusResponse {
sessionId: string | null;
status: string; // 'active' | 'completed' | 'failed' | 'paused' | 'interrupted'
stepCount: number;
output?: unknown;
error?: string;
checkpointId?: string;
isExecuting: boolean; // true if currently running
}Getting History
const response = await stub.fetch('/history?fromSequence=0&limit=100');
const history = await response.json<{
chunks: Array<{ sequence: number; chunk: StreamChunk }>;
hasMore: boolean;
latestSequence: number;
}>();Getting Messages
Fetch conversation messages with pagination:
const response = await stub.fetch('/messages?offset=0&limit=50');
const data = await response.json<{
messages: UIMessage[];
total: number;
offset: number;
limit: number;
hasMore: boolean;
}>();Getting Snapshot
Get a complete UI state snapshot for initializing frontend state (e.g., on page refresh):
const response = await stub.fetch('/snapshot');
const snapshot = await response.json<{
runId: string;
messages: UIMessage[];
state: TState | null;
streamSequence: number;
status: 'active' | 'paused' | 'ended' | 'failed';
timestamp: number;
}>();
// Use for initializing useChat
const { messages, setMessages } = useChat({
initialMessages: snapshot.messages,
});Streaming
WebSocket Streaming
WebSocket connections survive DO hibernation and provide real-time bidirectional communication:
// Client-side
const ws = new WebSocket(`wss://your-worker.workers.dev/ws/${sessionId}?fromSequence=0`);
ws.onopen = () => {
console.log('Connected');
};
ws.onmessage = (event) => {
const data = JSON.parse(event.data);
if (data.type === 'chunk') {
handleChunk(data.chunk);
} else if (data.type === 'end') {
console.log('Stream ended', data.finalOutput);
ws.close();
} else if (data.type === 'fail') {
console.error('Stream failed', data.error);
ws.close();
} else if (data.type === 'pong') {
// Response to ping
}
};
ws.onerror = (error) => {
console.error('WebSocket error', error);
};
// Send ping to keep alive
setInterval(() => {
if (ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify({ type: 'ping' }));
}
}, 30000);SSE Streaming
Server-Sent Events for one-way streaming (simpler, wider browser support):
// Client-side
const eventSource = new EventSource(
`https://your-worker.workers.dev/stream/${sessionId}?fromSequence=0`
);
eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);
switch (data.type) {
case 'chunk':
handleChunk(data.chunk);
break;
case 'end':
console.log('Complete', data.finalOutput);
eventSource.close();
break;
case 'fail':
console.error('Failed', data.error);
eventSource.close();
break;
}
};
eventSource.onerror = (error) => {
console.error('SSE error', error);
eventSource.close();
};
// SSE heartbeats are sent automatically every 30 secondsStream Events
Both WebSocket and SSE receive the same event types:
type StreamEvent =
| { type: 'chunk'; sequence: number; chunk: StreamChunk }
| { type: 'end'; finalOutput: unknown }
| { type: 'fail'; error: string };
type StreamChunk =
| { type: 'text_delta'; delta: string; agentId: string; ... }
| { type: 'tool_call_start'; toolName: string; ... }
| { type: 'tool_result'; result: unknown; ... }
| { type: 'run_completed'; output: unknown; ... }
| { type: 'run_interrupted'; reason: string; ... }
// ... and moreLLM Adapter Handling
Critical
LLM adapters cannot be serialized over HTTP. They must be created within the worker/DO context.
With the composition API, the llmAdapter factory handles this automatically:
export const MyAgentServer = createAgentServer<Env>({
// Called inside the DO for each execution
llmAdapter: (env) => new VercelAIAdapter({
model: openai('gpt-4o', { apiKey: env.OPENAI_API_KEY }),
}),
// ...
});Hibernation
The DO runtime supports Cloudflare's hibernation feature for cost efficiency:
How It Works
- When no WebSocket connections are active and no execution is running, the DO may hibernate
- On hibernation, in-memory state (like
currentExecution) is cleared - SQLite state persists across hibernation
- On wake, the DO detects interrupted execution and marks it as
paused
Handling Hibernation-Interrupted Execution
// Client detects interruption via stream event
ws.onmessage = (event) => {
const data = JSON.parse(event.data);
if (data.type === 'chunk' && data.chunk.type === 'run_interrupted') {
if (data.chunk.reason === 'hibernation') {
// Agent was paused due to hibernation
// Can resume when ready
await resumeExecution(data.chunk.sessionId);
}
}
};
// Resume execution
async function resumeExecution(sessionId: string) {
const response = await fetch(`/resume/${sessionId}`, {
method: 'POST',
body: JSON.stringify({
agentType: 'researcher',
options: { mode: 'continue' },
}),
});
}Keeping DOs Alive
To prevent hibernation during critical operations:
// Maintain WebSocket connection with pings
setInterval(() => ws.send(JSON.stringify({ type: 'ping' })), 30000);
// Or use SSE which includes automatic heartbeatsMulti-Turn Conversations
Continue conversations across multiple messages:
// First message - creates a new session
const sessionId = 'session-123';
await stub.fetch('/start', {
method: 'POST',
body: JSON.stringify({
agentType: 'researcher',
input: { message: 'Hello, my name is Alice' },
sessionId,
}),
});
// Wait for completion...
// Continue the conversation - same sessionId
await stub.fetch('/start', {
method: 'POST',
body: JSON.stringify({
agentType: 'researcher',
input: { message: 'What is my name?' },
sessionId, // Reuse the same session
}),
});
// The agent remembers: "Your name is Alice"Interrupt & Resume
Full interrupt/resume support is available:
// Start execution
await stub.fetch('/start', {
method: 'POST',
body: JSON.stringify({
agentType: 'researcher',
input: { message: 'Long research task' },
sessionId,
}),
});
// Interrupt later
await stub.fetch('/interrupt', {
method: 'POST',
body: JSON.stringify({ reason: 'user_pause' }),
});
// Check if resumable
const statusResponse = await stub.fetch('/status');
const { status } = await statusResponse.json();
if (status === 'paused' || status === 'interrupted') {
// Resume
await stub.fetch('/resume', {
method: 'POST',
body: JSON.stringify({
agentType: 'researcher',
options: { mode: 'continue' },
}),
});
}DOStateStore
The DOStateStore uses the DO's built-in SQLite for persistence:
import { DOStateStore } from '@helix-agents/runtime-cloudflare';
// Created internally by AgentServer
const stateStore = new DOStateStore({
sql: this.sql, // PartyServer's sql tagged template
logger: this.logger,
});
// Implements SessionStateStore interface
await stateStore.save(state);
const loaded = await stateStore.loadState(sessionId);
await stateStore.appendMessages(sessionId, messages);Schema
The DOStateStore creates these tables automatically:
-- Main state table (single row per DO)
CREATE TABLE state (
id INTEGER PRIMARY KEY DEFAULT 1,
session_id TEXT NOT NULL,
agent_type TEXT NOT NULL,
stream_id TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'active',
step_count INTEGER DEFAULT 0,
custom_state TEXT,
output TEXT,
error TEXT,
checkpoint_id TEXT,
...
);
-- Messages table
CREATE TABLE messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
role TEXT NOT NULL,
content TEXT,
tool_calls TEXT,
...
);
-- Stream chunks table
CREATE TABLE stream_chunks (
sequence INTEGER PRIMARY KEY,
chunk TEXT NOT NULL,
created_at INTEGER NOT NULL
);DOStreamManager
The DOStreamManager writes directly to connected clients:
import { DOStreamManager } from '@helix-agents/runtime-cloudflare';
// Created internally by AgentServer
const streamManager = new DOStreamManager({
sql: this.sql,
getConnections: () => this.getConnections(),
broadcast: (msg) => this.broadcast(msg),
sseConnections: this.sseConnections,
logger: this.logger,
});
// Writes to SQLite AND broadcasts to all connected clients
const writer = await streamManager.createWriter(streamId, agentId, agentType);
await writer.write(chunk); // FREE - no subrequest!DOUsageStore
The DOUsageStore tracks token usage, tool calls, and sub-agent invocations using the DO's built-in SQLite:
import { DOUsageStore } from '@helix-agents/runtime-cloudflare';
// Created internally by AgentServer
const usageStore = new DOUsageStore({
sql: this.sql, // PartyServer's sql tagged template
sessionId: 'session-123',
logger: this.logger,
});
// Implements UsageStore interface
await usageStore.recordEntry(entry);
const entries = await usageStore.getEntries(sessionId);
const rollup = await usageStore.getRollup(sessionId);Getting Usage via HTTP
// Get usage rollup for parent agent only
const response = await stub.fetch('/usage');
const rollup = await response.json();
// Get usage rollup including sub-agents (note: limited in DO runtime)
const response = await stub.fetch('/usage?mode=rollup');
// Get raw entries
const response = await stub.fetch('/usage?mode=entries');Usage Response
interface TokenCounts {
prompt?: number;
completion?: number;
reasoning?: number;
cached?: number;
total?: number;
}
interface UsageRollup {
runId: string;
// Token usage for this agent only
tokens: TokenCounts;
tokensByModel: Record<string, TokenCounts>;
// Token usage including sub-agents (see limitation below)
tokensIncludingSubAgents: TokenCounts;
// Tool execution statistics
toolStats: {
totalCalls: number;
successfulCalls: number;
failedCalls: number;
totalDurationMs: number;
byTool: Record<string, {
calls: number;
successfulCalls: number;
failedCalls: number;
totalDurationMs: number;
}>;
};
// Sub-agent execution statistics
subAgentStats: {
totalCalls: number;
successfulCalls: number;
failedCalls: number;
totalDurationMs: number;
byType: Record<string, {
calls: number;
successfulCalls: number;
failedCalls: number;
totalDurationMs: number;
}>;
};
// Custom metrics aggregated by type, then by name
// Example: { 'api_calls': { 'tavily': 5 }, 'bytes': { 'input': 1024 } }
custom: Record<string, Record<string, number>>;
// Timing
startedAt?: number;
lastUpdatedAt?: number;
entryCount: number;
}Sub-Agent Usage Limitation
In the DO runtime, sub-agents run in separate Durable Objects. Cross-DO usage aggregation is not currently supported. The tokensIncludingSubAgents field will only include the parent agent's tokens. For accurate sub-agent usage, query each sub-agent's DO directly.
Legacy API (Deprecated)
Deprecated
The subclass-based AgentServer API is deprecated. Use createAgentServer() instead for new projects. See Migration Guide below.
The legacy API uses class inheritance for customization:
import { AgentServer, type StartAgentRequest } from '@helix-agents/runtime-cloudflare';
export class MyAgentServer extends AgentServer<Env> {
protected override createLLMAdapter(): LLMAdapter {
const env = this.getEnv();
return new VercelAIAdapter({
model: openai('gpt-4o', { apiKey: env.OPENAI_API_KEY }),
});
}
protected override resolveAgent(
requestedAgent: AgentConfig<z.ZodType, z.ZodType>,
context: { sessionId: string; runId?: string; userId?: string }
): AgentConfig<z.ZodType, z.ZodType> {
return requestedAgent;
}
protected override async beforeStart(
request: Request,
body: StartAgentRequest
): Promise<void> {
await super.beforeStart(request, body);
console.log('Agent starting...');
}
}Migration from Legacy API
Before (subclass):
class MyAgentServer extends AgentServer<Env> {
private registry = new AgentRegistry();
constructor(ctx: DurableObjectState, env: Env) {
super(ctx, env);
this.registry.register(myAgent);
}
protected override createLLMAdapter(): LLMAdapter {
return new VercelAIAdapter({
model: openai('gpt-4o', { apiKey: this.getEnv().OPENAI_API_KEY }),
});
}
protected override async beforeStart(req: Request, body: StartAgentRequest): Promise<void> {
if (this.isExecuting()) {
await this.getCurrentHandle()?.interrupt('superseded');
}
}
}After (composition):
const registry = new AgentRegistry();
registry.register(myAgent);
export const MyAgentServer = createAgentServer<Env>({
llmAdapter: (env) => new VercelAIAdapter({
model: openai('gpt-4o', { apiKey: env.OPENAI_API_KEY }),
}),
agents: registry,
hooks: {
beforeStart: async ({ executionState }) => {
if (executionState.isExecuting) {
await executionState.interrupt('superseded');
}
},
},
});Limitations
LLM Adapter Serialization
LLM adapters contain functions and API clients that can't be serialized. With the composition API, the llmAdapter factory handles this automatically since it runs inside the DO.
Single DO Per Run
Each agent run gets its own DO instance. This means:
- No shared state between runs (use D1 if needed)
- Each run has its own SQLite database
- Geographic pinning based on first access
Geographic Pinning
DOs are pinned to a region on first access:
// First access pins to nearest region
const stub = env.AGENTS.get(doId);
// All subsequent access goes to that region
// Consider region affinity for latency-sensitive appsMemory Limits
DOs have memory limits. For very long conversations:
- Consider truncating message history
- Use checkpoints to manage state size
- Monitor memory usage in production
No Step-Level Durability
Unlike Workflows, the DO runtime doesn't have automatic step-level retries:
- If the DO crashes mid-execution, state reflects last committed step
- Use checkpoints for crash recovery
- Consider Workflows runtime for critical durability needs
Best Practices
1. Keep Connections Alive
// Client-side keepalive
setInterval(() => {
if (ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify({ type: 'ping' }));
}
}, 30000);2. Handle Reconnection
// Track last received sequence
let lastSequence = 0;
ws.onmessage = (event) => {
const data = JSON.parse(event.data);
if (data.sequence) {
lastSequence = data.sequence;
}
};
// Reconnect from last sequence
function reconnect() {
ws = new WebSocket(`wss://.../ws/${sessionId}?fromSequence=${lastSequence}`);
}3. Use Proper Error Handling
try {
const response = await stub.fetch('/start', { ... });
if (!response.ok) {
const error = await response.json();
throw new Error(error.error);
}
} catch (error) {
// Handle DO errors (network, timeout, etc.)
console.error('DO error:', error);
}4. Monitor Subrequest Usage
// The only subrequests should be LLM API calls
// Monitor via Cloudflare dashboard to ensure streaming is freeDeployment
Development
wrangler devProduction
wrangler deploySecrets
wrangler secret put OPENAI_API_KEYNext Steps
- Cloudflare Overview - Compare with Workflows runtime
- Workflows Runtime - For step-level durability
- Storage: Cloudflare - Storage options
- DO Example - Complete working example