All Articles

Build an AI Kubernetes Troubleshooter with Claude (2026)

Build a CLI tool that automatically diagnoses Kubernetes issues — OOMKilled, CrashLoopBackOff, pending pods — by gathering cluster state and asking Claude what's wrong and how to fix it.

DevOpsBoysApr 27, 20267 min read
Share:Tweet

When a Kubernetes pod fails, diagnosis involves running 6–8 kubectl commands, reading logs, checking events, correlating resource usage — then figuring out the cause. This guide builds a CLI tool that does all of that automatically and gives you a diagnosis with fix steps.


What We're Building

$ k8s-ai diagnose pod payments-7d9f8b-xk2p -n production

Gathering cluster state...
Pod status, events, logs, node metrics collected

Claude's diagnosis:

ROOT CAUSE: OOMKilled
The payments pod was killed because it exceeded its memory limit of 512Mi.

EVIDENCE:
- Container exit code: 137 (OOMKilled)
- Memory usage peaked at 498Mi before kill
- 3 restarts in the last hour, increasing in frequency

IMMEDIATE FIX:
kubectl patch deployment payments -n production \
  -p '{"spec":{"template":{"spec":{"containers":[{"name":"payments","resources":{"limits":{"memory":"1Gi"}}}]}}}}'

INVESTIGATION:
Check for memory leaks in payments service. Recent deploy at 14:32 matches
when restarts began. Consider rolling back to v2.3.1.

Project Setup

bash
pip install anthropic kubernetes click rich pyyaml
k8s-ai/
├── k8s_ai/
│   ├── __init__.py
│   ├── cli.py           # Click CLI
│   ├── collector.py     # Gather k8s data
│   └── analyzer.py      # Claude API calls
└── pyproject.toml

Step 1: Kubernetes Data Collector

python
# k8s_ai/collector.py
from kubernetes import client, config
from datetime import datetime, timezone
import json
 
try:
    config.load_incluster_config()
except:
    config.load_kube_config()
 
core_v1 = client.CoreV1Api()
apps_v1 = client.AppsV1Api()
metrics_api = client.CustomObjectsApi()
 
 
def collect_pod_context(pod_name: str, namespace: str) -> dict:
    """Collect everything about a pod needed for diagnosis."""
    context = {}
 
    # 1. Pod spec and status
    try:
        pod = core_v1.read_namespaced_pod(name=pod_name, namespace=namespace)
        context["pod"] = {
            "name": pod.metadata.name,
            "namespace": pod.metadata.namespace,
            "phase": pod.status.phase,
            "conditions": [
                {"type": c.type, "status": c.status, "reason": c.reason, "message": c.message}
                for c in (pod.status.conditions or [])
            ],
            "containers": [],
            "node": pod.spec.node_name,
            "created_at": str(pod.metadata.creation_timestamp),
        }
        for cs in (pod.status.container_statuses or []):
            container_info = {
                "name": cs.name,
                "ready": cs.ready,
                "restart_count": cs.restart_count,
                "image": cs.image,
                "state": {},
                "last_state": {},
            }
            if cs.state.running:
                container_info["state"] = {"running": {"started_at": str(cs.state.running.started_at)}}
            elif cs.state.waiting:
                container_info["state"] = {
                    "waiting": {"reason": cs.state.waiting.reason, "message": cs.state.waiting.message}
                }
            elif cs.state.terminated:
                container_info["state"] = {
                    "terminated": {
                        "exit_code": cs.state.terminated.exit_code,
                        "reason": cs.state.terminated.reason,
                        "message": cs.state.terminated.message,
                        "finished_at": str(cs.state.terminated.finished_at),
                    }
                }
            if cs.last_state.terminated:
                container_info["last_state"] = {
                    "exit_code": cs.last_state.terminated.exit_code,
                    "reason": cs.last_state.terminated.reason,
                }
 
            # Resource requests/limits from spec
            for c in pod.spec.containers:
                if c.name == cs.name and c.resources:
                    container_info["resources"] = {
                        "requests": {
                            "cpu": c.resources.requests.get("cpu") if c.resources.requests else None,
                            "memory": c.resources.requests.get("memory") if c.resources.requests else None,
                        },
                        "limits": {
                            "cpu": c.resources.limits.get("cpu") if c.resources.limits else None,
                            "memory": c.resources.limits.get("memory") if c.resources.limits else None,
                        }
                    }
 
            context["pod"]["containers"].append(container_info)
    except client.exceptions.ApiException as e:
        context["pod_error"] = str(e)
 
    # 2. Recent events
    try:
        events = core_v1.list_namespaced_event(
            namespace=namespace,
            field_selector=f"involvedObject.name={pod_name}"
        )
        context["events"] = [
            {
                "type": e.type,
                "reason": e.reason,
                "message": e.message,
                "count": e.count,
                "last_time": str(e.last_timestamp),
            }
            for e in sorted(events.items, key=lambda x: x.last_timestamp or datetime.min.replace(tzinfo=timezone.utc), reverse=True)[:10]
        ]
    except Exception as e:
        context["events_error"] = str(e)
 
    # 3. Recent logs (last 100 lines)
    try:
        logs = core_v1.read_namespaced_pod_log(
            name=pod_name,
            namespace=namespace,
            tail_lines=100,
            timestamps=True,
        )
        context["logs"] = logs[-3000:] if len(logs) > 3000 else logs  # Limit size
    except Exception as e:
        # Try previous container logs
        try:
            logs = core_v1.read_namespaced_pod_log(
                name=pod_name,
                namespace=namespace,
                tail_lines=50,
                previous=True,
            )
            context["logs_previous"] = logs[-2000:] if len(logs) > 2000 else logs
        except:
            context["logs_error"] = str(e)
 
    # 4. Node info (is the node under pressure?)
    if context.get("pod", {}).get("node"):
        node_name = context["pod"]["node"]
        try:
            node = core_v1.read_node(name=node_name)
            context["node"] = {
                "name": node_name,
                "conditions": [
                    {"type": c.type, "status": c.status, "message": c.message}
                    for c in (node.status.conditions or [])
                ],
                "allocatable": dict(node.status.allocatable or {}),
                "capacity": dict(node.status.capacity or {}),
            }
        except Exception:
            pass
 
    # 5. Deployment info if applicable
    try:
        # Try to find the owning deployment via labels
        if pod.metadata.labels:
            app_label = pod.metadata.labels.get("app") or pod.metadata.labels.get("app.kubernetes.io/name")
            if app_label:
                deps = apps_v1.list_namespaced_deployment(
                    namespace=namespace,
                    label_selector=f"app={app_label}"
                )
                if deps.items:
                    dep = deps.items[0]
                    context["deployment"] = {
                        "name": dep.metadata.name,
                        "replicas": dep.spec.replicas,
                        "available": dep.status.available_replicas,
                        "updated_at": str(dep.metadata.annotations.get("deployment.kubernetes.io/revision", "unknown")),
                    }
    except Exception:
        pass
 
    return context
 
 
def collect_namespace_context(namespace: str) -> dict:
    """Collect overview of a namespace for general troubleshooting."""
    context = {}
 
    try:
        pods = core_v1.list_namespaced_pod(namespace=namespace)
        context["pods"] = []
        for pod in pods.items:
            pod_summary = {
                "name": pod.metadata.name,
                "phase": pod.status.phase,
                "ready": False,
                "restarts": 0,
                "node": pod.spec.node_name,
            }
            if pod.status.container_statuses:
                pod_summary["ready"] = all(c.ready for c in pod.status.container_statuses)
                pod_summary["restarts"] = sum(c.restart_count for c in pod.status.container_statuses)
                # Check for non-running states
                for cs in pod.status.container_statuses:
                    if cs.state.waiting:
                        pod_summary["waiting_reason"] = cs.state.waiting.reason
            context["pods"].append(pod_summary)
    except Exception as e:
        context["error"] = str(e)
 
    return context

Step 2: Claude Analyzer

python
# k8s_ai/analyzer.py
import anthropic
import json
 
claude = anthropic.Anthropic()
 
SYSTEM_PROMPT = """You are an expert Kubernetes Site Reliability Engineer with deep knowledge of 
Kubernetes internals, common failure modes, and operational best practices.
 
When given Kubernetes cluster state data, you:
1. Identify the root cause of the issue clearly
2. Cite specific evidence from the data (exit codes, error messages, events)
3. Provide the exact kubectl commands to fix the immediate issue
4. Suggest next steps for root cause investigation
5. Keep explanations practical and actionable
 
Format your response with clear sections:
- Root Cause: one sentence
- Evidence: bullet points citing specific data
- Immediate Fix: exact commands the engineer should run
- Investigation: what to check next
"""
 
 
def diagnose_pod(pod_context: dict, question: str = None) -> str:
    """Diagnose a pod's issues using Claude."""
    context_json = json.dumps(pod_context, indent=2, default=str)
    user_message = (
        "Diagnose the following Kubernetes pod issue and provide a fix.\n\n"
        f"Cluster state data:\n{context_json}"
    )
    if question:
        user_message += f"\n\nSpecific question: {question}"
 
    response = claude.messages.create(
        model="claude-sonnet-4-6",
        max_tokens=1500,
        system=SYSTEM_PROMPT,
        messages=[{"role": "user", "content": user_message}],
    )
    return response.content[0].text
 
 
def diagnose_namespace(namespace_context: dict, question: str) -> str:
    """Answer a general question about a namespace."""
    context_json = json.dumps(namespace_context, indent=2, default=str)
    response = claude.messages.create(
        model="claude-sonnet-4-6",
        max_tokens=1500,
        system=SYSTEM_PROMPT,
        messages=[{
            "role": "user",
            "content": f"Question: {question}\n\nNamespace state:\n{context_json}"
        }],
    )
    return response.content[0].text

Step 3: CLI

python
# k8s_ai/cli.py
import click
from rich.console import Console
from rich.markdown import Markdown
from rich.spinner import Spinner
from rich.live import Live
from .collector import collect_pod_context, collect_namespace_context
from .analyzer import diagnose_pod, diagnose_namespace
 
console = Console()
 
 
@click.group()
def cli():
    """AI-powered Kubernetes troubleshooter."""
    pass
 
 
@cli.command()
@click.argument("pod_name")
@click.option("-n", "--namespace", default="default", help="Kubernetes namespace")
@click.option("-q", "--question", help="Specific question about the pod")
def diagnose(pod_name: str, namespace: str, question: str):
    """Diagnose issues with a specific pod."""
    with console.status("[bold cyan]Gathering cluster state...", spinner="dots"):
        context = collect_pod_context(pod_name, namespace)
 
    console.print(f"[dim]✓ Collected: pod status, {len(context.get('events', []))} events, logs[/dim]")
    console.print()
 
    with console.status("[bold cyan]Claude is analyzing...", spinner="dots"):
        diagnosis = diagnose_pod(context, question)
 
    console.print(Markdown(diagnosis))
 
 
@cli.command()
@click.argument("namespace")
@click.argument("question")
def ask(namespace: str, question: str):
    """Ask a question about a namespace."""
    with console.status("[bold cyan]Gathering namespace state...", spinner="dots"):
        context = collect_namespace_context(namespace)
 
    with console.status("[bold cyan]Claude is analyzing...", spinner="dots"):
        answer = diagnose_namespace(context, question)
 
    console.print(Markdown(answer))
 
 
def main():
    cli()

Step 4: Install and Run

bash
# Install
pip install -e .
 
# Or run directly
python -m k8s_ai.cli diagnose payments-7d9f8b-xk2p -n production
 
# Ask a question about a namespace
python -m k8s_ai.cli ask production "Why are some pods using more CPU than expected?"
 
# With a specific question about a pod
python -m k8s_ai.cli diagnose api-server-xyz -n staging \
  -q "The pod restarted 10 times last night. What's causing it?"

Deploy as a Kubernetes Job (Run on Demand)

yaml
# troubleshoot-job.yaml
apiVersion: batch/v1
kind: Job
metadata:
  name: k8s-diagnose
spec:
  template:
    spec:
      serviceAccountName: k8s-ai-sa  # Needs read access to pods, events, logs
      containers:
      - name: k8s-ai
        image: your-registry/k8s-ai:latest
        command: ["python", "-m", "k8s_ai.cli", "diagnose"]
        args: ["$(POD_NAME)", "-n", "$(NAMESPACE)"]
        env:
        - name: POD_NAME
          value: "payments-crashed-abc123"
        - name: NAMESPACE
          value: "production"
        - name: ANTHROPIC_API_KEY
          valueFrom:
            secretKeyRef:
              name: ai-secrets
              key: anthropic-api-key
      restartPolicy: Never
---
# RBAC — read-only access
apiVersion: v1
kind: ServiceAccount
metadata:
  name: k8s-ai-sa
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: k8s-ai-reader
rules:
- apiGroups: [""]
  resources: ["pods", "pods/log", "events", "nodes", "namespaces"]
  verbs: ["get", "list"]
- apiGroups: ["apps"]
  resources: ["deployments", "replicasets"]
  verbs: ["get", "list"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: k8s-ai-reader-binding
subjects:
- kind: ServiceAccount
  name: k8s-ai-sa
  namespace: default
roleRef:
  kind: ClusterRole
  name: k8s-ai-reader
  apiGroup: rbac.authorization.k8s.io

Real Diagnoses It Handles Well

IssueWhat Claude identifies
OOMKilledExit code 137, memory limit, suggested new limit
CrashLoopBackOffRoot cause from logs (missing env var, DB connection)
ImagePullBackOffWrong image tag or registry auth issue
Pending podInsufficient CPU/memory, taint mismatch, node selector
Init container failureWhich init container failed and why
Liveness probe failureResponse time vs timeout, wrong health endpoint

The key insight: AI doesn't replace your kubectl knowledge — it eliminates the 10 minutes of mechanical data gathering and correlation that precede every diagnosis. You stay in control of whether to apply the fix.

For building more sophisticated DevOps agents, see the LLM function calling for DevOps guide and the Anthropic tool use documentation.

Newsletter

Stay ahead of the curve

Get the latest DevOps, Kubernetes, AWS, and AI/ML guides delivered straight to your inbox. No spam — just practical engineering content.

Related Articles

Comments