Build a Self-Healing Kubernetes Cluster with AI Agents
Build an AI agent that monitors your Kubernetes cluster, detects issues, diagnoses root causes using Claude, and automatically applies safe fixes — without human intervention.
Self-healing infrastructure used to mean writing complex runbooks and automation scripts for every failure scenario. With AI agents, you can build a system that understands context and makes intelligent fix decisions.
Here's how to build one.
Architecture
Kubernetes Events + Metrics
→ Monitoring Agent (detects issues)
→ Diagnosis Agent (Claude analyzes root cause)
→ Action Agent (decides if safe to auto-fix)
→ Executor (applies kubectl fixes)
→ Audit Log (every action recorded)
The key principle: auto-fix only safe operations (restart pods, scale deployments, clear stuck jobs). For anything destructive, it pages a human instead.
Safe vs Unsafe Actions
# Define what the agent can do automatically
SAFE_ACTIONS = {
"restart_pod", # kubectl delete pod (pod restarts automatically)
"scale_deployment", # kubectl scale (within defined bounds)
"clear_stuck_job", # kubectl delete job if stuck > 2 hours
"drain_node_soft", # cordon only (no force)
}
# These require human approval
UNSAFE_ACTIONS = {
"delete_deployment",
"modify_configmap",
"change_resource_limits",
"drain_node_force",
"delete_pvc",
}Setup
pip install anthropic kubernetes prometheus-client slack-sdk python-dotenvStep 1: Kubernetes Monitor
# monitor.py
from kubernetes import client, config, watch
import time
from dataclasses import dataclass
from typing import Optional
@dataclass
class ClusterIssue:
issue_type: str
severity: str # "critical", "warning", "info"
namespace: str
resource_name: str
resource_kind: str
message: str
context: dict
def setup_k8s():
try:
config.load_incluster_config()
except:
config.load_kube_config()
def detect_issues() -> list[ClusterIssue]:
"""Detect current cluster issues."""
v1 = client.CoreV1Api()
apps_v1 = client.AppsV1Api()
issues = []
# Check for pods in bad states
pods = v1.list_pod_for_all_namespaces()
for pod in pods.items:
containers = pod.status.container_statuses or []
for container in containers:
if container.state.waiting:
reason = container.state.waiting.reason or ""
if reason in ["CrashLoopBackOff", "OOMKilled", "ImagePullBackOff", "Error"]:
issues.append(ClusterIssue(
issue_type="pod_failing",
severity="critical" if reason == "CrashLoopBackOff" else "warning",
namespace=pod.metadata.namespace,
resource_name=pod.metadata.name,
resource_kind="Pod",
message=f"Container {container.name} is in {reason} state",
context={
"reason": reason,
"restart_count": container.restart_count,
"image": container.image
}
))
# Check for stalled deployments
deployments = apps_v1.list_deployment_for_all_namespaces()
for deploy in deployments.items:
spec_replicas = deploy.spec.replicas or 0
ready_replicas = deploy.status.ready_replicas or 0
if spec_replicas > 0 and ready_replicas == 0:
issues.append(ClusterIssue(
issue_type="deployment_unavailable",
severity="critical",
namespace=deploy.metadata.namespace,
resource_name=deploy.metadata.name,
resource_kind="Deployment",
message=f"Deployment has {spec_replicas} desired but 0 ready replicas",
context={
"desired": spec_replicas,
"ready": ready_replicas,
"unavailable": deploy.status.unavailable_replicas or 0
}
))
return issuesStep 2: AI Diagnosis Agent
# diagnosis_agent.py
import anthropic
import json
import os
from monitor import ClusterIssue
client = anthropic.Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY"))
def get_pod_logs(namespace: str, pod_name: str, lines: int = 50) -> str:
"""Get recent pod logs for context."""
from kubernetes import client as k8s_client
v1 = k8s_client.CoreV1Api()
try:
logs = v1.read_namespaced_pod_log(
name=pod_name,
namespace=namespace,
tail_lines=lines,
previous=True # Get logs from crashed container
)
return logs[-3000:] # Last 3000 chars
except Exception as e:
return f"Could not retrieve logs: {e}"
def diagnose_issue(issue: ClusterIssue) -> dict:
"""Use Claude to diagnose the issue and recommend action."""
# Get additional context
logs = ""
if issue.resource_kind == "Pod":
logs = get_pod_logs(issue.namespace, issue.resource_name)
prompt = f"""You are an expert Kubernetes SRE. Diagnose this cluster issue and recommend action.
Issue:
- Type: {issue.issue_type}
- Severity: {issue.severity}
- Resource: {issue.resource_kind}/{issue.resource_name} in {issue.namespace}
- Message: {issue.message}
- Context: {json.dumps(issue.context, indent=2)}
Recent logs:
{logs if logs else "No logs available"}
Respond in JSON with:
{{
"root_cause": "brief root cause explanation",
"confidence": "high|medium|low",
"recommended_action": "restart_pod|scale_deployment|clear_stuck_job|notify_human|ignore",
"action_params": {{}},
"reasoning": "why this action",
"risk_level": "safe|caution|dangerous"
}}
Only recommend automated actions for safe, well-understood issues.
If uncertain, recommend notify_human."""
response = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=500,
messages=[{"role": "user", "content": prompt}]
)
try:
return json.loads(response.content[0].text)
except:
return {
"root_cause": "Unable to parse diagnosis",
"recommended_action": "notify_human",
"risk_level": "caution"
}Step 3: Action Executor
# executor.py
from kubernetes import client
import logging
from datetime import datetime
logger = logging.getLogger(__name__)
# Audit log
audit_log = []
def execute_action(action: str, namespace: str, resource_name: str, params: dict = {}) -> bool:
"""Execute a safe Kubernetes action."""
audit_entry = {
"timestamp": datetime.utcnow().isoformat(),
"action": action,
"namespace": namespace,
"resource": resource_name,
"params": params,
"success": False
}
try:
if action == "restart_pod":
v1 = client.CoreV1Api()
v1.delete_namespaced_pod(name=resource_name, namespace=namespace)
logger.info(f"Restarted pod {namespace}/{resource_name}")
audit_entry["success"] = True
elif action == "scale_deployment":
apps_v1 = client.AppsV1Api()
target_replicas = params.get("replicas", 1)
# Safety check: never scale beyond bounds
if target_replicas < 1 or target_replicas > 20:
logger.warning(f"Scale target {target_replicas} out of safe bounds")
return False
apps_v1.patch_namespaced_deployment_scale(
name=resource_name,
namespace=namespace,
body={"spec": {"replicas": target_replicas}}
)
audit_entry["success"] = True
elif action == "clear_stuck_job":
batch_v1 = client.BatchV1Api()
batch_v1.delete_namespaced_job(
name=resource_name,
namespace=namespace,
propagation_policy="Background"
)
audit_entry["success"] = True
else:
logger.warning(f"Unknown or unsafe action: {action}")
return False
except Exception as e:
logger.error(f"Action failed: {e}")
audit_entry["error"] = str(e)
audit_log.append(audit_entry)
return audit_entry["success"]Step 4: Main Healing Loop
# healer.py
import time
import os
from dotenv import load_dotenv
from monitor import detect_issues, setup_k8s
from diagnosis_agent import diagnose_issue
from executor import execute_action, audit_log
load_dotenv()
setup_k8s()
# Track recent actions to prevent loops
recent_actions = {} # resource_name -> last_action_time
def should_act(resource_name: str, cooldown_minutes: int = 10) -> bool:
"""Prevent acting on same resource repeatedly."""
if resource_name in recent_actions:
elapsed = time.time() - recent_actions[resource_name]
if elapsed < cooldown_minutes * 60:
return False
return True
def healing_loop():
print("Self-healing agent started")
while True:
issues = detect_issues()
for issue in issues:
if not should_act(issue.resource_name):
continue
print(f"Analyzing: {issue.message}")
diagnosis = diagnose_issue(issue)
action = diagnosis.get("recommended_action")
risk = diagnosis.get("risk_level", "caution")
if action == "notify_human" or risk == "dangerous":
# Send Slack alert with diagnosis
notify_human(issue, diagnosis)
continue
if action in ["restart_pod", "scale_deployment", "clear_stuck_job"]:
print(f"Executing: {action} on {issue.resource_name}")
print(f"Reason: {diagnosis.get('reasoning')}")
success = execute_action(
action=action,
namespace=issue.namespace,
resource_name=issue.resource_name,
params=diagnosis.get("action_params", {})
)
if success:
recent_actions[issue.resource_name] = time.time()
print(f"✅ Fixed: {issue.resource_name}")
time.sleep(60) # Check every minute
if __name__ == "__main__":
healing_loop()Deploy to Kubernetes
apiVersion: apps/v1
kind: Deployment
metadata:
name: k8s-healer
namespace: monitoring
spec:
replicas: 1
template:
spec:
serviceAccountName: k8s-healer # Needs RBAC to read/delete pods
containers:
- name: healer
image: python:3.11-slim
env:
- name: ANTHROPIC_API_KEY
valueFrom:
secretKeyRef:
name: healer-secrets
key: anthropic-keyThe self-healing agent handles 80% of common issues automatically — CrashLoopBackOff restarts, stuck jobs, zero-replica deployments. The other 20% that need human judgment get routed to Slack with a full diagnosis.
Get your Anthropic API key to build AI-powered infrastructure tools. Start with the diagnosis piece — even without auto-fix, AI diagnosis alone saves hours of debugging.
Today I Fixed
Short real fixes from production — posted daily
Stay ahead of the curve
Get the latest DevOps, Kubernetes, AWS, and AI/ML guides delivered straight to your inbox. No spam — just practical engineering content.
Related Articles
Build an AI-Powered DevOps Chatbot with Streamlit on Kubernetes
Build a DevOps assistant chatbot that answers infrastructure questions, generates kubectl commands, and explains errors — deployed as a Streamlit app on Kubernetes.
Build LLM-Powered Runbook Automation with Haystack and Kubernetes
Turn your static runbooks into an AI system that answers 'what do I do when X happens' with step-by-step instructions retrieved from your actual documentation.
Build a Natural Language kubectl — Ask Questions to Your Cluster
Build a CLI tool that lets you describe what you want in plain English and generates the correct kubectl command — powered by Claude API.