Cloudflare Durable Objects Runtime
The Durable Objects (DO) runtime executes agents directly inside a Durable Object, bypassing Cloudflare's subrequest limits by streaming directly to WebSocket and SSE connections.
When to Use DO Runtime
Choose the DO runtime when you need heavy streaming (>100 chunks per execution), real-time WebSocket/SSE connections, or simpler single-DO-per-run architecture. For step-level durability with automatic retries, consider the Workflows runtime instead.
Why Durable Objects?
Cloudflare Workers have a 1000 subrequest limit per invocation. In the Workflows runtime, each stream chunk write to the streaming Durable Object counts as a subrequest. For streaming-heavy agents, this can quickly exhaust the limit.
The DO runtime solves this by:
- Hosting agent execution inside the DO - The JSAgentExecutor runs within the Durable Object
- Streaming directly to connections - WebSocket/SSE writes don't count as subrequests
- Using local SQLite - State is stored in DO's built-in SQLite, not external D1
- Only LLM calls consume subrequests - The only external calls are to the LLM API
Key Benefits
| Feature | Description |
|---|---|
| FREE Streaming | WebSocket/SSE writes don't count against subrequest limits |
| Built-in SQLite | No D1 database needed - state lives in the DO |
| Hibernation | Cost-efficient sleep/wake with state preservation |
| Simpler Architecture | One DO = one agent run (no coordination needed) |
| Native WebSocket/SSE | Real-time bidirectional communication |
Session vs Run Identifiers
sessionId: Primary key for state storage. Each Durable Object hosts one session.runId: Identifies a specific execution within a session. Multiple runs can occur within one session (e.g., after interrupts).
Use sessionId for DO naming (e.g., env.AGENTS.idFromName(\session:${sessionId}`)). The runId` is execution metadata for tracing.
Architecture
graph TB
subgraph Edge ["Edge Location"]
subgraph CFWorker ["Cloudflare Worker"]
W1["Routes requests to DOs<br/>Creates LLM adapters<br/>Manages DO stubs"]
end
CFWorker --> AgentDO
subgraph AgentDO ["AgentServer (Durable Object)"]
subgraph Executor ["JSAgentExecutor"]
E1["Agent loop execution<br/>Tool execution<br/>Sub-agent spawning"]
end
subgraph Stores [" "]
direction LR
StateStore["DOStateStore<br/>(SQLite)"]
UsageStore["DOUsageStore<br/>(SQLite)"]
StreamMgr["DOStreamManager<br/>(WebSocket/SSE)"]
end
end
AgentDO --> Clients
subgraph Clients [" "]
direction LR
WS["<b>WebSocket Clients</b><br/>Real-time, survives<br/>hibernation"]
SSE["<b>SSE Clients</b><br/>One-way, simpler"]
end
endPrerequisites
- Cloudflare account with Workers Paid plan
- Wrangler CLI:
npm install -g wrangler - No additional services needed (D1, etc.) - SQLite is built into the DO
Installation
npm install @helix-agents/runtime-cloudflare @helix-agents/coreThe partyserver dependency is included automatically.
Quick Start (Composition API)
The composition API using createAgentServer() is the recommended approach for building AgentServer instances.
1. Configure wrangler.toml
name = "my-agent-worker"
main = "src/worker.ts"
compatibility_date = "2024-01-01"
# Durable Object binding with SQLite support
[[durable_objects.bindings]]
name = "AGENTS"
class_name = "MyAgentServer"
# Enable SQLite for the DO class
[[migrations]]
tag = "v1"
new_sqlite_classes = ["MyAgentServer"]Important
Use new_sqlite_classes instead of new_classes to enable SQLite storage in the Durable Object. Without this, the DO won't have access to the this.sql method.
2. Create AgentServer with Composition
// src/agent-server.ts
import { createAgentServer, AgentRegistry } from '@helix-agents/runtime-cloudflare';
import { VercelAIAdapter } from '@helix-agents/llm-vercel';
import { createAnthropic } from '@ai-sdk/anthropic';
import { createOpenAI } from '@ai-sdk/openai';
import { myAgent } from './agent.js';
interface Env {
AGENTS: DurableObjectNamespace;
ANTHROPIC_API_KEY: string;
OPENAI_API_KEY: string;
}
// Create and populate registry
const registry = new AgentRegistry();
registry.register(myAgent);
// Create AgentServer using composition
export const MyAgentServer = createAgentServer<Env>({
// Required: Factory to create LLM adapters
llmAdapter: (env, ctx) =>
new VercelAIAdapter({
anthropic: createAnthropic({ apiKey: env.ANTHROPIC_API_KEY }),
}),
// Required: Agent registry or resolver function
agents: registry,
// Optional: Required if using createSubAgentTool() — see Sub-Agents section
// subAgentNamespace: (env) => env.AGENTS,
// Optional: Lifecycle hooks
hooks: {
beforeStart: async ({ executionState, body }) => {
// "Last message wins" - interrupt existing execution
if (executionState.isExecuting) {
await executionState.interrupt('New message received');
}
console.log(`Starting agent: ${body.agentType}`);
},
afterStart: async ({ sessionId, agentType }) => {
console.log(`Agent ${agentType} started (session: ${sessionId})`);
},
onComplete: async ({ sessionId, status, result, error }) => {
console.log(`Execution complete: ${status}`);
},
},
});3. Create Worker Entry Point
// src/worker.ts
import { MyAgentServer } from './agent-server.js';
import type { Env } from './agent-server.js';
// Export the AgentServer DO class
export { MyAgentServer };
export default {
async fetch(request: Request, env: Env): Promise<Response> {
const url = new URL(request.url);
// Start new agent execution
if (url.pathname === '/execute' && request.method === 'POST') {
const { message } = await request.json<{ message: string | UserInputMessage[] }>();
// Generate unique session ID
const sessionId = crypto.randomUUID();
// Get DO stub for this session
const doId = env.AGENTS.idFromName(`session:${sessionId}`);
const stub = env.AGENTS.get(doId);
// Start execution using the new request format
const response = await stub.fetch('/start', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
agentType: 'researcher', // Agent type from registry
input: { message },
sessionId,
}),
});
const result = await response.json<{ sessionId: string; streamId: string }>();
return Response.json({
sessionId: result.sessionId,
streamUrl: `/stream/${result.sessionId}`,
websocketUrl: `/ws/${result.sessionId}`,
});
}
// SSE streaming endpoint
if (url.pathname.startsWith('/stream/')) {
const sessionId = url.pathname.split('/').pop()!;
const doId = env.AGENTS.idFromName(`session:${sessionId}`);
const stub = env.AGENTS.get(doId);
// Forward to DO's SSE endpoint
return stub.fetch(new URL('/sse', request.url), request);
}
// WebSocket upgrade
if (url.pathname.startsWith('/ws/')) {
const sessionId = url.pathname.split('/').pop()!;
const doId = env.AGENTS.idFromName(`session:${sessionId}`);
const stub = env.AGENTS.get(doId);
// Forward WebSocket upgrade to DO
return stub.fetch(request);
}
// Get status
if (url.pathname.startsWith('/status/')) {
const sessionId = url.pathname.split('/').pop()!;
const doId = env.AGENTS.idFromName(`session:${sessionId}`);
const stub = env.AGENTS.get(doId);
return stub.fetch(new URL('/status', request.url));
}
return new Response('Not found', { status: 404 });
},
};4. Define Your Agent
// src/agent.ts
import { defineAgent, defineTool } from '@helix-agents/core';
import { z } from 'zod';
const searchTool = defineTool({
name: 'search',
description: 'Search for information',
inputSchema: z.object({
query: z.string(),
}),
execute: async ({ query }) => {
// Your search implementation
return { results: [`Found: ${query}`] };
},
});
export const myAgent = defineAgent({
name: 'researcher',
description: 'Research assistant',
systemPrompt: 'You are a helpful research assistant.',
tools: [searchTool],
outputSchema: z.object({
summary: z.string(),
sources: z.array(z.string()),
}),
maxSteps: 10,
});DurableObjectAgentConfig
The createAgentServer<TEnv>(config) factory accepts this configuration:
interface DurableObjectAgentConfig<TEnv = unknown> {
/**
* Factory to create LLM adapters.
* Called once per execution with environment and context.
* Required.
*/
llmAdapter: LLMAdapterFactory<TEnv>;
/**
* Agent resolution strategy.
* Can be an AgentRegistry instance or a resolver function.
* Required.
*/
agents: AgentResolverInterface<TEnv> | AgentResolver<TEnv>;
/**
* Optional lifecycle hooks for customization.
*/
hooks?: ServerHooks<TEnv>;
/**
* Optional custom endpoints.
* Keys are paths (e.g., "/my-endpoint"), values are handlers.
* Custom endpoints are checked BEFORE built-in endpoints.
*/
endpoints?: Record<string, EndpointHandler<TEnv>>;
/**
* Optional usage store factory.
* If not provided, uses the DO-local DOUsageStore.
*/
usageStore?: UsageStoreFactory<TEnv>;
/**
* Optional logger.
* If not provided, uses noopLogger.
*/
logger?: Logger;
/**
* Optional namespace for routing sub-agent calls to sibling DOs.
* Required when using createSubAgentTool() — see Sub-Agents section below.
* Each sub-agent session gets its own isolated DO instance.
*/
subAgentNamespace?: (env: TEnv) => DurableObjectNamespace;
}LLMAdapterFactory
Creates LLM adapters with access to environment and execution context:
type LLMAdapterFactory<TEnv> = (env: TEnv, context: LLMAdapterContext) => LLMAdapter;
interface LLMAdapterContext {
sessionId: string;
agentType: string;
runId: string;
}Example: Per-agent model selection
llmAdapter: (env, ctx) => {
// Choose model based on agent type
if (ctx.agentType === 'fast-responder') {
return new VercelAIAdapter({
openai: createOpenAI({ apiKey: env.OPENAI_API_KEY }),
defaultModel: 'gpt-4o-mini',
});
}
return new VercelAIAdapter({
anthropic: createAnthropic({ apiKey: env.ANTHROPIC_API_KEY }),
defaultModel: 'claude-sonnet-4-20250514',
});
};AgentRegistry with Factories
The AgentRegistry supports both static registration and factory functions for dynamic agent creation.
Static Registration
For agents that don't need runtime dependencies:
import { AgentRegistry } from '@helix-agents/runtime-cloudflare';
const registry = new AgentRegistry();
registry.register(researchAgent);
registry.register(summarizerAgent);Factory Registration
For agents that need environment bindings, per-request configuration, or dependency injection:
registry.registerFactory<Env>('brainstorm', (ctx) => {
return createBrainstormAgent({
tavilyApiKey: ctx.env.TAVILY_API_KEY,
database: ctx.env.DATABASE,
userId: ctx.userId,
});
});AgentFactoryContext
Factory functions receive this context:
interface AgentFactoryContext<TEnv = unknown> {
/** Environment bindings */
env: TEnv;
/** Session ID for the execution */
sessionId: string;
/** Run ID for this execution */
runId: string;
/** Optional user identifier */
userId?: string;
}Using resolve() vs get()
// get() - Static agents only (legacy, for backward compatibility)
const agent = registry.get('researcher');
// resolve() - Handles both static and factory agents (preferred)
const agent = registry.resolve('brainstorm', {
env,
sessionId: 'session-123',
runId: 'run-456',
userId: 'user-789',
});Registry Methods
| Method | Description |
|---|---|
register(config) | Register a static agent configuration |
registerFactory(type, factory) | Register a factory function |
resolve(type, context) | Resolve agent (factory-first, then static) |
get(type) | Get static agent only (legacy) |
has(type) | Check if agent type is registered |
unregister(type) | Remove an agent registration |
getRegisteredTypes() | Get all registered agent type names |
clear() | Remove all registrations (for testing) |
AgentNotFoundError
When an agent type is not found, AgentNotFoundError provides helpful information:
import { AgentNotFoundError } from '@helix-agents/runtime-cloudflare';
try {
const agent = registry.resolve('unknown', context);
} catch (error) {
if (error instanceof AgentNotFoundError) {
console.log(`Unknown type: ${error.agentType}`);
console.log(`Available: ${error.availableTypes.join(', ')}`);
}
}ExecutionState
The ExecutionState interface provides hooks with a clean way to check and control execution:
interface ExecutionState {
/** Session ID of current execution, or null if none */
readonly sessionId: string | null;
/** Run ID of current execution, or null if none */
readonly runId: string | null;
/** Whether an execution is currently running */
readonly isExecuting: boolean;
/** Current execution status */
readonly status: 'idle' | 'running' | 'paused' | 'interrupted' | 'completed' | 'failed';
/** Get the current execution handle (if any) */
getHandle<T = unknown>(): AgentExecutionHandle<T> | null;
/** Interrupt the current execution (soft stop, resumable) */
interrupt(reason?: string): Promise<void>;
/** Abort the current execution (hard stop, not resumable) */
abort(reason?: string): Promise<void>;
}Example: "Last message wins" pattern
hooks: {
beforeStart: async ({ executionState }) => {
if (executionState.isExecuting) {
await executionState.interrupt('New message received');
}
},
}Lifecycle Hooks
All hooks are optional. They enable customization without subclassing.
interface ServerHooks<TEnv = unknown> {
/**
* Called before starting execution.
* Can abort by returning { abort: true, response: Response }.
*/
beforeStart?: (context: BeforeStartContext<TEnv>) => Promise<void | AbortResult>;
/**
* Called after execution handle is created (before it completes).
*/
afterStart?: (context: AfterStartContext<TEnv>) => Promise<void>;
/**
* Called before resuming execution.
* Can abort by returning { abort: true, response: Response }.
*/
beforeResume?: (context: BeforeResumeContext<TEnv>) => Promise<void | AbortResult>;
/**
* Called after resume handle is created.
*/
afterResume?: (context: AfterResumeContext<TEnv>) => Promise<void>;
/**
* Called before retrying execution.
* Can abort by returning { abort: true, response: Response }.
*/
beforeRetry?: (context: BeforeRetryContext<TEnv>) => Promise<void | AbortResult>;
/**
* Called after retry handle is created.
*/
afterRetry?: (context: AfterRetryContext<TEnv>) => Promise<void>;
/**
* Called when execution completes (success, failure, or interrupt).
*/
onComplete?: (context: CompleteContext<TEnv>) => Promise<void>;
/**
* Transform incoming /start request to standard format.
*/
transformRequest?: (rawBody: unknown, env: TEnv) => Promise<StartAgentRequestV2>;
/**
* Transform outgoing /start response.
*/
transformResponse?: (
response: { sessionId: string; streamId: string; status: 'started' | 'resumed' },
context: { sessionId: string; agentType: string }
) => Promise<unknown>;
}Hook Contexts
BeforeStartContext:
interface BeforeStartContext<TEnv> {
env: TEnv;
request: Request;
body: StartAgentRequestV2;
executionState: ExecutionState;
}AfterStartContext:
interface AfterStartContext<TEnv> {
env: TEnv;
sessionId: string;
runId: string;
agentType: string;
handle: AgentExecutionHandle<unknown>;
}CompleteContext:
interface CompleteContext<TEnv> {
env: TEnv;
sessionId: string;
runId: string;
status: 'completed' | 'failed' | 'interrupted';
result?: unknown;
error?: Error;
}Aborting Requests
Hooks can abort requests by returning an AbortResult:
hooks: {
beforeStart: async ({ body }) => {
if (!body.userId) {
return {
abort: true,
response: Response.json({ error: 'User ID required' }, { status: 401 }),
};
}
},
}Request/Response Transformation
Transform custom request formats to the standard format:
hooks: {
transformRequest: async (rawBody, env) => {
const body = rawBody as { prompt: string; conversationId: string };
return {
agentType: 'brainstorm',
input: { message: body.prompt },
sessionId: body.conversationId,
};
},
transformResponse: async (response, { sessionId }) => {
return {
id: sessionId,
status: 'processing',
};
},
}Custom Endpoints
Add custom HTTP endpoints that are checked before built-in endpoints:
export const MyAgentServer = createAgentServer<Env>({
llmAdapter: (env) =>
new VercelAIAdapter({
/* ... */
}),
agents: registry,
endpoints: {
'/health': async (request, { executionState }) => {
return Response.json({
status: 'healthy',
isExecuting: executionState.isExecuting,
});
},
'/custom-action': async (request, { env, logger }) => {
if (request.method !== 'POST') {
return Response.json({ error: 'Method not allowed' }, { status: 405 });
}
const body = await request.json();
logger.info('Custom action', { body });
return Response.json({ success: true });
},
},
});EndpointHandler
type EndpointHandler<TEnv> = (
request: Request,
context: EndpointContext<TEnv>
) => Promise<Response>;
interface EndpointContext<TEnv> {
env: TEnv;
executionState: ExecutionState;
logger: Logger;
}New Request Format
The composition API uses a new request format that uses agentType instead of the full AgentConfig:
StartAgentRequestV2
interface StartAgentRequestV2 {
/** Agent type to execute (looked up in registry/resolver) */
agentType: string;
/** Input for the agent */
input: {
/** String message or array of UserInputMessage for multi-message input */
message: string | UserInputMessage[];
state?: Record<string, unknown>;
messages?: Message[];
};
/** Session identifier (required) */
sessionId: string;
/** Optional user identifier */
userId?: string;
/** Optional tags for the session */
tags?: string[];
/** Optional metadata for the session */
metadata?: Record<string, string>;
}
/** Individual message in a multi-message input */
interface UserInputMessage {
role: 'user';
content: string;
metadata?: Record<string, unknown>;
files?: FileInput[];
}
interface FileInput {
data: string; // Base64 data URI
mediaType: string; // MIME type
filename?: string;
}ResumeAgentRequestV2
interface ResumeAgentRequestV2 {
/** Agent type (for context) */
agentType: string;
/** Resume options */
options: {
mode?: 'continue' | 'retry' | 'branch';
checkpointId?: string;
modifyState?: unknown;
appendMessages?: Message[];
};
}AgentServer HTTP Endpoints
The AgentServer exposes the following HTTP endpoints:
| Endpoint | Method | Description |
|---|---|---|
/start | POST | Start new agent execution |
/resume | POST | Resume paused/interrupted agent |
/retry | POST | Retry failed agent |
/interrupt | POST | Request soft interruption (agent finishes current step) |
/abort | POST | Force stop execution immediately |
/status | GET | Get current execution status |
/sse | GET | SSE streaming connection |
/history | GET | Get historical stream chunks |
/usage | GET | Get usage data (tokens, tool calls) |
/messages | GET | Get conversation messages (paginated) |
/snapshot | GET | Get UI state snapshot for initialization |
Starting Execution
// Request with a string message
const response = await stub.fetch('/start', {
method: 'POST',
body: JSON.stringify({
agentType: 'researcher', // Agent type from registry
input: { message: 'Hello' }, // Input message (string or UserInputMessage[])
sessionId: 'session-123', // Session ID for conversation continuity
userId: 'user-456', // Optional: user context
tags: ['test'], // Optional: tags
metadata: { key: 'value' }, // Optional: metadata
}),
});
// Request with multiple messages (context injection, file attachments)
const response = await stub.fetch('/start', {
method: 'POST',
body: JSON.stringify({
agentType: 'researcher',
input: {
message: [
{
role: 'user',
content: 'Background: user is on the enterprise plan',
metadata: { source: 'system' },
},
{ role: 'user', content: 'What features do I have access to?' },
],
},
sessionId: 'session-123',
}),
});
// Response
interface AgentStartResponse {
sessionId: string;
streamId: string;
status: 'started' | 'resumed';
}Resuming Execution
const response = await stub.fetch('/resume', {
method: 'POST',
body: JSON.stringify({
agentType: 'researcher',
options: {
mode: 'continue', // 'continue' | 'retry' | 'branch'
checkpointId: 'cp-123', // Optional: resume from checkpoint
modifyState: (s) => s, // Optional: modify state before resume
appendMessages: [...], // Optional: add messages before resume
},
}),
});Interrupting Execution
// Soft interrupt - agent finishes current step
await stub.fetch('/interrupt', {
method: 'POST',
body: JSON.stringify({ reason: 'user_requested' }),
});
// Hard abort - stops immediately
await stub.fetch('/abort', {
method: 'POST',
body: JSON.stringify({ reason: 'timeout' }),
});Getting Status
const response = await stub.fetch('/status');
const status = await response.json<AgentStatusResponse>();
interface AgentStatusResponse {
sessionId: string | null;
status: string; // 'active' | 'completed' | 'failed' | 'paused' | 'interrupted'
stepCount: number;
output?: unknown;
error?: string;
checkpointId?: string;
isExecuting: boolean; // true if currently running
}Getting History
const response = await stub.fetch('/history?fromSequence=0&limit=100');
const history = await response.json<{
chunks: Array<{ sequence: number; chunk: StreamChunk }>;
hasMore: boolean;
latestSequence: number;
}>();Getting Messages
Fetch conversation messages with pagination:
const response = await stub.fetch('/messages?offset=0&limit=50');
const data = await response.json<{
messages: UIMessage[];
total: number;
offset: number;
limit: number;
hasMore: boolean;
}>();Getting Snapshot
Get a complete UI state snapshot for initializing frontend state (e.g., on page refresh):
const response = await stub.fetch('/snapshot');
const snapshot = await response.json<{
runId: string;
messages: UIMessage[];
state: TState | null;
streamSequence: number;
status: 'active' | 'paused' | 'ended' | 'failed';
timestamp: number;
}>();
// Use for initializing useChat
const { messages, setMessages } = useChat({
initialMessages: snapshot.messages,
});Streaming
WebSocket Streaming
WebSocket connections survive DO hibernation and provide real-time bidirectional communication:
// Client-side
const ws = new WebSocket(`wss://your-worker.workers.dev/ws/${sessionId}?fromSequence=0`);
ws.onopen = () => {
console.log('Connected');
};
ws.onmessage = (event) => {
const data = JSON.parse(event.data);
if (data.type === 'chunk') {
handleChunk(data.chunk);
} else if (data.type === 'end') {
console.log('Stream ended', data.finalOutput);
ws.close();
} else if (data.type === 'fail') {
console.error('Stream failed', data.error);
ws.close();
} else if (data.type === 'pong') {
// Response to ping
}
};
ws.onerror = (error) => {
console.error('WebSocket error', error);
};
// Send ping to keep alive
setInterval(() => {
if (ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify({ type: 'ping' }));
}
}, 30000);SSE Streaming
Server-Sent Events for one-way streaming (simpler, wider browser support):
// Client-side
const eventSource = new EventSource(
`https://your-worker.workers.dev/stream/${sessionId}?fromSequence=0`
);
eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);
switch (data.type) {
case 'chunk':
handleChunk(data.chunk);
break;
case 'end':
console.log('Complete', data.finalOutput);
eventSource.close();
break;
case 'fail':
console.error('Failed', data.error);
eventSource.close();
break;
}
};
eventSource.onerror = (error) => {
console.error('SSE error', error);
eventSource.close();
};
// SSE heartbeats are sent automatically every 30 secondsStream Events
Both WebSocket and SSE receive the same event types:
type StreamEvent =
| { type: 'chunk'; sequence: number; chunk: StreamChunk }
| { type: 'end'; finalOutput: unknown }
| { type: 'fail'; error: string };
type StreamChunk =
| { type: 'text_delta'; delta: string; agentId: string; ... }
| { type: 'tool_call_start'; toolName: string; ... }
| { type: 'tool_result'; result: unknown; ... }
| { type: 'run_completed'; output: unknown; ... }
| { type: 'run_interrupted'; reason: string; ... }
// ... and moreLLM Adapter Handling
Critical
LLM adapters cannot be serialized over HTTP. They must be created within the worker/DO context.
With the composition API, the llmAdapter factory handles this automatically:
export const MyAgentServer = createAgentServer<Env>({
// Called inside the DO for each execution
llmAdapter: (env) =>
new VercelAIAdapter({
model: openai('gpt-4o', { apiKey: env.OPENAI_API_KEY }),
}),
// ...
});Hibernation
The DO runtime supports Cloudflare's hibernation feature for cost efficiency:
How It Works
- When no WebSocket connections are active and no execution is running, the DO may hibernate
- On hibernation, in-memory state (like
currentExecution) is cleared - SQLite state persists across hibernation
- On wake, the DO detects interrupted execution and marks it as
paused
Handling Hibernation-Interrupted Execution
// Client detects interruption via stream event
ws.onmessage = (event) => {
const data = JSON.parse(event.data);
if (data.type === 'chunk' && data.chunk.type === 'run_interrupted') {
if (data.chunk.reason === 'hibernation') {
// Agent was paused due to hibernation
// Can resume when ready
await resumeExecution(data.chunk.sessionId);
}
}
};
// Resume execution
async function resumeExecution(sessionId: string) {
const response = await fetch(`/resume/${sessionId}`, {
method: 'POST',
body: JSON.stringify({
agentType: 'researcher',
options: { mode: 'continue' },
}),
});
}Keeping DOs Alive
To prevent hibernation during critical operations:
// Maintain WebSocket connection with pings
setInterval(() => ws.send(JSON.stringify({ type: 'ping' })), 30000);
// Or use SSE which includes automatic heartbeatsMulti-Turn Conversations
Continue conversations across multiple messages:
// First message - creates a new session
const sessionId = 'session-123';
await stub.fetch('/start', {
method: 'POST',
body: JSON.stringify({
agentType: 'researcher',
input: { message: 'Hello, my name is Alice' },
sessionId,
}),
});
// Wait for completion...
// Continue the conversation - same sessionId
await stub.fetch('/start', {
method: 'POST',
body: JSON.stringify({
agentType: 'researcher',
input: { message: 'What is my name?' },
sessionId, // Reuse the same session
}),
});
// The agent remembers: "Your name is Alice"Multi-Message Input
The message field in input accepts either a string or a UserInputMessage[] array. This lets you inject context alongside the user's question or attach files:
await stub.fetch('/start', {
method: 'POST',
body: JSON.stringify({
agentType: 'researcher',
input: {
message: [
{
role: 'user',
content: 'System context: user is a premium subscriber',
metadata: { source: 'system' },
},
{ role: 'user', content: 'What premium features can I use?' },
],
},
sessionId,
}),
});
// Messages can include file attachments
await stub.fetch('/start', {
method: 'POST',
body: JSON.stringify({
agentType: 'researcher',
input: {
message: [
{
role: 'user',
content: 'Analyze this image',
files: [
{ data: 'data:image/png;base64,...', mediaType: 'image/png', filename: 'chart.png' },
],
},
],
},
sessionId,
}),
});String and multi-message inputs can be mixed freely across turns in the same session.
Interrupt & Resume
Full interrupt/resume support is available:
// Start execution
await stub.fetch('/start', {
method: 'POST',
body: JSON.stringify({
agentType: 'researcher',
input: { message: 'Long research task' },
sessionId,
}),
});
// Interrupt later
await stub.fetch('/interrupt', {
method: 'POST',
body: JSON.stringify({ reason: 'user_pause' }),
});
// Check if resumable
const statusResponse = await stub.fetch('/status');
const { status } = await statusResponse.json();
if (status === 'paused' || status === 'interrupted') {
// Resume
await stub.fetch('/resume', {
method: 'POST',
body: JSON.stringify({
agentType: 'researcher',
options: { mode: 'continue' },
}),
});
}DOStateStore
The DOStateStore uses the DO's built-in SQLite for persistence:
import { DOStateStore } from '@helix-agents/runtime-cloudflare';
// Created internally by AgentServer
const stateStore = new DOStateStore({
sql: this.sql, // PartyServer's sql tagged template
logger: this.logger,
});
// Implements SessionStateStore interface
await stateStore.save(state);
const loaded = await stateStore.loadState(sessionId);
await stateStore.appendMessages(sessionId, messages);Schema
The DOStateStore creates these tables automatically:
-- Main state table (single row per DO)
CREATE TABLE state (
id INTEGER PRIMARY KEY DEFAULT 1,
session_id TEXT NOT NULL,
agent_type TEXT NOT NULL,
stream_id TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'active',
step_count INTEGER DEFAULT 0,
custom_state TEXT,
output TEXT,
error TEXT,
checkpoint_id TEXT,
...
);
-- Messages table
CREATE TABLE messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
role TEXT NOT NULL,
content TEXT,
tool_calls TEXT,
...
);
-- Stream chunks table
CREATE TABLE stream_chunks (
sequence INTEGER PRIMARY KEY,
chunk TEXT NOT NULL,
created_at INTEGER NOT NULL
);DOStreamManager
The DOStreamManager writes directly to connected clients:
import { DOStreamManager } from '@helix-agents/runtime-cloudflare';
// Created internally by AgentServer
const streamManager = new DOStreamManager({
sql: this.sql,
getConnections: () => this.getConnections(),
broadcast: (msg) => this.broadcast(msg),
sseConnections: this.sseConnections,
logger: this.logger,
});
// Writes to SQLite AND broadcasts to all connected clients
const writer = await streamManager.createWriter(streamId, agentId, agentType);
await writer.write(chunk); // FREE - no subrequest!Sub-Agents
Sub-agents work transparently in the DO runtime — use createSubAgentTool() exactly as you would in other runtimes. The only requirement is configuring subAgentNamespace so the DO runtime knows which namespace to use when spawning sibling DOs.
Why Sibling DOs?
Each DO instance hosts a single session backed by its own SQLite database. Running a sub-agent inside the same DO as its parent would corrupt the parent's state. Instead, the DO runtime automatically routes createSubAgentTool() calls to sibling DO instances — each sub-agent session gets its own isolated DO with its own SQLite.
This is handled internally by DOStubTransport, an implementation of the RemoteAgentTransport interface that routes calls to sibling DOs within the same namespace. You don't interact with DOStubTransport directly — it is wired up automatically when subAgentNamespace is configured. Agent definitions don't need to change; the tool rewriting happens at execution time.
Configuration
Add subAgentNamespace to your createAgentServer config:
// wrangler.toml
// [[durable_objects.bindings]]
// name = "AGENTS"
// class_name = "MyAgentServer"
interface Env {
AGENTS: DurableObjectNamespace;
ANTHROPIC_API_KEY: string;
}
export const MyAgentServer = createAgentServer<Env>({
llmAdapter: (env) =>
new VercelAIAdapter({
anthropic: createAnthropic({ apiKey: env.ANTHROPIC_API_KEY }),
}),
agents: registry,
// Point to the same DO namespace used for the parent
subAgentNamespace: (env) => env.AGENTS,
});The sub-agent DO instances are resolved by session ID using namespace.idFromName(sessionId), so each sub-agent session gets a deterministic, isolated DO.
subAgentNamespace is required for sub-agents
If you use createSubAgentTool() in your agents but do not configure subAgentNamespace, the sub-agent tools will not be rewritten. The sub-agent will execute in-process within the same DO instance as the parent, sharing its single-session SQLite database. This will corrupt the parent's state. Always set subAgentNamespace when your agents include sub-agent tools.
Defining Sub-Agent Tools
Define sub-agents and tools exactly as documented in Sub-Agent Orchestration — no DO-specific changes needed. The only requirement is that both parent and sub-agents are registered in the same AgentRegistry, since the sibling DO resolves the sub-agent type from the same registry.
import { defineAgent, createSubAgentTool } from '@helix-agents/core';
import { AgentRegistry } from '@helix-agents/runtime-cloudflare';
import { z } from 'zod';
const AnalyzerAgent = defineAgent({
name: 'analyzer',
systemPrompt: 'You analyze text for sentiment and key topics.',
outputSchema: z.object({
sentiment: z.enum(['positive', 'negative', 'neutral']),
topics: z.array(z.string()),
}),
maxSteps: 5,
});
const analyzerTool = createSubAgentTool(
AnalyzerAgent,
z.object({ text: z.string().describe('Text to analyze') }),
{
description: 'Analyze text for sentiment and topics',
timeoutMs: 60_000, // 60 second timeout (default: 10 minutes)
}
);
const OrchestratorAgent = defineAgent({
name: 'orchestrator',
systemPrompt: 'Coordinate analysis tasks.',
tools: [analyzerTool],
outputSchema: z.object({ summary: z.string() }),
});
// Both parent and sub-agents must be in the same registry
const registry = new AgentRegistry();
registry.register(OrchestratorAgent);
registry.register(AnalyzerAgent);LLM adapter for sub-agents
Sub-agent DOs use the same llmAdapter factory configured in createAgentServer. Any llmConfig specified on the agent definition is passed to the adapter but the adapter itself is always created by the server-level factory. Use the LLMAdapterContext.agentType parameter if you need per-agent model selection.
How It Works
When a parent agent calls a sub-agent tool:
- Tool rewriting — At execution start,
createSubAgentTool()tools are transparently converted tocreateRemoteSubAgentTool()calls backed byDOStubTransport - Session ID — A deterministic session ID is generated:
${parentSessionId}-remote-${toolCallId} - Sibling DO —
DOStubTransportresolves a sibling DO vianamespace.idFromName(sessionId)and calls its/subagent/:agentType/startendpoint - Isolated execution — The sibling DO runs the sub-agent with its own SQLite state
- Streaming — Events stream back through SSE from the sibling DO and are proxied into the parent's stream
The parent stream includes subagent_start/subagent_end events identical to local sub-agents — frontends don't need to distinguish between runtimes.
Persistent Sub-Agents
Persistent sub-agents in the DO runtime work the same as ephemeral sub-agents at the infrastructure level -- each child gets its own sibling DO instance. The difference is in lifecycle management:
- Children are spawned and managed via companion tools (
companion__spawnAgent,companion__sendMessage, etc.) - The
subAgentNamespaceconfiguration is required (same as for ephemeral sub-agents) - Persistent children maintain state across multiple interactions within their DO's SQLite
- Children can receive follow-up messages via
companion__sendMessage
No additional DO configuration is needed beyond what is already documented for ephemeral sub-agents.
DOUsageStore
The DOUsageStore tracks token usage, tool calls, and sub-agent invocations using the DO's built-in SQLite:
import { DOUsageStore } from '@helix-agents/runtime-cloudflare';
// Created internally by AgentServer
const usageStore = new DOUsageStore({
sql: this.sql, // PartyServer's sql tagged template
sessionId: 'session-123',
logger: this.logger,
});
// Implements UsageStore interface
await usageStore.recordEntry(entry);
const entries = await usageStore.getEntries(sessionId);
const rollup = await usageStore.getRollup(sessionId);Getting Usage via HTTP
// Get usage rollup for parent agent only
const response = await stub.fetch('/usage');
const rollup = await response.json();
// Get usage rollup including sub-agents (note: limited in DO runtime)
const response = await stub.fetch('/usage?mode=rollup');
// Get raw entries
const response = await stub.fetch('/usage?mode=entries');Usage Response
interface TokenCounts {
prompt?: number;
completion?: number;
reasoning?: number;
cached?: number;
total?: number;
}
interface UsageRollup {
runId: string;
// Token usage for this agent only
tokens: TokenCounts;
tokensByModel: Record<string, TokenCounts>;
// Token usage including sub-agents (see limitation below)
tokensIncludingSubAgents: TokenCounts;
// Tool execution statistics
toolStats: {
totalCalls: number;
successfulCalls: number;
failedCalls: number;
totalDurationMs: number;
byTool: Record<
string,
{
calls: number;
successfulCalls: number;
failedCalls: number;
totalDurationMs: number;
}
>;
};
// Sub-agent execution statistics
subAgentStats: {
totalCalls: number;
successfulCalls: number;
failedCalls: number;
totalDurationMs: number;
byType: Record<
string,
{
calls: number;
successfulCalls: number;
failedCalls: number;
totalDurationMs: number;
}
>;
};
// Custom metrics aggregated by type, then by name
// Example: { 'api_calls': { 'tavily': 5 }, 'bytes': { 'input': 1024 } }
custom: Record<string, Record<string, number>>;
// Timing
startedAt?: number;
lastUpdatedAt?: number;
entryCount: number;
}Sub-Agent Usage Limitation
In the DO runtime, sub-agents run in separate Durable Objects. Cross-DO usage aggregation is not currently supported. The tokensIncludingSubAgents field will only include the parent agent's tokens. For accurate sub-agent usage, query each sub-agent's DO directly.
Migration from Legacy AgentServer API
The subclass-based AgentServer API has been removed. Use createAgentServer() instead:
import { createAgentServer } from '@helix-agents/runtime-cloudflare';
const registry = new AgentRegistry();
registry.register(myAgent);
export const MyAgentServer = createAgentServer<Env>({
llmAdapter: (env) =>
new VercelAIAdapter({
model: openai('gpt-4o', { apiKey: env.OPENAI_API_KEY }),
}),
agents: registry,
hooks: {
beforeStart: async ({ executionState }) => {
if (executionState.isExecuting) {
await executionState.interrupt('superseded');
}
},
},
});Limitations
LLM Adapter Serialization
LLM adapters contain functions and API clients that can't be serialized. With the composition API, the llmAdapter factory handles this automatically since it runs inside the DO.
Single DO Per Run
Each agent run gets its own DO instance. This means:
- No shared state between runs (use D1 if needed)
- Each run has its own SQLite database
- Geographic pinning based on first access
Geographic Pinning
DOs are pinned to a region on first access:
// First access pins to nearest region
const stub = env.AGENTS.get(doId);
// All subsequent access goes to that region
// Consider region affinity for latency-sensitive appsMemory Limits
DOs have memory limits. For very long conversations:
- Consider truncating message history
- Use checkpoints to manage state size
- Monitor memory usage in production
No Step-Level Durability
Unlike Workflows, the DO runtime doesn't have automatic step-level retries:
- If the DO crashes mid-execution, state reflects last committed step
- Use checkpoints for crash recovery
- Consider Workflows runtime for critical durability needs
Best Practices
1. Keep Connections Alive
// Client-side keepalive
setInterval(() => {
if (ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify({ type: 'ping' }));
}
}, 30000);2. Handle Reconnection
// Track last received sequence
let lastSequence = 0;
ws.onmessage = (event) => {
const data = JSON.parse(event.data);
if (data.sequence) {
lastSequence = data.sequence;
}
};
// Reconnect from last sequence
function reconnect() {
ws = new WebSocket(`wss://.../ws/${sessionId}?fromSequence=${lastSequence}`);
}3. Use Proper Error Handling
try {
const response = await stub.fetch('/start', { ... });
if (!response.ok) {
const error = await response.json();
throw new Error(error.error);
}
} catch (error) {
// Handle DO errors (network, timeout, etc.)
console.error('DO error:', error);
}4. Monitor Subrequest Usage
// The only subrequests should be LLM API calls
// Monitor via Cloudflare dashboard to ensure streaming is freeDeployment
Development
wrangler devProduction
wrangler deploySecrets
wrangler secret put OPENAI_API_KEYNext Steps
- Sub-Agents Guide - Sub-agent orchestration patterns
- Remote Agents Guide - Cross-service agent delegation
- Cloudflare Overview - Compare with Workflows runtime
- Workflows Runtime - For step-level durability
- Storage: Cloudflare - Storage options
- DO Example - Complete working example