Resumable Streams with Next.js
This example demonstrates resumable streams with Helix Agents using Next.js App Router, Redis persistence, and the JS runtime executor.
Runnable Example: The complete source code is available in the monorepo at
examples/resumable-streams-nextjs/.
Deployment Mode: Direct (in-process stores). For edge deployment with Durable Objects, see OpenNext + Cloudflare DO.
What This Demonstrates
- Server-Side Rendering (SSR) - Messages are server-rendered for immediate display
- Stream Resumption - Page refresh mid-stream resumes from where it left off
- Snapshot Pattern - The "sequence last" pattern ensures no events are lost
- Status-Based UI - Stream status (
active,ended,failed) drives UI behavior - Crash Recovery - Automatic recovery using
useAutoResyncwhen the server crashes and recovers
Architecture
graph TB
subgraph Server ["Server Component (RSC)"]
S1["1. Fetch snapshot from handler.getSnapshot()"]
S2["2. Render initial UI with snapshot.messages"]
S3["3. Pass snapshot as props to client component"]
end
Server -->|"Props: { initialSnapshot }"| Client
subgraph Client ["Client Component ('use client')"]
C1["1. Hydrate with initialSnapshot.messages (no flash)"]
C2["2. If status === 'active': connect SSE from streamSequence"]
C3["3. If status === 'ended': no SSE connection needed"]
C4["4. New events merge into messages state"]
endProject Structure
src/
├── app/
│ ├── layout.tsx # Root layout
│ ├── page.tsx # Home page
│ ├── chat/
│ │ └── [sessionId]/
│ │ ├── page.tsx # Server component - SSR
│ │ └── ChatClient.tsx # Client component - stream
│ └── api/
│ └── chat/
│ └── [sessionId]/
│ ├── route.ts # POST/GET handlers
│ └── snapshot/
│ └── route.ts # Snapshot endpoint
├── lib/
│ ├── agent.ts # Agent definition
│ └── handler.ts # FrontendHandler setup
└── components/ # Reusable componentsKey Implementation Details
Server-Side Rendering with Snapshots
The server component loads the snapshot directly:
// app/chat/[sessionId]/page.tsx (Server Component)
import { handler } from '@/lib/agent-handler';
import { ChatClient } from './ChatClient';
import { notFound } from 'next/navigation';
export default async function ChatPage({ params }: { params: { sessionId: string } }) {
// Server-side: fetch snapshot directly (no API call needed)
const snapshot = await handler.getSnapshot(params.sessionId);
if (!snapshot) {
notFound();
}
// Server renders with messages, hydrates on client
return <ChatClient sessionId={params.sessionId} initialSnapshot={snapshot} />;
}Client-Side Stream Resumption
The client component uses useChat with initialMessages and useAutoResync for crash recovery:
// app/chat/[sessionId]/ChatClient.tsx
'use client';
import { useChat } from '@ai-sdk/react';
import { useAutoResync } from '@helix-agents/ai-sdk/react';
import type { FrontendSnapshot } from '@helix-agents/ai-sdk';
interface Props {
sessionId: string;
initialSnapshot: FrontendSnapshot<MyState>;
}
export function ChatClient({ sessionId, initialSnapshot }: Props) {
const { messages, setMessages, data } = useChat({
id: `chat-${sessionId}`,
api: `/api/chat/${sessionId}`,
initialMessages: initialSnapshot.messages,
resume: initialSnapshot.status === 'active',
});
// Automatic crash recovery using helix hooks
useAutoResync(data, {
snapshotUrl: `/api/chat/${sessionId}/snapshot`,
setMessages,
onResync: (event) => {
console.log('Recovered from crash:', event.data.reason);
},
});
return (
<div>
{messages.map(m => (
<div key={m.id}>{m.content}</div>
))}
</div>
);
}FrontendHandler Setup with Redis
// src/lib/handler.ts
import { createFrontendHandler } from '@helix-agents/ai-sdk';
import { JSAgentExecutor } from '@helix-agents/runtime-js';
import { RedisStateStore, RedisStreamManager } from '@helix-agents/store-redis';
import { VercelAIAdapter } from '@helix-agents/llm-vercel';
import { ResearchAgent } from './agent';
const redisUrl = process.env.REDIS_URL!;
const stateStore = new RedisStateStore({ url: redisUrl });
const streamManager = new RedisStreamManager({ url: redisUrl });
const llmAdapter = new VercelAIAdapter();
const executor = new JSAgentExecutor(stateStore, streamManager, llmAdapter);
export const handler = createFrontendHandler({
streamManager,
executor,
agent: ResearchAgent,
stateStore,
});How It Works
The "Sequence Last" Pattern
The snapshot endpoint implements a specific ordering to ensure no events are lost:
- Load state FIRST (messages, agent state)
- Get stream sequence LAST
This ensures that any events that occur between loading state and getting the sequence are captured. The client resumes from the sequence, potentially receiving some duplicates that are handled by deduplication.
Content Replay
By default, when the client resumes a stream mid-message, the server replays partial content as stream events instead of including it in initialMessages. This prevents duplicate text that would otherwise occur because the AI SDK creates new text blocks on text-start events.
With content replay enabled (default):
getSnapshot()excludes partial content from messages- On stream resume, replay events are emitted first (text-start, text-delta with partial content)
- Live events continue seamlessly (more text-delta, text-end, etc.)
The client receives a linear stream without duplicates.
See Content Replay for configuration options.
Stream Status
The status field determines client behavior:
| Status | Meaning | Client Action |
|---|---|---|
active | Stream is running | Set resume: true in useChat |
paused | Stream is paused | May resume when execution continues |
ended | Stream completed successfully | No SSE connection needed |
failed | Stream failed | Handle error state |
Crash Recovery with Helix Hooks
When the server crashes mid-stream and recovers from a checkpoint, it sends a stream_resync event. The client handles this automatically:
- Agent is running, client connected via SSE
- Server crashes (process killed, restart, etc.)
- Server recovers from last checkpoint
- Server sends
stream_resyncevent with checkpoint info useAutoResyncdetects this event and fetches the snapshot- Messages are updated to match checkpoint state
- Stream continues from checkpoint
Available Recovery Hooks
| Hook | Purpose |
|---|---|
useAutoResync | Turnkey: automatically fetches snapshot and updates messages on resync |
useStreamResync | Low-level: calls your callback when resync events are detected |
useResyncState | UI state: tracks whether resync occurred for displaying indicators |
useCheckpointSnapshot | Load a specific checkpoint's snapshot |
useResumableChat | All-in-one: combines snapshot loading with auto-resync |
Testing Stream Resumption
- Start a new chat and ask a question
- While the agent is "thinking" (after a tool call starts), refresh the page
- Observe that:
- Messages are immediately visible (SSR)
- The stream continues from where it left off
- No events are lost
Quick Start
Prerequisites
- Node.js 18+
- Docker (for Redis)
- OpenAI API key
Setup
# Start Redis
docker-compose up -d
# Install dependencies
npm install
# Configure environment
cp .env.example .env
# Edit .env and add your OpenAI API key
# Run the development server
npm run dev
# Open http://localhost:3000Related Documentation
- AI SDK Integration - Full frontend integration guide
- Recovery Hooks - All recovery hook APIs
- Streaming Guide - Stream chunk types and patterns
- Interrupt & Resume - Crash recovery concepts