🎉 DevOps Interview Prep Bundle is live — 1000+ Q&A across 20 topicsGet it →
All Articles

Build an AI Cloud Cost Spike Detector with Claude API and Prometheus

Automatically detect unusual cloud cost spikes, identify the cause, and get an AI-generated explanation and recommended fix using Claude API and Prometheus metrics.

DevOpsBoys6 min read
Share:Tweet

Cloud cost spikes are often discovered too late — in the monthly bill. This tool monitors your AWS costs via Prometheus/CloudWatch metrics and uses Claude API to explain what happened and suggest a fix.

Architecture

AWS Cost Explorer API → Prometheus (via exporter) → Cost Spike Detector
                                                            ↓
                                              Claude API (analysis)
                                                            ↓
                                              Slack alert with explanation

Setup

bash
pip install anthropic boto3 prometheus-api-client slack-sdk python-dotenv
bash
# .env
ANTHROPIC_API_KEY=sk-ant-...
AWS_ACCESS_KEY_ID=...
AWS_SECRET_ACCESS_KEY=...
AWS_REGION=us-east-1
SLACK_WEBHOOK_URL=https://hooks.slack.com/services/...
PROMETHEUS_URL=http://prometheus.monitoring:9090

Step 1: Fetch Cost Data from AWS

python
import boto3
from datetime import datetime, timedelta
import json
 
ce_client = boto3.client('ce', region_name='us-east-1')
 
def get_daily_costs(days_back: int = 30) -> list[dict]:
    """Get daily cost breakdown by service for the past N days."""
    end = datetime.now().date()
    start = (datetime.now() - timedelta(days=days_back)).date()
    
    response = ce_client.get_cost_and_usage(
        TimePeriod={
            'Start': str(start),
            'End': str(end)
        },
        Granularity='DAILY',
        Metrics=['UnblendedCost'],
        GroupBy=[
            {'Type': 'DIMENSION', 'Key': 'SERVICE'},
        ]
    )
    
    daily_costs = []
    for result in response['ResultsByTime']:
        date = result['TimePeriod']['Start']
        total = 0
        services = {}
        
        for group in result['Groups']:
            service = group['Keys'][0]
            amount = float(group['Metrics']['UnblendedCost']['Amount'])
            services[service] = amount
            total += amount
        
        daily_costs.append({
            'date': date,
            'total': round(total, 2),
            'services': services
        })
    
    return daily_costs
 
 
def detect_spikes(costs: list[dict], threshold_pct: float = 30.0) -> list[dict]:
    """Detect days where cost increased more than threshold_pct vs previous day."""
    spikes = []
    
    for i in range(1, len(costs)):
        prev = costs[i-1]
        curr = costs[i]
        
        if prev['total'] == 0:
            continue
        
        pct_change = ((curr['total'] - prev['total']) / prev['total']) * 100
        
        if pct_change >= threshold_pct:
            # Find which services caused the spike
            service_deltas = {}
            for service, amount in curr['services'].items():
                prev_amount = prev['services'].get(service, 0)
                delta = amount - prev_amount
                if delta > 1.0:  # only show services with >$1 increase
                    service_deltas[service] = {
                        'prev': round(prev_amount, 2),
                        'curr': round(amount, 2),
                        'delta': round(delta, 2),
                        'pct_change': round(((amount - prev_amount) / max(prev_amount, 0.01)) * 100, 1)
                    }
            
            spikes.append({
                'date': curr['date'],
                'prev_cost': prev['total'],
                'curr_cost': curr['total'],
                'pct_increase': round(pct_change, 1),
                'absolute_increase': round(curr['total'] - prev['total'], 2),
                'service_deltas': dict(sorted(service_deltas.items(), 
                                               key=lambda x: x[1]['delta'], 
                                               reverse=True)[:5])
            })
    
    return spikes

Step 2: AI Analysis with Claude

python
from anthropic import Anthropic
import re
 
client = Anthropic()
 
ANALYSIS_PROMPT = """You are a FinOps expert analyzing AWS cloud cost spikes.
 
I have detected an unusual cost increase in our AWS account. Analyze this data and provide:
 
1. ROOT CAUSE: What most likely caused this spike?
2. SEVERITY: How urgent is this? (P1-Critical, P2-High, P3-Medium, P4-Low)  
3. RECOMMENDATIONS: 3-5 specific actions to investigate and potentially fix this
4. PREVENTION: How to prevent this from happening again
 
Cost Data:
{cost_data}
 
Additional Context:
- Company: DevOps startup
- Environment: Production + Development
- Typical daily spend: ${typical_spend}/day
 
Respond in a clear, actionable format. Be specific about which AWS services and features to check.
Include specific AWS console navigation paths or CLI commands where helpful."""
 
 
def analyze_spike_with_ai(spike: dict, typical_daily_spend: float = 100.0) -> dict:
    """Use Claude to analyze a cost spike and generate recommendations."""
    
    cost_data = json.dumps({
        'date': spike['date'],
        'previous_day_cost': f"${spike['prev_cost']:.2f}",
        'spike_cost': f"${spike['curr_cost']:.2f}",
        'increase': f"${spike['absolute_increase']:.2f} ({spike['pct_increase']}% increase)",
        'top_contributors': {
            service: f"+${data['delta']:.2f} ({data['pct_change']}% change, from ${data['prev']:.2f} to ${data['curr']:.2f})"
            for service, data in spike['service_deltas'].items()
        }
    }, indent=2)
    
    prompt = ANALYSIS_PROMPT.format(
        cost_data=cost_data,
        typical_spend=typical_daily_spend
    )
    
    response = client.messages.create(
        model="claude-opus-4-8",
        max_tokens=1500,
        messages=[{"role": "user", "content": prompt}]
    )
    
    analysis_text = response.content[0].text
    
    # Extract severity
    severity_match = re.search(r'P[1-4][-\s]?(Critical|High|Medium|Low)', analysis_text)
    severity = severity_match.group(0) if severity_match else "P3-Medium"
    
    return {
        'analysis': analysis_text,
        'severity': severity,
        'input_tokens': response.usage.input_tokens,
        'output_tokens': response.usage.output_tokens,
    }

Step 3: Slack Notification

python
import requests
 
def send_slack_alert(spike: dict, analysis: dict, webhook_url: str):
    """Send formatted Slack alert with cost spike analysis."""
    
    severity = analysis['severity']
    severity_emoji = {
        'P1': '🔴', 'P2': '🟠', 'P3': '🟡', 'P4': '🟢'
    }.get(severity[:2], '🟡')
    
    # Top service contributors
    top_services = "\n".join([
        f"  • *{service}*: +${data['delta']:.2f} ({data['pct_change']}% ↑)"
        for service, data in list(spike['service_deltas'].items())[:3]
    ])
    
    # Trim analysis to fit Slack
    short_analysis = analysis['analysis'][:1200] + "..." if len(analysis['analysis']) > 1200 else analysis['analysis']
    
    message = {
        "blocks": [
            {
                "type": "header",
                "text": {
                    "type": "plain_text",
                    "text": f"{severity_emoji} AWS Cost Spike Detected — {spike['date']}"
                }
            },
            {
                "type": "section",
                "fields": [
                    {"type": "mrkdwn", "text": f"*Severity:*\n{severity}"},
                    {"type": "mrkdwn", "text": f"*Increase:*\n${spike['absolute_increase']:.2f} (+{spike['pct_increase']}%)"},
                    {"type": "mrkdwn", "text": f"*Previous Day:*\n${spike['prev_cost']:.2f}"},
                    {"type": "mrkdwn", "text": f"*Spike Day:*\n${spike['curr_cost']:.2f}"},
                ]
            },
            {
                "type": "section",
                "text": {"type": "mrkdwn", "text": f"*Top Cost Contributors:*\n{top_services}"}
            },
            {
                "type": "section",
                "text": {"type": "mrkdwn", "text": f"*AI Analysis:*\n{short_analysis}"}
            }
        ]
    }
    
    response = requests.post(webhook_url, json=message)
    return response.status_code == 200

Step 4: Main Runner

python
import os
from dotenv import load_dotenv
 
load_dotenv()
 
def run_cost_spike_detector():
    """Main function to detect and alert on cost spikes."""
    print("Fetching AWS cost data...")
    costs = get_daily_costs(days_back=14)
    
    if len(costs) < 2:
        print("Not enough data")
        return
    
    # Calculate typical spend (7-day average excluding last 2 days)
    recent_costs = costs[-9:-2]
    typical_spend = sum(c['total'] for c in recent_costs) / len(recent_costs) if recent_costs else 100
    
    print(f"Typical daily spend: ${typical_spend:.2f}")
    
    # Detect spikes in last 7 days
    recent = costs[-8:]
    spikes = detect_spikes(recent, threshold_pct=25.0)
    
    if not spikes:
        print("No significant cost spikes detected")
        return
    
    print(f"Found {len(spikes)} cost spike(s)")
    
    webhook_url = os.environ.get("SLACK_WEBHOOK_URL")
    
    for spike in spikes:
        print(f"\nAnalyzing spike on {spike['date']} (+{spike['pct_increase']}%)...")
        
        analysis = analyze_spike_with_ai(spike, typical_spend)
        
        print(f"Severity: {analysis['severity']}")
        
        if webhook_url:
            sent = send_slack_alert(spike, analysis, webhook_url)
            print(f"Slack alert sent: {sent}")
        else:
            print("\nAI Analysis:")
            print(analysis['analysis'])
 
 
if __name__ == "__main__":
    run_cost_spike_detector()

Kubernetes CronJob to Run Daily

yaml
apiVersion: batch/v1
kind: CronJob
metadata:
  name: cost-spike-detector
  namespace: finops
spec:
  schedule: "0 9 * * *"  # Run at 9 AM daily
  jobTemplate:
    spec:
      template:
        spec:
          containers:
            - name: detector
              image: your-org/cost-spike-detector:latest
              envFrom:
                - secretRef:
                    name: cost-detector-secrets
          restartPolicy: OnFailure

Example Output

Found 1 cost spike(s)

Analyzing spike on 2026-06-20 (+47.3%)...

ROOT CAUSE: The spike is primarily driven by EC2 ($234 increase) and 
Data Transfer ($89 increase). The EC2 spike suggests either:
1. A new instance was launched and not yet tagged (check EC2 console for 
   instances launched on June 20)
2. A development environment was accidentally left running over the weekend
3. Auto Scaling scaled out but didn't scale back in

SEVERITY: P2-High

RECOMMENDATIONS:
1. Run: aws ec2 describe-instances --filters "Name=launch-time,Values=2026-06-20*" 
   to find instances launched that day
2. Check EC2 Cost Allocation tags — look for untagged or dev instances in production
3. Review Auto Scaling activity log for scale-out events
4. Check for any scheduled ECS tasks or Lambda invocations that ran unexpectedly
5. Review Data Transfer costs: high transfer often correlates with cross-AZ traffic 
   from new instances in a different AZ

PREVENTION:
- Set AWS Budgets alert at 120% of daily average
- Enforce tagging with AWS Config rule: required-tags
- Use Instance Scheduler to stop dev environments outside work hours

Cost spike detection + AI analysis takes the "what happened?" question out of your hands. For teams spending >$1000/month on AWS, the time savings on a single incident investigation pays for weeks of Claude API costs.

Resources: AWS Cost Explorer API | Anthropic API | AWS Budgets

🔧

Today I Fixed

Short real fixes from production — posted daily

Browse fixes
Newsletter

Stay ahead of the curve

Get the latest DevOps, Kubernetes, AWS, and AI/ML guides delivered straight to your inbox. No spam — just practical engineering content.

Related Articles

Comments