Distributed Coordination
When running agents across multiple processes or servers, you need coordination to prevent conflicts. This guide covers the tools and patterns for safe distributed execution.
The Problem
In distributed deployments, multiple processes might try to:
- Resume the same agent simultaneously
- Write state at the same time
- Process the same step concurrently
Without coordination, this leads to:
- Split-brain - Two processes think they own the agent
- Lost updates - One process overwrites another's changes
- Corrupted state - Partial writes from concurrent modifications
Solutions
Helix Agents provides two complementary mechanisms:
- Lock Managers - Prevent concurrent execution
- Optimistic Concurrency - Detect stale writes via version checking
Lock Managers
Lock managers provide distributed mutex semantics. A lock ensures only one process executes an agent at a time.
Available Implementations
| Lock Manager | Use Case | Backend |
|---|---|---|
NoOpLockManager | Single process | None |
InMemoryLockManager | Development | Memory |
RedisLockManager | Production | Redis |
DurableObjectLockManager | Cloudflare | Durable Objects |
NoOpLockManager
For single-process deployments where coordination isn't needed:
import { NoOpLockManager } from '@helix-agents/core';
const lockManager = new NoOpLockManager();
// All lock operations succeed immediatelyInMemoryLockManager
For development and testing:
import { InMemoryLockManager } from '@helix-agents/store-memory';
const lockManager = new InMemoryLockManager();RedisLockManager
For production distributed deployments:
import { RedisLockManager } from '@helix-agents/store-redis';
const lockManager = new RedisLockManager({
host: 'localhost',
port: 6379,
keyPrefix: 'myapp:locks:',
});DurableObjectLockManager
For Cloudflare Workers:
import { DurableObjectLockManager } from '@helix-agents/store-cloudflare';
const lockManager = new DurableObjectLockManager(env.LOCK_DO);Using Lock Managers
Acquire and Release
Manual lock management:
const resource = `session:${sessionId}`;
const result = await lockManager.acquire(resource, {
ttlMs: 30000, // 30 second lease
});
if (result.acquired) {
try {
// Do work with the lock held
await executeAgent();
} finally {
await lockManager.release(result.lock);
}
} else {
console.log(`Lock held by ${result.heldBy}`);
}withLock Helper
Automatic acquisition, heartbeat, and release:
await lockManager.withLock(`session:${sessionId}`, { ttlMs: 30000, heartbeatMs: 10000 }, async (lock) => {
// Lock is automatically refreshed every 10 seconds
await executeAgent();
// Lock is released when function completes
});Fencing Tokens
Locks include monotonically increasing fencing tokens to prevent split-brain writes:
const result = await lockManager.acquire(resource, { ttlMs: 30000 });
if (result.acquired) {
const { fencingToken } = result.lock;
// Pass token to state store for validation
await stateStore.save(state, { fencingToken });
}Optimistic Concurrency Control
Even with locks, network partitions can cause issues. Version checking provides a second layer of protection.
How It Works
- State includes a
versionfield - On save, the store checks if version matches
- If mismatched, save fails with
StaleStateError
// Process A loads state at version 5
const state = await stateStore.loadState(sessionId);
console.log(state.version); // 5
// Process B loads same state, modifies, saves -> version 6
// Process A tries to save its changes
try {
await stateStore.save(state); // Expected version 5, but store has 6
} catch (error) {
if (error instanceof StaleStateError) {
// Our changes are stale - another process modified state
console.log('State was modified by another process');
}
}Handling StaleStateError
When you detect a stale state, stop gracefully:
import { StaleStateError, ExecutorSupersededError } from '@helix-agents/core';
try {
await executeStep();
} catch (error) {
if (error instanceof StaleStateError) {
// Another executor has this run - stop gracefully
throw new ExecutorSupersededError(sessionId);
}
throw error;
}Error Types
StaleStateError
Thrown when a save fails due to version mismatch:
interface StaleStateError {
runId: string;
expectedVersion: number;
actualVersion: number;
}ExecutorSupersededError
Thrown when an executor should stop because another took over:
interface ExecutorSupersededError {
runId: string;
}This is a graceful shutdown signal, not a failure. The other executor will continue the work.
FencingTokenMismatchError
Thrown when a fencing token doesn't match:
interface FencingTokenMismatchError {
runId: string;
expectedToken: number;
}Stream Events
executor_superseded
Emitted when an executor is superseded:
for await (const chunk of stream) {
if (chunk.type === 'executor_superseded') {
console.log('Another executor took over');
break;
}
}Compare-And-Set Status
For atomic status transitions, use compareAndSetStatus:
// Only update status if currently 'interrupted'
const success = await stateStore.compareAndSetStatus(
sessionId,
['interrupted'], // expected statuses (array)
'running' // new status
);
if (!success) {
// Status was changed by another process
console.log('Status transition failed - concurrent modification');
}This prevents race conditions when multiple processes try to resume.
Version Increment on CAS
When compareAndSetStatus() succeeds, it atomically increments the state version along with the status change. This is critical for closing a subtle race condition in optimistic concurrency control.
The Race Condition Problem
Without version increment on CAS, a race condition exists:
// Timeline without version increment (BUG):
// 1. Process A loads state: version=5, status='interrupted'
// 2. Process B loads state: version=5, status='interrupted'
// 3. Process B: CAS succeeds, status='running' (version stays 5!)
// 4. Process A: CAS fails (status mismatch - correct)
// 5. Process B crashes without saving
// 6. Process A retries, loads state: version=5, status='running'
// 7. Process A does CAS: 'running' -> 'running' (succeeds, version stays 5)
// 8. Process A saves with version=5 -> SUCCEEDS (but shouldn't!)The problem: If CAS doesn't increment version, a process that loaded state before another's CAS can still save successfully because versions match.
The Solution
CAS atomically increments version on success:
// Timeline with version increment (CORRECT):
// 1. Process A loads state: version=5, status='interrupted'
// 2. Process B loads state: version=5, status='interrupted'
// 3. Process B: CAS succeeds, status='running', version=6
// 4. Process A: CAS fails (status mismatch - correct)
// 5. Later, if Process A somehow tries to save with version=5:
// -> StaleStateError (store has version=6)Example
// Initial state: version = 5, status = 'interrupted'
// Process A: CAS succeeds
const success = await stateStore.compareAndSetStatus(sessionId, ['interrupted'], 'running');
// success = true, version is now 6
// Process B: CAS fails (status is now 'running')
const success2 = await stateStore.compareAndSetStatus(sessionId, ['interrupted'], 'running');
// success2 = false (status mismatch)
// After successful CAS, sync local state version to match store:
if (success) {
state.status = 'running';
state.version = (state.version ?? 0) + 1; // Now 6, matches store
}
// Later saves will use the correct version
await stateStore.save(state); // Works because version=6 matches storeImplementation Across Stores
All state stores implement atomic CAS with version increment:
| Store | Implementation |
|---|---|
| InMemoryStateStore | Direct increment: state.version = (state.version ?? 0) + 1 |
| RedisStateStore | Lua script with HINCRBY key 'version' 1 for atomicity |
| D1StateStore | SQL UPDATE ... SET version = version + 1 |
| DOStateStore | SQL transaction with version increment |
The Lua script in Redis ensures the check-and-increment is atomic, preventing any race window.
Runtime-Specific Coordination
JavaScript Runtime
The JS runtime relies on explicit lock managers:
const executor = new JSAgentExecutor(stateStore, streamManager, llmAdapter, {
lockManager: new RedisLockManager(redis),
});Temporal Runtime
Temporal provides coordination via workflow ID uniqueness:
- Each run has a unique workflow ID
- Temporal ensures only one workflow with that ID runs at a time
- Resume creates a new workflow that continues from checkpoint
// Temporal handles coordination automatically
const executor = new TemporalAgentExecutor(connection, {
taskQueue: 'agents',
});Cloudflare Runtime
Cloudflare Workflows use instance ID uniqueness:
- Each run has a unique instance ID
- Cloudflare ensures instance uniqueness within a Workflow
- Durable Objects provide additional locking if needed
const executor = new CloudflareAgentExecutor(ctx, env, {
lockDO: env.LOCK_DO, // Optional Durable Object for extra coordination
});Best Practices
1. Use Appropriate Lock Manager
| Deployment | Recommended Lock Manager |
|---|---|
| Single process | NoOpLockManager |
| Multiple Node.js processes | RedisLockManager |
| Cloudflare Workers | DurableObjectLockManager |
| Temporal | Built-in workflow uniqueness |
2. Handle Supersession Gracefully
When superseded, stop cleanly without failing:
try {
await runAgentLoop();
} catch (error) {
if (error instanceof ExecutorSupersededError) {
// Not a failure - another executor continues
console.log('Superseded, stopping gracefully');
return;
}
throw error;
}3. Set Appropriate Lock TTLs
Too short: Lock expires during long operations Too long: Long wait if process crashes
// Rule of thumb: 2-3x expected step duration
const lockManager = new RedisLockManager(redis);
await lockManager.withLock(
resource,
{
ttlMs: 60000, // 1 minute lease
heartbeatMs: 20000, // Refresh every 20 seconds
},
fn
);4. Monitor Lock Contention
Track lock acquisition failures:
const result = await lockManager.acquire(resource, options);
if (!result.acquired) {
metrics.increment('lock_contention');
console.log(`Lock held by ${result.heldBy}, expires at ${result.expiresAt}`);
}Next Steps
- Interrupt and Resume - Coordination for pause/resume
- Checkpoints - State snapshots in distributed systems
- Redis Store - Redis-based state and locking