Cloudflare Workflows Runtime
The Cloudflare Workflows runtime executes agents using Cloudflare Workflows for durable execution with D1 for state storage and separate Durable Objects for stream management.
When to Use Workflows
Choose the Workflows runtime when you need step-level durability, automatic retries, or want to share D1 state with other services. If your agents require heavy streaming (>100 chunks), consider the Durable Objects runtime instead.
Architecture
graph TB
subgraph Edge ["Edge Location (Global)"]
subgraph Worker ["Cloudflare Worker"]
W1["HTTP endpoints<br/>CloudflareAgentExecutor<br/>Starts Workflows"]
end
Worker --> Workflow
subgraph Workflow ["Cloudflare Workflow"]
WF1["Agent execution steps<br/>LLM calls<br/>Tool execution"]
end
Workflow --> D1
Workflow --> DO
D1["<b>D1 Database</b><br/>Agent state<br/>Messages"]
DO["<b>Durable Object</b><br/>Stream events<br/>Real-time streaming"]
endPrerequisites
- Cloudflare account with Workers Paid plan
- Wrangler CLI:
npm install -g wrangler - D1 database for state storage
- Durable Objects for streaming
Installation
npm install @helix-agents/runtime-cloudflare @helix-agents/store-cloudflareSetup Guide
1. Configure wrangler.toml
name = "agent-worker"
main = "src/index.ts"
compatibility_date = "2024-01-01"
# D1 Database for state
[[d1_databases]]
binding = "DB"
database_name = "agent-state"
database_id = "your-database-id"
# Durable Object for streaming
[durable_objects]
bindings = [
{ name = "STREAM_MANAGER", class_name = "StreamManagerDO" }
]
[[migrations]]
tag = "v1"
new_classes = ["StreamManagerDO"]
# Workflow binding
[[workflows]]
name = "AGENT_WORKFLOW"
class_name = "AgentWorkflow"2. Use Programmatic Migrations
The D1StateStore uses programmatic migrations that are automatically applied. Call runMigration() on startup:
import { runMigration } from '@helix-agents/store-cloudflare';
// Run migrations on startup (safe to call every request - no-op if already migrated)
await runMigration(env.DB);The framework creates tables automatically with session-centric naming (all prefixed with __agents_):
-- Core state table
CREATE TABLE __agents_states (
session_id TEXT PRIMARY KEY,
agent_type TEXT NOT NULL,
stream_id TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'active',
step_count INTEGER DEFAULT 0,
custom_state TEXT,
output TEXT,
error TEXT,
parent_session_id TEXT,
aborted INTEGER DEFAULT 0,
abort_reason TEXT,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL
);
-- Messages table (separated for O(1) append)
CREATE TABLE __agents_messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id TEXT NOT NULL,
sequence INTEGER NOT NULL,
message TEXT NOT NULL,
created_at INTEGER NOT NULL,
UNIQUE(session_id, sequence)
);3. Create the Durable Object
// src/stream-manager-do.ts
import { DurableObject } from 'cloudflare:workers';
export class StreamManagerDO extends DurableObject {
private chunks: Map<string, StreamChunk[]> = new Map();
async write(streamId: string, chunk: StreamChunk): Promise<void> {
const chunks = this.chunks.get(streamId) ?? [];
chunks.push(chunk);
this.chunks.set(streamId, chunks);
// Notify connected clients via WebSocket
this.ctx.getWebSockets().forEach((ws) => {
ws.send(JSON.stringify(chunk));
});
}
async read(streamId: string, fromOffset: number): Promise<StreamChunk[]> {
const chunks = this.chunks.get(streamId) ?? [];
return chunks.slice(fromOffset);
}
async fetch(request: Request): Promise<Response> {
// WebSocket upgrade for real-time streaming
if (request.headers.get('Upgrade') === 'websocket') {
const [client, server] = Object.values(new WebSocketPair());
this.ctx.acceptWebSocket(server);
return new Response(null, { status: 101, webSocket: client });
}
return new Response('Expected WebSocket', { status: 400 });
}
}4. Create the Workflow
// src/workflows/agent-workflow.ts
import { WorkflowEntrypoint, WorkflowStep, WorkflowEvent } from 'cloudflare:workers';
import { AgentRegistry } from '@helix-agents/runtime-cloudflare';
import { executeWorkflowStep } from '@helix-agents/runtime-cloudflare';
export class AgentWorkflow extends WorkflowEntrypoint<Env, AgentWorkflowInput> {
async run(event: WorkflowEvent<AgentWorkflowInput>, step: WorkflowStep) {
const { sessionId, agentType, message, streamId } = event.payload;
// Get agent from registry
const registry = new AgentRegistry();
const agent = registry.get(agentType);
// Initialize or load state
let state = await step.do('load-state', async () => {
return this.env.DB.prepare('SELECT * FROM __agents_states WHERE session_id = ?').bind(sessionId).first();
});
if (!state) {
state = await step.do('init-state', async () => {
// Initialize new agent state
return initializeState(agent, sessionId, streamId, message);
});
}
// Main execution loop
while (state.status === 'running') {
// Check for abort
const aborted = await step.do(`check-abort-${state.step_count}`, async () => {
const row = await this.env.DB.prepare('SELECT aborted FROM __agents_states WHERE session_id = ?')
.bind(sessionId)
.first();
return row?.aborted === 1;
});
if (aborted) {
state.status = 'failed';
state.error = 'Aborted by user';
break;
}
// Execute one step
state = await step.do(`step-${state.step_count}`, async () => {
return executeWorkflowStep(agent, state, this.env.DB, this.env.STREAM_MANAGER, this.env);
});
}
// Save final state
await step.do('save-final', async () => {
await saveState(this.env.DB, state);
});
return {
status: state.status,
output: state.output,
error: state.error,
};
}
}5. Create the Worker Entry
// src/index.ts
import { CloudflareAgentExecutor } from '@helix-agents/runtime-cloudflare';
import { D1StateStore, DOStreamManager } from '@helix-agents/store-cloudflare';
import { AgentWorkflow } from './workflows/agent-workflow';
import { StreamManagerDO } from './stream-manager-do';
export { AgentWorkflow, StreamManagerDO };
export default {
async fetch(request: Request, env: Env): Promise<Response> {
const url = new URL(request.url);
// Create executor
const executor = new CloudflareAgentExecutor({
workflowBinding: env.AGENT_WORKFLOW,
stateStore: new D1StateStore(env.DB),
streamManager: new DOStreamManager(env.STREAM_MANAGER),
});
// POST /agent/execute - Start new execution
if (url.pathname === '/agent/execute' && request.method === 'POST') {
const { agentType, message, sessionId } = await request.json();
const agent = registry.get(agentType);
const handle = await executor.execute(agent, { message }, { sessionId });
return Response.json({
sessionId: handle.sessionId,
streamUrl: `/agent/stream/${handle.sessionId}`,
});
}
// GET /agent/stream/:sessionId - SSE stream
if (url.pathname.startsWith('/agent/stream/')) {
const sessionId = url.pathname.split('/').pop();
const handle = await executor.getHandle(registry.get('default'), sessionId);
if (!handle) {
return new Response('Not found', { status: 404 });
}
const stream = await handle.stream();
return new Response(
new ReadableStream({
async start(controller) {
for await (const chunk of stream) {
controller.enqueue(`data: ${JSON.stringify(chunk)}\n\n`);
}
controller.close();
},
}),
{
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
},
}
);
}
// GET /agent/result/:sessionId - Get result
if (url.pathname.startsWith('/agent/result/')) {
const sessionId = url.pathname.split('/').pop();
const handle = await executor.getHandle(registry.get('default'), sessionId);
if (!handle) {
return new Response('Not found', { status: 404 });
}
const result = await handle.result();
return Response.json(result);
}
return new Response('Not found', { status: 404 });
},
};Agent Registry
Register agents so the workflow can instantiate them:
// src/registry.ts
import { AgentRegistry } from '@helix-agents/runtime-cloudflare';
import { ResearchAgent, AnalyzerAgent } from './agents';
export const registry = new AgentRegistry();
registry.register(ResearchAgent);
registry.register(AnalyzerAgent);Executor API
Creating the Executor
import { CloudflareAgentExecutor } from '@helix-agents/runtime-cloudflare';
const executor = new CloudflareAgentExecutor({
workflowBinding: env.AGENT_WORKFLOW, // From wrangler.toml
stateStore: d1StateStore,
streamManager: doStreamManager,
});Executing Agents
const handle = await executor.execute(MyAgent, { message: 'Research quantum computing' }, {
sessionId: 'custom-session-id', // Optional
});Getting Handles
const handle = await executor.getHandle(MyAgent, sessionId);
if (handle) {
const result = await handle.result();
console.log(result);
}Multi-Turn Conversations
The Workflows runtime supports the same multi-turn conversation API as the JS runtime using the session-centric model:
Using sessionId
// First message - creates a new session
const handle1 = await executor.execute(agent, 'Hello, my name is Alice', {
sessionId: 'session-123',
});
await handle1.result();
// Continue the conversation - same sessionId
const handle2 = await executor.execute(agent, 'What is my name?', {
sessionId: 'session-123',
});Using handle.send()
const handle1 = await executor.execute(agent, 'Hello', {
sessionId: 'session-123',
});
await handle1.result();
const handle2 = await handle1.send('Tell me more');Using Direct Messages
const handle = await executor.execute(agent, {
message: 'Continue from here',
messages: myExternalMessageHistory,
});Behavior Table
| Input | Messages Source | State Source |
|---|---|---|
message only (new session) | Empty (fresh) | Empty (fresh) |
message + sessionId (existing) | From session | From session |
message + messages | From messages | Empty (fresh) |
message + state | Empty (fresh) | From state |
message + sessionId + messages | From messages (override) | From session |
message + sessionId + state | From session | From state (override) |
| All four | From messages (override) | From state (override) |
See JS Runtime - Multi-Turn Conversations for detailed documentation.
Streaming
Server-Sent Events
// Worker endpoint
if (url.pathname.startsWith('/stream/')) {
const stream = await handle.stream();
return new Response(
new ReadableStream({
async start(controller) {
for await (const chunk of stream) {
controller.enqueue(new TextEncoder().encode(`data: ${JSON.stringify(chunk)}\n\n`));
}
controller.close();
},
}),
{
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
Connection: 'keep-alive',
},
}
);
}WebSocket (via Durable Objects)
// Client connects to DO WebSocket
const ws = new WebSocket(`wss://your-worker.workers.dev/ws/${streamId}`);
ws.onmessage = (event) => {
const chunk = JSON.parse(event.data);
handleChunk(chunk);
};Sub-Agent Handling
Sub-agents execute as nested workflow calls:
// In workflow step
const subAgentResult = await step.do('sub-agent-call', async () => {
// Start child workflow
const instance = await this.env.AGENT_WORKFLOW.create({
id: `agent__${subAgentType}__${subSessionId}`,
params: {
agentType: subAgentType,
sessionId: subSessionId,
streamId: parentStreamId, // Same stream
message: inputMessage,
parentSessionId: parentSessionId,
},
});
// Wait for completion
return pollUntilComplete(instance);
});Workflow Steps
Cloudflare Workflows use steps for durability:
// Each step is durable - if worker restarts, execution continues
state = await step.do('step-1', async () => {
// LLM call
return await callLLM(messages, tools);
});
state = await step.do('step-2', async () => {
// Tool execution
return await executeTools(toolCalls);
});Key points:
- Steps are atomic and retried on failure
- State between steps is persisted
- Worker can restart between steps without data loss
Abort Handling
// Set abort flag in D1
await handle.abort('User cancelled');
// In workflow, check abort flag each step
const aborted = await step.do('check-abort', async () => {
const row = await env.DB.prepare('SELECT aborted FROM __agents_states WHERE session_id = ?')
.bind(sessionId)
.first();
return row?.aborted === 1;
});
if (aborted) {
return { status: 'failed', error: 'Aborted' };
}Interrupt Handling
Unlike abort (which is a hard stop), interrupt is a soft stop that saves state for later resumption:
// Interrupt the agent (soft stop)
await handle.interrupt('user_requested');
// Agent status becomes 'interrupted'
const state = await handle.getState();
console.log(state.status); // 'interrupted'
// Later, resume execution
const { canResume } = await handle.canResume();
if (canResume) {
const newHandle = await handle.resume();
const result = await newHandle.result();
}How It Works
The Cloudflare runtime uses a dual approach for responsive interrupts:
- Interrupt flag - Set in D1 via
stateStore.setInterruptFlag()for persistence - Interrupt event - Sent via
instance.sendEvent()for immediate wake-up
The workflow checks for interrupts at two points:
- At each step boundary - Before starting a new LLM call
- During sub-agent waits - Using
Promise.raceagainst interrupt events
Sub-Agent Interrupt Propagation
When an agent has running sub-agents, interrupts propagate through the entire hierarchy:
User calls handle.interrupt()
│
▼
Parent receives interrupt (immediate via event)
│
├──► Child 1: interrupt flag set
├──► Child 2: interrupt flag set
└──► Child 3: interrupt flag set
│
▼
Each child stops at next safe point
│
▼
Parent returns { status: 'interrupted' }Target latency: < 200ms from interrupt request to stopped execution.
See Interrupt and Resume for complete documentation including resume modes and error handling.
Stream Resumption
The Workflows runtime supports stream resumption for handling client disconnections during agent execution. This uses the same FrontendHandler pattern as other runtimes.
Setting Up Stream Resumption
import { createFrontendHandler } from '@helix-agents/ai-sdk';
import { D1StateStore, DOStreamManager } from '@helix-agents/store-cloudflare';
export default {
async fetch(request: Request, env: Env): Promise<Response> {
const url = new URL(request.url);
const handler = createFrontendHandler({
executor, // CloudflareAgentExecutor
stateStore: new D1StateStore(env.DB),
streamManager: new DOStreamManager(env.STREAM_MANAGER),
agent: myAgent,
});
// Handle stream resumption
if (url.pathname.startsWith('/api/chat/')) {
const sessionId = url.pathname.split('/').pop()!;
const fromSequence = parseInt(request.headers.get('Last-Event-ID') ?? '0');
return handler.handleStream(sessionId, { fromSequence });
}
return new Response('Not found', { status: 404 });
}
};Frontend Integration
Use HelixChatTransport for automatic resume handling:
import { HelixChatTransport } from '@helix-agents/ai-sdk/client';
import { useChat } from '@ai-sdk/react';
function Chat({ sessionId, snapshot }) {
const { messages } = useChat({
id: sessionId,
initialMessages: snapshot?.messages ?? [],
resume: snapshot?.status === 'active',
transport: new HelixChatTransport({
api: `/api/chat/${sessionId}`,
resumeFromSequence: snapshot?.streamSequence,
}),
});
}Limitations
Subrequest Limit Impact
Stream resumption in the Workflows runtime is subject to the 1000 subrequest limit. Each stream chunk write to the Durable Object counts as a subrequest. For streaming-heavy agents that require frequent reconnections, consider the Durable Objects runtime which has unlimited streaming.
Key considerations:
- Chunk storage - Stream chunks are stored in the Durable Object via
DOStreamManager - Sequence tracking - Each chunk has a sequence number for resumption
- TTL management - Configure appropriate chunk retention based on session duration
Deployment
Development
wrangler devProduction
wrangler deploySecrets
wrangler secret put OPENAI_API_KEYAccess in code:
const apiKey = env.OPENAI_API_KEY;D1 State Store
The Workflows runtime uses D1 for state:
import { D1StateStore } from '@helix-agents/store-cloudflare';
const stateStore = new D1StateStore({ database: env.DB });
// Save state
await stateStore.saveState(sessionId, state);
// Load state
const state = await stateStore.loadState(sessionId);
// Append messages
await stateStore.appendMessages(sessionId, messages);Limitations
Subrequest Limit
Important
Cloudflare Workers have a 1000 subrequest limit per invocation. Each stream chunk write to the Durable Object counts as a subrequest. For streaming-heavy agents, consider the Durable Objects runtime which bypasses this limit.
Workflow Duration
Cloudflare Workflows have time limits:
- Individual steps: 15 minutes
- Total workflow: varies by plan
For longer agents, implement checkpointing.
Cold Starts
Edge workers may have cold starts. Minimize initialization code.
D1 Limitations
- Single region (replication to read replicas)
- Write throughput limits
- No full-text search
Durable Object Limits
- Single instance per ID
- Memory limits per instance
- Geographic pinning
Best Practices
1. Efficient Steps
Group related operations in single steps:
// Good: One step for LLM + response processing
await step.do('llm-step', async () => {
const response = await callLLM(...);
const parsed = processResponse(response);
await saveToD1(parsed);
return parsed;
});
// Avoid: Separate steps for each operation (more overhead)2. Handle Rate Limits
Cloudflare has request limits. Implement backoff:
async function withRetry<T>(fn: () => Promise<T>): Promise<T> {
for (let attempt = 0; attempt < 3; attempt++) {
try {
return await fn();
} catch (error) {
if (error.message.includes('rate limit')) {
await sleep(Math.pow(2, attempt) * 1000);
continue;
}
throw error;
}
}
throw new Error('Max retries exceeded');
}3. Optimize D1 Queries
Use indexes and prepared statements:
// Good: Prepared statement with index
const state = await env.DB.prepare('SELECT * FROM __agents_states WHERE session_id = ?')
.bind(sessionId)
.first();
// Avoid: String interpolation, full scans4. Stream Efficiently
Buffer small chunks before sending:
const buffer: StreamChunk[] = [];
const BATCH_SIZE = 10;
async function flushBuffer() {
if (buffer.length > 0) {
await streamManager.writeBatch(streamId, buffer);
buffer.length = 0;
}
}
for await (const chunk of stream) {
buffer.push(chunk);
if (buffer.length >= BATCH_SIZE) {
await flushBuffer();
}
}
await flushBuffer();Next Steps
- Cloudflare Overview - Compare with DO runtime
- Durable Objects Runtime - Alternative for heavy streaming
- Storage: Cloudflare - D1 and Durable Objects details