Skip to content

@helix-agents/runtime-cloudflare

Cloudflare runtime for Helix Agents with two execution approaches: Workflows for step-level durability, and Durable Objects for unlimited streaming.

Installation

bash
npm install @helix-agents/runtime-cloudflare

For Workflows approach, also install:

bash
npm install @helix-agents/store-cloudflare

Durable Objects Runtime API

The DO approach executes agents directly inside a Durable Object, bypassing the 1000 subrequest limit by streaming directly to WebSocket/SSE connections.

The composition API using createAgentServer() is the recommended approach for creating AgentServer instances.

createAgentServer

Factory function that creates a Durable Object class with composition-based configuration.

typescript
import { createAgentServer, AgentRegistry } from '@helix-agents/runtime-cloudflare';
import { VercelAIAdapter } from '@helix-agents/llm-vercel';

const registry = new AgentRegistry();
registry.register(myAgent);

export const MyAgentServer = createAgentServer<Env>({
  llmAdapter: (env) => new VercelAIAdapter({
    anthropic: createAnthropic({ apiKey: env.ANTHROPIC_API_KEY }),
  }),
  agents: registry,
  hooks: {
    beforeStart: async ({ executionState }) => {
      if (executionState.isExecuting) {
        await executionState.interrupt('superseded');
      }
    },
  },
});

AgentServerConfig

Configuration for createAgentServer().

typescript
interface AgentServerConfig<TEnv = unknown> {
  /**
   * Factory to create LLM adapters.
   * Called once per execution with environment and context.
   * Required.
   */
  llmAdapter: LLMAdapterFactory<TEnv>;

  /**
   * Agent resolution strategy.
   * Can be an AgentRegistry instance or a resolver function.
   * Required.
   */
  agents: AgentResolverInterface<TEnv> | AgentResolver<TEnv>;

  /**
   * Optional lifecycle hooks for customization.
   */
  hooks?: ServerHooks<TEnv>;

  /**
   * Optional custom endpoints.
   * Keys are paths (e.g., "/my-endpoint"), values are handlers.
   * Custom endpoints are checked BEFORE built-in endpoints.
   */
  endpoints?: Record<string, EndpointHandler<TEnv>>;

  /**
   * Optional usage store factory.
   * If not provided, uses the DO-local DOUsageStore.
   */
  usageStore?: UsageStoreFactory<TEnv>;

  /**
   * Optional logger.
   * If not provided, uses noopLogger.
   */
  logger?: Logger;
}

LLMAdapterFactory

Factory function for creating LLM adapters with access to environment and execution context.

typescript
type LLMAdapterFactory<TEnv> = (
  env: TEnv,
  context: LLMAdapterContext
) => LLMAdapter;

interface LLMAdapterContext {
  /** Session ID for the execution */
  sessionId: string;
  /** Agent type being executed */
  agentType: string;
  /** Run ID for this execution */
  runId: string;
}

AgentResolver

Resolver function type for dynamic agent resolution.

typescript
type AgentResolver<TEnv = unknown> = (
  agentType: string,
  context: AgentFactoryContext<TEnv>
) => AgentConfig<z.ZodType, z.ZodType>;

ServerHooks

Lifecycle hooks for customizing AgentServer behavior.

typescript
interface ServerHooks<TEnv = unknown> {
  /**
   * Called before starting execution.
   * Can abort by returning { abort: true, response: Response }.
   */
  beforeStart?: (context: BeforeStartContext<TEnv>) => Promise<void | AbortResult>;

  /**
   * Called after execution handle is created (before it completes).
   */
  afterStart?: (context: AfterStartContext<TEnv>) => Promise<void>;

  /**
   * Called before resuming execution.
   * Can abort by returning { abort: true, response: Response }.
   */
  beforeResume?: (context: BeforeResumeContext<TEnv>) => Promise<void | AbortResult>;

  /**
   * Called after resume handle is created.
   */
  afterResume?: (context: AfterResumeContext<TEnv>) => Promise<void>;

  /**
   * Called before retrying execution.
   * Can abort by returning { abort: true, response: Response }.
   */
  beforeRetry?: (context: BeforeRetryContext<TEnv>) => Promise<void | AbortResult>;

  /**
   * Called after retry handle is created.
   */
  afterRetry?: (context: AfterRetryContext<TEnv>) => Promise<void>;

  /**
   * Called when execution completes (success, failure, or interrupt).
   */
  onComplete?: (context: CompleteContext<TEnv>) => Promise<void>;

  /**
   * Transform incoming /start request to standard format.
   */
  transformRequest?: (rawBody: unknown, env: TEnv) => Promise<StartAgentRequestV2>;

  /**
   * Transform outgoing /start response.
   */
  transformResponse?: (
    response: { sessionId: string; streamId: string; status: 'started' | 'resumed' },
    context: { sessionId: string; agentType: string }
  ) => Promise<unknown>;
}

Hook Contexts

typescript
interface BeforeStartContext<TEnv> {
  env: TEnv;
  request: Request;
  body: StartAgentRequestV2;
  executionState: ExecutionState;
}

interface AfterStartContext<TEnv> {
  env: TEnv;
  sessionId: string;
  runId: string;
  agentType: string;
  handle: AgentExecutionHandle<unknown>;
}

interface BeforeResumeContext<TEnv> {
  env: TEnv;
  request: Request;
  body: ResumeAgentRequestV2;
  executionState: ExecutionState;
}

interface AfterResumeContext<TEnv> {
  env: TEnv;
  sessionId: string;
  runId: string;
  agentType: string;
  handle: AgentExecutionHandle<unknown>;
}

interface BeforeRetryContext<TEnv> {
  env: TEnv;
  request: Request;
  body: RetryAgentRequestV2;
  executionState: ExecutionState;
}

interface AfterRetryContext<TEnv> {
  env: TEnv;
  sessionId: string;
  runId: string;
  agentType: string;
  handle: AgentExecutionHandle<unknown>;
}

interface CompleteContext<TEnv> {
  env: TEnv;
  sessionId: string;
  runId: string;
  status: 'completed' | 'failed' | 'interrupted';
  result?: unknown;
  error?: Error;
}

AbortResult

Return type for hooks that can abort the request.

typescript
interface AbortResult {
  abort: true;
  response: Response;
}

ExecutionState

Interface for checking and controlling execution state, available in hooks.

typescript
interface ExecutionState {
  /** Session ID of current execution, or null if none */
  readonly sessionId: string | null;

  /** Run ID of current execution, or null if none */
  readonly runId: string | null;

  /** Whether an execution is currently running */
  readonly isExecuting: boolean;

  /** Current execution status */
  readonly status: 'idle' | 'running' | 'paused' | 'interrupted' | 'completed' | 'failed';

  /** Get the current execution handle (if any) */
  getHandle<T = unknown>(): AgentExecutionHandle<T> | null;

  /** Interrupt the current execution (soft stop, resumable) */
  interrupt(reason?: string): Promise<void>;

  /** Abort the current execution (hard stop, not resumable) */
  abort(reason?: string): Promise<void>;
}

EndpointHandler

Handler function for custom endpoints.

typescript
type EndpointHandler<TEnv> = (
  request: Request,
  context: EndpointContext<TEnv>
) => Promise<Response>;

interface EndpointContext<TEnv> {
  env: TEnv;
  executionState: ExecutionState;
  logger: Logger;
}

Request Types (V2)

The composition API uses updated request types that use agentType instead of the full AgentConfig.

StartAgentRequestV2

typescript
interface StartAgentRequestV2 {
  /** Agent type to execute (looked up in registry/resolver) */
  agentType: string;

  /** Input for the agent */
  input: {
    message: string;
    state?: Record<string, unknown>;
    messages?: Message[];
  };

  /** Session identifier (required) */
  sessionId: string;

  /** Optional user identifier */
  userId?: string;

  /** Optional tags for the session */
  tags?: string[];

  /** Optional metadata for the session */
  metadata?: Record<string, string>;
}

ResumeAgentRequestV2

typescript
interface ResumeAgentRequestV2 {
  /** Agent type (for context) */
  agentType: string;

  /** Resume options */
  options: {
    mode?: 'continue' | 'retry' | 'branch';
    checkpointId?: string;
    modifyState?: unknown;
    appendMessages?: Message[];
  };
}

RetryAgentRequestV2

typescript
interface RetryAgentRequestV2 {
  /** Agent type (for context) */
  agentType: string;

  /** Retry options */
  options?: {
    mode?: 'from_checkpoint' | 'from_start';
    message?: string;
  };
}

AgentRegistry

Maps agent type names to configurations. Supports both static registration and factory functions.

typescript
import { AgentRegistry, defaultAgentRegistry } from '@helix-agents/runtime-cloudflare';

const registry = new AgentRegistry();

// Static registration
registry.register(MyAgent);

// Factory registration
registry.registerFactory<Env>('brainstorm', (ctx) =>
  createBrainstormAgent({ apiKey: ctx.env.API_KEY })
);

// Resolve (handles both static and factory)
const agent = registry.resolve('brainstorm', { env, sessionId, runId });

// Get static agent only (legacy)
const agent = registry.get('researcher');

// Other methods
registry.has('researcher');           // Check if registered
registry.unregister('researcher');    // Remove registration
registry.getRegisteredTypes();        // Get all type names
registry.clear();                     // Remove all (for testing)

AgentFactoryContext

Context passed to agent factory functions.

typescript
interface AgentFactoryContext<TEnv = unknown> {
  /** Environment bindings (e.g., API keys, database connections) */
  env: TEnv;
  /** Current session identifier */
  sessionId: string;
  /** Current run identifier */
  runId: string;
  /** Optional user identifier */
  userId?: string;
}

AgentFactory

Factory function type for dynamic agent creation.

typescript
type AgentFactory<TEnv = unknown> = (
  context: AgentFactoryContext<TEnv>
) => AgentConfig<z.ZodType, z.ZodType>;

AgentResolverInterface

Interface implemented by AgentRegistry and custom resolvers.

typescript
interface AgentResolverInterface<TEnv = unknown> {
  /**
   * Resolve an agent configuration by type.
   * @throws AgentNotFoundError if agent type is not found
   */
  resolve(
    agentType: string,
    context: AgentFactoryContext<TEnv>
  ): AgentConfig<z.ZodType, z.ZodType>;

  /**
   * Check if an agent type is registered.
   */
  has(agentType: string): boolean;
}

AgentNotFoundError

Error thrown when an agent type is not found in the registry.

typescript
class AgentNotFoundError extends Error {
  /** The agent type that was requested */
  readonly agentType: string;

  /** List of available agent types */
  readonly availableTypes: string[];

  constructor(agentType: string, availableTypes: string[]);
}

Example usage:

typescript
import { AgentNotFoundError } from '@helix-agents/runtime-cloudflare';

try {
  const agent = registry.resolve('unknown', context);
} catch (error) {
  if (error instanceof AgentNotFoundError) {
    console.error(`Unknown agent type: ${error.agentType}`);
    console.error(`Available types: ${error.availableTypes.join(', ')}`);
  }
}

AgentServer (Legacy - Deprecated)

Deprecated

The subclass-based AgentServer API is deprecated. Use createAgentServer() instead for new projects.

The main Durable Object class that hosts agent execution.

typescript
import { AgentServer } from '@helix-agents/runtime-cloudflare';

// Export for Cloudflare to instantiate
export { AgentServer };

AgentServer handles all execution internally and exposes HTTP/WebSocket endpoints.

HTTP Endpoints

EndpointMethodDescription
/startPOSTStart new agent execution
/resumePOSTResume paused/interrupted agent
/retryPOSTRetry failed agent
/interruptPOSTSoft stop (can resume)
/abortPOSTHard stop (cannot resume)
/statusGETGet execution status
/sseGETSSE streaming connection
/historyGETGet historical chunks
/usageGETGet usage data
/messagesGETGet conversation messages
/snapshotGETGet UI state snapshot
/WebSocketReal-time streaming

Legacy Request Types

Deprecated

These request types are for the legacy AgentServer API. Use StartAgentRequestV2 and ResumeAgentRequestV2 with createAgentServer() instead.

StartAgentRequest (Legacy)
typescript
interface StartAgentRequest {
  agent: AgentConfig<any, any, any>;
  input: AgentInput;
  runId?: string;
  llmAdapter?: LLMAdapter;
  llmAdapterConfig?: Record<string, any>;
  hooks?: AgentHooks;
}
ResumeAgentRequest (Legacy)
typescript
interface ResumeAgentRequest {
  agent: AgentConfig<any, any, any>;
  options?: ResumeOptions;
  llmAdapter?: LLMAdapter;
  llmAdapterConfig?: Record<string, any>;
  hooks?: AgentHooks;
}

Response Types

AgentStartResponse
typescript
interface AgentStartResponse {
  sessionId: string;
  streamId: string;
  status: 'started' | 'resumed';
}
AgentStatusResponse
typescript
interface AgentStatusResponse {
  sessionId: string | null;
  status: ExecutionStatus;
  output?: unknown;
  error?: string;
  stepCount: number;
  checkpointId?: string;
  isExecuting: boolean;
}

DOAgentExecutor

API for interacting with AgentServer DOs. Useful for status checks and managing existing runs.

typescript
import { DOAgentExecutor } from '@helix-agents/runtime-cloudflare';

const executor = new DOAgentExecutor({
  agentNamespace: env.AGENTS,
});

// Get handle for existing session
const handle = await executor.getHandle(myAgent, 'session-123');

// Check status
const state = await handle.getState();

Starting Executions

For starting new agent executions, use direct DO access since LLM adapters cannot be serialized over HTTP.

DOAgentExecutorConfig

typescript
interface DOAgentExecutorConfig {
  agentNamespace: DurableObjectNamespace;
  createLLMAdapter?: () => LLMAdapter;  // Reserved for future use
  createHooks?: () => AgentHooks;       // Reserved for future use
  logger?: Logger;
}

DOStateStore

State store implementation using Durable Object's built-in SQLite.

typescript
import { DOStateStore } from '@helix-agents/runtime-cloudflare';

// Created automatically inside AgentServer
// For manual use:
const stateStore = new DOStateStore({
  sql,      // PartyServer's sql tagged template
  logger,   // Optional
});

DOStateStoreOptions

typescript
interface DOStateStoreOptions {
  sql: SqlTaggedTemplate;
  logger?: Logger;
}

DOStreamManager

Stream manager that broadcasts directly to connected WebSocket/SSE clients.

typescript
import { DOStreamManager } from '@helix-agents/runtime-cloudflare';

// Created automatically inside AgentServer
// For manual use:
const streamManager = new DOStreamManager({
  sql,
  getConnections,
  broadcast,
  sseConnections,
  logger,
});

DOStreamManagerOptions

typescript
interface DOStreamManagerOptions {
  sql: SqlTaggedTemplate;
  getConnections: () => Map<string, WebSocket>;
  broadcast: (msg: string) => void;
  sseConnections: Map<string, SSEConnection>;
  logger?: Logger;
}

DOStreamWriter

Writer returned by createWriter() for emitting chunks. Implements the StreamWriter interface.

typescript
class DOStreamWriter implements StreamWriter {
  write(chunk: StreamChunk): Promise<void>;
  close(): Promise<void>;
}

Connection Types

typescript
// WebSocket connection state (survives hibernation)
interface ConnectionState {
  fromSequence: number;
}

// SSE connection tracking (in-memory, doesn't survive hibernation)
interface SSEConnection {
  writer: WritableStreamDefaultWriter<Uint8Array>;
  encoder: TextEncoder;
  fromSequence: number;
  lastActivity: number;
}

type StreamStatus = 'active' | 'paused' | 'ended' | 'failed';

DOUsageStore

Usage store implementation using Durable Object's built-in SQLite.

typescript
import { DOUsageStore } from '@helix-agents/runtime-cloudflare';

// Created automatically inside AgentServer
// For manual use:
const usageStore = new DOUsageStore({
  sql,
  sessionId,
  logger,
});

// Record usage
await usageStore.recordEntry(entry);

// Get entries
const entries = await usageStore.getEntries(sessionId);

// Get rollup
const rollup = await usageStore.getRollup(sessionId);

Durable Object Types

typescript
import type {
  DurableObjectNamespace,
  DurableObjectId,
  DurableObjectStub,
  SqlTaggedTemplate,
} from '@helix-agents/runtime-cloudflare';

Workflows Runtime API

The Workflows approach uses Cloudflare Workflows for durable, step-level execution with D1 for state.

CloudflareAgentExecutor

Executor that starts and manages Cloudflare Workflows.

typescript
import { CloudflareAgentExecutor } from '@helix-agents/runtime-cloudflare';

const executor = new CloudflareAgentExecutor({
  workflowBinding, // AgentWorkflowBinding from env
  stateStore,      // StateStore (D1StateStore)
  streamManager,   // StreamManager (DurableObjectStreamManager)
  logger,          // Optional
});

// Execute agent
const handle = await executor.execute(MyAgent, {
  message: 'Hello',
  state: { userId: 'user-123' },
});

// Get existing handle
const handle = await executor.getHandle(MyAgent, 'session-123');

CloudflareAgentExecutorConfig

typescript
interface CloudflareAgentExecutorConfig {
  workflowBinding: AgentWorkflowBinding;
  stateStore: StateStore;
  streamManager: StreamManager;
  logger?: Logger;
}

AgentExecutionHandle

Handle returned by execute() for interacting with running workflows.

typescript
// Run ID and stream ID
handle.sessionId;
handle.streamId;

// Stream events
const stream = await handle.stream();
for await (const chunk of stream) {
  switch (chunk.type) {
    case 'text_delta':
      console.log(chunk.delta);
      break;
    case 'run_interrupted':
      console.log(`Interrupted: ${chunk.reason}`);
      break;
  }
}

// Wait for result
const result = await handle.result();

// Abort execution (HARD stop - cannot resume)
await handle.abort('User cancelled');

// Interrupt execution (SOFT stop - can resume)
await handle.interrupt('user_requested');

// Check if resumable
const { canResume, reason } = await handle.canResume();

// Resume execution
const newHandle = await handle.resume();

// Resume with message
const newHandle = await handle.resume({
  mode: 'with_message',
  message: 'Continue with this context',
});

// Time-travel resume
const newHandle = await handle.resume({
  mode: 'from_checkpoint',
  checkpointId: 'cpv1-...',
});

// Get current state
const state = await handle.getState();

// Continue conversation
const handle2 = await handle.send('Follow up message');

runAgentWorkflow

Core workflow function for Cloudflare Workflows.

typescript
import { WorkflowEntrypoint, WorkflowEvent, WorkflowStep } from 'cloudflare:workers';
import { runAgentWorkflow } from '@helix-agents/runtime-cloudflare';
import { createCloudflareStore } from '@helix-agents/store-cloudflare';
import { VercelAIAdapter } from '@helix-agents/llm-vercel';

export class AgentWorkflow extends WorkflowEntrypoint<Env, AgentWorkflowInput> {
  async run(event: WorkflowEvent<AgentWorkflowInput>, step: WorkflowStep) {
    const { stateStore, streamManager } = createCloudflareStore({
      db: this.env.AGENT_DB,
      streams: this.env.STREAMS,
    });

    return runAgentWorkflow(event, step, {
      stateStore,
      streamManager,
      llmAdapter: new VercelAIAdapter(),
      registry,
      workflowBinding: this.env.AGENT_WORKFLOW,
    });
  }
}

AgentSteps

Step implementations for manual workflow control.

typescript
import { AgentSteps } from '@helix-agents/runtime-cloudflare';

const steps = new AgentSteps({
  stateStore,
  streamManager,
  llmAdapter,
  registry,
});

// Use in workflow
await steps.initializeAgentState(step, input);
await steps.executeAgentStep(step, input);
await steps.executeToolCall(step, toolCall);

createWorkflowRunner

Alternative factory for creating workflow runners.

typescript
import { createWorkflowRunner } from '@helix-agents/runtime-cloudflare';

const runner = createWorkflowRunner({
  stateStore,
  streamManager,
  llmAdapter,
  registry,
  workflowBinding,
});

// Use in workflow
return runner.run(event, step);

Binding Types

Types for Cloudflare bindings.

typescript
import type {
  AgentWorkflowBinding,
  WorkflowBinding,
  WorkflowInstance,
  WorkflowStatus,
  WorkflowDependencies,
  WorkflowOptions,
} from '@helix-agents/runtime-cloudflare';

// Define your environment
interface Env {
  AGENT_DB: D1Database;
  STREAMS: DurableObjectNamespace;
  AGENT_WORKFLOW: AgentWorkflowBinding;
  OPENAI_API_KEY: string;
}

Workflow DTOs

Type-safe schemas for workflow inputs/outputs.

Session vs Run Identifiers

  • sessionId: Primary key for state storage. A session contains all messages, state, and checkpoints.
  • runId: Identifies a specific workflow execution. Multiple runs can occur within one session.

In Cloudflare, each Durable Object or Workflow instance is tied to a sessionId. The runId is execution metadata for tracing.

typescript
import {
  // Workflow DTOs
  AgentWorkflowInputSchema,
  AgentWorkflowResultSchema,
  type AgentWorkflowInput,
  type AgentWorkflowResult,

  // Step DTOs
  InitializeAgentInputSchema,
  ExecuteAgentStepInputSchema,
  ExecuteToolCallInputSchema,
  ToolCallResultSchema,

  // Sub-agent DTOs
  RegisterSubAgentsInputSchema,
  RecordSubAgentResultInputSchema,
  MarkAgentFailedInputSchema,

  // Stream DTOs
  EndAgentStreamInputSchema,
  FailAgentStreamInputSchema,
} from '@helix-agents/runtime-cloudflare';

Workflow Types

typescript
import type {
  WorkflowStep,
  WorkflowEvent,
  WorkflowDuration,
  WorkflowRetryConfig,
  WorkflowStepConfig,
  WaitForEventOptions,
  WaitForEventResult,
  SubAgentCompletionEvent,
} from '@helix-agents/runtime-cloudflare';

Shared Utilities

pollUntil

Poll until a condition is met.

typescript
import { pollUntil } from '@helix-agents/runtime-cloudflare';

const result = await pollUntil({
  check: async () => {
    const status = await getStatus();
    if (status === 'done') {
      return { done: true, value: status };
    }
    return { done: false };
  },
  interval: 1000,
  timeout: 60000,
});

PollOptions

typescript
interface PollOptions<T> {
  check: () => Promise<PollCheckResult<T>>;
  interval?: number;
  timeout?: number;
}

type PollCheckResult<T> =
  | { done: true; value: T }
  | { done: false };

Complete Examples

Durable Objects Example (Composition API)

wrangler.toml

toml
name = "my-agent-do"
main = "src/worker.ts"
compatibility_date = "2024-01-01"

[[durable_objects.bindings]]
name = "AGENTS"
class_name = "MyAgentServer"

[[migrations]]
tag = "v1"
new_sqlite_classes = ["MyAgentServer"]

agent-server.ts

typescript
import { createAgentServer, AgentRegistry } from '@helix-agents/runtime-cloudflare';
import { VercelAIAdapter } from '@helix-agents/llm-vercel';
import { createAnthropic } from '@ai-sdk/anthropic';
import { myAgent } from './agent.js';

interface Env {
  AGENTS: DurableObjectNamespace;
  ANTHROPIC_API_KEY: string;
}

const registry = new AgentRegistry();
registry.register(myAgent);

export const MyAgentServer = createAgentServer<Env>({
  llmAdapter: (env) => new VercelAIAdapter({
    anthropic: createAnthropic({ apiKey: env.ANTHROPIC_API_KEY }),
  }),
  agents: registry,
  hooks: {
    beforeStart: async ({ executionState, body }) => {
      if (executionState.isExecuting) {
        await executionState.interrupt('New message received');
      }
      console.log(`Starting agent: ${body.agentType}`);
    },
  },
});

worker.ts

typescript
import { MyAgentServer } from './agent-server.js';
import type { Env } from './agent-server.js';

export { MyAgentServer };

export default {
  async fetch(request: Request, env: Env): Promise<Response> {
    const url = new URL(request.url);

    if (url.pathname === '/execute' && request.method === 'POST') {
      const { message } = await request.json<{ message: string }>();
      const sessionId = crypto.randomUUID();

      const doId = env.AGENTS.idFromName(`session:${sessionId}`);
      const stub = env.AGENTS.get(doId);

      const response = await stub.fetch('/start', {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify({
          agentType: 'researcher',
          input: { message },
          sessionId,
        }),
      });

      const result = await response.json<{ sessionId: string; streamId: string }>();

      return Response.json({
        sessionId: result.sessionId,
        streamUrl: `/stream/${result.sessionId}`,
        websocketUrl: `/ws/${result.sessionId}`,
      });
    }

    if (url.pathname.startsWith('/stream/')) {
      const sessionId = url.pathname.split('/').pop()!;
      const doId = env.AGENTS.idFromName(`session:${sessionId}`);
      return env.AGENTS.get(doId).fetch(new URL('/sse', request.url));
    }

    if (url.pathname.startsWith('/ws/')) {
      const sessionId = url.pathname.split('/').pop()!;
      const doId = env.AGENTS.idFromName(`session:${sessionId}`);
      return env.AGENTS.get(doId).fetch(request);
    }

    return new Response('Not found', { status: 404 });
  },
};

Workflows Example

wrangler.toml

toml
name = "my-agent-worker"
main = "src/worker.ts"
compatibility_date = "2024-12-01"
compatibility_flags = ["nodejs_compat"]

[[d1_databases]]
binding = "AGENT_DB"
database_name = "agents-db"
database_id = "xxx"

[[durable_objects.bindings]]
name = "STREAMS"
class_name = "StreamServer"

[[workflows]]
name = "agent-workflow"
binding = "AGENT_WORKFLOW"
class_name = "AgentWorkflow"

[[migrations]]
tag = "v1"
new_sqlite_classes = ["StreamServer"]

worker.ts

typescript
import { WorkflowEntrypoint, WorkflowEvent, WorkflowStep } from 'cloudflare:workers';
import {
  runAgentWorkflow,
  AgentRegistry,
  CloudflareAgentExecutor,
  type AgentWorkflowInput,
} from '@helix-agents/runtime-cloudflare';
import { createCloudflareStore, StreamServer } from '@helix-agents/store-cloudflare';
import { VercelAIAdapter } from '@helix-agents/llm-vercel';
import { MyAgent } from './agent.js';

export { StreamServer };

const registry = new AgentRegistry();
registry.register(MyAgent);

export class AgentWorkflow extends WorkflowEntrypoint<Env, AgentWorkflowInput> {
  async run(event: WorkflowEvent<AgentWorkflowInput>, step: WorkflowStep) {
    const { stateStore, streamManager } = createCloudflareStore({
      db: this.env.AGENT_DB,
      streams: this.env.STREAMS,
    });

    return runAgentWorkflow(event, step, {
      stateStore,
      streamManager,
      llmAdapter: new VercelAIAdapter(),
      registry,
      workflowBinding: this.env.AGENT_WORKFLOW,
    });
  }
}

export default {
  async fetch(request: Request, env: Env) {
    const { stateStore, streamManager } = createCloudflareStore({
      db: env.AGENT_DB,
      streams: env.STREAMS,
    });

    const executor = new CloudflareAgentExecutor({
      workflowBinding: env.AGENT_WORKFLOW,
      stateStore,
      streamManager,
    });

    if (request.method === 'POST') {
      const { message } = await request.json();
      const handle = await executor.execute(MyAgent, { message });
      return Response.json({ sessionId: handle.sessionId });
    }

    return Response.json({ error: 'Not found' }, { status: 404 });
  },
};

Interrupt and Resume

Both runtimes support interrupt/resume for pausing and continuing agent execution.

Workflows Interrupt

Interrupts use Cloudflare's step.waitForEvent():

typescript
// Interrupt from handle
await handle.interrupt('user_requested');

// Resume later
const { canResume } = await handle.canResume();
if (canResume) {
  const resumed = await handle.resume();
  const result = await resumed.result();
}

DO Interrupt

Interrupts use HTTP endpoints:

typescript
// Interrupt via fetch
await fetch(`/interrupt`, {
  method: 'POST',
  body: JSON.stringify({ reason: 'user_requested' }),
});

// Resume via fetch (composition API)
await fetch(`/resume`, {
  method: 'POST',
  body: JSON.stringify({
    agentType: 'researcher',
    options: { mode: 'continue' },
  }),
});

Workflow Execution Semantics

ActionBehavior
interrupt()Soft stop, saves checkpoint, can resume
resume()Continue from checkpoint or with new message
abort()Hard stop, marks as failed, cannot resume

See Also

Released under the MIT License.