@helix-agents/runtime-cloudflare
Cloudflare runtime for Helix Agents with two execution approaches: Workflows for step-level durability, and Durable Objects for unlimited streaming.
Installation
npm install @helix-agents/runtime-cloudflareFor Workflows approach, also install:
npm install @helix-agents/store-cloudflareDurable Objects Runtime API
The DO approach executes agents directly inside a Durable Object, bypassing the 1000 subrequest limit by streaming directly to WebSocket/SSE connections.
Composition API (Recommended)
The composition API using createAgentServer() is the recommended approach for creating AgentServer instances.
createAgentServer
Factory function that creates a Durable Object class with composition-based configuration.
import { createAgentServer, AgentRegistry } from '@helix-agents/runtime-cloudflare';
import { VercelAIAdapter } from '@helix-agents/llm-vercel';
const registry = new AgentRegistry();
registry.register(myAgent);
export const MyAgentServer = createAgentServer<Env>({
llmAdapter: (env) => new VercelAIAdapter({
anthropic: createAnthropic({ apiKey: env.ANTHROPIC_API_KEY }),
}),
agents: registry,
hooks: {
beforeStart: async ({ executionState }) => {
if (executionState.isExecuting) {
await executionState.interrupt('superseded');
}
},
},
});AgentServerConfig
Configuration for createAgentServer().
interface AgentServerConfig<TEnv = unknown> {
/**
* Factory to create LLM adapters.
* Called once per execution with environment and context.
* Required.
*/
llmAdapter: LLMAdapterFactory<TEnv>;
/**
* Agent resolution strategy.
* Can be an AgentRegistry instance or a resolver function.
* Required.
*/
agents: AgentResolverInterface<TEnv> | AgentResolver<TEnv>;
/**
* Optional lifecycle hooks for customization.
*/
hooks?: ServerHooks<TEnv>;
/**
* Optional custom endpoints.
* Keys are paths (e.g., "/my-endpoint"), values are handlers.
* Custom endpoints are checked BEFORE built-in endpoints.
*/
endpoints?: Record<string, EndpointHandler<TEnv>>;
/**
* Optional usage store factory.
* If not provided, uses the DO-local DOUsageStore.
*/
usageStore?: UsageStoreFactory<TEnv>;
/**
* Optional logger.
* If not provided, uses noopLogger.
*/
logger?: Logger;
}LLMAdapterFactory
Factory function for creating LLM adapters with access to environment and execution context.
type LLMAdapterFactory<TEnv> = (
env: TEnv,
context: LLMAdapterContext
) => LLMAdapter;
interface LLMAdapterContext {
/** Session ID for the execution */
sessionId: string;
/** Agent type being executed */
agentType: string;
/** Run ID for this execution */
runId: string;
}AgentResolver
Resolver function type for dynamic agent resolution.
type AgentResolver<TEnv = unknown> = (
agentType: string,
context: AgentFactoryContext<TEnv>
) => AgentConfig<z.ZodType, z.ZodType>;ServerHooks
Lifecycle hooks for customizing AgentServer behavior.
interface ServerHooks<TEnv = unknown> {
/**
* Called before starting execution.
* Can abort by returning { abort: true, response: Response }.
*/
beforeStart?: (context: BeforeStartContext<TEnv>) => Promise<void | AbortResult>;
/**
* Called after execution handle is created (before it completes).
*/
afterStart?: (context: AfterStartContext<TEnv>) => Promise<void>;
/**
* Called before resuming execution.
* Can abort by returning { abort: true, response: Response }.
*/
beforeResume?: (context: BeforeResumeContext<TEnv>) => Promise<void | AbortResult>;
/**
* Called after resume handle is created.
*/
afterResume?: (context: AfterResumeContext<TEnv>) => Promise<void>;
/**
* Called before retrying execution.
* Can abort by returning { abort: true, response: Response }.
*/
beforeRetry?: (context: BeforeRetryContext<TEnv>) => Promise<void | AbortResult>;
/**
* Called after retry handle is created.
*/
afterRetry?: (context: AfterRetryContext<TEnv>) => Promise<void>;
/**
* Called when execution completes (success, failure, or interrupt).
*/
onComplete?: (context: CompleteContext<TEnv>) => Promise<void>;
/**
* Transform incoming /start request to standard format.
*/
transformRequest?: (rawBody: unknown, env: TEnv) => Promise<StartAgentRequestV2>;
/**
* Transform outgoing /start response.
*/
transformResponse?: (
response: { sessionId: string; streamId: string; status: 'started' | 'resumed' },
context: { sessionId: string; agentType: string }
) => Promise<unknown>;
}Hook Contexts
interface BeforeStartContext<TEnv> {
env: TEnv;
request: Request;
body: StartAgentRequestV2;
executionState: ExecutionState;
}
interface AfterStartContext<TEnv> {
env: TEnv;
sessionId: string;
runId: string;
agentType: string;
handle: AgentExecutionHandle<unknown>;
}
interface BeforeResumeContext<TEnv> {
env: TEnv;
request: Request;
body: ResumeAgentRequestV2;
executionState: ExecutionState;
}
interface AfterResumeContext<TEnv> {
env: TEnv;
sessionId: string;
runId: string;
agentType: string;
handle: AgentExecutionHandle<unknown>;
}
interface BeforeRetryContext<TEnv> {
env: TEnv;
request: Request;
body: RetryAgentRequestV2;
executionState: ExecutionState;
}
interface AfterRetryContext<TEnv> {
env: TEnv;
sessionId: string;
runId: string;
agentType: string;
handle: AgentExecutionHandle<unknown>;
}
interface CompleteContext<TEnv> {
env: TEnv;
sessionId: string;
runId: string;
status: 'completed' | 'failed' | 'interrupted';
result?: unknown;
error?: Error;
}AbortResult
Return type for hooks that can abort the request.
interface AbortResult {
abort: true;
response: Response;
}ExecutionState
Interface for checking and controlling execution state, available in hooks.
interface ExecutionState {
/** Session ID of current execution, or null if none */
readonly sessionId: string | null;
/** Run ID of current execution, or null if none */
readonly runId: string | null;
/** Whether an execution is currently running */
readonly isExecuting: boolean;
/** Current execution status */
readonly status: 'idle' | 'running' | 'paused' | 'interrupted' | 'completed' | 'failed';
/** Get the current execution handle (if any) */
getHandle<T = unknown>(): AgentExecutionHandle<T> | null;
/** Interrupt the current execution (soft stop, resumable) */
interrupt(reason?: string): Promise<void>;
/** Abort the current execution (hard stop, not resumable) */
abort(reason?: string): Promise<void>;
}EndpointHandler
Handler function for custom endpoints.
type EndpointHandler<TEnv> = (
request: Request,
context: EndpointContext<TEnv>
) => Promise<Response>;
interface EndpointContext<TEnv> {
env: TEnv;
executionState: ExecutionState;
logger: Logger;
}Request Types (V2)
The composition API uses updated request types that use agentType instead of the full AgentConfig.
StartAgentRequestV2
interface StartAgentRequestV2 {
/** Agent type to execute (looked up in registry/resolver) */
agentType: string;
/** Input for the agent */
input: {
message: string;
state?: Record<string, unknown>;
messages?: Message[];
};
/** Session identifier (required) */
sessionId: string;
/** Optional user identifier */
userId?: string;
/** Optional tags for the session */
tags?: string[];
/** Optional metadata for the session */
metadata?: Record<string, string>;
}ResumeAgentRequestV2
interface ResumeAgentRequestV2 {
/** Agent type (for context) */
agentType: string;
/** Resume options */
options: {
mode?: 'continue' | 'retry' | 'branch';
checkpointId?: string;
modifyState?: unknown;
appendMessages?: Message[];
};
}RetryAgentRequestV2
interface RetryAgentRequestV2 {
/** Agent type (for context) */
agentType: string;
/** Retry options */
options?: {
mode?: 'from_checkpoint' | 'from_start';
message?: string;
};
}AgentRegistry
Maps agent type names to configurations. Supports both static registration and factory functions.
import { AgentRegistry, defaultAgentRegistry } from '@helix-agents/runtime-cloudflare';
const registry = new AgentRegistry();
// Static registration
registry.register(MyAgent);
// Factory registration
registry.registerFactory<Env>('brainstorm', (ctx) =>
createBrainstormAgent({ apiKey: ctx.env.API_KEY })
);
// Resolve (handles both static and factory)
const agent = registry.resolve('brainstorm', { env, sessionId, runId });
// Get static agent only (legacy)
const agent = registry.get('researcher');
// Other methods
registry.has('researcher'); // Check if registered
registry.unregister('researcher'); // Remove registration
registry.getRegisteredTypes(); // Get all type names
registry.clear(); // Remove all (for testing)AgentFactoryContext
Context passed to agent factory functions.
interface AgentFactoryContext<TEnv = unknown> {
/** Environment bindings (e.g., API keys, database connections) */
env: TEnv;
/** Current session identifier */
sessionId: string;
/** Current run identifier */
runId: string;
/** Optional user identifier */
userId?: string;
}AgentFactory
Factory function type for dynamic agent creation.
type AgentFactory<TEnv = unknown> = (
context: AgentFactoryContext<TEnv>
) => AgentConfig<z.ZodType, z.ZodType>;AgentResolverInterface
Interface implemented by AgentRegistry and custom resolvers.
interface AgentResolverInterface<TEnv = unknown> {
/**
* Resolve an agent configuration by type.
* @throws AgentNotFoundError if agent type is not found
*/
resolve(
agentType: string,
context: AgentFactoryContext<TEnv>
): AgentConfig<z.ZodType, z.ZodType>;
/**
* Check if an agent type is registered.
*/
has(agentType: string): boolean;
}AgentNotFoundError
Error thrown when an agent type is not found in the registry.
class AgentNotFoundError extends Error {
/** The agent type that was requested */
readonly agentType: string;
/** List of available agent types */
readonly availableTypes: string[];
constructor(agentType: string, availableTypes: string[]);
}Example usage:
import { AgentNotFoundError } from '@helix-agents/runtime-cloudflare';
try {
const agent = registry.resolve('unknown', context);
} catch (error) {
if (error instanceof AgentNotFoundError) {
console.error(`Unknown agent type: ${error.agentType}`);
console.error(`Available types: ${error.availableTypes.join(', ')}`);
}
}AgentServer (Legacy - Deprecated)
Deprecated
The subclass-based AgentServer API is deprecated. Use createAgentServer() instead for new projects.
The main Durable Object class that hosts agent execution.
import { AgentServer } from '@helix-agents/runtime-cloudflare';
// Export for Cloudflare to instantiate
export { AgentServer };AgentServer handles all execution internally and exposes HTTP/WebSocket endpoints.
HTTP Endpoints
| Endpoint | Method | Description |
|---|---|---|
/start | POST | Start new agent execution |
/resume | POST | Resume paused/interrupted agent |
/retry | POST | Retry failed agent |
/interrupt | POST | Soft stop (can resume) |
/abort | POST | Hard stop (cannot resume) |
/status | GET | Get execution status |
/sse | GET | SSE streaming connection |
/history | GET | Get historical chunks |
/usage | GET | Get usage data |
/messages | GET | Get conversation messages |
/snapshot | GET | Get UI state snapshot |
/ | WebSocket | Real-time streaming |
Legacy Request Types
Deprecated
These request types are for the legacy AgentServer API. Use StartAgentRequestV2 and ResumeAgentRequestV2 with createAgentServer() instead.
StartAgentRequest (Legacy)
interface StartAgentRequest {
agent: AgentConfig<any, any, any>;
input: AgentInput;
runId?: string;
llmAdapter?: LLMAdapter;
llmAdapterConfig?: Record<string, any>;
hooks?: AgentHooks;
}ResumeAgentRequest (Legacy)
interface ResumeAgentRequest {
agent: AgentConfig<any, any, any>;
options?: ResumeOptions;
llmAdapter?: LLMAdapter;
llmAdapterConfig?: Record<string, any>;
hooks?: AgentHooks;
}Response Types
AgentStartResponse
interface AgentStartResponse {
sessionId: string;
streamId: string;
status: 'started' | 'resumed';
}AgentStatusResponse
interface AgentStatusResponse {
sessionId: string | null;
status: ExecutionStatus;
output?: unknown;
error?: string;
stepCount: number;
checkpointId?: string;
isExecuting: boolean;
}DOAgentExecutor
API for interacting with AgentServer DOs. Useful for status checks and managing existing runs.
import { DOAgentExecutor } from '@helix-agents/runtime-cloudflare';
const executor = new DOAgentExecutor({
agentNamespace: env.AGENTS,
});
// Get handle for existing session
const handle = await executor.getHandle(myAgent, 'session-123');
// Check status
const state = await handle.getState();Starting Executions
For starting new agent executions, use direct DO access since LLM adapters cannot be serialized over HTTP.
DOAgentExecutorConfig
interface DOAgentExecutorConfig {
agentNamespace: DurableObjectNamespace;
createLLMAdapter?: () => LLMAdapter; // Reserved for future use
createHooks?: () => AgentHooks; // Reserved for future use
logger?: Logger;
}DOStateStore
State store implementation using Durable Object's built-in SQLite.
import { DOStateStore } from '@helix-agents/runtime-cloudflare';
// Created automatically inside AgentServer
// For manual use:
const stateStore = new DOStateStore({
sql, // PartyServer's sql tagged template
logger, // Optional
});DOStateStoreOptions
interface DOStateStoreOptions {
sql: SqlTaggedTemplate;
logger?: Logger;
}DOStreamManager
Stream manager that broadcasts directly to connected WebSocket/SSE clients.
import { DOStreamManager } from '@helix-agents/runtime-cloudflare';
// Created automatically inside AgentServer
// For manual use:
const streamManager = new DOStreamManager({
sql,
getConnections,
broadcast,
sseConnections,
logger,
});DOStreamManagerOptions
interface DOStreamManagerOptions {
sql: SqlTaggedTemplate;
getConnections: () => Map<string, WebSocket>;
broadcast: (msg: string) => void;
sseConnections: Map<string, SSEConnection>;
logger?: Logger;
}DOStreamWriter
Writer returned by createWriter() for emitting chunks. Implements the StreamWriter interface.
class DOStreamWriter implements StreamWriter {
write(chunk: StreamChunk): Promise<void>;
close(): Promise<void>;
}Connection Types
// WebSocket connection state (survives hibernation)
interface ConnectionState {
fromSequence: number;
}
// SSE connection tracking (in-memory, doesn't survive hibernation)
interface SSEConnection {
writer: WritableStreamDefaultWriter<Uint8Array>;
encoder: TextEncoder;
fromSequence: number;
lastActivity: number;
}
type StreamStatus = 'active' | 'paused' | 'ended' | 'failed';DOUsageStore
Usage store implementation using Durable Object's built-in SQLite.
import { DOUsageStore } from '@helix-agents/runtime-cloudflare';
// Created automatically inside AgentServer
// For manual use:
const usageStore = new DOUsageStore({
sql,
sessionId,
logger,
});
// Record usage
await usageStore.recordEntry(entry);
// Get entries
const entries = await usageStore.getEntries(sessionId);
// Get rollup
const rollup = await usageStore.getRollup(sessionId);Durable Object Types
import type {
DurableObjectNamespace,
DurableObjectId,
DurableObjectStub,
SqlTaggedTemplate,
} from '@helix-agents/runtime-cloudflare';Workflows Runtime API
The Workflows approach uses Cloudflare Workflows for durable, step-level execution with D1 for state.
CloudflareAgentExecutor
Executor that starts and manages Cloudflare Workflows.
import { CloudflareAgentExecutor } from '@helix-agents/runtime-cloudflare';
const executor = new CloudflareAgentExecutor({
workflowBinding, // AgentWorkflowBinding from env
stateStore, // StateStore (D1StateStore)
streamManager, // StreamManager (DurableObjectStreamManager)
logger, // Optional
});
// Execute agent
const handle = await executor.execute(MyAgent, {
message: 'Hello',
state: { userId: 'user-123' },
});
// Get existing handle
const handle = await executor.getHandle(MyAgent, 'session-123');CloudflareAgentExecutorConfig
interface CloudflareAgentExecutorConfig {
workflowBinding: AgentWorkflowBinding;
stateStore: StateStore;
streamManager: StreamManager;
logger?: Logger;
}AgentExecutionHandle
Handle returned by execute() for interacting with running workflows.
// Run ID and stream ID
handle.sessionId;
handle.streamId;
// Stream events
const stream = await handle.stream();
for await (const chunk of stream) {
switch (chunk.type) {
case 'text_delta':
console.log(chunk.delta);
break;
case 'run_interrupted':
console.log(`Interrupted: ${chunk.reason}`);
break;
}
}
// Wait for result
const result = await handle.result();
// Abort execution (HARD stop - cannot resume)
await handle.abort('User cancelled');
// Interrupt execution (SOFT stop - can resume)
await handle.interrupt('user_requested');
// Check if resumable
const { canResume, reason } = await handle.canResume();
// Resume execution
const newHandle = await handle.resume();
// Resume with message
const newHandle = await handle.resume({
mode: 'with_message',
message: 'Continue with this context',
});
// Time-travel resume
const newHandle = await handle.resume({
mode: 'from_checkpoint',
checkpointId: 'cpv1-...',
});
// Get current state
const state = await handle.getState();
// Continue conversation
const handle2 = await handle.send('Follow up message');runAgentWorkflow
Core workflow function for Cloudflare Workflows.
import { WorkflowEntrypoint, WorkflowEvent, WorkflowStep } from 'cloudflare:workers';
import { runAgentWorkflow } from '@helix-agents/runtime-cloudflare';
import { createCloudflareStore } from '@helix-agents/store-cloudflare';
import { VercelAIAdapter } from '@helix-agents/llm-vercel';
export class AgentWorkflow extends WorkflowEntrypoint<Env, AgentWorkflowInput> {
async run(event: WorkflowEvent<AgentWorkflowInput>, step: WorkflowStep) {
const { stateStore, streamManager } = createCloudflareStore({
db: this.env.AGENT_DB,
streams: this.env.STREAMS,
});
return runAgentWorkflow(event, step, {
stateStore,
streamManager,
llmAdapter: new VercelAIAdapter(),
registry,
workflowBinding: this.env.AGENT_WORKFLOW,
});
}
}AgentSteps
Step implementations for manual workflow control.
import { AgentSteps } from '@helix-agents/runtime-cloudflare';
const steps = new AgentSteps({
stateStore,
streamManager,
llmAdapter,
registry,
});
// Use in workflow
await steps.initializeAgentState(step, input);
await steps.executeAgentStep(step, input);
await steps.executeToolCall(step, toolCall);createWorkflowRunner
Alternative factory for creating workflow runners.
import { createWorkflowRunner } from '@helix-agents/runtime-cloudflare';
const runner = createWorkflowRunner({
stateStore,
streamManager,
llmAdapter,
registry,
workflowBinding,
});
// Use in workflow
return runner.run(event, step);Binding Types
Types for Cloudflare bindings.
import type {
AgentWorkflowBinding,
WorkflowBinding,
WorkflowInstance,
WorkflowStatus,
WorkflowDependencies,
WorkflowOptions,
} from '@helix-agents/runtime-cloudflare';
// Define your environment
interface Env {
AGENT_DB: D1Database;
STREAMS: DurableObjectNamespace;
AGENT_WORKFLOW: AgentWorkflowBinding;
OPENAI_API_KEY: string;
}Workflow DTOs
Type-safe schemas for workflow inputs/outputs.
Session vs Run Identifiers
sessionId: Primary key for state storage. A session contains all messages, state, and checkpoints.runId: Identifies a specific workflow execution. Multiple runs can occur within one session.
In Cloudflare, each Durable Object or Workflow instance is tied to a sessionId. The runId is execution metadata for tracing.
import {
// Workflow DTOs
AgentWorkflowInputSchema,
AgentWorkflowResultSchema,
type AgentWorkflowInput,
type AgentWorkflowResult,
// Step DTOs
InitializeAgentInputSchema,
ExecuteAgentStepInputSchema,
ExecuteToolCallInputSchema,
ToolCallResultSchema,
// Sub-agent DTOs
RegisterSubAgentsInputSchema,
RecordSubAgentResultInputSchema,
MarkAgentFailedInputSchema,
// Stream DTOs
EndAgentStreamInputSchema,
FailAgentStreamInputSchema,
} from '@helix-agents/runtime-cloudflare';Workflow Types
import type {
WorkflowStep,
WorkflowEvent,
WorkflowDuration,
WorkflowRetryConfig,
WorkflowStepConfig,
WaitForEventOptions,
WaitForEventResult,
SubAgentCompletionEvent,
} from '@helix-agents/runtime-cloudflare';Shared Utilities
pollUntil
Poll until a condition is met.
import { pollUntil } from '@helix-agents/runtime-cloudflare';
const result = await pollUntil({
check: async () => {
const status = await getStatus();
if (status === 'done') {
return { done: true, value: status };
}
return { done: false };
},
interval: 1000,
timeout: 60000,
});PollOptions
interface PollOptions<T> {
check: () => Promise<PollCheckResult<T>>;
interval?: number;
timeout?: number;
}
type PollCheckResult<T> =
| { done: true; value: T }
| { done: false };Complete Examples
Durable Objects Example (Composition API)
wrangler.toml
name = "my-agent-do"
main = "src/worker.ts"
compatibility_date = "2024-01-01"
[[durable_objects.bindings]]
name = "AGENTS"
class_name = "MyAgentServer"
[[migrations]]
tag = "v1"
new_sqlite_classes = ["MyAgentServer"]agent-server.ts
import { createAgentServer, AgentRegistry } from '@helix-agents/runtime-cloudflare';
import { VercelAIAdapter } from '@helix-agents/llm-vercel';
import { createAnthropic } from '@ai-sdk/anthropic';
import { myAgent } from './agent.js';
interface Env {
AGENTS: DurableObjectNamespace;
ANTHROPIC_API_KEY: string;
}
const registry = new AgentRegistry();
registry.register(myAgent);
export const MyAgentServer = createAgentServer<Env>({
llmAdapter: (env) => new VercelAIAdapter({
anthropic: createAnthropic({ apiKey: env.ANTHROPIC_API_KEY }),
}),
agents: registry,
hooks: {
beforeStart: async ({ executionState, body }) => {
if (executionState.isExecuting) {
await executionState.interrupt('New message received');
}
console.log(`Starting agent: ${body.agentType}`);
},
},
});worker.ts
import { MyAgentServer } from './agent-server.js';
import type { Env } from './agent-server.js';
export { MyAgentServer };
export default {
async fetch(request: Request, env: Env): Promise<Response> {
const url = new URL(request.url);
if (url.pathname === '/execute' && request.method === 'POST') {
const { message } = await request.json<{ message: string }>();
const sessionId = crypto.randomUUID();
const doId = env.AGENTS.idFromName(`session:${sessionId}`);
const stub = env.AGENTS.get(doId);
const response = await stub.fetch('/start', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
agentType: 'researcher',
input: { message },
sessionId,
}),
});
const result = await response.json<{ sessionId: string; streamId: string }>();
return Response.json({
sessionId: result.sessionId,
streamUrl: `/stream/${result.sessionId}`,
websocketUrl: `/ws/${result.sessionId}`,
});
}
if (url.pathname.startsWith('/stream/')) {
const sessionId = url.pathname.split('/').pop()!;
const doId = env.AGENTS.idFromName(`session:${sessionId}`);
return env.AGENTS.get(doId).fetch(new URL('/sse', request.url));
}
if (url.pathname.startsWith('/ws/')) {
const sessionId = url.pathname.split('/').pop()!;
const doId = env.AGENTS.idFromName(`session:${sessionId}`);
return env.AGENTS.get(doId).fetch(request);
}
return new Response('Not found', { status: 404 });
},
};Workflows Example
wrangler.toml
name = "my-agent-worker"
main = "src/worker.ts"
compatibility_date = "2024-12-01"
compatibility_flags = ["nodejs_compat"]
[[d1_databases]]
binding = "AGENT_DB"
database_name = "agents-db"
database_id = "xxx"
[[durable_objects.bindings]]
name = "STREAMS"
class_name = "StreamServer"
[[workflows]]
name = "agent-workflow"
binding = "AGENT_WORKFLOW"
class_name = "AgentWorkflow"
[[migrations]]
tag = "v1"
new_sqlite_classes = ["StreamServer"]worker.ts
import { WorkflowEntrypoint, WorkflowEvent, WorkflowStep } from 'cloudflare:workers';
import {
runAgentWorkflow,
AgentRegistry,
CloudflareAgentExecutor,
type AgentWorkflowInput,
} from '@helix-agents/runtime-cloudflare';
import { createCloudflareStore, StreamServer } from '@helix-agents/store-cloudflare';
import { VercelAIAdapter } from '@helix-agents/llm-vercel';
import { MyAgent } from './agent.js';
export { StreamServer };
const registry = new AgentRegistry();
registry.register(MyAgent);
export class AgentWorkflow extends WorkflowEntrypoint<Env, AgentWorkflowInput> {
async run(event: WorkflowEvent<AgentWorkflowInput>, step: WorkflowStep) {
const { stateStore, streamManager } = createCloudflareStore({
db: this.env.AGENT_DB,
streams: this.env.STREAMS,
});
return runAgentWorkflow(event, step, {
stateStore,
streamManager,
llmAdapter: new VercelAIAdapter(),
registry,
workflowBinding: this.env.AGENT_WORKFLOW,
});
}
}
export default {
async fetch(request: Request, env: Env) {
const { stateStore, streamManager } = createCloudflareStore({
db: env.AGENT_DB,
streams: env.STREAMS,
});
const executor = new CloudflareAgentExecutor({
workflowBinding: env.AGENT_WORKFLOW,
stateStore,
streamManager,
});
if (request.method === 'POST') {
const { message } = await request.json();
const handle = await executor.execute(MyAgent, { message });
return Response.json({ sessionId: handle.sessionId });
}
return Response.json({ error: 'Not found' }, { status: 404 });
},
};Interrupt and Resume
Both runtimes support interrupt/resume for pausing and continuing agent execution.
Workflows Interrupt
Interrupts use Cloudflare's step.waitForEvent():
// Interrupt from handle
await handle.interrupt('user_requested');
// Resume later
const { canResume } = await handle.canResume();
if (canResume) {
const resumed = await handle.resume();
const result = await resumed.result();
}DO Interrupt
Interrupts use HTTP endpoints:
// Interrupt via fetch
await fetch(`/interrupt`, {
method: 'POST',
body: JSON.stringify({ reason: 'user_requested' }),
});
// Resume via fetch (composition API)
await fetch(`/resume`, {
method: 'POST',
body: JSON.stringify({
agentType: 'researcher',
options: { mode: 'continue' },
}),
});Workflow Execution Semantics
| Action | Behavior |
|---|---|
interrupt() | Soft stop, saves checkpoint, can resume |
resume() | Continue from checkpoint or with new message |
abort() | Hard stop, marks as failed, cannot resume |