Skip to content

Resumable Streams with Next.js

This example demonstrates resumable streams with Helix Agents using Next.js App Router, Redis persistence, and the JS runtime executor.

Runnable Example: The complete source code is available in the monorepo at examples/resumable-streams-nextjs/.

Deployment Mode: Direct (in-process stores). For edge deployment with Durable Objects, see OpenNext + Cloudflare DO.

What This Demonstrates

  1. Server-Side Rendering (SSR) - Messages are server-rendered for immediate display
  2. Stream Resumption - Page refresh mid-stream resumes from where it left off
  3. Snapshot Pattern - The "sequence last" pattern ensures no events are lost
  4. Status-Based UI - Stream status (active, ended, failed) drives UI behavior
  5. Crash Recovery - Automatic recovery using useAutoResync when the server crashes and recovers

Architecture

mermaid
graph TB
    subgraph Server ["Server Component (RSC)"]
        S1["1. Fetch snapshot from handler.getSnapshot()"]
        S2["2. Render initial UI with snapshot.messages"]
        S3["3. Pass snapshot as props to client component"]
    end

    Server -->|"Props: { initialSnapshot }"| Client

    subgraph Client ["Client Component ('use client')"]
        C1["1. Hydrate with initialSnapshot.messages (no flash)"]
        C2["2. If status === 'active': connect SSE from streamSequence"]
        C3["3. If status === 'ended': no SSE connection needed"]
        C4["4. New events merge into messages state"]
    end

Project Structure

src/
├── app/
│   ├── layout.tsx                     # Root layout
│   ├── page.tsx                       # Home page
│   ├── chat/
│   │   └── [sessionId]/
│   │       ├── page.tsx               # Server component - SSR
│   │       └── ChatClient.tsx         # Client component - stream
│   └── api/
│       └── chat/
│           └── [sessionId]/
│               ├── route.ts           # POST/GET handlers
│               └── snapshot/
│                   └── route.ts       # Snapshot endpoint
├── lib/
│   ├── agent.ts                       # Agent definition
│   └── handler.ts                     # FrontendHandler setup
└── components/                        # Reusable components

Key Implementation Details

Server-Side Rendering with Snapshots

The server component loads the snapshot directly:

typescript
// app/chat/[sessionId]/page.tsx (Server Component)
import { handler } from '@/lib/agent-handler';
import { ChatClient } from './ChatClient';
import { notFound } from 'next/navigation';

export default async function ChatPage({ params }: { params: { sessionId: string } }) {
  // Server-side: fetch snapshot directly (no API call needed)
  const snapshot = await handler.getSnapshot(params.sessionId);

  if (!snapshot) {
    notFound();
  }

  // Server renders with messages, hydrates on client
  return <ChatClient sessionId={params.sessionId} initialSnapshot={snapshot} />;
}

Client-Side Stream Resumption

The client component uses useChat with initialMessages and useAutoResync for crash recovery:

typescript
// app/chat/[sessionId]/ChatClient.tsx
'use client';

import { useChat } from '@ai-sdk/react';
import { useAutoResync } from '@helix-agents/ai-sdk/react';
import type { FrontendSnapshot } from '@helix-agents/ai-sdk';

interface Props {
  sessionId: string;
  initialSnapshot: FrontendSnapshot<MyState>;
}

export function ChatClient({ sessionId, initialSnapshot }: Props) {
  const { messages, setMessages, data } = useChat({
    id: `chat-${sessionId}`,
    api: `/api/chat/${sessionId}`,
    initialMessages: initialSnapshot.messages,
    resume: initialSnapshot.status === 'active',
  });

  // Automatic crash recovery using helix hooks
  useAutoResync(data, {
    snapshotUrl: `/api/chat/${sessionId}/snapshot`,
    setMessages,
    onResync: (event) => {
      console.log('Recovered from crash:', event.data.reason);
    },
  });

  return (
    <div>
      {messages.map(m => (
        <div key={m.id}>{m.content}</div>
      ))}
    </div>
  );
}

FrontendHandler Setup with Redis

typescript
// src/lib/handler.ts
import { createFrontendHandler } from '@helix-agents/ai-sdk';
import { JSAgentExecutor } from '@helix-agents/runtime-js';
import { RedisStateStore, RedisStreamManager } from '@helix-agents/store-redis';
import { VercelAIAdapter } from '@helix-agents/llm-vercel';
import { ResearchAgent } from './agent';

const redisUrl = process.env.REDIS_URL!;

const stateStore = new RedisStateStore({ url: redisUrl });
const streamManager = new RedisStreamManager({ url: redisUrl });
const llmAdapter = new VercelAIAdapter();

const executor = new JSAgentExecutor(stateStore, streamManager, llmAdapter);

export const handler = createFrontendHandler({
  streamManager,
  executor,
  agent: ResearchAgent,
  stateStore,
});

How It Works

The "Sequence Last" Pattern

The snapshot endpoint implements a specific ordering to ensure no events are lost:

  1. Load state FIRST (messages, agent state)
  2. Get stream sequence LAST

This ensures that any events that occur between loading state and getting the sequence are captured. The client resumes from the sequence, potentially receiving some duplicates that are handled by deduplication.

Content Replay

By default, when the client resumes a stream mid-message, the server replays partial content as stream events instead of including it in initialMessages. This prevents duplicate text that would otherwise occur because the AI SDK creates new text blocks on text-start events.

With content replay enabled (default):

  1. getSnapshot() excludes partial content from messages
  2. On stream resume, replay events are emitted first (text-start, text-delta with partial content)
  3. Live events continue seamlessly (more text-delta, text-end, etc.)

The client receives a linear stream without duplicates.

See Content Replay for configuration options.

Stream Status

The status field determines client behavior:

StatusMeaningClient Action
activeStream is runningSet resume: true in useChat
pausedStream is pausedMay resume when execution continues
endedStream completed successfullyNo SSE connection needed
failedStream failedHandle error state

Crash Recovery with Helix Hooks

When the server crashes mid-stream and recovers from a checkpoint, it sends a stream_resync event. The client handles this automatically:

  1. Agent is running, client connected via SSE
  2. Server crashes (process killed, restart, etc.)
  3. Server recovers from last checkpoint
  4. Server sends stream_resync event with checkpoint info
  5. useAutoResync detects this event and fetches the snapshot
  6. Messages are updated to match checkpoint state
  7. Stream continues from checkpoint

Available Recovery Hooks

HookPurpose
useAutoResyncTurnkey: automatically fetches snapshot and updates messages on resync
useStreamResyncLow-level: calls your callback when resync events are detected
useResyncStateUI state: tracks whether resync occurred for displaying indicators
useCheckpointSnapshotLoad a specific checkpoint's snapshot
useResumableChatAll-in-one: combines snapshot loading with auto-resync

Testing Stream Resumption

  1. Start a new chat and ask a question
  2. While the agent is "thinking" (after a tool call starts), refresh the page
  3. Observe that:
    • Messages are immediately visible (SSR)
    • The stream continues from where it left off
    • No events are lost

Quick Start

Prerequisites

  • Node.js 18+
  • Docker (for Redis)
  • OpenAI API key

Setup

bash
# Start Redis
docker-compose up -d

# Install dependencies
npm install

# Configure environment
cp .env.example .env
# Edit .env and add your OpenAI API key

# Run the development server
npm run dev

# Open http://localhost:3000

Released under the MIT License.