🎉 DevOps Interview Prep Bundle is live — 1000+ Q&A across 20 topicsGet it →
All Articles

LLM Token Budget Management and Cost Control in Production

How to control LLM costs at scale — token counting, prompt compression, semantic caching with Redis, tiered model routing, and cost attribution dashboards. Python code included.

DevOpsBoys6 min read
Share:Tweet

Deploying an LLM feature is easy. Keeping the cost predictable at scale is not. A feature that costs $50/day in staging can hit $3,000/day in production — not because of bugs, but because of token waste that nobody measured. Here is a systematic approach to LLM cost control.

The Token Cost Problem

Costs scale with input tokens + output tokens. The math is simple but the sources of waste are not obvious:

  • Long system prompts repeated on every request (500 tokens × 1M requests = 500M tokens/month)
  • Full conversation history sent every turn (chat apps balloon fast)
  • Verbose output when short output would do
  • Premium model used for everything when cheap model handles 80% of requests
  • No caching for identical or near-identical prompts

A proper cost control system addresses all of these.

Step 1: Count Tokens Before Sending

The Anthropic API provides a token counting endpoint that returns the exact token count for a request before you send it. Use this for budget enforcement and logging.

python
import anthropic
from typing import Optional
 
client = anthropic.Anthropic()
 
def count_tokens(
    system_prompt: str,
    messages: list[dict],
    model: str = "claude-haiku-4-5-20251001"
) -> int:
    """Count tokens for a request without sending it."""
    response = client.messages.count_tokens(
        model=model,
        system=system_prompt,
        messages=messages
    )
    return response.input_tokens
 
 
def check_budget_and_send(
    system_prompt: str,
    messages: list[dict],
    model: str = "claude-haiku-4-5-20251001",
    max_input_tokens: int = 4000,
    max_output_tokens: int = 1024
) -> Optional[str]:
    """Count tokens, check budget, then send if within limits."""
 
    token_count = count_tokens(system_prompt, messages, model)
 
    if token_count > max_input_tokens:
        raise ValueError(
            f"Request exceeds token budget: {token_count} > {max_input_tokens}. "
            "Compress the prompt or truncate conversation history."
        )
 
    print(f"Token count: {token_count} (budget: {max_input_tokens})")
 
    response = client.messages.create(
        model=model,
        system=system_prompt,
        messages=messages,
        max_tokens=max_output_tokens
    )
 
    return response.content[0].text

Step 2: Prompt Compression Techniques

Remove redundant whitespace and formatting:

python
import re
 
def compress_prompt(text: str) -> str:
    """Remove excess whitespace while preserving code blocks."""
    # Split on code fences to avoid compressing code
    parts = re.split(r'(```[\s\S]*?```)', text)
 
    compressed_parts = []
    for i, part in enumerate(parts):
        if i % 2 == 0:  # Not a code block
            # Collapse multiple spaces/newlines
            part = re.sub(r'\n{3,}', '\n\n', part)
            part = re.sub(r' {2,}', ' ', part)
            part = part.strip()
        compressed_parts.append(part)
 
    return '\n'.join(compressed_parts)

Summarize long conversation history:

python
def compress_conversation_history(
    messages: list[dict],
    keep_recent: int = 4,
    summary_model: str = "claude-haiku-4-5-20251001"
) -> list[dict]:
    """Keep last N messages, summarize older ones."""
    if len(messages) <= keep_recent:
        return messages
 
    older_messages = messages[:-keep_recent]
    recent_messages = messages[-keep_recent:]
 
    # Summarize older messages
    history_text = "\n".join([
        f"{m['role'].upper()}: {m['content'][:500]}"
        for m in older_messages
    ])
 
    summary_response = client.messages.create(
        model=summary_model,
        max_tokens=300,
        messages=[{
            "role": "user",
            "content": f"Summarize this conversation history in 3-4 sentences, preserving key facts and decisions:\n\n{history_text}"
        }]
    )
 
    summary = summary_response.content[0].text
    compressed = [
        {"role": "user", "content": f"[Earlier conversation summary: {summary}]"},
        {"role": "assistant", "content": "Understood. I have the context from our earlier conversation."}
    ] + recent_messages
 
    return compressed

Step 3: Prompt Caching with Redis

Identical prompts should never hit the API twice. Implement two-level caching: exact match first, then semantic similarity.

python
import hashlib
import json
import redis
import numpy as np
 
redis_client = redis.Redis(host="localhost", port=6379, decode_responses=True)
CACHE_TTL = 3600  # 1 hour
 
 
def get_cache_key(system_prompt: str, messages: list[dict]) -> str:
    """Generate a deterministic cache key."""
    content = json.dumps({
        "system": system_prompt,
        "messages": messages
    }, sort_keys=True)
    return f"llm:exact:{hashlib.sha256(content.encode()).hexdigest()}"
 
 
def cached_llm_call(
    system_prompt: str,
    messages: list[dict],
    model: str = "claude-haiku-4-5-20251001",
    max_tokens: int = 1024
) -> dict:
    """LLM call with exact-match Redis caching."""
    cache_key = get_cache_key(system_prompt, messages)
 
    # Check cache
    cached = redis_client.get(cache_key)
    if cached:
        result = json.loads(cached)
        result["cache_hit"] = True
        return result
 
    # Cache miss — call API
    response = client.messages.create(
        model=model,
        system=system_prompt,
        messages=messages,
        max_tokens=max_tokens
    )
 
    result = {
        "text": response.content[0].text,
        "input_tokens": response.usage.input_tokens,
        "output_tokens": response.usage.output_tokens,
        "model": model,
        "cache_hit": False
    }
 
    # Store in cache
    redis_client.setex(cache_key, CACHE_TTL, json.dumps(result))
    return result

Cache hit rates to aim for:

  • Static content (documentation Q&A, FAQ): 60–80% hit rate
  • Dynamic user queries: 15–30% hit rate (still meaningful at scale)

Step 4: Tiered Model Routing

Use cheap models for simple requests, escalate to powerful models only when needed:

python
def classify_request_complexity(user_message: str) -> str:
    """
    Classify request complexity to choose the right model tier.
    Returns: 'simple', 'medium', or 'complex'
    """
    # Simple heuristics first (no API call)
    word_count = len(user_message.split())
 
    simple_patterns = [
        "what is", "define", "how many", "when was",
        "yes or no", "true or false", "summarize in one"
    ]
    complex_patterns = [
        "analyze", "compare", "design", "architecture",
        "debug", "refactor", "write a", "implement"
    ]
 
    text_lower = user_message.lower()
 
    if word_count < 20 and any(p in text_lower for p in simple_patterns):
        return "simple"
    elif any(p in text_lower for p in complex_patterns) or word_count > 100:
        return "complex"
    else:
        return "medium"
 
 
MODEL_TIERS = {
    "simple": "claude-haiku-4-5-20251001",    # Fastest, cheapest
    "medium": "claude-sonnet-4-5",             # Balanced
    "complex": "claude-opus-4-5"               # Most capable
}
 
# Approximate cost per 1M tokens (input/output)
TIER_COSTS = {
    "claude-haiku-4-5-20251001":  {"input": 0.80,   "output": 4.00},
    "claude-sonnet-4-5":          {"input": 3.00,   "output": 15.00},
    "claude-opus-4-5":            {"input": 15.00,  "output": 75.00}
}
 
 
def routed_llm_call(
    system_prompt: str,
    user_message: str,
    force_tier: Optional[str] = None
) -> dict:
    """Route to appropriate model tier based on request complexity."""
    complexity = force_tier or classify_request_complexity(user_message)
    model = MODEL_TIERS[complexity]
    messages = [{"role": "user", "content": user_message}]
 
    result = cached_llm_call(system_prompt, messages, model=model)
    result["complexity_tier"] = complexity
    result["model_used"] = model
 
    return result

Step 5: Cost Attribution and Budget Alerts

Track spend per feature, per user, and per model to catch runaway costs early:

python
from datetime import datetime, date
 
def track_llm_spend(
    feature: str,
    user_id: str,
    result: dict
) -> float:
    """Track cost in Redis and return cost for this call."""
    model = result["model"]
    costs = TIER_COSTS.get(model, {"input": 3.0, "output": 15.0})
 
    cost_usd = (
        result["input_tokens"] / 1_000_000 * costs["input"] +
        result["output_tokens"] / 1_000_000 * costs["output"]
    )
 
    today = date.today().isoformat()
 
    # Increment daily spend counters
    redis_client.incrbyfloat(f"llm:cost:daily:{today}", cost_usd)
    redis_client.incrbyfloat(f"llm:cost:feature:{feature}:{today}", cost_usd)
    redis_client.incrbyfloat(f"llm:cost:user:{user_id}:{today}", cost_usd)
 
    # Set TTL so old data expires
    for key in [
        f"llm:cost:daily:{today}",
        f"llm:cost:feature:{feature}:{today}",
        f"llm:cost:user:{user_id}:{today}"
    ]:
        redis_client.expire(key, 86400 * 30)  # 30 days
 
    return cost_usd
 
 
def check_budget_alert(daily_budget_usd: float = 100.0) -> None:
    """Alert if daily spend exceeds threshold."""
    today = date.today().isoformat()
    current_spend = float(redis_client.get(f"llm:cost:daily:{today}") or 0)
 
    if current_spend > daily_budget_usd * 0.8:
        print(f"BUDGET ALERT: Daily LLM spend at ${current_spend:.2f} "
              f"({current_spend/daily_budget_usd*100:.0f}% of ${daily_budget_usd} budget)")
        # Send to your alerting system (PagerDuty, Slack, etc.)

Putting It Together

python
def production_llm_call(
    feature: str,
    user_id: str,
    system_prompt: str,
    user_message: str
) -> str:
    """Full production LLM call with all cost controls applied."""
 
    # 1. Compress prompt
    compressed_system = compress_prompt(system_prompt)
 
    # 2. Route to right model tier
    result = routed_llm_call(compressed_system, user_message)
 
    # 3. Track spend
    cost = track_llm_spend(feature, user_id, result)
 
    # 4. Check budget
    check_budget_alert(daily_budget_usd=200.0)
 
    print(f"[{feature}] user={user_id} model={result['model_used']} "
          f"tokens={result['input_tokens']}+{result['output_tokens']} "
          f"cost=${cost:.5f} cache={'HIT' if result['cache_hit'] else 'MISS'}")
 
    return result["text"]

Expected Savings

Applying these techniques on a typical production LLM feature handling 100k requests/day:

OptimizationToken reductionCost reduction
Prompt compression10–20%10–20%
Exact-match caching20–40% cache hit20–40%
Tiered model routing60% simple requests → Haiku50–70%
History summarization30–50% on chat features30–50%
Combined60–80% total

Start with tiered model routing — it gives the biggest impact with the least code.

🔧

Today I Fixed

Short real fixes from production — posted daily

Browse fixes
Newsletter

Stay ahead of the curve

Get the latest DevOps, Kubernetes, AWS, and AI/ML guides delivered straight to your inbox. No spam — just practical engineering content.

Related Articles

Comments