🎉 DevOps Interview Prep Bundle is live — 1000+ Q&A across 20 topicsGet it →
All Articles

Build an AI-Powered AWS Cost Anomaly Detector

AWS Cost Anomaly Detection catches spikes but gives no context. Build a system that detects anomalies, uses Claude to explain what caused them, and posts actionable Slack alerts with a fix recommendation.

DevOpsBoysMay 16, 20265 min read
Share:Tweet

AWS Cost Anomaly Detection tells you "your EC2 costs spiked by $1,200." It doesn't tell you which instances, which team, why it happened, or what to do about it. Here's how to build that context automatically.


Architecture

AWS Cost Explorer API (daily costs per service/tag)
        ↓
Python script: detect anomalies (z-score / % change)
        ↓ (if anomaly found)
Claude API: analyze cost data + explain the spike
        ↓
Slack alert with:
  - Which service/account spiked
  - Likely cause (scaling event, new resource, config change)
  - Estimated monthly impact
  - Recommended investigation steps

Setup

bash
pip install anthropic boto3 requests numpy
export ANTHROPIC_API_KEY=sk-ant-your-key
export SLACK_WEBHOOK_URL=https://hooks.slack.com/services/...

IAM permissions needed:

json
{
  "Version": "2012-10-17",
  "Statement": [{
    "Effect": "Allow",
    "Action": [
      "ce:GetCostAndUsage",
      "ce:GetDimensionValues",
      "ce:GetAnomalies",
      "ce:GetCostCategories"
    ],
    "Resource": "*"
  }]
}

The Cost Anomaly Detector

python
# cost_anomaly_detector.py
import boto3
import anthropic
import requests
import os
import json
import numpy as np
from datetime import datetime, timedelta
from typing import Optional
 
def get_daily_costs(days: int = 30) -> dict:
    """Fetch daily cost breakdown by service from Cost Explorer"""
    ce = boto3.client('ce', region_name='us-east-1')
    
    end_date = datetime.now().strftime('%Y-%m-%d')
    start_date = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d')
    
    response = ce.get_cost_and_usage(
        TimePeriod={'Start': start_date, 'End': end_date},
        Granularity='DAILY',
        Metrics=['UnblendedCost'],
        GroupBy=[
            {'Type': 'DIMENSION', 'Key': 'SERVICE'},
        ]
    )
    
    # Parse into {service: [daily_costs]}
    costs_by_service = {}
    
    for result in response['ResultsByTime']:
        date = result['TimePeriod']['Start']
        for group in result['Groups']:
            service = group['Keys'][0]
            cost = float(group['Metrics']['UnblendedCost']['Amount'])
            
            if service not in costs_by_service:
                costs_by_service[service] = []
            costs_by_service[service].append({'date': date, 'cost': cost})
    
    return costs_by_service
 
 
def detect_anomalies(costs_by_service: dict, threshold_multiplier: float = 2.5) -> list:
    """Find services with unusual cost spikes using z-score"""
    anomalies = []
    
    for service, daily_costs in costs_by_service.items():
        if len(daily_costs) < 7:
            continue
        
        costs = [d['cost'] for d in daily_costs]
        yesterday_cost = costs[-1]
        baseline_costs = costs[-8:-1]  # Previous 7 days
        
        if sum(baseline_costs) == 0:
            continue
        
        mean = np.mean(baseline_costs)
        std = np.std(baseline_costs)
        
        if std == 0:
            continue
        
        z_score = (yesterday_cost - mean) / std
        change_pct = ((yesterday_cost - mean) / mean) * 100
        
        # Anomaly if z-score > threshold AND change > 20%
        if z_score > threshold_multiplier and change_pct > 20:
            anomalies.append({
                'service': service,
                'yesterday_cost': yesterday_cost,
                'baseline_mean': mean,
                'change_pct': change_pct,
                'z_score': z_score,
                'history': daily_costs[-14:],  # Last 2 weeks
                'estimated_monthly_impact': (yesterday_cost - mean) * 30
            })
    
    # Sort by impact
    anomalies.sort(key=lambda x: x['estimated_monthly_impact'], reverse=True)
    return anomalies
 
 
def analyze_anomaly_with_ai(anomaly: dict, account_id: str) -> str:
    """Use Claude to explain the cost anomaly"""
    client = anthropic.Anthropic(api_key=os.environ["ANTHROPIC_API_KEY"])
    
    history_summary = "\n".join([
        f"  {d['date']}: ${d['cost']:.2f}"
        for d in anomaly['history']
    ])
    
    message = client.messages.create(
        model="claude-sonnet-4-6",
        max_tokens=800,
        messages=[{
            "role": "user",
            "content": f"""You are an AWS FinOps specialist analyzing a cost anomaly.
 
Service: {anomaly['service']}
AWS Account: {account_id}
Yesterday's cost: ${anomaly['yesterday_cost']:.2f}
14-day baseline average: ${anomaly['baseline_mean']:.2f}
Change: +{anomaly['change_pct']:.1f}%
Estimated monthly impact: +${anomaly['estimated_monthly_impact']:.0f}
 
Cost history (last 2 weeks):
{history_summary}
 
Based on this data:
1. What is the most likely cause of this spike? (Be specific to the AWS service)
2. What specific AWS resources or configurations should be investigated first?
3. What's the fastest way to confirm the root cause? (Specific AWS console/CLI commands)
4. Is this likely a one-time spike or ongoing increase?
 
Format for Slack with *bold* for key items. Keep it under 200 words total."""
        }]
    )
    
    return message.content[0].text
 
 
def send_slack_alert(anomalies: list, account_id: str):
    """Send formatted Slack alert with AI analysis"""
    webhook_url = os.environ["SLACK_WEBHOOK_URL"]
    date_str = datetime.now().strftime('%Y-%m-%d')
    
    blocks = [
        {
            "type": "header",
            "text": {
                "type": "plain_text",
                "text": f"💸 AWS Cost Anomalies Detected — {date_str}"
            }
        },
        {
            "type": "section",
            "text": {
                "type": "mrkdwn",
                "text": f"*Account:* `{account_id}`\n*Anomalies found:* {len(anomalies)}\n*Total estimated monthly impact:* +${sum(a['estimated_monthly_impact'] for a in anomalies):,.0f}"
            }
        },
        {"type": "divider"}
    ]
    
    for i, anomaly in enumerate(anomalies[:3]):  # Top 3 anomalies
        ai_analysis = analyze_anomaly_with_ai(anomaly, account_id)
        
        blocks.extend([
            {
                "type": "section",
                "fields": [
                    {"type": "mrkdwn", "text": f"*Service:*\n{anomaly['service']}"},
                    {"type": "mrkdwn", "text": f"*Yesterday:*\n${anomaly['yesterday_cost']:.2f}"},
                    {"type": "mrkdwn", "text": f"*Baseline avg:*\n${anomaly['baseline_mean']:.2f}"},
                    {"type": "mrkdwn", "text": f"*Change:*\n+{anomaly['change_pct']:.0f}%  (~+${anomaly['estimated_monthly_impact']:,.0f}/mo)"},
                ]
            },
            {
                "type": "section",
                "text": {"type": "mrkdwn", "text": f"*AI Analysis:*\n{ai_analysis}"}
            },
            {"type": "divider"}
        ])
    
    response = requests.post(webhook_url, json={"blocks": blocks})
    print(f"Slack alert sent: {response.status_code}")
 
 
def main():
    account_id = boto3.client('sts').get_caller_identity()['Account']
    print(f"Analyzing costs for account: {account_id}")
    
    print("Fetching 30 days of cost data...")
    costs_by_service = get_daily_costs(days=30)
    
    print(f"Analyzing {len(costs_by_service)} services for anomalies...")
    anomalies = detect_anomalies(costs_by_service)
    
    if not anomalies:
        print("No anomalies detected. Costs look normal.")
        return
    
    print(f"Found {len(anomalies)} anomalies!")
    for a in anomalies:
        print(f"  {a['service']}: +{a['change_pct']:.0f}% (${a['yesterday_cost']:.2f})")
    
    send_slack_alert(anomalies, account_id)
 
 
if __name__ == "__main__":
    main()

Schedule It Daily (Lambda + EventBridge)

yaml
# serverless.yml or CDK stack
AWSTemplateFormatVersion: '2010-09-09'
 
Resources:
  CostAnomalyDetector:
    Type: AWS::Lambda::Function
    Properties:
      FunctionName: cost-anomaly-detector
      Runtime: python3.12
      Handler: cost_anomaly_detector.main
      Timeout: 120
      Environment:
        Variables:
          SLACK_WEBHOOK_URL: !Sub "{{resolve:ssm:/alerts/slack-webhook}}"
          ANTHROPIC_API_KEY: !Sub "{{resolve:ssm:/ai/anthropic-key}}"
 
  DailyTrigger:
    Type: AWS::Events::Rule
    Properties:
      ScheduleExpression: "cron(0 9 * * ? *)"   # 9am UTC daily
      Targets:
        - Arn: !GetAtt CostAnomalyDetector.Arn
          Id: DailyCostCheck

Example Slack Output

💸 AWS Cost Anomalies Detected — 2026-05-16

Account: 123456789012
Anomalies found: 2
Total estimated monthly impact: +$4,200

Service: Amazon EC2 - Other
Yesterday: $287.40 | Baseline avg: $42.10 | Change: +582% (~+$3,760/mo)

AI Analysis:
*Likely cause: New EC2 instances launched without lifecycle management.*
This spike pattern (flat baseline → sudden jump → sustained) typically 
indicates new instances started without auto-shutdown policy.

*Investigate first:*
- `aws ec2 describe-instances --filters "Name=launch-time,Values=2026-05-15*"`
- Check for instances without termination protection in non-production accounts
- Look for instances in us-east-1 launched by developers directly (not Terraform)

*Likely ongoing* — instances still running. Check immediately.

This system pays for itself with the first alert it catches. A single undetected dev EC2 instance left running can cost $200–500/month — far more than a few Claude API calls per day.

For FinOps and AWS cost optimization hands-on labs, KodeKloud has cloud cost management courses covering Cost Explorer, budgets, and Karpenter for Kubernetes cost savings.

Newsletter

Stay ahead of the curve

Get the latest DevOps, Kubernetes, AWS, and AI/ML guides delivered straight to your inbox. No spam — just practical engineering content.

Related Articles

Comments