🎉 DevOps Interview Prep Bundle is live — 1000+ Q&A across 20 topicsGet it →
All Articles

LLM Error Handling: Fallbacks, Retries, and Circuit Breakers in Production

Build production-grade LLM error handling in Python. Covers exponential backoff, fallback chains, circuit breaker pattern, timeout budgets, and dead letter queues using tenacity.

DevOpsBoys6 min read
Share:Tweet

Your LLM call works perfectly in development. Then you deploy to production, hit 100 concurrent users, and start seeing a mix of 429s, timeouts, and 500s. Here is how to build LLM calls that stay up even when the API does not.

The Error Landscape

LLM APIs fail in specific, predictable ways:

ErrorStatusCauseStrategy
Rate limit429Too many requestsExponential backoff
Server error500/503API overloadedRetry with backoff
TimeoutN/AResponse too slowTimeout budget
Context too long400Input over token limitTruncate or chunk
Auth error401Bad keyAlert, do not retry
Overloaded529 (Anthropic)API at capacityRetry + fallback

Never retry a 401 or 400 (unless it is a context length error you can fix by truncating). Always retry 429, 500, and 529.

Step 1: Exponential Backoff with Jitter

The worst thing you can do with a 429 is retry immediately. Every client doing that simultaneously creates a thundering herd that makes the rate limit worse.

Exponential backoff with jitter: wait (2^attempt + random_float) * base_seconds.

python
import time
import random
import anthropic
 
client = anthropic.Anthropic()
 
def call_with_backoff(
    prompt: str,
    model: str = "claude-haiku-4-5-20251001",
    max_retries: int = 4,
    base_delay: float = 1.0,
) -> str:
    for attempt in range(max_retries):
        try:
            response = client.messages.create(
                model=model,
                max_tokens=1024,
                messages=[{"role": "user", "content": prompt}],
            )
            return response.content[0].text
 
        except anthropic.RateLimitError as e:
            if attempt == max_retries - 1:
                raise
            delay = (2 ** attempt + random.random()) * base_delay
            print(f"Rate limited. Waiting {delay:.1f}s before retry {attempt + 1}/{max_retries}")
            time.sleep(delay)
 
        except anthropic.APIStatusError as e:
            if e.status_code in (500, 503, 529):
                if attempt == max_retries - 1:
                    raise
                delay = (2 ** attempt + random.random()) * base_delay
                print(f"Server error {e.status_code}. Retrying in {delay:.1f}s")
                time.sleep(delay)
            else:
                raise  # 400, 401, 403 — do not retry

Step 2: Using tenacity for Cleaner Retry Logic

tenacity is the best Python library for retry logic. It handles backoff, jitter, stop conditions, and before/after hooks.

bash
pip install tenacity
python
from tenacity import (
    retry,
    stop_after_attempt,
    wait_exponential_jitter,
    retry_if_exception_type,
    before_sleep_log,
    after_log,
)
import logging
import anthropic
 
logger = logging.getLogger(__name__)
client = anthropic.Anthropic()
 
RETRYABLE_ERRORS = (
    anthropic.RateLimitError,
    anthropic.APIConnectionError,
    anthropic.APITimeoutError,
)
 
def is_retryable_status(exc: Exception) -> bool:
    if isinstance(exc, anthropic.APIStatusError):
        return exc.status_code in (500, 503, 529)
    return False
 
 
@retry(
    retry=(
        retry_if_exception_type(RETRYABLE_ERRORS) |
        retry_if_exception(is_retryable_status)
    ),
    wait=wait_exponential_jitter(initial=1, max=60, jitter=2),
    stop=stop_after_attempt(5),
    before_sleep=before_sleep_log(logger, logging.WARNING),
    after=after_log(logger, logging.INFO),
    reraise=True,
)
def call_llm(prompt: str, model: str = "claude-haiku-4-5-20251001") -> str:
    response = client.messages.create(
        model=model,
        max_tokens=1024,
        timeout=30.0,
        messages=[{"role": "user", "content": prompt}],
    )
    return response.content[0].text

Step 3: Fallback Chain

When retries fail, fall back to a cheaper/simpler model, then a cached response, then a static graceful degradation.

python
from functools import lru_cache
from typing import Optional
 
 
FALLBACK_CHAIN = [
    "claude-sonnet-4-5",        # primary
    "claude-haiku-4-5-20251001", # cheaper fallback
]
 
 
@lru_cache(maxsize=512)
def get_cached_response(prompt_hash: str) -> Optional[str]:
    """In production, replace with Redis cache lookup."""
    return None
 
 
def call_with_fallback(prompt: str) -> tuple[str, str]:
    """
    Returns (response_text, source) where source is the model used
    or 'cache' or 'degraded'.
    """
    # Try cache first
    import hashlib
    prompt_hash = hashlib.sha256(prompt.encode()).hexdigest()[:16]
    cached = get_cached_response(prompt_hash)
    if cached:
        return cached, "cache"
 
    # Try each model in fallback chain
    last_error = None
    for model in FALLBACK_CHAIN:
        try:
            result = call_llm(prompt, model=model)
            return result, model
        except Exception as e:
            last_error = e
            logger.warning(f"Model {model} failed: {e}. Trying next fallback.")
            continue
 
    # All models failed — graceful degradation
    logger.error(f"All LLM fallbacks exhausted. Last error: {last_error}")
    return (
        "I am unable to process this request right now. Please try again in a few minutes.",
        "degraded",
    )

Step 4: Circuit Breaker Pattern

A circuit breaker stops sending requests when the error rate exceeds a threshold. This prevents hammering an API that is already down and burns through your retries budget on requests that will fail anyway.

States:

  • Closed — requests pass through normally
  • Open — requests fail immediately (circuit is "open" = broken)
  • Half-open — one probe request allowed to test if service recovered
python
import time
from dataclasses import dataclass, field
from threading import Lock
 
 
@dataclass
class CircuitBreaker:
    failure_threshold: int = 5        # open after 5 failures
    recovery_timeout: float = 60.0    # wait 60s before half-open
    _failures: int = field(default=0, init=False)
    _state: str = field(default="closed", init=False)
    _opened_at: float = field(default=0.0, init=False)
    _lock: Lock = field(default_factory=Lock, init=False)
 
    def call(self, func, *args, **kwargs):
        with self._lock:
            if self._state == "open":
                if time.time() - self._opened_at > self.recovery_timeout:
                    self._state = "half-open"
                    logger.info("Circuit breaker: half-open, probing...")
                else:
                    raise Exception("Circuit breaker is OPEN — request blocked")
 
        try:
            result = func(*args, **kwargs)
            with self._lock:
                if self._state == "half-open":
                    logger.info("Circuit breaker: probe succeeded, closing")
                    self._state = "closed"
                    self._failures = 0
            return result
 
        except Exception as e:
            with self._lock:
                self._failures += 1
                if self._failures >= self.failure_threshold:
                    self._state = "open"
                    self._opened_at = time.time()
                    logger.error(
                        f"Circuit breaker OPENED after {self._failures} failures"
                    )
            raise
 
 
# Singleton circuit breaker per model
_breakers: dict[str, CircuitBreaker] = {}
 
def get_breaker(model: str) -> CircuitBreaker:
    if model not in _breakers:
        _breakers[model] = CircuitBreaker()
    return _breakers[model]
 
 
def call_llm_with_breaker(prompt: str, model: str = "claude-haiku-4-5-20251001") -> str:
    breaker = get_breaker(model)
    return breaker.call(call_llm, prompt, model=model)

Step 5: Timeout Budgets

Never let an LLM call run without a timeout. The Anthropic SDK supports per-request timeouts:

python
response = client.messages.create(
    model="claude-haiku-4-5-20251001",
    max_tokens=1024,
    timeout=httpx.Timeout(
        connect=5.0,   # connection timeout
        read=30.0,     # time to receive response
        write=5.0,
        pool=5.0,
    ),
    messages=[{"role": "user", "content": prompt}],
)

For streaming responses where you want to abandon after the first token takes too long:

python
import asyncio
 
async def call_with_timeout(prompt: str, timeout_seconds: float = 20.0) -> str:
    async with anthropic.AsyncAnthropic() as aclient:
        try:
            response = await asyncio.wait_for(
                aclient.messages.create(
                    model="claude-haiku-4-5-20251001",
                    max_tokens=1024,
                    messages=[{"role": "user", "content": prompt}],
                ),
                timeout=timeout_seconds,
            )
            return response.content[0].text
        except asyncio.TimeoutError:
            raise TimeoutError(f"LLM call exceeded {timeout_seconds}s budget")

Step 6: Dead Letter Queue for Failed Jobs

For background LLM jobs (document processing, batch summarization), failed requests should not be silently dropped. Use a dead letter queue.

python
import json
import boto3
from datetime import datetime
 
sqs = boto3.client("sqs", region_name="ap-south-1")
 
MAIN_QUEUE_URL = "https://sqs.ap-south-1.amazonaws.com/123456/llm-jobs"
DLQ_URL = "https://sqs.ap-south-1.amazonaws.com/123456/llm-jobs-dlq"
 
 
def process_job(job: dict) -> None:
    try:
        result = call_with_fallback(job["prompt"])
        # store result...
    except Exception as e:
        send_to_dlq(job, str(e))
 
 
def send_to_dlq(job: dict, error: str) -> None:
    job["_failed_at"] = datetime.utcnow().isoformat()
    job["_error"] = error
    sqs.send_message(
        QueueUrl=DLQ_URL,
        MessageBody=json.dumps(job),
    )
    logger.error(f"Job {job.get('id')} sent to DLQ: {error}")

Set up the SQS DLQ with maxReceiveCount: 3 so messages are automatically moved there after 3 failed processing attempts.

Putting It All Together

python
def robust_llm_call(prompt: str) -> dict:
    try:
        text, source = call_with_fallback(prompt)
        return {"success": True, "text": text, "source": source}
    except Exception as e:
        logger.exception("LLM call completely failed")
        return {
            "success": False,
            "text": "Service temporarily unavailable.",
            "source": "error",
            "error": str(e),
        }

Call robust_llm_call everywhere in your application. It always returns a dict — callers never need to handle exceptions from LLM code.

Monitoring

Track these metrics in Prometheus or Datadog:

python
from prometheus_client import Counter, Histogram
 
llm_calls = Counter("llm_calls_total", "Total LLM calls", ["model", "status"])
llm_latency = Histogram("llm_call_duration_seconds", "LLM call latency", ["model"])
 
# In your call function:
with llm_latency.labels(model=model).time():
    result = call_llm(prompt, model)
    llm_calls.labels(model=model, status="success").inc()

Alert when rate(llm_calls_total{status="error"}[5m]) / rate(llm_calls_total[5m]) > 0.1 — more than 10% error rate triggers an alert.

Affiliate Tools

  • Anthropic API — sign up for API access
  • Upstash Redis — serverless Redis for response caching in the fallback chain
  • Grafana Cloud — free hosted Prometheus for LLM call metrics
🔧

Today I Fixed

Short real fixes from production — posted daily

Browse fixes
Newsletter

Stay ahead of the curve

Get the latest DevOps, Kubernetes, AWS, and AI/ML guides delivered straight to your inbox. No spam — just practical engineering content.

Related Articles

Comments