🎉 DevOps Interview Prep Bundle is live — 1000+ Q&A across 20 topicsGet it →
All Articles

LLM Batch Inference and Async Queue Patterns in Production

When you have thousands of LLM requests to process, synchronous API calls don't scale. Here's how to build async batch inference pipelines with Celery, Redis, and Anthropic.

DevOpsBoys5 min read
Share:Tweet

Most LLM tutorials show you a single synchronous call. Production looks different — you need to process 10,000 documents overnight, handle thousands of user requests without timeouts, and manage rate limits gracefully.

When to Use Async Batch Processing

Use sync (direct API call) when:

  • User is waiting and expects a response < 10 seconds
  • Processing a single item
  • Real-time chat or assistant

Use async batch when:

  • Processing bulk documents (reports, emails, logs)
  • User can wait and you'll notify them when done
  • You need to manage rate limits without blocking
  • Processing cost matters (can use cheaper batch endpoints)

Architecture

User Request → API → Redis Queue → Workers (Celery) → Claude API
                          ↓                 ↓
                    Job Status DB    Results Storage
                          ↓
               User polls status / webhook

Anthropic's Official Batch API

Anthropic has a native Message Batches API that's designed for bulk processing:

python
import anthropic
import json
from pathlib import Path
 
client = anthropic.Anthropic()
 
# Create a batch of requests
batch = client.messages.batches.create(
    requests=[
        {
            "custom_id": f"doc-{i}",
            "params": {
                "model": "claude-haiku-4-5-20251001",
                "max_tokens": 500,
                "messages": [
                    {
                        "role": "user",
                        "content": f"Summarize this document in 3 bullet points:\n\n{doc}"
                    }
                ]
            }
        }
        for i, doc in enumerate(documents)  # up to 10,000 requests per batch
    ]
)
 
print(f"Batch ID: {batch.id}")
print(f"Status: {batch.processing_status}")
 
# Poll until complete
import time
while True:
    batch = client.messages.batches.retrieve(batch.id)
    
    if batch.processing_status == "ended":
        break
    
    print(f"In progress: {batch.request_counts.in_progress} remaining")
    time.sleep(60)  # poll every minute
 
# Get results
for result in client.messages.batches.results(batch.id):
    print(f"ID: {result.custom_id}")
    if result.result.type == "succeeded":
        print(f"Response: {result.result.message.content[0].text}")
    else:
        print(f"Error: {result.result.error}")

The batch API is 50% cheaper than real-time API and processes within 24 hours. Use it for any non-realtime workload.

Custom Queue with Celery + Redis

For more control (priority queues, retries, monitoring), build your own:

python
# tasks.py
from celery import Celery
from anthropic import Anthropic
import redis
import json
import time
 
celery = Celery("llm_tasks", broker="redis://redis:6379/0", backend="redis://redis:6379/1")
client = Anthropic()
 
# Rate limit: 1000 requests/minute = ~16.7/second
# Use Celery's rate limiting
celery.conf.task_annotations = {
    "tasks.process_with_llm": {"rate_limit": "800/m"}
}
 
 
@celery.task(
    bind=True,
    max_retries=3,
    default_retry_delay=30,
    autoretry_for=(Exception,),
    retry_backoff=True,
    retry_backoff_max=300,
)
def process_with_llm(self, job_id: str, text: str, system_prompt: str, model: str = "claude-haiku-4-5-20251001"):
    """Process a single item with Claude API."""
    try:
        message = client.messages.create(
            model=model,
            max_tokens=1000,
            system=system_prompt,
            messages=[{"role": "user", "content": text}]
        )
        
        result = {
            "job_id": job_id,
            "status": "complete",
            "result": message.content[0].text,
            "input_tokens": message.usage.input_tokens,
            "output_tokens": message.usage.output_tokens,
            "model": model,
        }
        
        # Store result in Redis with 24-hour TTL
        redis_client = redis.Redis(host="redis", port=6379)
        redis_client.setex(f"llm:result:{job_id}", 86400, json.dumps(result))
        
        return result
        
    except client.anthropic.RateLimitError as e:
        # Rate limit - wait and retry
        raise self.retry(exc=e, countdown=60)
    
    except client.anthropic.APIStatusError as e:
        if e.status_code >= 500:
            raise self.retry(exc=e, countdown=30)
        raise  # Don't retry 4xx errors
 
 
@celery.task
def process_batch(batch_id: str, items: list[dict], system_prompt: str):
    """Submit a batch of items for processing."""
    job_ids = []
    
    for item in items:
        job_id = f"{batch_id}:{item['id']}"
        process_with_llm.apply_async(
            args=[job_id, item['text'], system_prompt],
            priority=item.get('priority', 5),  # 0=highest, 9=lowest
        )
        job_ids.append(job_id)
    
    # Store batch metadata
    redis_client = redis.Redis(host="redis", port=6379)
    redis_client.setex(
        f"llm:batch:{batch_id}",
        86400,
        json.dumps({"job_ids": job_ids, "total": len(items), "status": "processing"})
    )
    
    return job_ids

FastAPI Endpoint for Batch Submission

python
from fastapi import FastAPI, BackgroundTasks
from pydantic import BaseModel
from typing import Optional
import uuid
import redis
 
app = FastAPI()
redis_client = redis.Redis(host="redis", port=6379)
 
 
class BatchRequest(BaseModel):
    items: list[dict]  # [{id, text, priority?}]
    system_prompt: str
    model: Optional[str] = "claude-haiku-4-5-20251001"
    callback_url: Optional[str] = None
 
 
class JobStatus(BaseModel):
    batch_id: str
    total: int
    completed: int
    failed: int
    status: str
 
 
@app.post("/batch/submit")
async def submit_batch(request: BatchRequest):
    batch_id = str(uuid.uuid4())
    
    # Queue the batch processing task
    process_batch.delay(batch_id, request.items, request.system_prompt)
    
    return {
        "batch_id": batch_id,
        "total_items": len(request.items),
        "status_url": f"/batch/{batch_id}/status",
    }
 
 
@app.get("/batch/{batch_id}/status")
async def get_batch_status(batch_id: str) -> JobStatus:
    batch_data = redis_client.get(f"llm:batch:{batch_id}")
    if not batch_data:
        return {"error": "Batch not found"}, 404
    
    batch = json.loads(batch_data)
    job_ids = batch["job_ids"]
    
    completed = sum(1 for jid in job_ids if redis_client.exists(f"llm:result:{jid}"))
    
    # Check for failures
    failed = 0
    for jid in job_ids:
        result = redis_client.get(f"llm:result:{jid}")
        if result and json.loads(result).get("status") == "failed":
            failed += 1
    
    return JobStatus(
        batch_id=batch_id,
        total=len(job_ids),
        completed=completed,
        failed=failed,
        status="complete" if completed == len(job_ids) else "processing"
    )
 
 
@app.get("/batch/{batch_id}/results")
async def get_batch_results(batch_id: str):
    batch_data = redis_client.get(f"llm:batch:{batch_id}")
    if not batch_data:
        return {"error": "Not found"}, 404
    
    batch = json.loads(batch_data)
    results = []
    
    for jid in batch["job_ids"]:
        result = redis_client.get(f"llm:result:{jid}")
        if result:
            results.append(json.loads(result))
    
    return {"batch_id": batch_id, "results": results}

Kubernetes Deployment

yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: llm-worker
  namespace: ai
spec:
  replicas: 5  # 5 workers for parallelism
  template:
    spec:
      containers:
        - name: worker
          image: your-org/llm-worker:latest
          command: ["celery", "-A", "tasks", "worker", "--loglevel=info", "-c", "4"]
          env:
            - name: ANTHROPIC_API_KEY
              valueFrom:
                secretKeyRef:
                  name: llm-secrets
                  key: anthropic_api_key
            - name: CELERY_BROKER_URL
              value: redis://redis:6379/0
          resources:
            requests:
              memory: "256Mi"
              cpu: "250m"
            limits:
              memory: "512Mi"
              cpu: "1000m"
---
# Scale workers based on queue depth
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
  name: llm-worker-scaler
spec:
  scaleTargetRef:
    name: llm-worker
  minReplicaCount: 2
  maxReplicaCount: 20
  triggers:
    - type: redis
      metadata:
        address: redis:6379
        listName: celery
        listLength: "10"  # 1 worker per 10 items in queue

Cost Optimization: Smart Model Routing

Use cheap models for bulk, expensive for complex:

python
def get_optimal_model(text_length: int, task_type: str) -> str:
    """Route to cost-optimal model based on task."""
    if task_type in ["classification", "extraction", "simple_summary"]:
        return "claude-haiku-4-5-20251001"  # $0.25/M input tokens
    elif task_type in ["analysis", "detailed_summary"] and text_length < 5000:
        return "claude-sonnet-4-6"  # $3/M input tokens
    else:
        return "claude-opus-4-8"  # $15/M input tokens
 
 
# Processing 10,000 documents:
# With Haiku: ~$2.50 (assuming 1000 tokens avg input)
# With Opus: ~$150
# Savings from routing: ~94%

Async batch processing + smart routing + Anthropic's native batch API = the cost-efficient LLM pipeline that scales to millions of documents.

Resources: Anthropic Batch API docs | Celery docs | KEDA

🔧

Today I Fixed

Short real fixes from production — posted daily

Browse fixes
Newsletter

Stay ahead of the curve

Get the latest DevOps, Kubernetes, AWS, and AI/ML guides delivered straight to your inbox. No spam — just practical engineering content.

Related Articles

Comments