Skip to content
Technology5 min read1 views

Real-Time AI: Streaming, WebSockets, and Server-Sent Events for LLM Applications

How to build responsive AI applications using streaming, WebSockets, and SSE, with practical patterns for token streaming, agent status updates, and real-time collaboration.

Why Real-Time Matters for AI

LLM inference is slow compared to traditional APIs. A complex query to a frontier model can take 5-30 seconds for the full response. Without streaming, users stare at a loading spinner for the entire duration. With streaming, they see tokens appear in real-time, dramatically improving perceived performance and user experience.

But token streaming is just the beginning. Production AI systems need real-time updates for agent status, tool execution progress, error notifications, and multi-user collaboration.

Token Streaming: The Foundation

Server-Sent Events (SSE)

SSE is the most common pattern for LLM token streaming. It uses a standard HTTP connection with a special content type:

# FastAPI SSE endpoint
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import anthropic

app = FastAPI()

@app.post("/api/chat")
async def chat(request: ChatRequest):
    async def generate():
        client = anthropic.AsyncAnthropic()
        async with client.messages.stream(
            model="claude-sonnet-4-20250514",
            messages=request.messages,
            max_tokens=4096
        ) as stream:
            async for event in stream:
                if event.type == "content_block_delta":
                    yield f"data: {json.dumps({'text': event.delta.text})}\n\n"

            # Send final message with usage stats
            final = await stream.get_final_message()
            yield f"data: {json.dumps({'done': True, 'usage': {'input': final.usage.input_tokens, 'output': final.usage.output_tokens}})}\n\n"

    return StreamingResponse(
        generate(),
        media_type="text/event-stream",
        headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}
    )

Client-side consumption:

const response = await fetch('/api/chat', {
  method: 'POST',
  headers: { 'Content-Type': 'application/json' },
  body: JSON.stringify({ messages })
});

const reader = response.body!.getReader();
const decoder = new TextDecoder();

while (true) {
  const { done, value } = await reader.read();
  if (done) break;

  const chunk = decoder.decode(value);
  const lines = chunk.split('\n').filter(l => l.startsWith('data: '));

  for (const line of lines) {
    const data = JSON.parse(line.slice(6));
    if (data.text) appendToUI(data.text);
    if (data.done) showUsageStats(data.usage);
  }
}

SSE advantages: Simple, HTTP-based, works through most proxies and load balancers, automatic reconnection built into the EventSource API.

SSE limitations: Unidirectional (server to client only), limited to text data, connection limits per domain in browsers (6 in HTTP/1.1).

WebSockets

WebSockets provide full-duplex communication, essential for interactive agent sessions:

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

# FastAPI WebSocket for interactive agent
from fastapi import WebSocket

@app.websocket("/ws/agent")
async def agent_session(websocket: WebSocket):
    await websocket.accept()
    agent = create_agent(tools=available_tools)

    while True:
        user_message = await websocket.receive_json()

        async for event in agent.run_stream(user_message["content"]):
            match event.type:
                case "thinking":
                    await websocket.send_json({
                        "type": "thinking",
                        "content": event.text
                    })
                case "tool_call":
                    await websocket.send_json({
                        "type": "tool_call",
                        "tool": event.name,
                        "args": event.args,
                        "status": "executing"
                    })
                case "tool_result":
                    await websocket.send_json({
                        "type": "tool_result",
                        "tool": event.name,
                        "result": event.result
                    })
                case "text_delta":
                    await websocket.send_json({
                        "type": "text",
                        "content": event.text
                    })

WebSocket advantages: Bidirectional, low latency, supports binary data, client can send messages while receiving.

WebSocket limitations: More complex infrastructure (sticky sessions, WebSocket-aware load balancers), no automatic reconnection, connection management overhead.

Choosing the Right Protocol

Use Case Recommended Protocol
Simple chat with streaming SSE
Interactive agent with tool use WebSocket
Real-time collaboration WebSocket
Notification/status updates SSE
Voice/audio streaming WebSocket
Webhook-style events SSE

Production Patterns

Structured Streaming Events

Do not just stream raw text. Define an event protocol:

type StreamEvent =
  | { type: 'text_delta'; content: string }
  | { type: 'tool_start'; tool: string; args: Record<string, unknown> }
  | { type: 'tool_end'; tool: string; result: unknown; duration_ms: number }
  | { type: 'thinking'; content: string }
  | { type: 'error'; message: string; recoverable: boolean }
  | { type: 'done'; usage: { input_tokens: number; output_tokens: number } };

This enables rich UI updates: show a spinner when a tool is executing, display thinking text in a collapsible panel, and show token usage when complete.

Backpressure Handling

If the client cannot consume tokens as fast as the model generates them (common on slow networks), implement backpressure:

  • SSE: The TCP send buffer naturally provides backpressure, but set reasonable buffer limits
  • WebSocket: Monitor the send buffer size and pause generation if it exceeds a threshold

Reconnection and State Recovery

Connections drop. Your protocol should handle it:

# Server-side: assign event IDs for recovery
event_id = 0
async for token in stream:
    event_id += 1
    yield f"id: {event_id}\ndata: {json.dumps({'text': token})}\n\n"

# Client-side: reconnect with Last-Event-ID
const eventSource = new EventSource('/stream', {
    headers: { 'Last-Event-ID': lastReceivedId }
});

Infrastructure Considerations

  • Reverse proxies: Nginx requires proxy_buffering off and proxy_read_timeout settings for SSE. Use proxy_http_version 1.1 and Upgrade headers for WebSocket
  • Load balancers: WebSocket requires sticky sessions or connection-aware routing. SSE works with standard HTTP load balancing
  • CDNs: Most CDNs do not support SSE/WebSocket. Route real-time traffic directly to origin
  • Kubernetes: Use sessionAffinity: ClientIP for WebSocket services; increase proxy-read-timeout annotations for SSE

Streaming is not just a UX nicety -- it is a fundamental requirement for AI applications. The difference between a 10-second loading spinner and seeing tokens appear immediately is the difference between an application users tolerate and one they enjoy.

Sources: MDN Server-Sent Events | FastAPI WebSocket Docs | Vercel AI SDK Streaming

Share this article
N

NYC News

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.