Build an AI-Powered Terraform Drift Detection System
Terraform drift happens silently. Here's how to build an automated drift detector using Terraform plan + Claude API that alerts your team and explains exactly what changed.
Infrastructure drift is when your actual cloud resources no longer match your Terraform state. Someone manually changed a security group rule. An auto-scaling policy got updated through the console. A team member ran terraform apply without committing the plan.
The problem is silent until it breaks something. Here's how to build a system that detects drift automatically, analyzes it with AI, and alerts your team with a clear explanation.
What You'll Build
Scheduled Job (every 6h)
↓
terraform plan -detailed-exitcode
↓ (exit code 2 = drift detected)
Parse plan output (JSON)
↓
Claude API analyzes drift
↓
Slack alert with:
- Resources drifted
- What changed
- Risk assessment
- Recommended action
Step 1 — Set Up Terraform Plan to Detect Drift
# Run plan and capture exit code
terraform plan -detailed-exitcode -out=plan.tfplan
EXIT_CODE=$?
# Exit codes:
# 0 = no changes (no drift)
# 1 = error
# 2 = changes detected (drift!)Output the plan as JSON for programmatic parsing:
terraform show -json plan.tfplan > plan.jsonStep 2 — The Drift Detector Script
# drift_detector.py
import subprocess
import json
import os
import anthropic
import requests
from datetime import datetime
def run_terraform_plan(working_dir: str) -> tuple[int, str]:
"""Run terraform plan and return exit code + JSON output"""
# Init first
subprocess.run(["terraform", "init", "-input=false"],
cwd=working_dir, capture_output=True)
# Run plan
result = subprocess.run(
["terraform", "plan", "-detailed-exitcode", "-out=plan.tfplan", "-input=false"],
cwd=working_dir,
capture_output=True,
text=True
)
exit_code = result.returncode
if exit_code == 1:
return 1, result.stderr
if exit_code == 2:
# Get JSON output
json_result = subprocess.run(
["terraform", "show", "-json", "plan.tfplan"],
cwd=working_dir,
capture_output=True,
text=True
)
return 2, json_result.stdout
return 0, ""
def extract_changes(plan_json: str) -> dict:
"""Extract meaningful changes from terraform plan JSON"""
plan = json.loads(plan_json)
changes = {
"resource_changes": [],
"summary": {
"create": 0,
"update": 0,
"delete": 0,
"replace": 0
}
}
for change in plan.get("resource_changes", []):
actions = change.get("change", {}).get("actions", [])
if "no-op" in actions:
continue
resource_info = {
"address": change.get("address"),
"type": change.get("type"),
"name": change.get("name"),
"actions": actions,
"before": change.get("change", {}).get("before"),
"after": change.get("change", {}).get("after")
}
changes["resource_changes"].append(resource_info)
for action in actions:
if action in changes["summary"]:
changes["summary"][action] += 1
return changes
def analyze_drift_with_ai(changes: dict, environment: str) -> str:
"""Use Claude to analyze drift and generate a human-readable report"""
client = anthropic.Anthropic(api_key=os.environ["ANTHROPIC_API_KEY"])
changes_text = json.dumps(changes, indent=2)
message = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=1024,
messages=[
{
"role": "user",
"content": f"""You are a DevOps engineer reviewing infrastructure drift in a {environment} environment.
Analyze these Terraform plan changes and provide:
1. A 2-sentence summary of what drifted
2. Risk level: LOW / MEDIUM / HIGH (with reason)
3. Most likely cause of the drift
4. Recommended immediate action
Keep the response concise and actionable. Format for Slack (use *bold* and bullet points).
Changes detected:
{changes_text}"""
}
]
)
return message.content[0].text
def send_slack_alert(changes: dict, ai_analysis: str, environment: str, repo: str):
"""Send formatted Slack alert"""
webhook_url = os.environ["SLACK_WEBHOOK_URL"]
summary = changes["summary"]
resource_list = "\n".join([
f"• `{r['address']}` — {', '.join(r['actions'])}"
for r in changes["resource_changes"][:10] # Show first 10
])
if len(changes["resource_changes"]) > 10:
resource_list += f"\n_...and {len(changes['resource_changes']) - 10} more_"
blocks = [
{
"type": "header",
"text": {"type": "plain_text", "text": f"⚠️ Terraform Drift Detected — {environment.upper()}"}
},
{
"type": "section",
"fields": [
{"type": "mrkdwn", "text": f"*Environment:*\n{environment}"},
{"type": "mrkdwn", "text": f"*Repository:*\n{repo}"},
{"type": "mrkdwn", "text": f"*Detected at:*\n{datetime.now().strftime('%Y-%m-%d %H:%M UTC')}"},
{"type": "mrkdwn", "text": f"*Changes:*\n+{summary['create']} create ~{summary['update']} update -{summary['delete']} delete"},
]
},
{
"type": "section",
"text": {"type": "mrkdwn", "text": f"*Drifted Resources:*\n{resource_list}"}
},
{
"type": "divider"
},
{
"type": "section",
"text": {"type": "mrkdwn", "text": f"*AI Analysis:*\n{ai_analysis}"}
},
{
"type": "actions",
"elements": [
{
"type": "button",
"text": {"type": "plain_text", "text": "View Plan Details"},
"url": f"https://github.com/{repo}/actions"
}
]
}
]
requests.post(webhook_url, json={"blocks": blocks})
print("Slack alert sent")
def main():
environment = os.environ.get("ENVIRONMENT", "production")
terraform_dir = os.environ.get("TERRAFORM_DIR", ".")
repo = os.environ.get("GITHUB_REPOSITORY", "myorg/infra")
print(f"Checking drift in {environment}...")
exit_code, output = run_terraform_plan(terraform_dir)
if exit_code == 0:
print("No drift detected. Infrastructure matches Terraform state.")
return
if exit_code == 1:
print(f"Error running terraform plan: {output}")
return
# Drift detected!
print("Drift detected! Analyzing...")
changes = extract_changes(output)
print(f"Found {len(changes['resource_changes'])} changed resources")
ai_analysis = analyze_drift_with_ai(changes, environment)
send_slack_alert(changes, ai_analysis, environment, repo)
# Exit with code 1 to fail the CI job if desired
# exit(1)
if __name__ == "__main__":
main()Step 3 — GitHub Actions Scheduled Job
# .github/workflows/drift-detection.yml
name: Terraform Drift Detection
on:
schedule:
- cron: '0 */6 * * *' # Every 6 hours
workflow_dispatch: # Manual trigger
jobs:
detect-drift:
runs-on: ubuntu-latest
strategy:
matrix:
environment: [staging, production]
steps:
- uses: actions/checkout@v4
- name: Configure AWS Credentials
uses: aws-actions/configure-aws-credentials@v4
with:
role-to-assume: arn:aws:iam::${{ secrets.AWS_ACCOUNT_ID }}:role/terraform-readonly
aws-region: ap-south-1
- name: Setup Terraform
uses: hashicorp/setup-terraform@v3
with:
terraform_version: 1.8.0
- name: Run Drift Detection
env:
ANTHROPIC_API_KEY: ${{ secrets.ANTHROPIC_API_KEY }}
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}
ENVIRONMENT: ${{ matrix.environment }}
TERRAFORM_DIR: environments/${{ matrix.environment }}
GITHUB_REPOSITORY: ${{ github.repository }}
TF_BACKEND_CONFIG: ${{ secrets[format('{0}_TF_BACKEND', matrix.environment)] }}
run: |
pip install anthropic requests
python scripts/drift_detector.pyExample Slack Alert
⚠️ Terraform Drift Detected — PRODUCTION
Environment: production Repository: myorg/infra
Detected at: 2026-05-12 06:00 UTC Changes: ~3 update
Drifted Resources:
• aws_security_group.api_sg — update
• aws_autoscaling_group.api_asg — update
• aws_iam_role_policy.lambda_policy — update
AI Analysis:
*Security group and autoscaling configuration changed outside of Terraform.*
Risk level: *HIGH* — security group changes in production may open unintended access.
Likely cause: Manual changes made via AWS Console during an incident response.
Recommended action: Run `terraform plan` locally to review exact changes, then either
`terraform apply` to revert or update the Terraform code to match the intended state.
[View Plan Details]
Making It Smarter
Add auto-remediation for low-risk drift:
def auto_remediate(changes: dict, risk_level: str, terraform_dir: str):
if risk_level == "LOW" and changes["summary"]["update"] <= 2:
print("Low-risk drift. Auto-applying...")
subprocess.run(
["terraform", "apply", "-auto-approve", "plan.tfplan"],
cwd=terraform_dir
)Track drift history in S3:
def save_drift_report(changes: dict, environment: str):
import boto3
s3 = boto3.client('s3')
key = f"drift-reports/{environment}/{datetime.now().isoformat()}.json"
s3.put_object(
Bucket="my-drift-reports",
Key=key,
Body=json.dumps(changes)
)Terraform drift is inevitable — people make emergency changes, auto-scaling updates launch configs, cloud providers modify defaults. The goal isn't zero drift; it's fast detection and clear resolution paths.
This system gives you both, for free, with AI-generated explanations that even non-Terraform engineers can understand.
For Terraform labs and certification prep, KodeKloud has hands-on Terraform courses from beginner to associate-level certification.
Stay ahead of the curve
Get the latest DevOps, Kubernetes, AWS, and AI/ML guides delivered straight to your inbox. No spam — just practical engineering content.
Related Articles
AI Agents for Automated Terraform Code Review — The Future of IaC Quality
How AI agents are automating Terraform code review with security scanning, cost estimation, best practice enforcement, and drift prevention. Covers practical tools, custom LLM pipelines, and CI/CD integration.
Ansible vs Terraform: Which One Should You Use? (2026)
Ansible and Terraform are both called 'IaC tools' but they solve completely different problems. Here's when to use each — and when to use both.
Build a Complete AWS Infrastructure with Terraform from Scratch (2026)
Full project walkthrough: provision a production-grade AWS VPC, EKS cluster, RDS, S3, and IAM with Terraform. Real code, real architecture, ready to use.