Real-Time AI: Streaming, WebSockets, and Server-Sent Events for LLM Applications
How to build responsive AI applications using streaming, WebSockets, and SSE, with practical patterns for token streaming, agent status updates, and real-time collaboration.
Why Real-Time Matters for AI
LLM inference is slow compared to traditional APIs. A complex query to a frontier model can take 5-30 seconds for the full response. Without streaming, users stare at a loading spinner for the entire duration. With streaming, they see tokens appear in real-time, dramatically improving perceived performance and user experience.
But token streaming is just the beginning. Production AI systems need real-time updates for agent status, tool execution progress, error notifications, and multi-user collaboration.
Token Streaming: The Foundation
Server-Sent Events (SSE)
SSE is the most common pattern for LLM token streaming. It uses a standard HTTP connection with a special content type:
# FastAPI SSE endpoint
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import anthropic
app = FastAPI()
@app.post("/api/chat")
async def chat(request: ChatRequest):
async def generate():
client = anthropic.AsyncAnthropic()
async with client.messages.stream(
model="claude-sonnet-4-20250514",
messages=request.messages,
max_tokens=4096
) as stream:
async for event in stream:
if event.type == "content_block_delta":
yield f"data: {json.dumps({'text': event.delta.text})}\n\n"
# Send final message with usage stats
final = await stream.get_final_message()
yield f"data: {json.dumps({'done': True, 'usage': {'input': final.usage.input_tokens, 'output': final.usage.output_tokens}})}\n\n"
return StreamingResponse(
generate(),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}
)
Client-side consumption:
const response = await fetch('/api/chat', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ messages })
});
const reader = response.body!.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
const lines = chunk.split('\n').filter(l => l.startsWith('data: '));
for (const line of lines) {
const data = JSON.parse(line.slice(6));
if (data.text) appendToUI(data.text);
if (data.done) showUsageStats(data.usage);
}
}
SSE advantages: Simple, HTTP-based, works through most proxies and load balancers, automatic reconnection built into the EventSource API.
SSE limitations: Unidirectional (server to client only), limited to text data, connection limits per domain in browsers (6 in HTTP/1.1).
WebSockets
WebSockets provide full-duplex communication, essential for interactive agent sessions:
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
# FastAPI WebSocket for interactive agent
from fastapi import WebSocket
@app.websocket("/ws/agent")
async def agent_session(websocket: WebSocket):
await websocket.accept()
agent = create_agent(tools=available_tools)
while True:
user_message = await websocket.receive_json()
async for event in agent.run_stream(user_message["content"]):
match event.type:
case "thinking":
await websocket.send_json({
"type": "thinking",
"content": event.text
})
case "tool_call":
await websocket.send_json({
"type": "tool_call",
"tool": event.name,
"args": event.args,
"status": "executing"
})
case "tool_result":
await websocket.send_json({
"type": "tool_result",
"tool": event.name,
"result": event.result
})
case "text_delta":
await websocket.send_json({
"type": "text",
"content": event.text
})
WebSocket advantages: Bidirectional, low latency, supports binary data, client can send messages while receiving.
WebSocket limitations: More complex infrastructure (sticky sessions, WebSocket-aware load balancers), no automatic reconnection, connection management overhead.
Choosing the Right Protocol
| Use Case | Recommended Protocol |
|---|---|
| Simple chat with streaming | SSE |
| Interactive agent with tool use | WebSocket |
| Real-time collaboration | WebSocket |
| Notification/status updates | SSE |
| Voice/audio streaming | WebSocket |
| Webhook-style events | SSE |
Production Patterns
Structured Streaming Events
Do not just stream raw text. Define an event protocol:
type StreamEvent =
| { type: 'text_delta'; content: string }
| { type: 'tool_start'; tool: string; args: Record<string, unknown> }
| { type: 'tool_end'; tool: string; result: unknown; duration_ms: number }
| { type: 'thinking'; content: string }
| { type: 'error'; message: string; recoverable: boolean }
| { type: 'done'; usage: { input_tokens: number; output_tokens: number } };
This enables rich UI updates: show a spinner when a tool is executing, display thinking text in a collapsible panel, and show token usage when complete.
Backpressure Handling
If the client cannot consume tokens as fast as the model generates them (common on slow networks), implement backpressure:
- SSE: The TCP send buffer naturally provides backpressure, but set reasonable buffer limits
- WebSocket: Monitor the send buffer size and pause generation if it exceeds a threshold
Reconnection and State Recovery
Connections drop. Your protocol should handle it:
# Server-side: assign event IDs for recovery
event_id = 0
async for token in stream:
event_id += 1
yield f"id: {event_id}\ndata: {json.dumps({'text': token})}\n\n"
# Client-side: reconnect with Last-Event-ID
const eventSource = new EventSource('/stream', {
headers: { 'Last-Event-ID': lastReceivedId }
});
Infrastructure Considerations
- Reverse proxies: Nginx requires
proxy_buffering offandproxy_read_timeoutsettings for SSE. Useproxy_http_version 1.1andUpgradeheaders for WebSocket - Load balancers: WebSocket requires sticky sessions or connection-aware routing. SSE works with standard HTTP load balancing
- CDNs: Most CDNs do not support SSE/WebSocket. Route real-time traffic directly to origin
- Kubernetes: Use
sessionAffinity: ClientIPfor WebSocket services; increaseproxy-read-timeoutannotations for SSE
Streaming is not just a UX nicety -- it is a fundamental requirement for AI applications. The difference between a 10-second loading spinner and seeing tokens appear immediately is the difference between an application users tolerate and one they enjoy.
Sources: MDN Server-Sent Events | FastAPI WebSocket Docs | Vercel AI SDK Streaming
NYC News
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.