Build an AI Multi-Cloud Infrastructure Drift Detector with LangChain
Use LangChain and Claude API to detect drift between your Terraform state and actual AWS/GCP/Azure resources, then generate a plain-English remediation report.
Infrastructure drift — when real cloud resources diverge from your Terraform state — is a silent killer. Someone clicked in the AWS console, a resource was auto-modified, a manual hotfix was applied and never committed. This tool detects drift and uses Claude to explain what changed and why it matters.
Architecture
Terraform state (S3) ──┐
├── Drift Detector → Claude API → Report + Slack
Real AWS resources ─────┘
Setup
pip install langchain-anthropic langchain boto3 python-terraformexport ANTHROPIC_API_KEY="sk-ant-..."
export AWS_ACCESS_KEY_ID="..."
export AWS_SECRET_ACCESS_KEY="..."
export AWS_DEFAULT_REGION="us-east-1"Step 1: Read Terraform State and Real Resources
import boto3
import json
import subprocess
from typing import Optional
def get_terraform_state(state_bucket: str, state_key: str) -> dict:
"""Read Terraform state from S3."""
s3 = boto3.client("s3")
response = s3.get_object(Bucket=state_bucket, Key=state_key)
return json.loads(response["Body"].read())
def get_state_resources(state: dict) -> dict:
"""Extract resources from Terraform state."""
resources = {}
for resource in state.get("resources", []):
if resource.get("mode") != "managed":
continue
for instance in resource.get("instances", []):
key = f"{resource['type']}.{resource['name']}"
resources[key] = {
"type": resource["type"],
"name": resource["name"],
"provider": resource.get("provider", ""),
"attributes": instance.get("attributes", {}),
}
return resources
def get_real_ec2_instances(region: str) -> dict:
"""Get real EC2 instances from AWS."""
ec2 = boto3.client("ec2", region_name=region)
paginator = ec2.get_paginator("describe_instances")
real_instances = {}
for page in paginator.paginate():
for reservation in page["Reservations"]:
for instance in reservation["Instances"]:
if instance["State"]["Name"] == "terminated":
continue
tags = {t["Key"]: t["Value"] for t in instance.get("Tags", [])}
instance_id = instance["InstanceId"]
real_instances[instance_id] = {
"id": instance_id,
"type": instance["InstanceType"],
"state": instance["State"]["Name"],
"az": instance["Placement"]["AvailabilityZone"],
"private_ip": instance.get("PrivateIpAddress", ""),
"public_ip": instance.get("PublicIpAddress", ""),
"tags": tags,
"security_groups": [sg["GroupId"] for sg in instance.get("SecurityGroups", [])],
"vpc_id": instance.get("VpcId", ""),
"subnet_id": instance.get("SubnetId", ""),
}
return real_instances
def get_real_rds_instances(region: str) -> dict:
"""Get real RDS instances from AWS."""
rds = boto3.client("rds", region_name=region)
response = rds.describe_db_instances()
real_dbs = {}
for db in response["DBInstances"]:
db_id = db["DBInstanceIdentifier"]
real_dbs[db_id] = {
"id": db_id,
"class": db["DBInstanceClass"],
"engine": db["Engine"],
"engine_version": db["EngineVersion"],
"status": db["DBInstanceStatus"],
"allocated_storage": db["AllocatedStorage"],
"multi_az": db["MultiAZ"],
"publicly_accessible": db["PubliclyAccessible"],
"vpc_id": db.get("DBSubnetGroup", {}).get("VpcId", ""),
"deletion_protection": db.get("DeletionProtection", False),
}
return real_dbsStep 2: Detect Drift
from dataclasses import dataclass
@dataclass
class DriftItem:
resource_type: str
resource_id: str
field: str
state_value: str
actual_value: str
severity: str # "critical", "high", "medium", "low"
def detect_ec2_drift(state_resources: dict, real_instances: dict) -> list[DriftItem]:
"""Compare Terraform EC2 state with real instances."""
drift_items = []
for resource_key, resource in state_resources.items():
if resource["type"] != "aws_instance":
continue
attrs = resource["attributes"]
instance_id = attrs.get("id", "")
if not instance_id or instance_id not in real_instances:
if instance_id:
drift_items.append(DriftItem(
resource_type="aws_instance",
resource_id=resource_key,
field="existence",
state_value=f"Instance {instance_id} exists in state",
actual_value="Instance not found in AWS",
severity="critical"
))
continue
real = real_instances[instance_id]
# Check instance type
if attrs.get("instance_type") != real["type"]:
drift_items.append(DriftItem(
resource_type="aws_instance",
resource_id=instance_id,
field="instance_type",
state_value=attrs.get("instance_type", ""),
actual_value=real["type"],
severity="high"
))
# Check state
state_power = "running" if attrs.get("associate_public_ip_address") is not None else "unknown"
if real["state"] not in ["running", "pending"] and state_power == "running":
drift_items.append(DriftItem(
resource_type="aws_instance",
resource_id=instance_id,
field="state",
state_value="running",
actual_value=real["state"],
severity="critical"
))
return drift_items
def detect_rds_drift(state_resources: dict, real_dbs: dict) -> list[DriftItem]:
"""Compare Terraform RDS state with real instances."""
drift_items = []
for resource_key, resource in state_resources.items():
if resource["type"] != "aws_db_instance":
continue
attrs = resource["attributes"]
db_id = attrs.get("id", "")
if not db_id or db_id not in real_dbs:
continue
real = real_dbs[db_id]
# Instance class changes
if attrs.get("instance_class") != real["class"]:
drift_items.append(DriftItem(
resource_type="aws_db_instance",
resource_id=db_id,
field="instance_class",
state_value=attrs.get("instance_class", ""),
actual_value=real["class"],
severity="high"
))
# Multi-AZ changes
state_multi_az = attrs.get("multi_az", False)
if state_multi_az != real["multi_az"]:
drift_items.append(DriftItem(
resource_type="aws_db_instance",
resource_id=db_id,
field="multi_az",
state_value=str(state_multi_az),
actual_value=str(real["multi_az"]),
severity="high"
))
# Deletion protection
state_deletion_protection = attrs.get("deletion_protection", False)
if state_deletion_protection and not real["deletion_protection"]:
drift_items.append(DriftItem(
resource_type="aws_db_instance",
resource_id=db_id,
field="deletion_protection",
state_value="true",
actual_value="false",
severity="critical"
))
return drift_itemsStep 3: AI Analysis with LangChain + Claude
from langchain_anthropic import ChatAnthropic
from langchain_core.messages import HumanMessage, SystemMessage
llm = ChatAnthropic(model="claude-opus-4-8", max_tokens=2000)
def analyze_drift_with_ai(drift_items: list[DriftItem], account_id: str, region: str) -> str:
"""Use Claude to analyze drift and generate actionable report."""
if not drift_items:
return "No drift detected. Infrastructure matches Terraform state."
drift_summary = "\n".join([
f"- {item.resource_type} '{item.resource_id}': {item.field} changed from '{item.state_value}' to '{item.actual_value}' (severity: {item.severity})"
for item in drift_items
])
critical_count = sum(1 for d in drift_items if d.severity == "critical")
high_count = sum(1 for d in drift_items if d.severity == "high")
messages = [
SystemMessage(content="""You are a senior DevOps engineer and infrastructure security expert.
Analyze infrastructure drift reports and provide actionable, specific remediation guidance.
Be direct and prioritize by risk."""),
HumanMessage(content=f"""I detected infrastructure drift in AWS account {account_id}, region {region}.
DRIFT SUMMARY:
{drift_summary}
Total: {len(drift_items)} drift items ({critical_count} critical, {high_count} high)
Please provide:
1. RISK ASSESSMENT: What are the top 3 risks from this drift?
2. ROOT CAUSE: What most likely caused each type of drift?
3. REMEDIATION STEPS: Specific steps to fix each item (with terraform/AWS CLI commands)
4. PREVENTION: How to prevent this drift from occurring again
5. PRIORITY ORDER: Which items to fix first?
Be specific — include actual terraform commands and AWS console paths where relevant.""")
]
response = llm.invoke(messages)
return response.content
def generate_drift_report(
state_bucket: str,
state_key: str,
region: str,
account_id: str,
) -> dict:
"""Generate a complete drift detection report."""
print("Reading Terraform state...")
state = get_terraform_state(state_bucket, state_key)
state_resources = get_state_resources(state)
print("Fetching real AWS resources...")
real_ec2 = get_real_ec2_instances(region)
real_rds = get_real_rds_instances(region)
print("Detecting drift...")
all_drift = []
all_drift.extend(detect_ec2_drift(state_resources, real_ec2))
all_drift.extend(detect_rds_drift(state_resources, real_rds))
print(f"Found {len(all_drift)} drift items. Analyzing with AI...")
ai_analysis = analyze_drift_with_ai(all_drift, account_id, region)
return {
"account_id": account_id,
"region": region,
"total_drift_items": len(all_drift),
"critical": sum(1 for d in all_drift if d.severity == "critical"),
"high": sum(1 for d in all_drift if d.severity == "high"),
"drift_items": [
{
"resource_type": d.resource_type,
"resource_id": d.resource_id,
"field": d.field,
"state_value": d.state_value,
"actual_value": d.actual_value,
"severity": d.severity,
}
for d in all_drift
],
"ai_analysis": ai_analysis,
}
# Run it
if __name__ == "__main__":
report = generate_drift_report(
state_bucket="my-terraform-state",
state_key="production/terraform.tfstate",
region="us-east-1",
account_id="123456789012",
)
print(f"\n{'='*60}")
print(f"DRIFT REPORT: {report['total_drift_items']} items detected")
print(f"Critical: {report['critical']}, High: {report['high']}")
print(f"\nAI Analysis:\n{report['ai_analysis']}")Schedule as Kubernetes CronJob
apiVersion: batch/v1
kind: CronJob
metadata:
name: drift-detector
namespace: platform
spec:
schedule: "0 */6 * * *" # Every 6 hours
jobTemplate:
spec:
template:
spec:
serviceAccountName: drift-detector # needs AWS IRSA
containers:
- name: detector
image: your-org/drift-detector:latest
env:
- name: ANTHROPIC_API_KEY
valueFrom:
secretKeyRef:
name: ai-secrets
key: anthropic_api_key
- name: STATE_BUCKET
value: "my-terraform-state"
- name: AWS_REGION
value: "us-east-1"
restartPolicy: OnFailureThis gives you scheduled, AI-explained infrastructure drift detection without having to pay for Terraform Cloud or Spacelift just for drift detection.
Resources: LangChain docs | AWS Boto3 | Anthropic API
Today I Fixed
Short real fixes from production — posted daily
Stay ahead of the curve
Get the latest DevOps, Kubernetes, AWS, and AI/ML guides delivered straight to your inbox. No spam — just practical engineering content.
Related Articles
Build an AI Terraform Plan Reviewer with Claude API
Automatically review terraform plan output with Claude API to catch risky changes, unintended destroys, and security issues before they hit production.
Use an LLM to Generate Terraform from Natural Language in 2026
Build a tool that converts plain English infrastructure descriptions into valid Terraform code using Claude AI — with validation, state awareness, and GitOps integration.
Build an AI-Powered Terraform Drift Detection System
Terraform drift happens silently. Here's how to build an automated drift detector using Terraform plan + Claude API that alerts your team and explains exactly what changed.