Build an AI Kubernetes Event Analyzer with Claude API
Build a Python tool that collects kubectl events and pod status, sends them to Claude API for AI analysis, identifies root causes, and posts alerts to Slack — deployable as a Kubernetes CronJob.
Kubernetes events contain the most useful debugging information in your cluster — but there are hundreds of them, most are noise, and the signal is buried. This project builds a Python script that collects cluster events and pod status, sends them to Claude API for intelligent analysis, and posts a summary with root cause suggestions to Slack. You then run it as a Kubernetes CronJob every 15 minutes.
What It Does
- Runs
kubectl get eventsandkubectl get podsacross all namespaces - Filters to Warning events and non-Running/Completed pods
- Sends the filtered output to Claude claude-haiku-4-5-20251001 for analysis
- Claude correlates pod failures with events, identifies patterns, suggests fixes
- Posts the summary to Slack if issues are found
Prerequisites
pip install anthropic slack-sdk kubernetesYou need:
- A running Kubernetes cluster with kubectl configured
- An Anthropic API key (get one at console.anthropic.com)
- A Slack webhook URL (optional but recommended)
The Analyzer Script
Create k8s_event_analyzer.py:
#!/usr/bin/env python3
"""
Kubernetes Event Analyzer using Claude API
Collects cluster events, analyzes with AI, alerts on Slack
"""
import subprocess
import json
import os
import sys
from datetime import datetime
import anthropic
from slack_sdk.webhook import WebhookClient
ANTHROPIC_API_KEY = os.environ.get("ANTHROPIC_API_KEY")
SLACK_WEBHOOK_URL = os.environ.get("SLACK_WEBHOOK_URL", "")
CLUSTER_NAME = os.environ.get("CLUSTER_NAME", "my-cluster")
client = anthropic.Anthropic(api_key=ANTHROPIC_API_KEY)
def run_kubectl(args: list[str]) -> str:
"""Run a kubectl command and return stdout."""
try:
result = subprocess.run(
["kubectl"] + args,
capture_output=True,
text=True,
timeout=30
)
if result.returncode != 0:
print(f"kubectl error: {result.stderr}", file=sys.stderr)
return ""
return result.stdout
except subprocess.TimeoutExpired:
print(f"kubectl timed out: {args}", file=sys.stderr)
return ""
def collect_cluster_state() -> dict:
"""Collect events and pod status from the cluster."""
print("Collecting cluster state...")
# Get Warning events only (last 1 hour via field selector is not supported well,
# so we filter in Python)
events_raw = run_kubectl([
"get", "events",
"--all-namespaces",
"--field-selector", "type=Warning",
"--sort-by=.lastTimestamp",
"-o", "wide"
])
# Get all pods that are not Running or Completed
pods_raw = run_kubectl([
"get", "pods",
"--all-namespaces",
"-o", "wide"
])
# Filter pods to problem states
problem_pods = []
for line in pods_raw.splitlines():
if any(status in line for status in [
"CrashLoopBackOff", "Error", "OOMKilled",
"Pending", "ImagePullBackOff", "ErrImagePull",
"Terminating", "Unknown", "Init:Error"
]):
problem_pods.append(line)
# Get recent events count for context
all_events = run_kubectl(["get", "events", "--all-namespaces", "--sort-by=.lastTimestamp"])
event_lines = [l for l in all_events.splitlines() if l.strip()]
return {
"warning_events": events_raw,
"problem_pods": "\n".join(problem_pods) if problem_pods else "None",
"total_events_seen": len(event_lines),
"timestamp": datetime.utcnow().isoformat() + "Z"
}
def analyze_with_claude(cluster_state: dict) -> str:
"""Send cluster state to Claude for analysis."""
# Build the prompt
prompt = f"""You are a Kubernetes SRE expert. Analyze these cluster events and pod issues.
CLUSTER: {CLUSTER_NAME}
TIMESTAMP: {cluster_state['timestamp']}
TOTAL EVENTS IN CLUSTER: {cluster_state['total_events_seen']}
=== WARNING EVENTS ===
{cluster_state['warning_events'] or 'No warning events found'}
=== PROBLEM PODS (non-Running/Completed) ===
{cluster_state['problem_pods']}
Provide your analysis in this exact format:
SEVERITY: [CRITICAL / WARNING / OK]
SUMMARY:
One sentence summary of the cluster health.
ISSUES FOUND:
- Issue 1: [What is happening] → [Which pods/namespaces affected]
- Issue 2: ...
(If no issues, write "None detected")
ROOT CAUSES:
- [Most likely cause for each issue]
RECOMMENDED ACTIONS:
1. [Most urgent action with exact kubectl command if applicable]
2. [Next action]
3. ...
Keep the response concise and actionable. Focus on patterns, not individual events."""
print("Sending to Claude API for analysis...")
message = client.messages.create(
model="claude-haiku-4-5-20251001",
max_tokens=1024,
messages=[
{"role": "user", "content": prompt}
]
)
return message.content[0].text
def send_slack_alert(analysis: str, cluster_state: dict) -> None:
"""Post analysis to Slack if issues found and webhook is configured."""
if not SLACK_WEBHOOK_URL:
print("No Slack webhook configured, skipping alert")
return
# Only alert if severity is not OK
if "SEVERITY: OK" in analysis:
print("Cluster is healthy, no Slack alert needed")
return
severity = "CRITICAL" if "SEVERITY: CRITICAL" in analysis else "WARNING"
emoji = ":red_circle:" if severity == "CRITICAL" else ":large_yellow_circle:"
webhook = WebhookClient(SLACK_WEBHOOK_URL)
response = webhook.send(
text=f"{emoji} *K8s Cluster Alert — {CLUSTER_NAME}*",
blocks=[
{
"type": "header",
"text": {
"type": "plain_text",
"text": f"{emoji} Kubernetes Alert: {CLUSTER_NAME}"
}
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"*Timestamp:* {cluster_state['timestamp']}\n*Severity:* {severity}"
}
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"```{analysis[:2800]}```"
}
}
]
)
if response.status_code == 200:
print("Slack alert sent successfully")
else:
print(f"Slack alert failed: {response.status_code} {response.body}")
def main():
print(f"=== Kubernetes Event Analyzer ===")
print(f"Cluster: {CLUSTER_NAME}")
print(f"Time: {datetime.utcnow().isoformat()}Z")
print()
# Step 1: Collect cluster state
cluster_state = collect_cluster_state()
# Step 2: If no warning events and no problem pods, skip API call
if (not cluster_state["warning_events"].strip() or
cluster_state["warning_events"].strip() == "No resources found." or
cluster_state["warning_events"].strip() == ""):
if cluster_state["problem_pods"] == "None":
print("Cluster is clean — no warning events, no problem pods. Exiting.")
return
# Step 3: Analyze with Claude
analysis = analyze_with_claude(cluster_state)
# Step 4: Print analysis
print("\n=== AI ANALYSIS ===")
print(analysis)
print("==================\n")
# Step 5: Send Slack alert if needed
send_slack_alert(analysis, cluster_state)
if __name__ == "__main__":
main()Sample Output
Running on a cluster with a CrashLoopBackOff pod and OOM events:
=== Kubernetes Event Analyzer ===
Cluster: production-eks
Time: 2026-06-29T10:15:00Z
Collecting cluster state...
Sending to Claude API for analysis...
=== AI ANALYSIS ===
SEVERITY: CRITICAL
SUMMARY:
One pod in the payments namespace is in CrashLoopBackOff due to OOM kills, likely caused by a memory leak or missing resource limits.
ISSUES FOUND:
- Issue 1: payments-worker-7d9f-xk2p is CrashLoopBackOff → payments namespace
- Issue 2: Multiple OOMKilled events in the last 30 minutes for the same pod
ROOT CAUSES:
- Pod has no memory limit set, consuming unbounded memory until node kills it
- Application likely has a memory leak in the payment processing loop
RECOMMENDED ACTIONS:
1. Check recent logs: kubectl logs payments-worker-7d9f-xk2p -n payments --previous
2. Add memory limit to deployment: kubectl set resources deployment/payments-worker -n payments --limits=memory=512Mi
3. Monitor memory usage: kubectl top pod -n payments
==================
Deploy as a Kubernetes CronJob
Build and push the container:
FROM python:3.12-slim
RUN pip install anthropic slack-sdk kubernetes
COPY k8s_event_analyzer.py /app/
WORKDIR /app
CMD ["python", "k8s_event_analyzer.py"]docker build -t your-registry/k8s-analyzer:latest .
docker push your-registry/k8s-analyzer:latestCreate a ServiceAccount with read access:
apiVersion: v1
kind: ServiceAccount
metadata:
name: k8s-analyzer
namespace: monitoring
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: k8s-analyzer-reader
rules:
- apiGroups: [""]
resources: ["pods", "events", "namespaces", "nodes"]
verbs: ["get", "list"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: k8s-analyzer-binding
subjects:
- kind: ServiceAccount
name: k8s-analyzer
namespace: monitoring
roleRef:
kind: ClusterRole
name: k8s-analyzer-reader
apiGroup: rbac.authorization.k8s.ioCreate a Secret for the API key:
kubectl create secret generic k8s-analyzer-secrets \
--from-literal=anthropic-api-key=sk-ant-your-key \
--from-literal=slack-webhook-url=https://hooks.slack.com/your-webhook \
-n monitoringDeploy the CronJob:
apiVersion: batch/v1
kind: CronJob
metadata:
name: k8s-event-analyzer
namespace: monitoring
spec:
schedule: "*/15 * * * *"
concurrencyPolicy: Forbid
successfulJobsHistoryLimit: 3
failedJobsHistoryLimit: 3
jobTemplate:
spec:
template:
spec:
serviceAccountName: k8s-analyzer
restartPolicy: OnFailure
containers:
- name: analyzer
image: your-registry/k8s-analyzer:latest
env:
- name: CLUSTER_NAME
value: "production-eks"
- name: ANTHROPIC_API_KEY
valueFrom:
secretKeyRef:
name: k8s-analyzer-secrets
key: anthropic-api-key
- name: SLACK_WEBHOOK_URL
valueFrom:
secretKeyRef:
name: k8s-analyzer-secrets
key: slack-webhook-url
resources:
requests:
memory: "128Mi"
cpu: "100m"
limits:
memory: "256Mi"
cpu: "500m"Apply everything:
kubectl apply -f serviceaccount.yaml
kubectl apply -f cronjob.yaml
# Trigger a manual run to test
kubectl create job --from=cronjob/k8s-event-analyzer test-run -n monitoring
kubectl logs -l job-name=test-run -n monitoring -fCost Estimate
claude-haiku-4-5-20251001 costs $0.80 per million input tokens and $4 per million output tokens. Each analysis run sends roughly 2,000 tokens and returns 500 tokens. At 4 runs/hour that is about $0.006/hour or roughly $4.50/month for continuous monitoring — cheaper than a single PagerDuty alert on most plans.
This analyzer catches real patterns that static alerting rules miss: correlating OOM events with specific deployments, identifying cascading failures, and translating cryptic Kubernetes messages into actionable steps.
Today I Fixed
Short real fixes from production — posted daily
Stay ahead of the curve
Get the latest DevOps, Kubernetes, AWS, and AI/ML guides delivered straight to your inbox. No spam — just practical engineering content.
Related Articles
AI-Powered Kubernetes Anomaly Detection: Beyond Static Thresholds
Static alerts miss 40% of real incidents. Learn how AI and ML-based anomaly detection — using tools like Prometheus + ML, Dynatrace, and custom LLM runbooks — catches what thresholds can't.
Build an AI-Powered SLO Breach Predictor with Claude and Prometheus
Build an SLO breach predictor that reads error budget burn rate from Prometheus, uses Claude to analyze patterns, and sends Slack alerts before SLOs breach — not after.
Build an AI SRE Runbook Agent with LangGraph and Kubernetes
Use LangGraph to build an agentic SRE assistant that reads Kubernetes state, queries Prometheus, and executes runbook steps autonomously using Claude API.