šŸŽ‰ DevOps Interview Prep Bundle is live — 1000+ Q&A across 20 topicsGet it →
All Articles

LLM Streaming Responses in Production — SSE and WebSockets

Implement LLM streaming in production with Server-Sent Events and WebSockets. Real code for FastAPI, Next.js, token buffering, error handling, and Kubernetes deployment.

DevOpsBoysJun 5, 20264 min read
Share:Tweet

Streaming LLM responses transforms UX — users see text appear token by token instead of waiting 10 seconds for a full response. But production streaming has sharp edges: connection drops, partial tokens, load balancer timeouts, and backpressure.


SSE vs WebSockets for LLM Streaming

SSEWebSockets
DirectionServer → Client onlyBidirectional
ProtocolHTTP/1.1+WS upgrade
ReconnectBuilt-in auto-reconnectManual
Load balancer supportEasy (sticky not needed)Needs WS support
Best forChat, completionsReal-time interactive AI
ComplexityLowMedium

Use SSE for most LLM streaming. WebSockets only when you need to send mid-stream interrupts or true bidirectional chat.


Backend: FastAPI + SSE

python
# main.py
import anthropic
import asyncio
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
 
app = FastAPI()
client = anthropic.Anthropic()
 
 
class ChatRequest(BaseModel):
    message: str
    model: str = "claude-sonnet-4-6"
 
 
async def generate_stream(message: str, model: str):
    """Async generator yielding SSE-formatted chunks"""
    with client.messages.stream(
        model=model,
        max_tokens=2048,
        messages=[{"role": "user", "content": message}],
    ) as stream:
        for text in stream.text_stream:
            # SSE format: "data: <payload>\n\n"
            yield f"data: {text}\n\n"
 
    # Signal stream completion
    yield "data: [DONE]\n\n"
 
 
@app.post("/stream")
async def stream_chat(req: ChatRequest):
    return StreamingResponse(
        generate_stream(req.message, req.model),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "X-Accel-Buffering": "no",      # disable nginx buffering
            "Access-Control-Allow-Origin": "*",
        },
    )

Frontend: Next.js SSE Consumer

typescript
// hooks/useStreamChat.ts
import { useState, useCallback } from "react";
 
export function useStreamChat() {
  const [output, setOutput] = useState("");
  const [loading, setLoading] = useState(false);
 
  const sendMessage = useCallback(async (message: string) => {
    setOutput("");
    setLoading(true);
 
    const response = await fetch("/api/stream", {
      method: "POST",
      headers: { "Content-Type": "application/json" },
      body: JSON.stringify({ message }),
    });
 
    if (!response.body) return;
 
    const reader = response.body.getReader();
    const decoder = new TextDecoder();
 
    try {
      while (true) {
        const { done, value } = await reader.read();
        if (done) break;
 
        const chunk = decoder.decode(value, { stream: true });
        const lines = chunk.split("\n");
 
        for (const line of lines) {
          if (line.startsWith("data: ")) {
            const data = line.slice(6);
            if (data === "[DONE]") {
              setLoading(false);
              return;
            }
            setOutput((prev) => prev + data);
          }
        }
      }
    } finally {
      setLoading(false);
      reader.releaseLock();
    }
  }, []);
 
  return { output, loading, sendMessage };
}
tsx
// components/StreamChat.tsx
"use client";
import { useStreamChat } from "@/hooks/useStreamChat";
import { useState } from "react";
 
export default function StreamChat() {
  const [input, setInput] = useState("");
  const { output, loading, sendMessage } = useStreamChat();
 
  return (
    <div className="p-4">
      <div className="min-h-32 p-4 bg-gray-900 rounded font-mono text-sm whitespace-pre-wrap">
        {output}
        {loading && <span className="animate-pulse">ā–Œ</span>}
      </div>
      <div className="flex gap-2 mt-4">
        <input
          value={input}
          onChange={(e) => setInput(e.target.value)}
          className="flex-1 p-2 bg-gray-800 rounded"
          placeholder="Ask anything..."
          onKeyDown={(e) => e.key === "Enter" && sendMessage(input)}
        />
        <button
          onClick={() => sendMessage(input)}
          disabled={loading}
          className="px-4 py-2 bg-blue-600 rounded disabled:opacity-50"
        >
          Send
        </button>
      </div>
    </div>
  );
}

WebSockets Implementation

python
# websocket_server.py
from fastapi import WebSocket, WebSocketDisconnect
import anthropic
import json
 
client = anthropic.Anthropic()
 
 
@app.websocket("/ws/chat")
async def websocket_chat(websocket: WebSocket):
    await websocket.accept()
    conversation_history = []
 
    try:
        while True:
            # Receive user message
            data = await websocket.receive_json()
            user_message = data["message"]
 
            conversation_history.append({
                "role": "user",
                "content": user_message
            })
 
            # Stream response
            full_response = ""
            with client.messages.stream(
                model="claude-sonnet-4-6",
                max_tokens=2048,
                messages=conversation_history,
            ) as stream:
                for text in stream.text_stream:
                    await websocket.send_json({
                        "type": "token",
                        "content": text
                    })
                    full_response += text
 
            # Send completion signal
            await websocket.send_json({"type": "done"})
 
            # Add to conversation history
            conversation_history.append({
                "role": "assistant",
                "content": full_response
            })
 
    except WebSocketDisconnect:
        pass

Production Issues and Fixes

1. Nginx buffering kills SSE

nginx
location /stream {
    proxy_pass http://backend;
    proxy_buffering off;
    proxy_cache off;
    proxy_set_header Connection '';
    proxy_http_version 1.1;
    chunked_transfer_encoding on;
}

2. AWS ALB 60-second timeout

SSE connections drop after 60s on ALB by default.

terraform
resource "aws_lb_target_group" "api" {
  # ...
  stickiness {
    enabled = false
  }
  # Increase idle timeout
}
 
resource "aws_lb" "api" {
  # ...
  idle_timeout = 3600  # 1 hour
}

3. Kubernetes ingress-nginx buffering

yaml
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
  annotations:
    nginx.ingress.kubernetes.io/proxy-buffering: "off"
    nginx.ingress.kubernetes.io/proxy-read-timeout: "3600"
    nginx.ingress.kubernetes.io/proxy-send-timeout: "3600"

4. Connection error recovery (frontend)

typescript
function createEventSource(url: string, onMessage: (data: string) => void) {
  const es = new EventSource(url);
 
  es.onmessage = (e) => {
    if (e.data === "[DONE]") {
      es.close();
      return;
    }
    onMessage(e.data);
  };
 
  es.onerror = () => {
    es.close();
    // Reconnect with exponential backoff
    setTimeout(() => createEventSource(url, onMessage), 2000);
  };
 
  return es;
}

Token Buffering for Markdown Rendering

Raw token streaming breaks markdown mid-render. Buffer to complete words:

python
async def buffered_stream(message: str):
    buffer = ""
    with client.messages.stream(
        model="claude-sonnet-4-6",
        max_tokens=2048,
        messages=[{"role": "user", "content": message}],
    ) as stream:
        for text in stream.text_stream:
            buffer += text
            # Flush on word boundary
            if text.endswith((" ", "\n", ".", ",", "!")):
                yield f"data: {buffer}\n\n"
                buffer = ""
        if buffer:
            yield f"data: {buffer}\n\n"
    yield "data: [DONE]\n\n"

Observability

Track streaming metrics with OpenTelemetry:

python
from opentelemetry import trace, metrics
 
tracer = trace.get_tracer(__name__)
meter = metrics.get_meter(__name__)
 
stream_duration = meter.create_histogram("llm_stream_duration_ms")
tokens_streamed = meter.create_counter("llm_tokens_streamed_total")
 
 
async def traced_stream(message: str):
    with tracer.start_as_current_span("llm_stream") as span:
        span.set_attribute("llm.model", "claude-sonnet-4-6")
        token_count = 0
        start = time.time()
 
        async for chunk in generate_stream(message, "claude-sonnet-4-6"):
            token_count += 1
            yield chunk
 
        stream_duration.record(
            (time.time() - start) * 1000,
            {"model": "claude-sonnet-4-6"}
        )
        tokens_streamed.add(token_count)

Streaming is the baseline expectation for production LLM apps. SSE covers 90% of use cases — add WebSockets only when you need mid-stream user interrupts or bidirectional real-time interaction.

šŸ”§

Today I Fixed

Short real fixes from production — posted daily

Browse fixes
Newsletter

Stay ahead of the curve

Get the latest DevOps, Kubernetes, AWS, and AI/ML guides delivered straight to your inbox. No spam — just practical engineering content.

Related Articles

Comments