🎉 DevOps Interview Prep Bundle is live — 1000+ Q&A across 20 topicsGet it →
All Articles

Build an AI Kubernetes Event Analyzer with Claude API

Build a Python tool that collects kubectl events and pod status, sends them to Claude API for AI analysis, identifies root causes, and posts alerts to Slack — deployable as a Kubernetes CronJob.

DevOpsBoys6 min read
Share:Tweet

Kubernetes events contain the most useful debugging information in your cluster — but there are hundreds of them, most are noise, and the signal is buried. This project builds a Python script that collects cluster events and pod status, sends them to Claude API for intelligent analysis, and posts a summary with root cause suggestions to Slack. You then run it as a Kubernetes CronJob every 15 minutes.

What It Does

  1. Runs kubectl get events and kubectl get pods across all namespaces
  2. Filters to Warning events and non-Running/Completed pods
  3. Sends the filtered output to Claude claude-haiku-4-5-20251001 for analysis
  4. Claude correlates pod failures with events, identifies patterns, suggests fixes
  5. Posts the summary to Slack if issues are found

Prerequisites

bash
pip install anthropic slack-sdk kubernetes

You need:

The Analyzer Script

Create k8s_event_analyzer.py:

python
#!/usr/bin/env python3
"""
Kubernetes Event Analyzer using Claude API
Collects cluster events, analyzes with AI, alerts on Slack
"""
 
import subprocess
import json
import os
import sys
from datetime import datetime
import anthropic
from slack_sdk.webhook import WebhookClient
 
ANTHROPIC_API_KEY = os.environ.get("ANTHROPIC_API_KEY")
SLACK_WEBHOOK_URL = os.environ.get("SLACK_WEBHOOK_URL", "")
CLUSTER_NAME = os.environ.get("CLUSTER_NAME", "my-cluster")
 
client = anthropic.Anthropic(api_key=ANTHROPIC_API_KEY)
 
 
def run_kubectl(args: list[str]) -> str:
    """Run a kubectl command and return stdout."""
    try:
        result = subprocess.run(
            ["kubectl"] + args,
            capture_output=True,
            text=True,
            timeout=30
        )
        if result.returncode != 0:
            print(f"kubectl error: {result.stderr}", file=sys.stderr)
            return ""
        return result.stdout
    except subprocess.TimeoutExpired:
        print(f"kubectl timed out: {args}", file=sys.stderr)
        return ""
 
 
def collect_cluster_state() -> dict:
    """Collect events and pod status from the cluster."""
    print("Collecting cluster state...")
 
    # Get Warning events only (last 1 hour via field selector is not supported well,
    # so we filter in Python)
    events_raw = run_kubectl([
        "get", "events",
        "--all-namespaces",
        "--field-selector", "type=Warning",
        "--sort-by=.lastTimestamp",
        "-o", "wide"
    ])
 
    # Get all pods that are not Running or Completed
    pods_raw = run_kubectl([
        "get", "pods",
        "--all-namespaces",
        "-o", "wide"
    ])
 
    # Filter pods to problem states
    problem_pods = []
    for line in pods_raw.splitlines():
        if any(status in line for status in [
            "CrashLoopBackOff", "Error", "OOMKilled",
            "Pending", "ImagePullBackOff", "ErrImagePull",
            "Terminating", "Unknown", "Init:Error"
        ]):
            problem_pods.append(line)
 
    # Get recent events count for context
    all_events = run_kubectl(["get", "events", "--all-namespaces", "--sort-by=.lastTimestamp"])
    event_lines = [l for l in all_events.splitlines() if l.strip()]
 
    return {
        "warning_events": events_raw,
        "problem_pods": "\n".join(problem_pods) if problem_pods else "None",
        "total_events_seen": len(event_lines),
        "timestamp": datetime.utcnow().isoformat() + "Z"
    }
 
 
def analyze_with_claude(cluster_state: dict) -> str:
    """Send cluster state to Claude for analysis."""
 
    # Build the prompt
    prompt = f"""You are a Kubernetes SRE expert. Analyze these cluster events and pod issues.
 
CLUSTER: {CLUSTER_NAME}
TIMESTAMP: {cluster_state['timestamp']}
TOTAL EVENTS IN CLUSTER: {cluster_state['total_events_seen']}
 
=== WARNING EVENTS ===
{cluster_state['warning_events'] or 'No warning events found'}
 
=== PROBLEM PODS (non-Running/Completed) ===
{cluster_state['problem_pods']}
 
Provide your analysis in this exact format:
 
SEVERITY: [CRITICAL / WARNING / OK]
 
SUMMARY:
One sentence summary of the cluster health.
 
ISSUES FOUND:
- Issue 1: [What is happening] → [Which pods/namespaces affected]
- Issue 2: ...
(If no issues, write "None detected")
 
ROOT CAUSES:
- [Most likely cause for each issue]
 
RECOMMENDED ACTIONS:
1. [Most urgent action with exact kubectl command if applicable]
2. [Next action]
3. ...
 
Keep the response concise and actionable. Focus on patterns, not individual events."""
 
    print("Sending to Claude API for analysis...")
    message = client.messages.create(
        model="claude-haiku-4-5-20251001",
        max_tokens=1024,
        messages=[
            {"role": "user", "content": prompt}
        ]
    )
 
    return message.content[0].text
 
 
def send_slack_alert(analysis: str, cluster_state: dict) -> None:
    """Post analysis to Slack if issues found and webhook is configured."""
    if not SLACK_WEBHOOK_URL:
        print("No Slack webhook configured, skipping alert")
        return
 
    # Only alert if severity is not OK
    if "SEVERITY: OK" in analysis:
        print("Cluster is healthy, no Slack alert needed")
        return
 
    severity = "CRITICAL" if "SEVERITY: CRITICAL" in analysis else "WARNING"
    emoji = ":red_circle:" if severity == "CRITICAL" else ":large_yellow_circle:"
 
    webhook = WebhookClient(SLACK_WEBHOOK_URL)
    response = webhook.send(
        text=f"{emoji} *K8s Cluster Alert — {CLUSTER_NAME}*",
        blocks=[
            {
                "type": "header",
                "text": {
                    "type": "plain_text",
                    "text": f"{emoji} Kubernetes Alert: {CLUSTER_NAME}"
                }
            },
            {
                "type": "section",
                "text": {
                    "type": "mrkdwn",
                    "text": f"*Timestamp:* {cluster_state['timestamp']}\n*Severity:* {severity}"
                }
            },
            {
                "type": "section",
                "text": {
                    "type": "mrkdwn",
                    "text": f"```{analysis[:2800]}```"
                }
            }
        ]
    )
 
    if response.status_code == 200:
        print("Slack alert sent successfully")
    else:
        print(f"Slack alert failed: {response.status_code} {response.body}")
 
 
def main():
    print(f"=== Kubernetes Event Analyzer ===")
    print(f"Cluster: {CLUSTER_NAME}")
    print(f"Time: {datetime.utcnow().isoformat()}Z")
    print()
 
    # Step 1: Collect cluster state
    cluster_state = collect_cluster_state()
 
    # Step 2: If no warning events and no problem pods, skip API call
    if (not cluster_state["warning_events"].strip() or
            cluster_state["warning_events"].strip() == "No resources found." or
            cluster_state["warning_events"].strip() == ""):
        if cluster_state["problem_pods"] == "None":
            print("Cluster is clean — no warning events, no problem pods. Exiting.")
            return
 
    # Step 3: Analyze with Claude
    analysis = analyze_with_claude(cluster_state)
 
    # Step 4: Print analysis
    print("\n=== AI ANALYSIS ===")
    print(analysis)
    print("==================\n")
 
    # Step 5: Send Slack alert if needed
    send_slack_alert(analysis, cluster_state)
 
 
if __name__ == "__main__":
    main()

Sample Output

Running on a cluster with a CrashLoopBackOff pod and OOM events:

=== Kubernetes Event Analyzer ===
Cluster: production-eks
Time: 2026-06-29T10:15:00Z

Collecting cluster state...
Sending to Claude API for analysis...

=== AI ANALYSIS ===
SEVERITY: CRITICAL

SUMMARY:
One pod in the payments namespace is in CrashLoopBackOff due to OOM kills, likely caused by a memory leak or missing resource limits.

ISSUES FOUND:
- Issue 1: payments-worker-7d9f-xk2p is CrashLoopBackOff → payments namespace
- Issue 2: Multiple OOMKilled events in the last 30 minutes for the same pod

ROOT CAUSES:
- Pod has no memory limit set, consuming unbounded memory until node kills it
- Application likely has a memory leak in the payment processing loop

RECOMMENDED ACTIONS:
1. Check recent logs: kubectl logs payments-worker-7d9f-xk2p -n payments --previous
2. Add memory limit to deployment: kubectl set resources deployment/payments-worker -n payments --limits=memory=512Mi
3. Monitor memory usage: kubectl top pod -n payments
==================

Deploy as a Kubernetes CronJob

Build and push the container:

dockerfile
FROM python:3.12-slim
RUN pip install anthropic slack-sdk kubernetes
COPY k8s_event_analyzer.py /app/
WORKDIR /app
CMD ["python", "k8s_event_analyzer.py"]
bash
docker build -t your-registry/k8s-analyzer:latest .
docker push your-registry/k8s-analyzer:latest

Create a ServiceAccount with read access:

yaml
apiVersion: v1
kind: ServiceAccount
metadata:
  name: k8s-analyzer
  namespace: monitoring
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: k8s-analyzer-reader
rules:
- apiGroups: [""]
  resources: ["pods", "events", "namespaces", "nodes"]
  verbs: ["get", "list"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: k8s-analyzer-binding
subjects:
- kind: ServiceAccount
  name: k8s-analyzer
  namespace: monitoring
roleRef:
  kind: ClusterRole
  name: k8s-analyzer-reader
  apiGroup: rbac.authorization.k8s.io

Create a Secret for the API key:

bash
kubectl create secret generic k8s-analyzer-secrets \
  --from-literal=anthropic-api-key=sk-ant-your-key \
  --from-literal=slack-webhook-url=https://hooks.slack.com/your-webhook \
  -n monitoring

Deploy the CronJob:

yaml
apiVersion: batch/v1
kind: CronJob
metadata:
  name: k8s-event-analyzer
  namespace: monitoring
spec:
  schedule: "*/15 * * * *"
  concurrencyPolicy: Forbid
  successfulJobsHistoryLimit: 3
  failedJobsHistoryLimit: 3
  jobTemplate:
    spec:
      template:
        spec:
          serviceAccountName: k8s-analyzer
          restartPolicy: OnFailure
          containers:
          - name: analyzer
            image: your-registry/k8s-analyzer:latest
            env:
            - name: CLUSTER_NAME
              value: "production-eks"
            - name: ANTHROPIC_API_KEY
              valueFrom:
                secretKeyRef:
                  name: k8s-analyzer-secrets
                  key: anthropic-api-key
            - name: SLACK_WEBHOOK_URL
              valueFrom:
                secretKeyRef:
                  name: k8s-analyzer-secrets
                  key: slack-webhook-url
            resources:
              requests:
                memory: "128Mi"
                cpu: "100m"
              limits:
                memory: "256Mi"
                cpu: "500m"

Apply everything:

bash
kubectl apply -f serviceaccount.yaml
kubectl apply -f cronjob.yaml
 
# Trigger a manual run to test
kubectl create job --from=cronjob/k8s-event-analyzer test-run -n monitoring
kubectl logs -l job-name=test-run -n monitoring -f

Cost Estimate

claude-haiku-4-5-20251001 costs $0.80 per million input tokens and $4 per million output tokens. Each analysis run sends roughly 2,000 tokens and returns 500 tokens. At 4 runs/hour that is about $0.006/hour or roughly $4.50/month for continuous monitoring — cheaper than a single PagerDuty alert on most plans.

This analyzer catches real patterns that static alerting rules miss: correlating OOM events with specific deployments, identifying cascading failures, and translating cryptic Kubernetes messages into actionable steps.

🔧

Today I Fixed

Short real fixes from production — posted daily

Browse fixes
Newsletter

Stay ahead of the curve

Get the latest DevOps, Kubernetes, AWS, and AI/ML guides delivered straight to your inbox. No spam — just practical engineering content.

Related Articles

Comments