🎉 DevOps Interview Prep Bundle is live — 1000+ Q&A across 20 topicsGet it →
All Articles

Build a Terraform Drift Detection Bot with Claude

Infrastructure drift — when actual cloud state diverges from Terraform state — causes silent failures. Build a bot that detects drift daily and explains what changed using Claude AI.

DevOpsBoysJun 4, 20265 min read
Share:Tweet

Terraform drift happens when someone manually changes infrastructure outside of Terraform — adds a security group rule, resizes an instance, modifies a tag. Your next terraform apply might undo critical changes, or your state becomes wrong.

This bot runs terraform plan daily, detects drift, and uses Claude to explain what drifted and why it matters.


What We're Building

Daily cron → terraform plan → parse output → Claude analysis → Slack alert:

"🔄 DRIFT DETECTED in production/us-east-1

CHANGED (2):
  aws_security_group.api - ingress rule added: port 3306 from 0.0.0.0/0
  aws_instance.bastion - instance_type changed: t3.micro → t3.medium

⚠️ RISK: The open MySQL port (3306) is a critical security issue.
   Someone may have opened database access for debugging and forgot to close it.

ACTION: Run 'terraform apply' to revert OR update Terraform code if change was intentional."

Setup

bash
pip install anthropic boto3 slack-sdk python-dotenv
# Terraform must be installed and authenticated

Step 1: Run Terraform Plan and Capture Drift

python
# drift_detector.py
import subprocess
import json
import os
from pathlib import Path
 
def run_terraform_plan(working_dir: str) -> dict:
    """Run terraform plan -json and capture changes."""
    
    # Initialize if needed
    subprocess.run(
        ["terraform", "init", "-backend=true", "-no-color"],
        cwd=working_dir, capture_output=True, check=True
    )
    
    # Run plan with JSON output
    result = subprocess.run(
        ["terraform", "plan", "-json", "-no-color", "-detailed-exitcode"],
        cwd=working_dir,
        capture_output=True,
        text=True
    )
    
    # Exit codes: 0=no changes, 1=error, 2=changes present
    if result.returncode == 1:
        raise RuntimeError(f"Terraform plan failed: {result.stderr}")
    
    has_changes = result.returncode == 2
    
    # Parse JSON lines output
    changes = []
    for line in result.stdout.strip().split('\n'):
        if not line:
            continue
        try:
            event = json.loads(line)
            if event.get("@level") == "info" and event.get("type") == "planned_change":
                changes.append(event.get("change", {}))
        except json.JSONDecodeError:
            continue
    
    return {
        "has_drift": has_changes,
        "changes": changes,
        "raw_output": result.stdout
    }
 
def parse_changes(changes: list) -> dict:
    """Categorize changes by type."""
    categorized = {"create": [], "update": [], "delete": [], "replace": []}
    
    for change in changes:
        action = change.get("action", "")
        resource = change.get("resource", {})
        resource_addr = resource.get("addr", "unknown")
        
        # Get what actually changed
        before = change.get("before", {})
        after = change.get("after", {})
        
        changed_attrs = {}
        for key in set(list(before.keys()) + list(after.keys())):
            if before.get(key) != after.get(key):
                changed_attrs[key] = {
                    "before": before.get(key),
                    "after": after.get(key)
                }
        
        entry = {
            "resource": resource_addr,
            "type": resource.get("resource_type", ""),
            "changed_attributes": changed_attrs
        }
        
        if action in categorized:
            categorized[action].append(entry)
    
    return categorized

Step 2: Claude Analysis

python
# analyzer.py
import anthropic
import json
import os
 
client = anthropic.Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY"))
 
def analyze_drift(changes: dict, environment: str) -> dict:
    """Use Claude to explain drift and assess risk."""
    
    if not any(changes.values()):
        return {"has_drift": False}
    
    changes_summary = json.dumps(changes, indent=2)
    
    prompt = f"""You are a DevOps security and infrastructure expert analyzing Terraform drift.
 
Environment: {environment}
 
Detected infrastructure changes (diff between Terraform state and actual cloud):
{changes_summary}
 
Analyze this drift and provide:
1. SUMMARY: What changed in plain English (1-2 sentences)
2. RISK_LEVEL: critical/high/medium/low
3. RISK_EXPLANATION: Why this matters (security, cost, stability implications)
4. LIKELY_CAUSE: What probably caused this (manual change, auto-scaling, etc.)
5. RECOMMENDED_ACTION: Specific next steps
 
Focus on security issues (open ports, IAM changes, public access) — flag these as critical.
 
Return as JSON with keys: summary, risk_level, risk_explanation, likely_cause, recommended_action"""
 
    response = client.messages.create(
        model="claude-sonnet-4-6",
        max_tokens=800,
        messages=[{"role": "user", "content": prompt}]
    )
    
    try:
        analysis = json.loads(response.content[0].text)
    except:
        analysis = {
            "summary": response.content[0].text[:200],
            "risk_level": "medium",
            "risk_explanation": "Unable to parse detailed analysis",
            "recommended_action": "Review terraform plan output manually"
        }
    
    analysis["has_drift"] = True
    analysis["changes"] = changes
    return analysis

Step 3: Slack Notification

python
# notifier.py
from slack_sdk import WebClient
import os
 
slack = WebClient(token=os.getenv("SLACK_BOT_TOKEN"))
 
RISK_EMOJI = {
    "critical": "🚨",
    "high": "⚠️",
    "medium": "🔄",
    "low": "ℹ️"
}
 
def send_drift_alert(analysis: dict, environment: str, workspace: str):
    if not analysis.get("has_drift"):
        return
    
    risk = analysis.get("risk_level", "medium")
    emoji = RISK_EMOJI.get(risk, "🔄")
    
    changes = analysis.get("changes", {})
    change_lines = []
    
    for action, resources in changes.items():
        for r in resources:
            attrs = list(r.get("changed_attributes", {}).keys())[:3]
            attrs_str = ", ".join(attrs) if attrs else "configuration"
            change_lines.append(f"  `{r['resource']}` — {action}: {attrs_str}")
    
    blocks = [
        {
            "type": "header",
            "text": {"type": "plain_text", "text": f"{emoji} Terraform Drift — {environment}"}
        },
        {
            "type": "section",
            "text": {"type": "mrkdwn", "text": f"*Workspace:* `{workspace}`\n*Risk:* {risk.upper()}\n\n{analysis.get('summary', '')}"}
        },
    ]
    
    if change_lines:
        blocks.append({
            "type": "section",
            "text": {"type": "mrkdwn", "text": "*Changes detected:*\n" + "\n".join(change_lines[:10])}
        })
    
    if analysis.get("risk_explanation"):
        blocks.append({
            "type": "section",
            "text": {"type": "mrkdwn", "text": f"*Why it matters:*\n{analysis['risk_explanation']}"}
        })
    
    if analysis.get("recommended_action"):
        blocks.append({
            "type": "section",
            "text": {"type": "mrkdwn", "text": f"*Action:*\n{analysis['recommended_action']}"}
        })
    
    slack.chat_postMessage(
        channel=os.getenv("SLACK_CHANNEL", "#infra-alerts"),
        blocks=blocks
    )

Step 4: Main Runner

python
# main.py
import os
import sys
from dotenv import load_dotenv
from drift_detector import run_terraform_plan, parse_changes
from analyzer import analyze_drift
from notifier import send_drift_alert
 
load_dotenv()
 
TERRAFORM_WORKSPACES = [
    {"path": "/infra/production", "env": "production", "workspace": "prod"},
    {"path": "/infra/staging", "env": "staging", "workspace": "staging"},
]
 
def main():
    any_drift = False
    
    for ws in TERRAFORM_WORKSPACES:
        print(f"Checking {ws['env']}...")
        
        try:
            plan_result = run_terraform_plan(ws["path"])
            
            if not plan_result["has_drift"]:
                print(f"  ✅ No drift in {ws['env']}")
                continue
            
            any_drift = True
            changes = parse_changes(plan_result["changes"])
            analysis = analyze_drift(changes, ws["env"])
            send_drift_alert(analysis, ws["env"], ws["workspace"])
            print(f"  ⚠️  Drift detected in {ws['env']} — risk: {analysis.get('risk_level')}")
            
        except Exception as e:
            print(f"  ❌ Error checking {ws['env']}: {e}")
    
    if not any_drift:
        print("All workspaces in sync ✅")
 
if __name__ == "__main__":
    main()

Schedule as GitHub Actions Cron

yaml
# .github/workflows/drift-detection.yml
name: Terraform Drift Detection
 
on:
  schedule:
    - cron: '0 8 * * 1-5'   # 8 AM weekdays
  workflow_dispatch:          # Manual trigger
 
jobs:
  detect-drift:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      
      - uses: hashicorp/setup-terraform@v3
        with:
          terraform_version: "1.9.0"
      
      - uses: aws-actions/configure-aws-credentials@v4
        with:
          role-to-assume: ${{ secrets.AWS_DRIFT_DETECTION_ROLE }}
          aws-region: us-east-1
      
      - run: pip install anthropic slack-sdk boto3
      
      - name: Run drift detection
        env:
          ANTHROPIC_API_KEY: ${{ secrets.ANTHROPIC_API_KEY }}
          SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}
          SLACK_CHANNEL: "#infra-alerts"
        run: python main.py

Drift detection catches the silent changes that cause incidents weeks later. Running it daily means you know within 24 hours when something drifts — before it becomes a problem.

Get your Anthropic API key to build this. IAM role for drift detection needs terraform plan permissions only — read-only, no apply.

🔧

Today I Fixed

Short real fixes from production — posted daily

Browse fixes
Newsletter

Stay ahead of the curve

Get the latest DevOps, Kubernetes, AWS, and AI/ML guides delivered straight to your inbox. No spam — just practical engineering content.

Related Articles

Comments