Build a Terraform Drift Detection Bot with Claude
Infrastructure drift — when actual cloud state diverges from Terraform state — causes silent failures. Build a bot that detects drift daily and explains what changed using Claude AI.
Terraform drift happens when someone manually changes infrastructure outside of Terraform — adds a security group rule, resizes an instance, modifies a tag. Your next terraform apply might undo critical changes, or your state becomes wrong.
This bot runs terraform plan daily, detects drift, and uses Claude to explain what drifted and why it matters.
What We're Building
Daily cron → terraform plan → parse output → Claude analysis → Slack alert:
"🔄 DRIFT DETECTED in production/us-east-1
CHANGED (2):
aws_security_group.api - ingress rule added: port 3306 from 0.0.0.0/0
aws_instance.bastion - instance_type changed: t3.micro → t3.medium
⚠️ RISK: The open MySQL port (3306) is a critical security issue.
Someone may have opened database access for debugging and forgot to close it.
ACTION: Run 'terraform apply' to revert OR update Terraform code if change was intentional."
Setup
pip install anthropic boto3 slack-sdk python-dotenv
# Terraform must be installed and authenticatedStep 1: Run Terraform Plan and Capture Drift
# drift_detector.py
import subprocess
import json
import os
from pathlib import Path
def run_terraform_plan(working_dir: str) -> dict:
"""Run terraform plan -json and capture changes."""
# Initialize if needed
subprocess.run(
["terraform", "init", "-backend=true", "-no-color"],
cwd=working_dir, capture_output=True, check=True
)
# Run plan with JSON output
result = subprocess.run(
["terraform", "plan", "-json", "-no-color", "-detailed-exitcode"],
cwd=working_dir,
capture_output=True,
text=True
)
# Exit codes: 0=no changes, 1=error, 2=changes present
if result.returncode == 1:
raise RuntimeError(f"Terraform plan failed: {result.stderr}")
has_changes = result.returncode == 2
# Parse JSON lines output
changes = []
for line in result.stdout.strip().split('\n'):
if not line:
continue
try:
event = json.loads(line)
if event.get("@level") == "info" and event.get("type") == "planned_change":
changes.append(event.get("change", {}))
except json.JSONDecodeError:
continue
return {
"has_drift": has_changes,
"changes": changes,
"raw_output": result.stdout
}
def parse_changes(changes: list) -> dict:
"""Categorize changes by type."""
categorized = {"create": [], "update": [], "delete": [], "replace": []}
for change in changes:
action = change.get("action", "")
resource = change.get("resource", {})
resource_addr = resource.get("addr", "unknown")
# Get what actually changed
before = change.get("before", {})
after = change.get("after", {})
changed_attrs = {}
for key in set(list(before.keys()) + list(after.keys())):
if before.get(key) != after.get(key):
changed_attrs[key] = {
"before": before.get(key),
"after": after.get(key)
}
entry = {
"resource": resource_addr,
"type": resource.get("resource_type", ""),
"changed_attributes": changed_attrs
}
if action in categorized:
categorized[action].append(entry)
return categorizedStep 2: Claude Analysis
# analyzer.py
import anthropic
import json
import os
client = anthropic.Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY"))
def analyze_drift(changes: dict, environment: str) -> dict:
"""Use Claude to explain drift and assess risk."""
if not any(changes.values()):
return {"has_drift": False}
changes_summary = json.dumps(changes, indent=2)
prompt = f"""You are a DevOps security and infrastructure expert analyzing Terraform drift.
Environment: {environment}
Detected infrastructure changes (diff between Terraform state and actual cloud):
{changes_summary}
Analyze this drift and provide:
1. SUMMARY: What changed in plain English (1-2 sentences)
2. RISK_LEVEL: critical/high/medium/low
3. RISK_EXPLANATION: Why this matters (security, cost, stability implications)
4. LIKELY_CAUSE: What probably caused this (manual change, auto-scaling, etc.)
5. RECOMMENDED_ACTION: Specific next steps
Focus on security issues (open ports, IAM changes, public access) — flag these as critical.
Return as JSON with keys: summary, risk_level, risk_explanation, likely_cause, recommended_action"""
response = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=800,
messages=[{"role": "user", "content": prompt}]
)
try:
analysis = json.loads(response.content[0].text)
except:
analysis = {
"summary": response.content[0].text[:200],
"risk_level": "medium",
"risk_explanation": "Unable to parse detailed analysis",
"recommended_action": "Review terraform plan output manually"
}
analysis["has_drift"] = True
analysis["changes"] = changes
return analysisStep 3: Slack Notification
# notifier.py
from slack_sdk import WebClient
import os
slack = WebClient(token=os.getenv("SLACK_BOT_TOKEN"))
RISK_EMOJI = {
"critical": "🚨",
"high": "⚠️",
"medium": "🔄",
"low": "ℹ️"
}
def send_drift_alert(analysis: dict, environment: str, workspace: str):
if not analysis.get("has_drift"):
return
risk = analysis.get("risk_level", "medium")
emoji = RISK_EMOJI.get(risk, "🔄")
changes = analysis.get("changes", {})
change_lines = []
for action, resources in changes.items():
for r in resources:
attrs = list(r.get("changed_attributes", {}).keys())[:3]
attrs_str = ", ".join(attrs) if attrs else "configuration"
change_lines.append(f" `{r['resource']}` — {action}: {attrs_str}")
blocks = [
{
"type": "header",
"text": {"type": "plain_text", "text": f"{emoji} Terraform Drift — {environment}"}
},
{
"type": "section",
"text": {"type": "mrkdwn", "text": f"*Workspace:* `{workspace}`\n*Risk:* {risk.upper()}\n\n{analysis.get('summary', '')}"}
},
]
if change_lines:
blocks.append({
"type": "section",
"text": {"type": "mrkdwn", "text": "*Changes detected:*\n" + "\n".join(change_lines[:10])}
})
if analysis.get("risk_explanation"):
blocks.append({
"type": "section",
"text": {"type": "mrkdwn", "text": f"*Why it matters:*\n{analysis['risk_explanation']}"}
})
if analysis.get("recommended_action"):
blocks.append({
"type": "section",
"text": {"type": "mrkdwn", "text": f"*Action:*\n{analysis['recommended_action']}"}
})
slack.chat_postMessage(
channel=os.getenv("SLACK_CHANNEL", "#infra-alerts"),
blocks=blocks
)Step 4: Main Runner
# main.py
import os
import sys
from dotenv import load_dotenv
from drift_detector import run_terraform_plan, parse_changes
from analyzer import analyze_drift
from notifier import send_drift_alert
load_dotenv()
TERRAFORM_WORKSPACES = [
{"path": "/infra/production", "env": "production", "workspace": "prod"},
{"path": "/infra/staging", "env": "staging", "workspace": "staging"},
]
def main():
any_drift = False
for ws in TERRAFORM_WORKSPACES:
print(f"Checking {ws['env']}...")
try:
plan_result = run_terraform_plan(ws["path"])
if not plan_result["has_drift"]:
print(f" ✅ No drift in {ws['env']}")
continue
any_drift = True
changes = parse_changes(plan_result["changes"])
analysis = analyze_drift(changes, ws["env"])
send_drift_alert(analysis, ws["env"], ws["workspace"])
print(f" ⚠️ Drift detected in {ws['env']} — risk: {analysis.get('risk_level')}")
except Exception as e:
print(f" ❌ Error checking {ws['env']}: {e}")
if not any_drift:
print("All workspaces in sync ✅")
if __name__ == "__main__":
main()Schedule as GitHub Actions Cron
# .github/workflows/drift-detection.yml
name: Terraform Drift Detection
on:
schedule:
- cron: '0 8 * * 1-5' # 8 AM weekdays
workflow_dispatch: # Manual trigger
jobs:
detect-drift:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: hashicorp/setup-terraform@v3
with:
terraform_version: "1.9.0"
- uses: aws-actions/configure-aws-credentials@v4
with:
role-to-assume: ${{ secrets.AWS_DRIFT_DETECTION_ROLE }}
aws-region: us-east-1
- run: pip install anthropic slack-sdk boto3
- name: Run drift detection
env:
ANTHROPIC_API_KEY: ${{ secrets.ANTHROPIC_API_KEY }}
SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}
SLACK_CHANNEL: "#infra-alerts"
run: python main.pyDrift detection catches the silent changes that cause incidents weeks later. Running it daily means you know within 24 hours when something drifts — before it becomes a problem.
Get your Anthropic API key to build this. IAM role for drift detection needs
terraform planpermissions only — read-only, no apply.
Today I Fixed
Short real fixes from production — posted daily
Stay ahead of the curve
Get the latest DevOps, Kubernetes, AWS, and AI/ML guides delivered straight to your inbox. No spam — just practical engineering content.
Related Articles
Auto-Generate Terraform Modules Using OpenAI Function Calling
Build a tool that takes plain English descriptions and generates production-ready Terraform modules using OpenAI's function calling API. No more starting from scratch.
Build an AI Cloud Cost Anomaly Detector with Python
Cloud costs spike without warning. Build a Python bot that pulls AWS cost data daily, detects anomalies using statistical analysis, and explains the spike using Claude AI.
Build an AI-Powered CI/CD Pipeline Failure Analyzer with LangChain
Build a tool that automatically reads CI/CD failure logs, uses LangChain + Claude to diagnose the root cause, and posts a clear explanation with fix suggestions to your PR.