LLM Token Budget Management and Cost Control in Production
How to control LLM costs at scale — token counting, prompt compression, semantic caching with Redis, tiered model routing, and cost attribution dashboards. Python code included.
Deploying an LLM feature is easy. Keeping the cost predictable at scale is not. A feature that costs $50/day in staging can hit $3,000/day in production — not because of bugs, but because of token waste that nobody measured. Here is a systematic approach to LLM cost control.
The Token Cost Problem
Costs scale with input tokens + output tokens. The math is simple but the sources of waste are not obvious:
- Long system prompts repeated on every request (500 tokens × 1M requests = 500M tokens/month)
- Full conversation history sent every turn (chat apps balloon fast)
- Verbose output when short output would do
- Premium model used for everything when cheap model handles 80% of requests
- No caching for identical or near-identical prompts
A proper cost control system addresses all of these.
Step 1: Count Tokens Before Sending
The Anthropic API provides a token counting endpoint that returns the exact token count for a request before you send it. Use this for budget enforcement and logging.
import anthropic
from typing import Optional
client = anthropic.Anthropic()
def count_tokens(
system_prompt: str,
messages: list[dict],
model: str = "claude-haiku-4-5-20251001"
) -> int:
"""Count tokens for a request without sending it."""
response = client.messages.count_tokens(
model=model,
system=system_prompt,
messages=messages
)
return response.input_tokens
def check_budget_and_send(
system_prompt: str,
messages: list[dict],
model: str = "claude-haiku-4-5-20251001",
max_input_tokens: int = 4000,
max_output_tokens: int = 1024
) -> Optional[str]:
"""Count tokens, check budget, then send if within limits."""
token_count = count_tokens(system_prompt, messages, model)
if token_count > max_input_tokens:
raise ValueError(
f"Request exceeds token budget: {token_count} > {max_input_tokens}. "
"Compress the prompt or truncate conversation history."
)
print(f"Token count: {token_count} (budget: {max_input_tokens})")
response = client.messages.create(
model=model,
system=system_prompt,
messages=messages,
max_tokens=max_output_tokens
)
return response.content[0].textStep 2: Prompt Compression Techniques
Remove redundant whitespace and formatting:
import re
def compress_prompt(text: str) -> str:
"""Remove excess whitespace while preserving code blocks."""
# Split on code fences to avoid compressing code
parts = re.split(r'(```[\s\S]*?```)', text)
compressed_parts = []
for i, part in enumerate(parts):
if i % 2 == 0: # Not a code block
# Collapse multiple spaces/newlines
part = re.sub(r'\n{3,}', '\n\n', part)
part = re.sub(r' {2,}', ' ', part)
part = part.strip()
compressed_parts.append(part)
return '\n'.join(compressed_parts)Summarize long conversation history:
def compress_conversation_history(
messages: list[dict],
keep_recent: int = 4,
summary_model: str = "claude-haiku-4-5-20251001"
) -> list[dict]:
"""Keep last N messages, summarize older ones."""
if len(messages) <= keep_recent:
return messages
older_messages = messages[:-keep_recent]
recent_messages = messages[-keep_recent:]
# Summarize older messages
history_text = "\n".join([
f"{m['role'].upper()}: {m['content'][:500]}"
for m in older_messages
])
summary_response = client.messages.create(
model=summary_model,
max_tokens=300,
messages=[{
"role": "user",
"content": f"Summarize this conversation history in 3-4 sentences, preserving key facts and decisions:\n\n{history_text}"
}]
)
summary = summary_response.content[0].text
compressed = [
{"role": "user", "content": f"[Earlier conversation summary: {summary}]"},
{"role": "assistant", "content": "Understood. I have the context from our earlier conversation."}
] + recent_messages
return compressedStep 3: Prompt Caching with Redis
Identical prompts should never hit the API twice. Implement two-level caching: exact match first, then semantic similarity.
import hashlib
import json
import redis
import numpy as np
redis_client = redis.Redis(host="localhost", port=6379, decode_responses=True)
CACHE_TTL = 3600 # 1 hour
def get_cache_key(system_prompt: str, messages: list[dict]) -> str:
"""Generate a deterministic cache key."""
content = json.dumps({
"system": system_prompt,
"messages": messages
}, sort_keys=True)
return f"llm:exact:{hashlib.sha256(content.encode()).hexdigest()}"
def cached_llm_call(
system_prompt: str,
messages: list[dict],
model: str = "claude-haiku-4-5-20251001",
max_tokens: int = 1024
) -> dict:
"""LLM call with exact-match Redis caching."""
cache_key = get_cache_key(system_prompt, messages)
# Check cache
cached = redis_client.get(cache_key)
if cached:
result = json.loads(cached)
result["cache_hit"] = True
return result
# Cache miss — call API
response = client.messages.create(
model=model,
system=system_prompt,
messages=messages,
max_tokens=max_tokens
)
result = {
"text": response.content[0].text,
"input_tokens": response.usage.input_tokens,
"output_tokens": response.usage.output_tokens,
"model": model,
"cache_hit": False
}
# Store in cache
redis_client.setex(cache_key, CACHE_TTL, json.dumps(result))
return resultCache hit rates to aim for:
- Static content (documentation Q&A, FAQ): 60–80% hit rate
- Dynamic user queries: 15–30% hit rate (still meaningful at scale)
Step 4: Tiered Model Routing
Use cheap models for simple requests, escalate to powerful models only when needed:
def classify_request_complexity(user_message: str) -> str:
"""
Classify request complexity to choose the right model tier.
Returns: 'simple', 'medium', or 'complex'
"""
# Simple heuristics first (no API call)
word_count = len(user_message.split())
simple_patterns = [
"what is", "define", "how many", "when was",
"yes or no", "true or false", "summarize in one"
]
complex_patterns = [
"analyze", "compare", "design", "architecture",
"debug", "refactor", "write a", "implement"
]
text_lower = user_message.lower()
if word_count < 20 and any(p in text_lower for p in simple_patterns):
return "simple"
elif any(p in text_lower for p in complex_patterns) or word_count > 100:
return "complex"
else:
return "medium"
MODEL_TIERS = {
"simple": "claude-haiku-4-5-20251001", # Fastest, cheapest
"medium": "claude-sonnet-4-5", # Balanced
"complex": "claude-opus-4-5" # Most capable
}
# Approximate cost per 1M tokens (input/output)
TIER_COSTS = {
"claude-haiku-4-5-20251001": {"input": 0.80, "output": 4.00},
"claude-sonnet-4-5": {"input": 3.00, "output": 15.00},
"claude-opus-4-5": {"input": 15.00, "output": 75.00}
}
def routed_llm_call(
system_prompt: str,
user_message: str,
force_tier: Optional[str] = None
) -> dict:
"""Route to appropriate model tier based on request complexity."""
complexity = force_tier or classify_request_complexity(user_message)
model = MODEL_TIERS[complexity]
messages = [{"role": "user", "content": user_message}]
result = cached_llm_call(system_prompt, messages, model=model)
result["complexity_tier"] = complexity
result["model_used"] = model
return resultStep 5: Cost Attribution and Budget Alerts
Track spend per feature, per user, and per model to catch runaway costs early:
from datetime import datetime, date
def track_llm_spend(
feature: str,
user_id: str,
result: dict
) -> float:
"""Track cost in Redis and return cost for this call."""
model = result["model"]
costs = TIER_COSTS.get(model, {"input": 3.0, "output": 15.0})
cost_usd = (
result["input_tokens"] / 1_000_000 * costs["input"] +
result["output_tokens"] / 1_000_000 * costs["output"]
)
today = date.today().isoformat()
# Increment daily spend counters
redis_client.incrbyfloat(f"llm:cost:daily:{today}", cost_usd)
redis_client.incrbyfloat(f"llm:cost:feature:{feature}:{today}", cost_usd)
redis_client.incrbyfloat(f"llm:cost:user:{user_id}:{today}", cost_usd)
# Set TTL so old data expires
for key in [
f"llm:cost:daily:{today}",
f"llm:cost:feature:{feature}:{today}",
f"llm:cost:user:{user_id}:{today}"
]:
redis_client.expire(key, 86400 * 30) # 30 days
return cost_usd
def check_budget_alert(daily_budget_usd: float = 100.0) -> None:
"""Alert if daily spend exceeds threshold."""
today = date.today().isoformat()
current_spend = float(redis_client.get(f"llm:cost:daily:{today}") or 0)
if current_spend > daily_budget_usd * 0.8:
print(f"BUDGET ALERT: Daily LLM spend at ${current_spend:.2f} "
f"({current_spend/daily_budget_usd*100:.0f}% of ${daily_budget_usd} budget)")
# Send to your alerting system (PagerDuty, Slack, etc.)Putting It Together
def production_llm_call(
feature: str,
user_id: str,
system_prompt: str,
user_message: str
) -> str:
"""Full production LLM call with all cost controls applied."""
# 1. Compress prompt
compressed_system = compress_prompt(system_prompt)
# 2. Route to right model tier
result = routed_llm_call(compressed_system, user_message)
# 3. Track spend
cost = track_llm_spend(feature, user_id, result)
# 4. Check budget
check_budget_alert(daily_budget_usd=200.0)
print(f"[{feature}] user={user_id} model={result['model_used']} "
f"tokens={result['input_tokens']}+{result['output_tokens']} "
f"cost=${cost:.5f} cache={'HIT' if result['cache_hit'] else 'MISS'}")
return result["text"]Expected Savings
Applying these techniques on a typical production LLM feature handling 100k requests/day:
| Optimization | Token reduction | Cost reduction |
|---|---|---|
| Prompt compression | 10–20% | 10–20% |
| Exact-match caching | 20–40% cache hit | 20–40% |
| Tiered model routing | 60% simple requests → Haiku | 50–70% |
| History summarization | 30–50% on chat features | 30–50% |
| Combined | — | 60–80% total |
Start with tiered model routing — it gives the biggest impact with the least code.
Today I Fixed
Short real fixes from production — posted daily
Stay ahead of the curve
Get the latest DevOps, Kubernetes, AWS, and AI/ML guides delivered straight to your inbox. No spam — just practical engineering content.
Related Articles
Build an AI Terraform Cost Estimator Using Claude (2026)
Before you run terraform apply, wouldn't you want to know how much it'll cost? Build an AI cost estimator that reads your Terraform plan output and gives you a detailed cost breakdown using Claude as the reasoning engine.
AI-Driven Capacity Planning for Kubernetes Clusters (2026)
How to use AI and machine learning for Kubernetes capacity planning. Covers predictive autoscaling, cost optimization, tools like StormForge and Kubecost, and building custom ML models for resource forecasting.
Build an AI Capacity Forecasting Tool with Prophet + Kubernetes Metrics
Reactive autoscaling fixes problems after they happen. Build a forecasting tool using Facebook's Prophet library on historical Prometheus metrics to predict capacity needs days ahead — before traffic spikes hit.