LLM Batch Inference and Async Queue Patterns in Production
When you have thousands of LLM requests to process, synchronous API calls don't scale. Here's how to build async batch inference pipelines with Celery, Redis, and Anthropic.
Most LLM tutorials show you a single synchronous call. Production looks different — you need to process 10,000 documents overnight, handle thousands of user requests without timeouts, and manage rate limits gracefully.
When to Use Async Batch Processing
Use sync (direct API call) when:
- User is waiting and expects a response < 10 seconds
- Processing a single item
- Real-time chat or assistant
Use async batch when:
- Processing bulk documents (reports, emails, logs)
- User can wait and you'll notify them when done
- You need to manage rate limits without blocking
- Processing cost matters (can use cheaper batch endpoints)
Architecture
User Request → API → Redis Queue → Workers (Celery) → Claude API
↓ ↓
Job Status DB Results Storage
↓
User polls status / webhook
Anthropic's Official Batch API
Anthropic has a native Message Batches API that's designed for bulk processing:
import anthropic
import json
from pathlib import Path
client = anthropic.Anthropic()
# Create a batch of requests
batch = client.messages.batches.create(
requests=[
{
"custom_id": f"doc-{i}",
"params": {
"model": "claude-haiku-4-5-20251001",
"max_tokens": 500,
"messages": [
{
"role": "user",
"content": f"Summarize this document in 3 bullet points:\n\n{doc}"
}
]
}
}
for i, doc in enumerate(documents) # up to 10,000 requests per batch
]
)
print(f"Batch ID: {batch.id}")
print(f"Status: {batch.processing_status}")
# Poll until complete
import time
while True:
batch = client.messages.batches.retrieve(batch.id)
if batch.processing_status == "ended":
break
print(f"In progress: {batch.request_counts.in_progress} remaining")
time.sleep(60) # poll every minute
# Get results
for result in client.messages.batches.results(batch.id):
print(f"ID: {result.custom_id}")
if result.result.type == "succeeded":
print(f"Response: {result.result.message.content[0].text}")
else:
print(f"Error: {result.result.error}")The batch API is 50% cheaper than real-time API and processes within 24 hours. Use it for any non-realtime workload.
Custom Queue with Celery + Redis
For more control (priority queues, retries, monitoring), build your own:
# tasks.py
from celery import Celery
from anthropic import Anthropic
import redis
import json
import time
celery = Celery("llm_tasks", broker="redis://redis:6379/0", backend="redis://redis:6379/1")
client = Anthropic()
# Rate limit: 1000 requests/minute = ~16.7/second
# Use Celery's rate limiting
celery.conf.task_annotations = {
"tasks.process_with_llm": {"rate_limit": "800/m"}
}
@celery.task(
bind=True,
max_retries=3,
default_retry_delay=30,
autoretry_for=(Exception,),
retry_backoff=True,
retry_backoff_max=300,
)
def process_with_llm(self, job_id: str, text: str, system_prompt: str, model: str = "claude-haiku-4-5-20251001"):
"""Process a single item with Claude API."""
try:
message = client.messages.create(
model=model,
max_tokens=1000,
system=system_prompt,
messages=[{"role": "user", "content": text}]
)
result = {
"job_id": job_id,
"status": "complete",
"result": message.content[0].text,
"input_tokens": message.usage.input_tokens,
"output_tokens": message.usage.output_tokens,
"model": model,
}
# Store result in Redis with 24-hour TTL
redis_client = redis.Redis(host="redis", port=6379)
redis_client.setex(f"llm:result:{job_id}", 86400, json.dumps(result))
return result
except client.anthropic.RateLimitError as e:
# Rate limit - wait and retry
raise self.retry(exc=e, countdown=60)
except client.anthropic.APIStatusError as e:
if e.status_code >= 500:
raise self.retry(exc=e, countdown=30)
raise # Don't retry 4xx errors
@celery.task
def process_batch(batch_id: str, items: list[dict], system_prompt: str):
"""Submit a batch of items for processing."""
job_ids = []
for item in items:
job_id = f"{batch_id}:{item['id']}"
process_with_llm.apply_async(
args=[job_id, item['text'], system_prompt],
priority=item.get('priority', 5), # 0=highest, 9=lowest
)
job_ids.append(job_id)
# Store batch metadata
redis_client = redis.Redis(host="redis", port=6379)
redis_client.setex(
f"llm:batch:{batch_id}",
86400,
json.dumps({"job_ids": job_ids, "total": len(items), "status": "processing"})
)
return job_idsFastAPI Endpoint for Batch Submission
from fastapi import FastAPI, BackgroundTasks
from pydantic import BaseModel
from typing import Optional
import uuid
import redis
app = FastAPI()
redis_client = redis.Redis(host="redis", port=6379)
class BatchRequest(BaseModel):
items: list[dict] # [{id, text, priority?}]
system_prompt: str
model: Optional[str] = "claude-haiku-4-5-20251001"
callback_url: Optional[str] = None
class JobStatus(BaseModel):
batch_id: str
total: int
completed: int
failed: int
status: str
@app.post("/batch/submit")
async def submit_batch(request: BatchRequest):
batch_id = str(uuid.uuid4())
# Queue the batch processing task
process_batch.delay(batch_id, request.items, request.system_prompt)
return {
"batch_id": batch_id,
"total_items": len(request.items),
"status_url": f"/batch/{batch_id}/status",
}
@app.get("/batch/{batch_id}/status")
async def get_batch_status(batch_id: str) -> JobStatus:
batch_data = redis_client.get(f"llm:batch:{batch_id}")
if not batch_data:
return {"error": "Batch not found"}, 404
batch = json.loads(batch_data)
job_ids = batch["job_ids"]
completed = sum(1 for jid in job_ids if redis_client.exists(f"llm:result:{jid}"))
# Check for failures
failed = 0
for jid in job_ids:
result = redis_client.get(f"llm:result:{jid}")
if result and json.loads(result).get("status") == "failed":
failed += 1
return JobStatus(
batch_id=batch_id,
total=len(job_ids),
completed=completed,
failed=failed,
status="complete" if completed == len(job_ids) else "processing"
)
@app.get("/batch/{batch_id}/results")
async def get_batch_results(batch_id: str):
batch_data = redis_client.get(f"llm:batch:{batch_id}")
if not batch_data:
return {"error": "Not found"}, 404
batch = json.loads(batch_data)
results = []
for jid in batch["job_ids"]:
result = redis_client.get(f"llm:result:{jid}")
if result:
results.append(json.loads(result))
return {"batch_id": batch_id, "results": results}Kubernetes Deployment
apiVersion: apps/v1
kind: Deployment
metadata:
name: llm-worker
namespace: ai
spec:
replicas: 5 # 5 workers for parallelism
template:
spec:
containers:
- name: worker
image: your-org/llm-worker:latest
command: ["celery", "-A", "tasks", "worker", "--loglevel=info", "-c", "4"]
env:
- name: ANTHROPIC_API_KEY
valueFrom:
secretKeyRef:
name: llm-secrets
key: anthropic_api_key
- name: CELERY_BROKER_URL
value: redis://redis:6379/0
resources:
requests:
memory: "256Mi"
cpu: "250m"
limits:
memory: "512Mi"
cpu: "1000m"
---
# Scale workers based on queue depth
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
name: llm-worker-scaler
spec:
scaleTargetRef:
name: llm-worker
minReplicaCount: 2
maxReplicaCount: 20
triggers:
- type: redis
metadata:
address: redis:6379
listName: celery
listLength: "10" # 1 worker per 10 items in queueCost Optimization: Smart Model Routing
Use cheap models for bulk, expensive for complex:
def get_optimal_model(text_length: int, task_type: str) -> str:
"""Route to cost-optimal model based on task."""
if task_type in ["classification", "extraction", "simple_summary"]:
return "claude-haiku-4-5-20251001" # $0.25/M input tokens
elif task_type in ["analysis", "detailed_summary"] and text_length < 5000:
return "claude-sonnet-4-6" # $3/M input tokens
else:
return "claude-opus-4-8" # $15/M input tokens
# Processing 10,000 documents:
# With Haiku: ~$2.50 (assuming 1000 tokens avg input)
# With Opus: ~$150
# Savings from routing: ~94%Async batch processing + smart routing + Anthropic's native batch API = the cost-efficient LLM pipeline that scales to millions of documents.
Resources: Anthropic Batch API docs | Celery docs | KEDA
Today I Fixed
Short real fixes from production — posted daily
Stay ahead of the curve
Get the latest DevOps, Kubernetes, AWS, and AI/ML guides delivered straight to your inbox. No spam — just practical engineering content.
Related Articles
AI-Driven Capacity Planning for Kubernetes Clusters (2026)
How to use AI and machine learning for Kubernetes capacity planning. Covers predictive autoscaling, cost optimization, tools like StormForge and Kubecost, and building custom ML models for resource forecasting.
AI-Powered Kubernetes Anomaly Detection: Beyond Static Thresholds
Static alerts miss 40% of real incidents. Learn how AI and ML-based anomaly detection — using tools like Prometheus + ML, Dynatrace, and custom LLM runbooks — catches what thresholds can't.
Argo Workflows vs Prefect vs Airflow — Best for ML Pipelines 2026
Choosing a workflow orchestrator for your ML pipelines? Argo Workflows, Prefect, and Apache Airflow each have distinct strengths. Here's which to pick for your use case.