Skip to content

@helix-agents/runtime-temporal

Temporal runtime for durable agent execution. Provides crash recovery, automatic retries, and distributed execution using Temporal workflows.

Installation

bash
npm install @helix-agents/runtime-temporal

AgentRegistry

Maps agent type names to configurations. Supports both static registration and factory functions for dynamic agent creation.

typescript
import { AgentRegistry, AgentNotFoundError } from '@helix-agents/runtime-temporal';

const registry = new AgentRegistry();

// Static registration
registry.register(ResearchAgent);
registry.register(SummarizerAgent);

// Factory registration (for agents needing runtime deps)
registry.registerFactory<Env>('dynamic-agent', (ctx) =>
  createAgent({ apiKey: ctx.env.API_KEY, userId: ctx.userId })
);

// Resolve (handles both static and factory)
const agent = registry.resolve('dynamic-agent', {
  env,
  sessionId: 'session-123',
  runId: 'run-456',
  userId: 'user-789',
});

// Get static agent only (legacy)
const agent = registry.get('research-assistant');

// Check if registered
const exists = registry.has('research-assistant');

// Remove registration
registry.unregister('research-assistant');

// Get all registered types
const types = registry.getRegisteredTypes();

// Clear all registrations (for testing)
registry.clear();

// Use default global registry
import { defaultAgentRegistry } from '@helix-agents/runtime-temporal';

AgentFactoryContext

Context passed to factory functions:

typescript
interface AgentFactoryContext<TEnv = unknown> {
  /** Environment bindings (e.g., API keys, database connections) */
  env: TEnv;
  /** Current session identifier */
  sessionId: string;
  /** Current run identifier */
  runId: string;
  /** Optional user identifier */
  userId?: string;
}

AgentFactory

Factory function type:

typescript
type AgentFactory<TEnv = unknown> = (
  context: AgentFactoryContext<TEnv>
) => AgentConfig<z.ZodType, z.ZodType>;

AgentResolverInterface

Interface implemented by AgentRegistry:

typescript
interface AgentResolverInterface<TEnv = unknown> {
  /**
   * Resolve an agent configuration by type.
   * @throws AgentNotFoundError if agent type is not found
   */
  resolve(
    agentType: string,
    context: AgentFactoryContext<TEnv>
  ): AgentConfig<z.ZodType, z.ZodType>;

  /**
   * Check if an agent type is registered.
   */
  has(agentType: string): boolean;
}

AgentNotFoundError

Error thrown when an agent type is not found:

typescript
class AgentNotFoundError extends Error {
  /** The agent type that was requested */
  readonly agentType: string;

  /** List of available agent types */
  readonly availableTypes: string[];

  constructor(agentType: string, availableTypes: string[]);
}

Example usage:

typescript
import { AgentNotFoundError } from '@helix-agents/runtime-temporal';

try {
  const agent = registry.resolve('unknown', context);
} catch (error) {
  if (error instanceof AgentNotFoundError) {
    console.error(`Unknown agent type: ${error.agentType}`);
    console.error(`Available types: ${error.availableTypes.join(', ')}`);
  }
}

Registry Methods Summary

MethodDescription
register(config)Register a static agent configuration
registerFactory(type, factory)Register a factory function
resolve(type, context)Resolve agent (factory-first, then static)
get(type)Get static agent only (legacy)
has(type)Check if agent type is registered
unregister(type)Remove an agent registration
getRegisteredTypes()Get all registered agent type names
clear()Remove all registrations (for testing)

GenericAgentActivities

Activity implementations for Temporal workflows. No decorators—wrap these with platform-specific activity decorators.

typescript
import { GenericAgentActivities } from '@helix-agents/runtime-temporal';

const activities = new GenericAgentActivities({
  registry, // AgentRegistry
  stateStore, // StateStore
  streamManager, // StreamManager
  llmAdapter, // LLMAdapter
  logger, // Optional: Logger
});

// Available activity methods:
activities.initializeAgentState(input);
activities.executeAgentStep(input);
activities.executeToolCall(input);
activities.registerSubAgents(input);
activities.recordSubAgentResult(input);
activities.markAgentFailed(input);
activities.markAgentAborted(input);
activities.endAgentStream(input);
activities.failAgentStream(input);
activities.checkExistingState(input);
activities.updateAgentStatus(input);

runAgentWorkflow

Core workflow logic function. Wrap with Temporal workflow decorators.

typescript
import { runAgentWorkflow } from '@helix-agents/runtime-temporal/workflow-exports';
import type { AgentWorkflowActivities } from '@helix-agents/runtime-temporal/workflow-exports';

// In your workflow file
export async function agentWorkflow(input: AgentWorkflowInput): Promise<AgentWorkflowResult> {
  const activities = proxyActivities<AgentWorkflowActivities>({
    startToCloseTimeout: '5m',
  });

  return runAgentWorkflow(input, activities, {
    startChildWorkflow: async (childInput) => {
      return executeChild(agentWorkflow, {
        workflowId: childInput.runId,
        args: [childInput],
      });
    },
    isCancellation: (error) => isCancellation(error),
    log: {
      info: (msg, data) => log.info(msg, data),
      warn: (msg, data) => log.warn(msg, data),
      error: (msg, data) => log.error(msg, data),
    },
  });
}

Options:

  • startChildWorkflow - Function to start sub-agent workflows
  • isCancellation - Check if error is Temporal cancellation
  • log - Workflow-safe logger

TemporalAgentExecutor

Executor that starts and manages Temporal workflows. Same API as JSAgentExecutor.

typescript
import { TemporalAgentExecutor } from '@helix-agents/runtime-temporal';

const executor = new TemporalAgentExecutor({
  client, // Temporal Client
  taskQueue, // Task queue name
  stateStore, // StateStore
  streamManager, // StreamManager
  logger, // Optional
});

// Execute agent
const handle = await executor.execute(MyAgent, 'Hello');

// Get existing handle
const handle = await executor.getHandle(MyAgent, 'session-123');

// Check if resumable
const result = await executor.canResume(MyAgent, 'session-123');

AgentExecutionHandle

Handle returned by execute() for interacting with a running agent.

Properties

typescript
handle.sessionId; // Unique run identifier
handle.streamId; // Stream identifier

stream

Get a stream of events from the execution.

typescript
const stream = await handle.stream();

for await (const chunk of stream) {
  switch (chunk.type) {
    case 'text_delta':
      process.stdout.write(chunk.delta);
      break;
    case 'tool_start':
      console.log(`Tool: ${chunk.toolName}`);
      break;
    case 'run_interrupted':
      console.log(`Interrupted: ${chunk.reason}`);
      break;
  }
}

result

Wait for the execution to complete and get the result.

typescript
const result = await handle.result();

if (result.status === 'completed') {
  console.log('Output:', result.output);
} else if (result.status === 'interrupted') {
  console.log('Interrupted:', result.interruptReason);
}

abort

Cancel the execution. This is a HARD stop - the agent fails and cannot be resumed. Sends a cancellation signal to the Temporal workflow.

typescript
await handle.abort('User requested cancellation');

interrupt

Interrupt execution for later resumption. This is a SOFT stop - the agent can be resumed.

typescript
await handle.interrupt('user_requested');

// Status is now 'interrupted'
const state = await handle.getState();
console.log(state.status); // 'interrupted'

The interrupt sends a signal to the Temporal workflow, which checkpoints state and exits gracefully.

canResume

Check if execution can be resumed.

typescript
const { canResume, reason } = await handle.canResume();

if (canResume) {
  const resumed = await handle.resume();
}

resume

Resume interrupted or paused execution. Returns a new handle for the resumed execution.

typescript
// Continue from where it stopped
const newHandle = await handle.resume();

// Resume with a new message
const newHandle = await handle.resume({
  mode: 'with_message',
  message: 'Continue with this context',
});

// Time-travel to a specific checkpoint
const newHandle = await handle.resume({
  mode: 'from_checkpoint',
  checkpointId: 'cpv1-session-123-s5-...',
});

getState

Get current agent state.

typescript
const state = await handle.getState();
console.log('Status:', state.status);
console.log('Step count:', state.stepCount);

send

Continue the conversation after completion. Returns a new handle.

typescript
const handle1 = await executor.execute(agent, 'Hello');
await handle1.result();

const handle2 = await handle1.send('Tell me more');
const result = await handle2.result();

DTOs

Type-safe schemas for workflow and activity inputs/outputs.

Session vs Run Identifiers

  • sessionId: Primary key for state storage. A session contains all messages, state, and checkpoints.
  • runId: Identifies a specific workflow execution. Multiple runs can occur within one session.

In Temporal, the workflow ID is tied to sessionId for session continuity. The runId is execution metadata for tracing.

Workflow DTOs

typescript
import {
  AgentWorkflowInputSchema,
  AgentWorkflowResultSchema,
  type AgentWorkflowInput,
  type AgentWorkflowResult,
} from '@helix-agents/runtime-temporal';

interface AgentWorkflowInput {
  agentType: string;
  runId: string;          // Execution metadata (for tracing)
  streamId: string;
  message: string;
  initialState?: Record<string, unknown>;
  parentSessionId?: string; // Parent's sessionId for sub-agents
}

interface AgentWorkflowResult {
  status: 'completed' | 'failed' | 'cancelled';
  output?: unknown;
  error?: string;
}

Activity DTOs

typescript
import {
  InitializeAgentInputSchema,
  ExecuteAgentStepInputSchema,
  ExecuteToolCallInputSchema,
  AgentStepResultSchema,
  ToolCallResultSchema,
  // ... and more
} from '@helix-agents/runtime-temporal';

Signal DTOs

typescript
import {
  PauseAgentSignalSchema,
  ResumeAgentSignalSchema,
  AbortAgentSignalSchema,
  InterruptAgentSignalSchema,
} from '@helix-agents/runtime-temporal';

// Interrupt signal payload
interface InterruptAgentSignal {
  reason?: string;
}

Cancellation Helpers

typescript
import {
  type CancellationSignalProvider,
  defaultCancellationSignalProvider,
  createLinkedAbortController,
  isActivityCancelled,
} from '@helix-agents/runtime-temporal';

// Check if activity is cancelled
if (isActivityCancelled()) {
  throw new Error('Activity cancelled');
}

// Create linked abort controller
const controller = createLinkedAbortController();

Heartbeat Helpers

typescript
import {
  type HeartbeatProvider,
  defaultHeartbeatProvider,
  HeartbeatManager,
} from '@helix-agents/runtime-temporal';

// Manage heartbeats for long activities
const heartbeat = new HeartbeatManager({
  intervalMs: 5000,
  onHeartbeat: () => console.log('Heartbeat'),
});

heartbeat.start();
// ... do work ...
heartbeat.stop();

Complete Example

activities.ts

typescript
import { GenericAgentActivities, AgentRegistry } from '@helix-agents/runtime-temporal';
import { InMemoryStateStore, InMemoryStreamManager } from '@helix-agents/store-memory';
import { VercelAIAdapter } from '@helix-agents/llm-vercel';
import { MyAgent } from './agent.js';

const registry = new AgentRegistry();
registry.register(MyAgent);

export function createActivities() {
  const activities = new GenericAgentActivities({
    registry,
    stateStore: new InMemoryStateStore(),
    streamManager: new InMemoryStreamManager(),
    llmAdapter: new VercelAIAdapter(),
  });

  return {
    initializeAgentState: activities.initializeAgentState.bind(activities),
    executeAgentStep: activities.executeAgentStep.bind(activities),
    executeToolCall: activities.executeToolCall.bind(activities),
    registerSubAgents: activities.registerSubAgents.bind(activities),
    recordSubAgentResult: activities.recordSubAgentResult.bind(activities),
    markAgentFailed: activities.markAgentFailed.bind(activities),
    markAgentAborted: activities.markAgentAborted.bind(activities),
    endAgentStream: activities.endAgentStream.bind(activities),
    failAgentStream: activities.failAgentStream.bind(activities),
    checkExistingState: activities.checkExistingState.bind(activities),
    updateAgentStatus: activities.updateAgentStatus.bind(activities),
  };
}

workflows.ts

typescript
import { proxyActivities, executeChild, isCancellation, log } from '@temporalio/workflow';
import type { AgentWorkflowActivities } from '@helix-agents/runtime-temporal/workflow-exports';
import { runAgentWorkflow } from '@helix-agents/runtime-temporal/workflow-exports';
import type { AgentWorkflowInput, AgentWorkflowResult } from '@helix-agents/runtime-temporal';

const activities = proxyActivities<AgentWorkflowActivities>({
  startToCloseTimeout: '5m',
  retry: {
    initialInterval: '1s',
    maximumInterval: '30s',
    backoffCoefficient: 2,
    maximumAttempts: 3,
  },
});

export async function agentWorkflow(input: AgentWorkflowInput): Promise<AgentWorkflowResult> {
  return runAgentWorkflow(input, activities, {
    startChildWorkflow: async (childInput) => {
      return executeChild(agentWorkflow, {
        workflowId: childInput.runId,
        args: [childInput],
        parentClosePolicy: 'ABANDON',
      });
    },
    isCancellation: (error) => isCancellation(error),
    log: { info: log.info, warn: log.warn, error: log.error },
  });
}

worker.ts

typescript
import { Worker, bundleWorkflowCode, NativeConnection } from '@temporalio/worker';
import { createActivities } from './activities.js';

async function main() {
  const connection = await NativeConnection.connect({ address: 'localhost:7233' });

  const workflowBundle = await bundleWorkflowCode({
    workflowsPath: './workflows.ts',
  });

  const worker = await Worker.create({
    connection,
    taskQueue: 'my-agents',
    workflowBundle,
    activities: createActivities(),
  });

  await worker.run();
}

Interrupt and Resume

The Temporal runtime provides durable interrupt/resume via workflow signals.

Interrupt Signal Handling

Workflows listen for the interruptAgent signal:

typescript
import { setHandler, defineSignal } from '@temporalio/workflow';
import { INTERRUPT_SIGNAL_NAME } from '@helix-agents/runtime-temporal';

const interruptSignal = defineSignal<[{ reason?: string }]>(INTERRUPT_SIGNAL_NAME);

export async function agentWorkflow(input: AgentWorkflowInput): Promise<AgentWorkflowResult> {
  let interrupted = false;
  let interruptReason: string | undefined;

  setHandler(interruptSignal, (payload) => {
    interrupted = true;
    interruptReason = payload.reason;
  });

  // In your step loop, check for interruption
  while (!interrupted && status === 'running') {
    // Execute step...
    if (interrupted) {
      await activities.checkpointAndExit({ runId, reason: interruptReason });
      return { status: 'interrupted' };
    }
  }
}

Resuming After Process Restart

Temporal provides built-in durability. After worker restart, reconnect to workflows:

typescript
import { TemporalAgentExecutor } from '@helix-agents/runtime-temporal';
import { Client, Connection } from '@temporalio/client';

// Reconnect to Temporal
const connection = await Connection.connect({ address: 'localhost:7233' });
const client = new Client({ connection });

const executor = new TemporalAgentExecutor({
  client,
  taskQueue: 'my-agents',
  stateStore,
  streamManager,
});

// Resume interrupted workflow
const handle = await executor.getHandle(MyAgent, savedSessionId);
if (handle) {
  const { canResume } = await handle.canResume();
  if (canResume) {
    const resumed = await handle.resume();
    const result = await resumed.result();
  }
}

Workflow Execution Semantics

ActionWorkflow Behavior
interrupt()Sends signal, workflow checkpoints and exits
resume()Starts new workflow from checkpoint
abort()Sends cancellation, workflow terminates

See Also

Released under the MIT License.