🎉 DevOps Interview Prep Bundle is live — 1000+ Q&A across 20 topicsGet it →
All Articles

Build an AI Multi-Cloud Infrastructure Drift Detector with LangChain

Use LangChain and Claude API to detect drift between your Terraform state and actual AWS/GCP/Azure resources, then generate a plain-English remediation report.

DevOpsBoys6 min read
Share:Tweet

Infrastructure drift — when real cloud resources diverge from your Terraform state — is a silent killer. Someone clicked in the AWS console, a resource was auto-modified, a manual hotfix was applied and never committed. This tool detects drift and uses Claude to explain what changed and why it matters.

Architecture

Terraform state (S3) ──┐
                        ├── Drift Detector → Claude API → Report + Slack
Real AWS resources ─────┘

Setup

bash
pip install langchain-anthropic langchain boto3 python-terraform
bash
export ANTHROPIC_API_KEY="sk-ant-..."
export AWS_ACCESS_KEY_ID="..."
export AWS_SECRET_ACCESS_KEY="..."
export AWS_DEFAULT_REGION="us-east-1"

Step 1: Read Terraform State and Real Resources

python
import boto3
import json
import subprocess
from typing import Optional
 
def get_terraform_state(state_bucket: str, state_key: str) -> dict:
    """Read Terraform state from S3."""
    s3 = boto3.client("s3")
    response = s3.get_object(Bucket=state_bucket, Key=state_key)
    return json.loads(response["Body"].read())
 
 
def get_state_resources(state: dict) -> dict:
    """Extract resources from Terraform state."""
    resources = {}
    for resource in state.get("resources", []):
        if resource.get("mode") != "managed":
            continue
        for instance in resource.get("instances", []):
            key = f"{resource['type']}.{resource['name']}"
            resources[key] = {
                "type": resource["type"],
                "name": resource["name"],
                "provider": resource.get("provider", ""),
                "attributes": instance.get("attributes", {}),
            }
    return resources
 
 
def get_real_ec2_instances(region: str) -> dict:
    """Get real EC2 instances from AWS."""
    ec2 = boto3.client("ec2", region_name=region)
    paginator = ec2.get_paginator("describe_instances")
    
    real_instances = {}
    for page in paginator.paginate():
        for reservation in page["Reservations"]:
            for instance in reservation["Instances"]:
                if instance["State"]["Name"] == "terminated":
                    continue
                
                tags = {t["Key"]: t["Value"] for t in instance.get("Tags", [])}
                instance_id = instance["InstanceId"]
                
                real_instances[instance_id] = {
                    "id": instance_id,
                    "type": instance["InstanceType"],
                    "state": instance["State"]["Name"],
                    "az": instance["Placement"]["AvailabilityZone"],
                    "private_ip": instance.get("PrivateIpAddress", ""),
                    "public_ip": instance.get("PublicIpAddress", ""),
                    "tags": tags,
                    "security_groups": [sg["GroupId"] for sg in instance.get("SecurityGroups", [])],
                    "vpc_id": instance.get("VpcId", ""),
                    "subnet_id": instance.get("SubnetId", ""),
                }
    
    return real_instances
 
 
def get_real_rds_instances(region: str) -> dict:
    """Get real RDS instances from AWS."""
    rds = boto3.client("rds", region_name=region)
    response = rds.describe_db_instances()
    
    real_dbs = {}
    for db in response["DBInstances"]:
        db_id = db["DBInstanceIdentifier"]
        real_dbs[db_id] = {
            "id": db_id,
            "class": db["DBInstanceClass"],
            "engine": db["Engine"],
            "engine_version": db["EngineVersion"],
            "status": db["DBInstanceStatus"],
            "allocated_storage": db["AllocatedStorage"],
            "multi_az": db["MultiAZ"],
            "publicly_accessible": db["PubliclyAccessible"],
            "vpc_id": db.get("DBSubnetGroup", {}).get("VpcId", ""),
            "deletion_protection": db.get("DeletionProtection", False),
        }
    
    return real_dbs

Step 2: Detect Drift

python
from dataclasses import dataclass
 
@dataclass
class DriftItem:
    resource_type: str
    resource_id: str
    field: str
    state_value: str
    actual_value: str
    severity: str  # "critical", "high", "medium", "low"
 
 
def detect_ec2_drift(state_resources: dict, real_instances: dict) -> list[DriftItem]:
    """Compare Terraform EC2 state with real instances."""
    drift_items = []
    
    for resource_key, resource in state_resources.items():
        if resource["type"] != "aws_instance":
            continue
        
        attrs = resource["attributes"]
        instance_id = attrs.get("id", "")
        
        if not instance_id or instance_id not in real_instances:
            if instance_id:
                drift_items.append(DriftItem(
                    resource_type="aws_instance",
                    resource_id=resource_key,
                    field="existence",
                    state_value=f"Instance {instance_id} exists in state",
                    actual_value="Instance not found in AWS",
                    severity="critical"
                ))
            continue
        
        real = real_instances[instance_id]
        
        # Check instance type
        if attrs.get("instance_type") != real["type"]:
            drift_items.append(DriftItem(
                resource_type="aws_instance",
                resource_id=instance_id,
                field="instance_type",
                state_value=attrs.get("instance_type", ""),
                actual_value=real["type"],
                severity="high"
            ))
        
        # Check state
        state_power = "running" if attrs.get("associate_public_ip_address") is not None else "unknown"
        if real["state"] not in ["running", "pending"] and state_power == "running":
            drift_items.append(DriftItem(
                resource_type="aws_instance",
                resource_id=instance_id,
                field="state",
                state_value="running",
                actual_value=real["state"],
                severity="critical"
            ))
    
    return drift_items
 
 
def detect_rds_drift(state_resources: dict, real_dbs: dict) -> list[DriftItem]:
    """Compare Terraform RDS state with real instances."""
    drift_items = []
    
    for resource_key, resource in state_resources.items():
        if resource["type"] != "aws_db_instance":
            continue
        
        attrs = resource["attributes"]
        db_id = attrs.get("id", "")
        
        if not db_id or db_id not in real_dbs:
            continue
        
        real = real_dbs[db_id]
        
        # Instance class changes
        if attrs.get("instance_class") != real["class"]:
            drift_items.append(DriftItem(
                resource_type="aws_db_instance",
                resource_id=db_id,
                field="instance_class",
                state_value=attrs.get("instance_class", ""),
                actual_value=real["class"],
                severity="high"
            ))
        
        # Multi-AZ changes
        state_multi_az = attrs.get("multi_az", False)
        if state_multi_az != real["multi_az"]:
            drift_items.append(DriftItem(
                resource_type="aws_db_instance",
                resource_id=db_id,
                field="multi_az",
                state_value=str(state_multi_az),
                actual_value=str(real["multi_az"]),
                severity="high"
            ))
        
        # Deletion protection
        state_deletion_protection = attrs.get("deletion_protection", False)
        if state_deletion_protection and not real["deletion_protection"]:
            drift_items.append(DriftItem(
                resource_type="aws_db_instance",
                resource_id=db_id,
                field="deletion_protection",
                state_value="true",
                actual_value="false",
                severity="critical"
            ))
    
    return drift_items

Step 3: AI Analysis with LangChain + Claude

python
from langchain_anthropic import ChatAnthropic
from langchain_core.messages import HumanMessage, SystemMessage
 
llm = ChatAnthropic(model="claude-opus-4-8", max_tokens=2000)
 
def analyze_drift_with_ai(drift_items: list[DriftItem], account_id: str, region: str) -> str:
    """Use Claude to analyze drift and generate actionable report."""
    
    if not drift_items:
        return "No drift detected. Infrastructure matches Terraform state."
    
    drift_summary = "\n".join([
        f"- {item.resource_type} '{item.resource_id}': {item.field} changed from '{item.state_value}' to '{item.actual_value}' (severity: {item.severity})"
        for item in drift_items
    ])
    
    critical_count = sum(1 for d in drift_items if d.severity == "critical")
    high_count = sum(1 for d in drift_items if d.severity == "high")
    
    messages = [
        SystemMessage(content="""You are a senior DevOps engineer and infrastructure security expert. 
        Analyze infrastructure drift reports and provide actionable, specific remediation guidance.
        Be direct and prioritize by risk."""),
        HumanMessage(content=f"""I detected infrastructure drift in AWS account {account_id}, region {region}.
 
DRIFT SUMMARY:
{drift_summary}
 
Total: {len(drift_items)} drift items ({critical_count} critical, {high_count} high)
 
Please provide:
1. RISK ASSESSMENT: What are the top 3 risks from this drift?
2. ROOT CAUSE: What most likely caused each type of drift?
3. REMEDIATION STEPS: Specific steps to fix each item (with terraform/AWS CLI commands)
4. PREVENTION: How to prevent this drift from occurring again
5. PRIORITY ORDER: Which items to fix first?
 
Be specific — include actual terraform commands and AWS console paths where relevant.""")
    ]
    
    response = llm.invoke(messages)
    return response.content
 
 
def generate_drift_report(
    state_bucket: str,
    state_key: str,
    region: str,
    account_id: str,
) -> dict:
    """Generate a complete drift detection report."""
    print("Reading Terraform state...")
    state = get_terraform_state(state_bucket, state_key)
    state_resources = get_state_resources(state)
    
    print("Fetching real AWS resources...")
    real_ec2 = get_real_ec2_instances(region)
    real_rds = get_real_rds_instances(region)
    
    print("Detecting drift...")
    all_drift = []
    all_drift.extend(detect_ec2_drift(state_resources, real_ec2))
    all_drift.extend(detect_rds_drift(state_resources, real_rds))
    
    print(f"Found {len(all_drift)} drift items. Analyzing with AI...")
    ai_analysis = analyze_drift_with_ai(all_drift, account_id, region)
    
    return {
        "account_id": account_id,
        "region": region,
        "total_drift_items": len(all_drift),
        "critical": sum(1 for d in all_drift if d.severity == "critical"),
        "high": sum(1 for d in all_drift if d.severity == "high"),
        "drift_items": [
            {
                "resource_type": d.resource_type,
                "resource_id": d.resource_id,
                "field": d.field,
                "state_value": d.state_value,
                "actual_value": d.actual_value,
                "severity": d.severity,
            }
            for d in all_drift
        ],
        "ai_analysis": ai_analysis,
    }
 
 
# Run it
if __name__ == "__main__":
    report = generate_drift_report(
        state_bucket="my-terraform-state",
        state_key="production/terraform.tfstate",
        region="us-east-1",
        account_id="123456789012",
    )
    
    print(f"\n{'='*60}")
    print(f"DRIFT REPORT: {report['total_drift_items']} items detected")
    print(f"Critical: {report['critical']}, High: {report['high']}")
    print(f"\nAI Analysis:\n{report['ai_analysis']}")

Schedule as Kubernetes CronJob

yaml
apiVersion: batch/v1
kind: CronJob
metadata:
  name: drift-detector
  namespace: platform
spec:
  schedule: "0 */6 * * *"  # Every 6 hours
  jobTemplate:
    spec:
      template:
        spec:
          serviceAccountName: drift-detector  # needs AWS IRSA
          containers:
            - name: detector
              image: your-org/drift-detector:latest
              env:
                - name: ANTHROPIC_API_KEY
                  valueFrom:
                    secretKeyRef:
                      name: ai-secrets
                      key: anthropic_api_key
                - name: STATE_BUCKET
                  value: "my-terraform-state"
                - name: AWS_REGION
                  value: "us-east-1"
          restartPolicy: OnFailure

This gives you scheduled, AI-explained infrastructure drift detection without having to pay for Terraform Cloud or Spacelift just for drift detection.

Resources: LangChain docs | AWS Boto3 | Anthropic API

🔧

Today I Fixed

Short real fixes from production — posted daily

Browse fixes
Newsletter

Stay ahead of the curve

Get the latest DevOps, Kubernetes, AWS, and AI/ML guides delivered straight to your inbox. No spam — just practical engineering content.

Related Articles

Comments