LLM Error Handling: Fallbacks, Retries, and Circuit Breakers in Production
Build production-grade LLM error handling in Python. Covers exponential backoff, fallback chains, circuit breaker pattern, timeout budgets, and dead letter queues using tenacity.
Your LLM call works perfectly in development. Then you deploy to production, hit 100 concurrent users, and start seeing a mix of 429s, timeouts, and 500s. Here is how to build LLM calls that stay up even when the API does not.
The Error Landscape
LLM APIs fail in specific, predictable ways:
| Error | Status | Cause | Strategy |
|---|---|---|---|
| Rate limit | 429 | Too many requests | Exponential backoff |
| Server error | 500/503 | API overloaded | Retry with backoff |
| Timeout | N/A | Response too slow | Timeout budget |
| Context too long | 400 | Input over token limit | Truncate or chunk |
| Auth error | 401 | Bad key | Alert, do not retry |
| Overloaded | 529 (Anthropic) | API at capacity | Retry + fallback |
Never retry a 401 or 400 (unless it is a context length error you can fix by truncating). Always retry 429, 500, and 529.
Step 1: Exponential Backoff with Jitter
The worst thing you can do with a 429 is retry immediately. Every client doing that simultaneously creates a thundering herd that makes the rate limit worse.
Exponential backoff with jitter: wait (2^attempt + random_float) * base_seconds.
import time
import random
import anthropic
client = anthropic.Anthropic()
def call_with_backoff(
prompt: str,
model: str = "claude-haiku-4-5-20251001",
max_retries: int = 4,
base_delay: float = 1.0,
) -> str:
for attempt in range(max_retries):
try:
response = client.messages.create(
model=model,
max_tokens=1024,
messages=[{"role": "user", "content": prompt}],
)
return response.content[0].text
except anthropic.RateLimitError as e:
if attempt == max_retries - 1:
raise
delay = (2 ** attempt + random.random()) * base_delay
print(f"Rate limited. Waiting {delay:.1f}s before retry {attempt + 1}/{max_retries}")
time.sleep(delay)
except anthropic.APIStatusError as e:
if e.status_code in (500, 503, 529):
if attempt == max_retries - 1:
raise
delay = (2 ** attempt + random.random()) * base_delay
print(f"Server error {e.status_code}. Retrying in {delay:.1f}s")
time.sleep(delay)
else:
raise # 400, 401, 403 — do not retryStep 2: Using tenacity for Cleaner Retry Logic
tenacity is the best Python library for retry logic. It handles backoff, jitter, stop conditions, and before/after hooks.
pip install tenacityfrom tenacity import (
retry,
stop_after_attempt,
wait_exponential_jitter,
retry_if_exception_type,
before_sleep_log,
after_log,
)
import logging
import anthropic
logger = logging.getLogger(__name__)
client = anthropic.Anthropic()
RETRYABLE_ERRORS = (
anthropic.RateLimitError,
anthropic.APIConnectionError,
anthropic.APITimeoutError,
)
def is_retryable_status(exc: Exception) -> bool:
if isinstance(exc, anthropic.APIStatusError):
return exc.status_code in (500, 503, 529)
return False
@retry(
retry=(
retry_if_exception_type(RETRYABLE_ERRORS) |
retry_if_exception(is_retryable_status)
),
wait=wait_exponential_jitter(initial=1, max=60, jitter=2),
stop=stop_after_attempt(5),
before_sleep=before_sleep_log(logger, logging.WARNING),
after=after_log(logger, logging.INFO),
reraise=True,
)
def call_llm(prompt: str, model: str = "claude-haiku-4-5-20251001") -> str:
response = client.messages.create(
model=model,
max_tokens=1024,
timeout=30.0,
messages=[{"role": "user", "content": prompt}],
)
return response.content[0].textStep 3: Fallback Chain
When retries fail, fall back to a cheaper/simpler model, then a cached response, then a static graceful degradation.
from functools import lru_cache
from typing import Optional
FALLBACK_CHAIN = [
"claude-sonnet-4-5", # primary
"claude-haiku-4-5-20251001", # cheaper fallback
]
@lru_cache(maxsize=512)
def get_cached_response(prompt_hash: str) -> Optional[str]:
"""In production, replace with Redis cache lookup."""
return None
def call_with_fallback(prompt: str) -> tuple[str, str]:
"""
Returns (response_text, source) where source is the model used
or 'cache' or 'degraded'.
"""
# Try cache first
import hashlib
prompt_hash = hashlib.sha256(prompt.encode()).hexdigest()[:16]
cached = get_cached_response(prompt_hash)
if cached:
return cached, "cache"
# Try each model in fallback chain
last_error = None
for model in FALLBACK_CHAIN:
try:
result = call_llm(prompt, model=model)
return result, model
except Exception as e:
last_error = e
logger.warning(f"Model {model} failed: {e}. Trying next fallback.")
continue
# All models failed — graceful degradation
logger.error(f"All LLM fallbacks exhausted. Last error: {last_error}")
return (
"I am unable to process this request right now. Please try again in a few minutes.",
"degraded",
)Step 4: Circuit Breaker Pattern
A circuit breaker stops sending requests when the error rate exceeds a threshold. This prevents hammering an API that is already down and burns through your retries budget on requests that will fail anyway.
States:
- Closed — requests pass through normally
- Open — requests fail immediately (circuit is "open" = broken)
- Half-open — one probe request allowed to test if service recovered
import time
from dataclasses import dataclass, field
from threading import Lock
@dataclass
class CircuitBreaker:
failure_threshold: int = 5 # open after 5 failures
recovery_timeout: float = 60.0 # wait 60s before half-open
_failures: int = field(default=0, init=False)
_state: str = field(default="closed", init=False)
_opened_at: float = field(default=0.0, init=False)
_lock: Lock = field(default_factory=Lock, init=False)
def call(self, func, *args, **kwargs):
with self._lock:
if self._state == "open":
if time.time() - self._opened_at > self.recovery_timeout:
self._state = "half-open"
logger.info("Circuit breaker: half-open, probing...")
else:
raise Exception("Circuit breaker is OPEN — request blocked")
try:
result = func(*args, **kwargs)
with self._lock:
if self._state == "half-open":
logger.info("Circuit breaker: probe succeeded, closing")
self._state = "closed"
self._failures = 0
return result
except Exception as e:
with self._lock:
self._failures += 1
if self._failures >= self.failure_threshold:
self._state = "open"
self._opened_at = time.time()
logger.error(
f"Circuit breaker OPENED after {self._failures} failures"
)
raise
# Singleton circuit breaker per model
_breakers: dict[str, CircuitBreaker] = {}
def get_breaker(model: str) -> CircuitBreaker:
if model not in _breakers:
_breakers[model] = CircuitBreaker()
return _breakers[model]
def call_llm_with_breaker(prompt: str, model: str = "claude-haiku-4-5-20251001") -> str:
breaker = get_breaker(model)
return breaker.call(call_llm, prompt, model=model)Step 5: Timeout Budgets
Never let an LLM call run without a timeout. The Anthropic SDK supports per-request timeouts:
response = client.messages.create(
model="claude-haiku-4-5-20251001",
max_tokens=1024,
timeout=httpx.Timeout(
connect=5.0, # connection timeout
read=30.0, # time to receive response
write=5.0,
pool=5.0,
),
messages=[{"role": "user", "content": prompt}],
)For streaming responses where you want to abandon after the first token takes too long:
import asyncio
async def call_with_timeout(prompt: str, timeout_seconds: float = 20.0) -> str:
async with anthropic.AsyncAnthropic() as aclient:
try:
response = await asyncio.wait_for(
aclient.messages.create(
model="claude-haiku-4-5-20251001",
max_tokens=1024,
messages=[{"role": "user", "content": prompt}],
),
timeout=timeout_seconds,
)
return response.content[0].text
except asyncio.TimeoutError:
raise TimeoutError(f"LLM call exceeded {timeout_seconds}s budget")Step 6: Dead Letter Queue for Failed Jobs
For background LLM jobs (document processing, batch summarization), failed requests should not be silently dropped. Use a dead letter queue.
import json
import boto3
from datetime import datetime
sqs = boto3.client("sqs", region_name="ap-south-1")
MAIN_QUEUE_URL = "https://sqs.ap-south-1.amazonaws.com/123456/llm-jobs"
DLQ_URL = "https://sqs.ap-south-1.amazonaws.com/123456/llm-jobs-dlq"
def process_job(job: dict) -> None:
try:
result = call_with_fallback(job["prompt"])
# store result...
except Exception as e:
send_to_dlq(job, str(e))
def send_to_dlq(job: dict, error: str) -> None:
job["_failed_at"] = datetime.utcnow().isoformat()
job["_error"] = error
sqs.send_message(
QueueUrl=DLQ_URL,
MessageBody=json.dumps(job),
)
logger.error(f"Job {job.get('id')} sent to DLQ: {error}")Set up the SQS DLQ with maxReceiveCount: 3 so messages are automatically moved there after 3 failed processing attempts.
Putting It All Together
def robust_llm_call(prompt: str) -> dict:
try:
text, source = call_with_fallback(prompt)
return {"success": True, "text": text, "source": source}
except Exception as e:
logger.exception("LLM call completely failed")
return {
"success": False,
"text": "Service temporarily unavailable.",
"source": "error",
"error": str(e),
}Call robust_llm_call everywhere in your application. It always returns a dict — callers never need to handle exceptions from LLM code.
Monitoring
Track these metrics in Prometheus or Datadog:
from prometheus_client import Counter, Histogram
llm_calls = Counter("llm_calls_total", "Total LLM calls", ["model", "status"])
llm_latency = Histogram("llm_call_duration_seconds", "LLM call latency", ["model"])
# In your call function:
with llm_latency.labels(model=model).time():
result = call_llm(prompt, model)
llm_calls.labels(model=model, status="success").inc()Alert when rate(llm_calls_total{status="error"}[5m]) / rate(llm_calls_total[5m]) > 0.1 — more than 10% error rate triggers an alert.
Affiliate Tools
- Anthropic API — sign up for API access
- Upstash Redis — serverless Redis for response caching in the fallback chain
- Grafana Cloud — free hosted Prometheus for LLM call metrics
Today I Fixed
Short real fixes from production — posted daily
Stay ahead of the curve
Get the latest DevOps, Kubernetes, AWS, and AI/ML guides delivered straight to your inbox. No spam — just practical engineering content.
Related Articles
Build an AI DevOps Onboarding Assistant with Claude API
Build a RAG-based chatbot with Claude API that answers new engineer questions from your runbooks and docs. Full Python FastAPI code, cosine similarity retrieval, and Slack bot deployment.
LLM Multi-Agent Orchestration with LangGraph in Production
Build a production-ready multi-agent system with LangGraph for DevOps automation — Planner, Executor, and Reviewer agents with shared state, conditional edges, human-in-the-loop checkpoints, and LangSmith observability.
LLM Routing: Automatically Select the Right Model in Production
Build a model router in Python that picks cheap vs expensive LLMs based on query complexity. Covers cost-based routing, latency fallbacks, LiteLLM router, and tracking routing decisions with the Anthropic SDK.