🎉 DevOps Interview Prep Bundle is live — 1000+ Q&A across 20 topicsGet it →
All Articles

Build an AI Cloud Cost Anomaly Detector with Python

Cloud costs spike without warning. Build a Python bot that pulls AWS cost data daily, detects anomalies using statistical analysis, and explains the spike using Claude AI.

DevOpsBoysJun 6, 20265 min read
Share:Tweet

AWS sends you a bill at the end of the month. By then it's too late. This bot detects cost spikes within 24 hours and tells you exactly what caused them.


What We're Building

Daily cron → pull AWS Cost Explorer data → detect anomalies → Claude explains → Slack alert:

"🚨 COST SPIKE DETECTED — June 6, 2026

Service: Amazon EC2
Yesterday: $45.20
7-day average: $12.30
Spike: +267%

Claude Analysis:
This spike likely indicates EC2 instances were not stopped after
a load test. 3 x m5.2xlarge instances running ~18 hours at
$0.384/hr each = ~$20.70 unexpected cost. Check Auto Scaling
groups and instance scheduler in us-east-1."

Setup

bash
pip install anthropic boto3 slack-sdk pandas scipy python-dotenv

AWS permissions needed:

json
{
  "Version": "2012-10-17",
  "Statement": [{
    "Effect": "Allow",
    "Action": [
      "ce:GetCostAndUsage",
      "ce:GetDimensionValues"
    ],
    "Resource": "*"
  }]
}

Step 1: Pull Cost Data from AWS

python
# cost_fetcher.py
import boto3
from datetime import datetime, timedelta
import pandas as pd
 
ce = boto3.client('ce', region_name='us-east-1')
 
def get_daily_costs_by_service(days: int = 30) -> pd.DataFrame:
    """Get daily costs per service for the last N days."""
    
    end_date = datetime.today().strftime('%Y-%m-%d')
    start_date = (datetime.today() - timedelta(days=days)).strftime('%Y-%m-%d')
    
    response = ce.get_cost_and_usage(
        TimePeriod={'Start': start_date, 'End': end_date},
        Granularity='DAILY',
        Metrics=['UnblendedCost'],
        GroupBy=[{'Type': 'DIMENSION', 'Key': 'SERVICE'}]
    )
    
    rows = []
    for result in response['ResultsByTime']:
        date = result['TimePeriod']['Start']
        for group in result['Groups']:
            service = group['Keys'][0]
            cost = float(group['Metrics']['UnblendedCost']['Amount'])
            rows.append({'date': date, 'service': service, 'cost': cost})
    
    df = pd.DataFrame(rows)
    df['date'] = pd.to_datetime(df['date'])
    return df
 
def get_total_daily_costs(days: int = 30) -> pd.DataFrame:
    """Get total daily costs (all services combined)."""
    
    end_date = datetime.today().strftime('%Y-%m-%d')
    start_date = (datetime.today() - timedelta(days=days)).strftime('%Y-%m-%d')
    
    response = ce.get_cost_and_usage(
        TimePeriod={'Start': start_date, 'End': end_date},
        Granularity='DAILY',
        Metrics=['UnblendedCost']
    )
    
    rows = []
    for result in response['ResultsByTime']:
        date = result['TimePeriod']['Start']
        cost = float(result['Total']['UnblendedCost']['Amount'])
        rows.append({'date': date, 'cost': cost})
    
    return pd.DataFrame(rows)

Step 2: Anomaly Detection

python
# anomaly_detector.py
import pandas as pd
import numpy as np
from scipy import stats
 
def detect_anomalies(df: pd.DataFrame, 
                     lookback_days: int = 7,
                     z_threshold: float = 2.0) -> list[dict]:
    """
    Detect cost anomalies using Z-score against rolling average.
    Returns list of anomalous services with spike details.
    """
    anomalies = []
    
    # Get yesterday's costs
    yesterday = df['date'].max()
    
    for service in df['service'].unique():
        service_df = df[df['service'] == service].sort_values('date')
        
        if len(service_df) < lookback_days + 1:
            continue
        
        # Yesterday's cost
        yesterday_row = service_df[service_df['date'] == yesterday]
        if yesterday_row.empty:
            continue
        
        yesterday_cost = yesterday_row['cost'].iloc[0]
        
        # Skip tiny costs (noise)
        if yesterday_cost < 1.0:
            continue
        
        # Rolling average (exclude yesterday)
        historical = service_df[service_df['date'] < yesterday].tail(lookback_days)
        avg_cost = historical['cost'].mean()
        std_cost = historical['cost'].std()
        
        if std_cost == 0 or avg_cost == 0:
            continue
        
        # Z-score
        z_score = (yesterday_cost - avg_cost) / std_cost
        pct_change = ((yesterday_cost - avg_cost) / avg_cost) * 100
        
        if z_score > z_threshold and pct_change > 50:
            anomalies.append({
                'service': service,
                'yesterday_cost': round(yesterday_cost, 2),
                'avg_cost': round(avg_cost, 2),
                'pct_change': round(pct_change, 1),
                'z_score': round(z_score, 2),
                'historical_costs': historical['cost'].tolist()
            })
    
    # Sort by severity
    return sorted(anomalies, key=lambda x: x['pct_change'], reverse=True)

Step 3: Claude AI Analysis

python
# analyzer.py
import anthropic
import json
import os
 
client = anthropic.Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY"))
 
def analyze_cost_spike(anomaly: dict, account_context: str = "") -> str:
    """Use Claude to explain likely cause of cost spike."""
    
    prompt = f"""You are an AWS FinOps expert analyzing a cloud cost anomaly.
 
Service: {anomaly['service']}
Yesterday's cost: ${anomaly['yesterday_cost']}
7-day average cost: ${anomaly['avg_cost']}
Increase: +{anomaly['pct_change']}%
Z-score: {anomaly['z_score']}
Historical daily costs (last 7 days): {anomaly['historical_costs']}
 
{f"Account context: {account_context}" if account_context else ""}
 
Based on this spike pattern for {anomaly['service']}, provide:
1. Most likely cause (1-2 sentences — be specific about what AWS resources/actions cause this)
2. Where to look in AWS Console to investigate
3. Recommended action (stop, review, or accept?)
 
Keep response under 150 words. Be direct and actionable."""
 
    response = client.messages.create(
        model="claude-haiku-4-5-20251001",
        max_tokens=300,
        messages=[{"role": "user", "content": prompt}]
    )
    
    return response.content[0].text

Step 4: Slack Alert

python
# notifier.py
from slack_sdk import WebClient
import os
 
slack = WebClient(token=os.getenv("SLACK_BOT_TOKEN"))
 
def send_cost_alert(anomalies: list[dict], analyses: list[str], date: str):
    if not anomalies:
        return
    
    blocks = [{
        "type": "header",
        "text": {"type": "plain_text", "text": f"🚨 AWS Cost Anomaly — {date}"}
    }]
    
    for anomaly, analysis in zip(anomalies[:5], analyses[:5]):  # Top 5 only
        blocks.append({
            "type": "section",
            "text": {
                "type": "mrkdwn",
                "text": (
                    f"*{anomaly['service']}*\n"
                    f"Yesterday: `${anomaly['yesterday_cost']}` | "
                    f"7-day avg: `${anomaly['avg_cost']}` | "
                    f"Spike: `+{anomaly['pct_change']}%`\n\n"
                    f"{analysis}"
                )
            }
        })
        blocks.append({"type": "divider"})
    
    slack.chat_postMessage(
        channel=os.getenv("SLACK_CHANNEL", "#cost-alerts"),
        blocks=blocks
    )

Step 5: Main Runner

python
# main.py
from datetime import datetime
from dotenv import load_dotenv
from cost_fetcher import get_daily_costs_by_service
from anomaly_detector import detect_anomalies
from analyzer import analyze_cost_spike
from notifier import send_cost_alert
 
load_dotenv()
 
def main():
    print("Fetching AWS cost data...")
    df = get_daily_costs_by_service(days=30)
    
    print("Detecting anomalies...")
    anomalies = detect_anomalies(df, lookback_days=7, z_threshold=2.0)
    
    if not anomalies:
        print("✅ No cost anomalies detected")
        return
    
    print(f"⚠️  Found {len(anomalies)} anomalies — analyzing with Claude...")
    analyses = [analyze_cost_spike(a) for a in anomalies]
    
    date_str = datetime.today().strftime('%Y-%m-%d')
    send_cost_alert(anomalies, analyses, date_str)
    
    print(f"Alert sent for: {[a['service'] for a in anomalies]}")
 
if __name__ == "__main__":
    main()

Deploy as GitHub Actions Cron

yaml
name: Cost Anomaly Detection
on:
  schedule:
    - cron: '0 9 * * *'   # 9 AM daily
  workflow_dispatch:
 
jobs:
  detect:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - uses: aws-actions/configure-aws-credentials@v4
        with:
          role-to-assume: ${{ secrets.AWS_COST_READER_ROLE }}
          aws-region: us-east-1
      - run: pip install anthropic boto3 slack-sdk pandas scipy python-dotenv
      - run: python main.py
        env:
          ANTHROPIC_API_KEY: ${{ secrets.ANTHROPIC_API_KEY }}
          SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}

Catch cost spikes in 24 hours instead of month-end surprise bills. The Z-score threshold of 2.0 catches real spikes while ignoring normal day-to-day variation.

Get your Anthropic API key to power the analysis. Cost Explorer API calls are free up to 1,000/month — this bot uses 1 call per day.

🔧

Today I Fixed

Short real fixes from production — posted daily

Browse fixes
Newsletter

Stay ahead of the curve

Get the latest DevOps, Kubernetes, AWS, and AI/ML guides delivered straight to your inbox. No spam — just practical engineering content.

Related Articles

Comments