Build an AI Kubernetes Troubleshooter with Claude (2026)
Build a CLI tool that automatically diagnoses Kubernetes issues — OOMKilled, CrashLoopBackOff, pending pods — by gathering cluster state and asking Claude what's wrong and how to fix it.
When a Kubernetes pod fails, diagnosis involves running 6–8 kubectl commands, reading logs, checking events, correlating resource usage — then figuring out the cause. This guide builds a CLI tool that does all of that automatically and gives you a diagnosis with fix steps.
What We're Building
$ k8s-ai diagnose pod payments-7d9f8b-xk2p -n production
Gathering cluster state...
Pod status, events, logs, node metrics collected
Claude's diagnosis:
ROOT CAUSE: OOMKilled
The payments pod was killed because it exceeded its memory limit of 512Mi.
EVIDENCE:
- Container exit code: 137 (OOMKilled)
- Memory usage peaked at 498Mi before kill
- 3 restarts in the last hour, increasing in frequency
IMMEDIATE FIX:
kubectl patch deployment payments -n production \
-p '{"spec":{"template":{"spec":{"containers":[{"name":"payments","resources":{"limits":{"memory":"1Gi"}}}]}}}}'
INVESTIGATION:
Check for memory leaks in payments service. Recent deploy at 14:32 matches
when restarts began. Consider rolling back to v2.3.1.
Project Setup
pip install anthropic kubernetes click rich pyyamlk8s-ai/
├── k8s_ai/
│ ├── __init__.py
│ ├── cli.py # Click CLI
│ ├── collector.py # Gather k8s data
│ └── analyzer.py # Claude API calls
└── pyproject.toml
Step 1: Kubernetes Data Collector
# k8s_ai/collector.py
from kubernetes import client, config
from datetime import datetime, timezone
import json
try:
config.load_incluster_config()
except:
config.load_kube_config()
core_v1 = client.CoreV1Api()
apps_v1 = client.AppsV1Api()
metrics_api = client.CustomObjectsApi()
def collect_pod_context(pod_name: str, namespace: str) -> dict:
"""Collect everything about a pod needed for diagnosis."""
context = {}
# 1. Pod spec and status
try:
pod = core_v1.read_namespaced_pod(name=pod_name, namespace=namespace)
context["pod"] = {
"name": pod.metadata.name,
"namespace": pod.metadata.namespace,
"phase": pod.status.phase,
"conditions": [
{"type": c.type, "status": c.status, "reason": c.reason, "message": c.message}
for c in (pod.status.conditions or [])
],
"containers": [],
"node": pod.spec.node_name,
"created_at": str(pod.metadata.creation_timestamp),
}
for cs in (pod.status.container_statuses or []):
container_info = {
"name": cs.name,
"ready": cs.ready,
"restart_count": cs.restart_count,
"image": cs.image,
"state": {},
"last_state": {},
}
if cs.state.running:
container_info["state"] = {"running": {"started_at": str(cs.state.running.started_at)}}
elif cs.state.waiting:
container_info["state"] = {
"waiting": {"reason": cs.state.waiting.reason, "message": cs.state.waiting.message}
}
elif cs.state.terminated:
container_info["state"] = {
"terminated": {
"exit_code": cs.state.terminated.exit_code,
"reason": cs.state.terminated.reason,
"message": cs.state.terminated.message,
"finished_at": str(cs.state.terminated.finished_at),
}
}
if cs.last_state.terminated:
container_info["last_state"] = {
"exit_code": cs.last_state.terminated.exit_code,
"reason": cs.last_state.terminated.reason,
}
# Resource requests/limits from spec
for c in pod.spec.containers:
if c.name == cs.name and c.resources:
container_info["resources"] = {
"requests": {
"cpu": c.resources.requests.get("cpu") if c.resources.requests else None,
"memory": c.resources.requests.get("memory") if c.resources.requests else None,
},
"limits": {
"cpu": c.resources.limits.get("cpu") if c.resources.limits else None,
"memory": c.resources.limits.get("memory") if c.resources.limits else None,
}
}
context["pod"]["containers"].append(container_info)
except client.exceptions.ApiException as e:
context["pod_error"] = str(e)
# 2. Recent events
try:
events = core_v1.list_namespaced_event(
namespace=namespace,
field_selector=f"involvedObject.name={pod_name}"
)
context["events"] = [
{
"type": e.type,
"reason": e.reason,
"message": e.message,
"count": e.count,
"last_time": str(e.last_timestamp),
}
for e in sorted(events.items, key=lambda x: x.last_timestamp or datetime.min.replace(tzinfo=timezone.utc), reverse=True)[:10]
]
except Exception as e:
context["events_error"] = str(e)
# 3. Recent logs (last 100 lines)
try:
logs = core_v1.read_namespaced_pod_log(
name=pod_name,
namespace=namespace,
tail_lines=100,
timestamps=True,
)
context["logs"] = logs[-3000:] if len(logs) > 3000 else logs # Limit size
except Exception as e:
# Try previous container logs
try:
logs = core_v1.read_namespaced_pod_log(
name=pod_name,
namespace=namespace,
tail_lines=50,
previous=True,
)
context["logs_previous"] = logs[-2000:] if len(logs) > 2000 else logs
except:
context["logs_error"] = str(e)
# 4. Node info (is the node under pressure?)
if context.get("pod", {}).get("node"):
node_name = context["pod"]["node"]
try:
node = core_v1.read_node(name=node_name)
context["node"] = {
"name": node_name,
"conditions": [
{"type": c.type, "status": c.status, "message": c.message}
for c in (node.status.conditions or [])
],
"allocatable": dict(node.status.allocatable or {}),
"capacity": dict(node.status.capacity or {}),
}
except Exception:
pass
# 5. Deployment info if applicable
try:
# Try to find the owning deployment via labels
if pod.metadata.labels:
app_label = pod.metadata.labels.get("app") or pod.metadata.labels.get("app.kubernetes.io/name")
if app_label:
deps = apps_v1.list_namespaced_deployment(
namespace=namespace,
label_selector=f"app={app_label}"
)
if deps.items:
dep = deps.items[0]
context["deployment"] = {
"name": dep.metadata.name,
"replicas": dep.spec.replicas,
"available": dep.status.available_replicas,
"updated_at": str(dep.metadata.annotations.get("deployment.kubernetes.io/revision", "unknown")),
}
except Exception:
pass
return context
def collect_namespace_context(namespace: str) -> dict:
"""Collect overview of a namespace for general troubleshooting."""
context = {}
try:
pods = core_v1.list_namespaced_pod(namespace=namespace)
context["pods"] = []
for pod in pods.items:
pod_summary = {
"name": pod.metadata.name,
"phase": pod.status.phase,
"ready": False,
"restarts": 0,
"node": pod.spec.node_name,
}
if pod.status.container_statuses:
pod_summary["ready"] = all(c.ready for c in pod.status.container_statuses)
pod_summary["restarts"] = sum(c.restart_count for c in pod.status.container_statuses)
# Check for non-running states
for cs in pod.status.container_statuses:
if cs.state.waiting:
pod_summary["waiting_reason"] = cs.state.waiting.reason
context["pods"].append(pod_summary)
except Exception as e:
context["error"] = str(e)
return contextStep 2: Claude Analyzer
# k8s_ai/analyzer.py
import anthropic
import json
claude = anthropic.Anthropic()
SYSTEM_PROMPT = """You are an expert Kubernetes Site Reliability Engineer with deep knowledge of
Kubernetes internals, common failure modes, and operational best practices.
When given Kubernetes cluster state data, you:
1. Identify the root cause of the issue clearly
2. Cite specific evidence from the data (exit codes, error messages, events)
3. Provide the exact kubectl commands to fix the immediate issue
4. Suggest next steps for root cause investigation
5. Keep explanations practical and actionable
Format your response with clear sections:
- Root Cause: one sentence
- Evidence: bullet points citing specific data
- Immediate Fix: exact commands the engineer should run
- Investigation: what to check next
"""
def diagnose_pod(pod_context: dict, question: str = None) -> str:
"""Diagnose a pod's issues using Claude."""
context_json = json.dumps(pod_context, indent=2, default=str)
user_message = (
"Diagnose the following Kubernetes pod issue and provide a fix.\n\n"
f"Cluster state data:\n{context_json}"
)
if question:
user_message += f"\n\nSpecific question: {question}"
response = claude.messages.create(
model="claude-sonnet-4-6",
max_tokens=1500,
system=SYSTEM_PROMPT,
messages=[{"role": "user", "content": user_message}],
)
return response.content[0].text
def diagnose_namespace(namespace_context: dict, question: str) -> str:
"""Answer a general question about a namespace."""
context_json = json.dumps(namespace_context, indent=2, default=str)
response = claude.messages.create(
model="claude-sonnet-4-6",
max_tokens=1500,
system=SYSTEM_PROMPT,
messages=[{
"role": "user",
"content": f"Question: {question}\n\nNamespace state:\n{context_json}"
}],
)
return response.content[0].textStep 3: CLI
# k8s_ai/cli.py
import click
from rich.console import Console
from rich.markdown import Markdown
from rich.spinner import Spinner
from rich.live import Live
from .collector import collect_pod_context, collect_namespace_context
from .analyzer import diagnose_pod, diagnose_namespace
console = Console()
@click.group()
def cli():
"""AI-powered Kubernetes troubleshooter."""
pass
@cli.command()
@click.argument("pod_name")
@click.option("-n", "--namespace", default="default", help="Kubernetes namespace")
@click.option("-q", "--question", help="Specific question about the pod")
def diagnose(pod_name: str, namespace: str, question: str):
"""Diagnose issues with a specific pod."""
with console.status("[bold cyan]Gathering cluster state...", spinner="dots"):
context = collect_pod_context(pod_name, namespace)
console.print(f"[dim]✓ Collected: pod status, {len(context.get('events', []))} events, logs[/dim]")
console.print()
with console.status("[bold cyan]Claude is analyzing...", spinner="dots"):
diagnosis = diagnose_pod(context, question)
console.print(Markdown(diagnosis))
@cli.command()
@click.argument("namespace")
@click.argument("question")
def ask(namespace: str, question: str):
"""Ask a question about a namespace."""
with console.status("[bold cyan]Gathering namespace state...", spinner="dots"):
context = collect_namespace_context(namespace)
with console.status("[bold cyan]Claude is analyzing...", spinner="dots"):
answer = diagnose_namespace(context, question)
console.print(Markdown(answer))
def main():
cli()Step 4: Install and Run
# Install
pip install -e .
# Or run directly
python -m k8s_ai.cli diagnose payments-7d9f8b-xk2p -n production
# Ask a question about a namespace
python -m k8s_ai.cli ask production "Why are some pods using more CPU than expected?"
# With a specific question about a pod
python -m k8s_ai.cli diagnose api-server-xyz -n staging \
-q "The pod restarted 10 times last night. What's causing it?"Deploy as a Kubernetes Job (Run on Demand)
# troubleshoot-job.yaml
apiVersion: batch/v1
kind: Job
metadata:
name: k8s-diagnose
spec:
template:
spec:
serviceAccountName: k8s-ai-sa # Needs read access to pods, events, logs
containers:
- name: k8s-ai
image: your-registry/k8s-ai:latest
command: ["python", "-m", "k8s_ai.cli", "diagnose"]
args: ["$(POD_NAME)", "-n", "$(NAMESPACE)"]
env:
- name: POD_NAME
value: "payments-crashed-abc123"
- name: NAMESPACE
value: "production"
- name: ANTHROPIC_API_KEY
valueFrom:
secretKeyRef:
name: ai-secrets
key: anthropic-api-key
restartPolicy: Never
---
# RBAC — read-only access
apiVersion: v1
kind: ServiceAccount
metadata:
name: k8s-ai-sa
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: k8s-ai-reader
rules:
- apiGroups: [""]
resources: ["pods", "pods/log", "events", "nodes", "namespaces"]
verbs: ["get", "list"]
- apiGroups: ["apps"]
resources: ["deployments", "replicasets"]
verbs: ["get", "list"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: k8s-ai-reader-binding
subjects:
- kind: ServiceAccount
name: k8s-ai-sa
namespace: default
roleRef:
kind: ClusterRole
name: k8s-ai-reader
apiGroup: rbac.authorization.k8s.ioReal Diagnoses It Handles Well
| Issue | What Claude identifies |
|---|---|
| OOMKilled | Exit code 137, memory limit, suggested new limit |
| CrashLoopBackOff | Root cause from logs (missing env var, DB connection) |
| ImagePullBackOff | Wrong image tag or registry auth issue |
| Pending pod | Insufficient CPU/memory, taint mismatch, node selector |
| Init container failure | Which init container failed and why |
| Liveness probe failure | Response time vs timeout, wrong health endpoint |
The key insight: AI doesn't replace your kubectl knowledge — it eliminates the 10 minutes of mechanical data gathering and correlation that precede every diagnosis. You stay in control of whether to apply the fix.
For building more sophisticated DevOps agents, see the LLM function calling for DevOps guide and the Anthropic tool use documentation.
Stay ahead of the curve
Get the latest DevOps, Kubernetes, AWS, and AI/ML guides delivered straight to your inbox. No spam — just practical engineering content.
Related Articles
Build a DevOps RAG Pipeline with LlamaIndex on Kubernetes (2026)
Build a Retrieval-Augmented Generation (RAG) pipeline that answers questions from your runbooks, Confluence docs, and incident history. Deploy it on Kubernetes with LlamaIndex, Ollama, and Qdrant vector database.
AI-Driven Capacity Planning for Kubernetes Clusters (2026)
How to use AI and machine learning for Kubernetes capacity planning. Covers predictive autoscaling, cost optimization, tools like StormForge and Kubecost, and building custom ML models for resource forecasting.
AI-Powered Kubernetes Anomaly Detection: Beyond Static Thresholds
Static alerts miss 40% of real incidents. Learn how AI and ML-based anomaly detection — using tools like Prometheus + ML, Dynatrace, and custom LLM runbooks — catches what thresholds can't.