🎉 DevOps Interview Prep Bundle is live — 1000+ Q&A across 20 topicsGet it →
All Articles

Build a Self-Healing Kubernetes Cluster with AI Agents

Build an AI agent that monitors your Kubernetes cluster, detects issues, diagnoses root causes using Claude, and automatically applies safe fixes — without human intervention.

DevOpsBoysMay 31, 20265 min read
Share:Tweet

Self-healing infrastructure used to mean writing complex runbooks and automation scripts for every failure scenario. With AI agents, you can build a system that understands context and makes intelligent fix decisions.

Here's how to build one.


Architecture

Kubernetes Events + Metrics
    → Monitoring Agent (detects issues)
    → Diagnosis Agent (Claude analyzes root cause)
    → Action Agent (decides if safe to auto-fix)
    → Executor (applies kubectl fixes)
    → Audit Log (every action recorded)

The key principle: auto-fix only safe operations (restart pods, scale deployments, clear stuck jobs). For anything destructive, it pages a human instead.


Safe vs Unsafe Actions

python
# Define what the agent can do automatically
SAFE_ACTIONS = {
    "restart_pod",           # kubectl delete pod (pod restarts automatically)
    "scale_deployment",      # kubectl scale (within defined bounds)
    "clear_stuck_job",       # kubectl delete job if stuck > 2 hours
    "drain_node_soft",       # cordon only (no force)
}
 
# These require human approval
UNSAFE_ACTIONS = {
    "delete_deployment",
    "modify_configmap",
    "change_resource_limits",
    "drain_node_force",
    "delete_pvc",
}

Setup

bash
pip install anthropic kubernetes prometheus-client slack-sdk python-dotenv

Step 1: Kubernetes Monitor

python
# monitor.py
from kubernetes import client, config, watch
import time
from dataclasses import dataclass
from typing import Optional
 
@dataclass
class ClusterIssue:
    issue_type: str
    severity: str  # "critical", "warning", "info"
    namespace: str
    resource_name: str
    resource_kind: str
    message: str
    context: dict
 
def setup_k8s():
    try:
        config.load_incluster_config()
    except:
        config.load_kube_config()
 
def detect_issues() -> list[ClusterIssue]:
    """Detect current cluster issues."""
    v1 = client.CoreV1Api()
    apps_v1 = client.AppsV1Api()
    issues = []
    
    # Check for pods in bad states
    pods = v1.list_pod_for_all_namespaces()
    for pod in pods.items:
        containers = pod.status.container_statuses or []
        for container in containers:
            if container.state.waiting:
                reason = container.state.waiting.reason or ""
                if reason in ["CrashLoopBackOff", "OOMKilled", "ImagePullBackOff", "Error"]:
                    issues.append(ClusterIssue(
                        issue_type="pod_failing",
                        severity="critical" if reason == "CrashLoopBackOff" else "warning",
                        namespace=pod.metadata.namespace,
                        resource_name=pod.metadata.name,
                        resource_kind="Pod",
                        message=f"Container {container.name} is in {reason} state",
                        context={
                            "reason": reason,
                            "restart_count": container.restart_count,
                            "image": container.image
                        }
                    ))
    
    # Check for stalled deployments
    deployments = apps_v1.list_deployment_for_all_namespaces()
    for deploy in deployments.items:
        spec_replicas = deploy.spec.replicas or 0
        ready_replicas = deploy.status.ready_replicas or 0
        
        if spec_replicas > 0 and ready_replicas == 0:
            issues.append(ClusterIssue(
                issue_type="deployment_unavailable",
                severity="critical",
                namespace=deploy.metadata.namespace,
                resource_name=deploy.metadata.name,
                resource_kind="Deployment",
                message=f"Deployment has {spec_replicas} desired but 0 ready replicas",
                context={
                    "desired": spec_replicas,
                    "ready": ready_replicas,
                    "unavailable": deploy.status.unavailable_replicas or 0
                }
            ))
    
    return issues

Step 2: AI Diagnosis Agent

python
# diagnosis_agent.py
import anthropic
import json
import os
from monitor import ClusterIssue
 
client = anthropic.Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY"))
 
def get_pod_logs(namespace: str, pod_name: str, lines: int = 50) -> str:
    """Get recent pod logs for context."""
    from kubernetes import client as k8s_client
    v1 = k8s_client.CoreV1Api()
    try:
        logs = v1.read_namespaced_pod_log(
            name=pod_name,
            namespace=namespace,
            tail_lines=lines,
            previous=True  # Get logs from crashed container
        )
        return logs[-3000:]  # Last 3000 chars
    except Exception as e:
        return f"Could not retrieve logs: {e}"
 
def diagnose_issue(issue: ClusterIssue) -> dict:
    """Use Claude to diagnose the issue and recommend action."""
    
    # Get additional context
    logs = ""
    if issue.resource_kind == "Pod":
        logs = get_pod_logs(issue.namespace, issue.resource_name)
    
    prompt = f"""You are an expert Kubernetes SRE. Diagnose this cluster issue and recommend action.
 
Issue:
- Type: {issue.issue_type}
- Severity: {issue.severity}
- Resource: {issue.resource_kind}/{issue.resource_name} in {issue.namespace}
- Message: {issue.message}
- Context: {json.dumps(issue.context, indent=2)}
 
Recent logs:
{logs if logs else "No logs available"}
 
Respond in JSON with:
{{
  "root_cause": "brief root cause explanation",
  "confidence": "high|medium|low",
  "recommended_action": "restart_pod|scale_deployment|clear_stuck_job|notify_human|ignore",
  "action_params": {{}},
  "reasoning": "why this action",
  "risk_level": "safe|caution|dangerous"
}}
 
Only recommend automated actions for safe, well-understood issues.
If uncertain, recommend notify_human."""
 
    response = client.messages.create(
        model="claude-sonnet-4-6",
        max_tokens=500,
        messages=[{"role": "user", "content": prompt}]
    )
    
    try:
        return json.loads(response.content[0].text)
    except:
        return {
            "root_cause": "Unable to parse diagnosis",
            "recommended_action": "notify_human",
            "risk_level": "caution"
        }

Step 3: Action Executor

python
# executor.py
from kubernetes import client
import logging
from datetime import datetime
 
logger = logging.getLogger(__name__)
 
# Audit log
audit_log = []
 
def execute_action(action: str, namespace: str, resource_name: str, params: dict = {}) -> bool:
    """Execute a safe Kubernetes action."""
    
    audit_entry = {
        "timestamp": datetime.utcnow().isoformat(),
        "action": action,
        "namespace": namespace,
        "resource": resource_name,
        "params": params,
        "success": False
    }
    
    try:
        if action == "restart_pod":
            v1 = client.CoreV1Api()
            v1.delete_namespaced_pod(name=resource_name, namespace=namespace)
            logger.info(f"Restarted pod {namespace}/{resource_name}")
            audit_entry["success"] = True
            
        elif action == "scale_deployment":
            apps_v1 = client.AppsV1Api()
            target_replicas = params.get("replicas", 1)
            
            # Safety check: never scale beyond bounds
            if target_replicas < 1 or target_replicas > 20:
                logger.warning(f"Scale target {target_replicas} out of safe bounds")
                return False
            
            apps_v1.patch_namespaced_deployment_scale(
                name=resource_name,
                namespace=namespace,
                body={"spec": {"replicas": target_replicas}}
            )
            audit_entry["success"] = True
            
        elif action == "clear_stuck_job":
            batch_v1 = client.BatchV1Api()
            batch_v1.delete_namespaced_job(
                name=resource_name,
                namespace=namespace,
                propagation_policy="Background"
            )
            audit_entry["success"] = True
            
        else:
            logger.warning(f"Unknown or unsafe action: {action}")
            return False
            
    except Exception as e:
        logger.error(f"Action failed: {e}")
        audit_entry["error"] = str(e)
    
    audit_log.append(audit_entry)
    return audit_entry["success"]

Step 4: Main Healing Loop

python
# healer.py
import time
import os
from dotenv import load_dotenv
from monitor import detect_issues, setup_k8s
from diagnosis_agent import diagnose_issue
from executor import execute_action, audit_log
 
load_dotenv()
setup_k8s()
 
# Track recent actions to prevent loops
recent_actions = {}  # resource_name -> last_action_time
 
def should_act(resource_name: str, cooldown_minutes: int = 10) -> bool:
    """Prevent acting on same resource repeatedly."""
    if resource_name in recent_actions:
        elapsed = time.time() - recent_actions[resource_name]
        if elapsed < cooldown_minutes * 60:
            return False
    return True
 
def healing_loop():
    print("Self-healing agent started")
    
    while True:
        issues = detect_issues()
        
        for issue in issues:
            if not should_act(issue.resource_name):
                continue
            
            print(f"Analyzing: {issue.message}")
            diagnosis = diagnose_issue(issue)
            
            action = diagnosis.get("recommended_action")
            risk = diagnosis.get("risk_level", "caution")
            
            if action == "notify_human" or risk == "dangerous":
                # Send Slack alert with diagnosis
                notify_human(issue, diagnosis)
                continue
            
            if action in ["restart_pod", "scale_deployment", "clear_stuck_job"]:
                print(f"Executing: {action} on {issue.resource_name}")
                print(f"Reason: {diagnosis.get('reasoning')}")
                
                success = execute_action(
                    action=action,
                    namespace=issue.namespace,
                    resource_name=issue.resource_name,
                    params=diagnosis.get("action_params", {})
                )
                
                if success:
                    recent_actions[issue.resource_name] = time.time()
                    print(f"✅ Fixed: {issue.resource_name}")
        
        time.sleep(60)  # Check every minute
 
if __name__ == "__main__":
    healing_loop()

Deploy to Kubernetes

yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: k8s-healer
  namespace: monitoring
spec:
  replicas: 1
  template:
    spec:
      serviceAccountName: k8s-healer  # Needs RBAC to read/delete pods
      containers:
        - name: healer
          image: python:3.11-slim
          env:
            - name: ANTHROPIC_API_KEY
              valueFrom:
                secretKeyRef:
                  name: healer-secrets
                  key: anthropic-key

The self-healing agent handles 80% of common issues automatically — CrashLoopBackOff restarts, stuck jobs, zero-replica deployments. The other 20% that need human judgment get routed to Slack with a full diagnosis.

Get your Anthropic API key to build AI-powered infrastructure tools. Start with the diagnosis piece — even without auto-fix, AI diagnosis alone saves hours of debugging.

🔧

Today I Fixed

Short real fixes from production — posted daily

Browse fixes
Newsletter

Stay ahead of the curve

Get the latest DevOps, Kubernetes, AWS, and AI/ML guides delivered straight to your inbox. No spam — just practical engineering content.

Related Articles

Comments