LLM Streaming Responses in Production ā SSE and WebSockets
Implement LLM streaming in production with Server-Sent Events and WebSockets. Real code for FastAPI, Next.js, token buffering, error handling, and Kubernetes deployment.
Streaming LLM responses transforms UX ā users see text appear token by token instead of waiting 10 seconds for a full response. But production streaming has sharp edges: connection drops, partial tokens, load balancer timeouts, and backpressure.
SSE vs WebSockets for LLM Streaming
| SSE | WebSockets | |
|---|---|---|
| Direction | Server ā Client only | Bidirectional |
| Protocol | HTTP/1.1+ | WS upgrade |
| Reconnect | Built-in auto-reconnect | Manual |
| Load balancer support | Easy (sticky not needed) | Needs WS support |
| Best for | Chat, completions | Real-time interactive AI |
| Complexity | Low | Medium |
Use SSE for most LLM streaming. WebSockets only when you need to send mid-stream interrupts or true bidirectional chat.
Backend: FastAPI + SSE
# main.py
import anthropic
import asyncio
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
app = FastAPI()
client = anthropic.Anthropic()
class ChatRequest(BaseModel):
message: str
model: str = "claude-sonnet-4-6"
async def generate_stream(message: str, model: str):
"""Async generator yielding SSE-formatted chunks"""
with client.messages.stream(
model=model,
max_tokens=2048,
messages=[{"role": "user", "content": message}],
) as stream:
for text in stream.text_stream:
# SSE format: "data: <payload>\n\n"
yield f"data: {text}\n\n"
# Signal stream completion
yield "data: [DONE]\n\n"
@app.post("/stream")
async def stream_chat(req: ChatRequest):
return StreamingResponse(
generate_stream(req.message, req.model),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no", # disable nginx buffering
"Access-Control-Allow-Origin": "*",
},
)Frontend: Next.js SSE Consumer
// hooks/useStreamChat.ts
import { useState, useCallback } from "react";
export function useStreamChat() {
const [output, setOutput] = useState("");
const [loading, setLoading] = useState(false);
const sendMessage = useCallback(async (message: string) => {
setOutput("");
setLoading(true);
const response = await fetch("/api/stream", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ message }),
});
if (!response.body) return;
const reader = response.body.getReader();
const decoder = new TextDecoder();
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value, { stream: true });
const lines = chunk.split("\n");
for (const line of lines) {
if (line.startsWith("data: ")) {
const data = line.slice(6);
if (data === "[DONE]") {
setLoading(false);
return;
}
setOutput((prev) => prev + data);
}
}
}
} finally {
setLoading(false);
reader.releaseLock();
}
}, []);
return { output, loading, sendMessage };
}// components/StreamChat.tsx
"use client";
import { useStreamChat } from "@/hooks/useStreamChat";
import { useState } from "react";
export default function StreamChat() {
const [input, setInput] = useState("");
const { output, loading, sendMessage } = useStreamChat();
return (
<div className="p-4">
<div className="min-h-32 p-4 bg-gray-900 rounded font-mono text-sm whitespace-pre-wrap">
{output}
{loading && <span className="animate-pulse">ā</span>}
</div>
<div className="flex gap-2 mt-4">
<input
value={input}
onChange={(e) => setInput(e.target.value)}
className="flex-1 p-2 bg-gray-800 rounded"
placeholder="Ask anything..."
onKeyDown={(e) => e.key === "Enter" && sendMessage(input)}
/>
<button
onClick={() => sendMessage(input)}
disabled={loading}
className="px-4 py-2 bg-blue-600 rounded disabled:opacity-50"
>
Send
</button>
</div>
</div>
);
}WebSockets Implementation
# websocket_server.py
from fastapi import WebSocket, WebSocketDisconnect
import anthropic
import json
client = anthropic.Anthropic()
@app.websocket("/ws/chat")
async def websocket_chat(websocket: WebSocket):
await websocket.accept()
conversation_history = []
try:
while True:
# Receive user message
data = await websocket.receive_json()
user_message = data["message"]
conversation_history.append({
"role": "user",
"content": user_message
})
# Stream response
full_response = ""
with client.messages.stream(
model="claude-sonnet-4-6",
max_tokens=2048,
messages=conversation_history,
) as stream:
for text in stream.text_stream:
await websocket.send_json({
"type": "token",
"content": text
})
full_response += text
# Send completion signal
await websocket.send_json({"type": "done"})
# Add to conversation history
conversation_history.append({
"role": "assistant",
"content": full_response
})
except WebSocketDisconnect:
passProduction Issues and Fixes
1. Nginx buffering kills SSE
location /stream {
proxy_pass http://backend;
proxy_buffering off;
proxy_cache off;
proxy_set_header Connection '';
proxy_http_version 1.1;
chunked_transfer_encoding on;
}2. AWS ALB 60-second timeout
SSE connections drop after 60s on ALB by default.
resource "aws_lb_target_group" "api" {
# ...
stickiness {
enabled = false
}
# Increase idle timeout
}
resource "aws_lb" "api" {
# ...
idle_timeout = 3600 # 1 hour
}3. Kubernetes ingress-nginx buffering
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
annotations:
nginx.ingress.kubernetes.io/proxy-buffering: "off"
nginx.ingress.kubernetes.io/proxy-read-timeout: "3600"
nginx.ingress.kubernetes.io/proxy-send-timeout: "3600"4. Connection error recovery (frontend)
function createEventSource(url: string, onMessage: (data: string) => void) {
const es = new EventSource(url);
es.onmessage = (e) => {
if (e.data === "[DONE]") {
es.close();
return;
}
onMessage(e.data);
};
es.onerror = () => {
es.close();
// Reconnect with exponential backoff
setTimeout(() => createEventSource(url, onMessage), 2000);
};
return es;
}Token Buffering for Markdown Rendering
Raw token streaming breaks markdown mid-render. Buffer to complete words:
async def buffered_stream(message: str):
buffer = ""
with client.messages.stream(
model="claude-sonnet-4-6",
max_tokens=2048,
messages=[{"role": "user", "content": message}],
) as stream:
for text in stream.text_stream:
buffer += text
# Flush on word boundary
if text.endswith((" ", "\n", ".", ",", "!")):
yield f"data: {buffer}\n\n"
buffer = ""
if buffer:
yield f"data: {buffer}\n\n"
yield "data: [DONE]\n\n"Observability
Track streaming metrics with OpenTelemetry:
from opentelemetry import trace, metrics
tracer = trace.get_tracer(__name__)
meter = metrics.get_meter(__name__)
stream_duration = meter.create_histogram("llm_stream_duration_ms")
tokens_streamed = meter.create_counter("llm_tokens_streamed_total")
async def traced_stream(message: str):
with tracer.start_as_current_span("llm_stream") as span:
span.set_attribute("llm.model", "claude-sonnet-4-6")
token_count = 0
start = time.time()
async for chunk in generate_stream(message, "claude-sonnet-4-6"):
token_count += 1
yield chunk
stream_duration.record(
(time.time() - start) * 1000,
{"model": "claude-sonnet-4-6"}
)
tokens_streamed.add(token_count)Streaming is the baseline expectation for production LLM apps. SSE covers 90% of use cases ā add WebSockets only when you need mid-stream user interrupts or bidirectional real-time interaction.
Today I Fixed
Short real fixes from production ā posted daily
Stay ahead of the curve
Get the latest DevOps, Kubernetes, AWS, and AI/ML guides delivered straight to your inbox. No spam ā just practical engineering content.
Related Articles
AI-Powered Kubernetes Anomaly Detection: Beyond Static Thresholds
Static alerts miss 40% of real incidents. Learn how AI and ML-based anomaly detection ā using tools like Prometheus + ML, Dynatrace, and custom LLM runbooks ā catches what thresholds can't.
Build an AI Kubernetes Runbook Generator with LLMs (2026)
Manual runbooks go stale. Build a system that watches your Kubernetes cluster, detects incidents, and generates step-by-step runbooks automatically using LLMs. Full implementation with Python, kubectl, and Ollama.
Build an AI-Powered SLO Breach Predictor with Claude and Prometheus
Build an SLO breach predictor that reads error budget burn rate from Prometheus, uses Claude to analyze patterns, and sends Slack alerts before SLOs breach ā not after.