Build an AI Kubernetes Runbook Generator with LLMs (2026)
Manual runbooks go stale. Build a system that watches your Kubernetes cluster, detects incidents, and generates step-by-step runbooks automatically using LLMs. Full implementation with Python, kubectl, and Ollama.
Runbooks are supposed to be your first line of defense during incidents. In practice, they're outdated the moment they're written, ignored under pressure, and rarely match your actual cluster state.
This project builds an AI system that generates context-aware, actionable runbooks on demand — using the live state of your cluster as input to an LLM.
What We're Building
Kubernetes Cluster
↓ (kubectl/k8s API)
Event Detector (Python)
↓
Context Collector (namespace, resources, recent events)
↓
LLM (Ollama/OpenAI)
↓
Runbook (Markdown) → Slack/File/Web UI
When a pod crashes, a deployment fails, or a node goes NotReady, the system:
- Detects the event
- Collects relevant context (pod logs, events, resource specs)
- Sends it to an LLM
- Gets back a structured runbook with diagnosis steps and fixes
- Posts it to Slack (or saves as file)
Prerequisites
- Python 3.11+
- kubectl configured
- Kubernetes cluster (local k3s or cloud)
- Ollama running locally, or OpenAI API key
Step 1: Project Structure
k8s-runbook-generator/
├── main.py # Entry point / watcher
├── collector.py # K8s context collection
├── generator.py # LLM runbook generation
├── notifier.py # Slack/file output
├── prompts.py # LLM prompt templates
├── requirements.txt
└── config.yaml
mkdir k8s-runbook-generator && cd k8s-runbook-generator
python -m venv venv && source venv/bin/activate
pip install kubernetes openai ollama pyyaml slack-sdk richStep 2: Kubernetes Context Collector
# collector.py
from kubernetes import client, config
from datetime import datetime, timezone
import json
class K8sContextCollector:
def __init__(self):
try:
config.load_incluster_config()
except:
config.load_kube_config()
self.v1 = client.CoreV1Api()
self.apps_v1 = client.AppsV1Api()
def get_pod_context(self, namespace: str, pod_name: str) -> dict:
"""Collect full context for a failing pod."""
context = {}
# Pod details
try:
pod = self.v1.read_namespaced_pod(pod_name, namespace)
context['pod_spec'] = {
'name': pod.metadata.name,
'namespace': pod.metadata.namespace,
'node': pod.spec.node_name,
'phase': pod.status.phase,
'conditions': [
{
'type': c.type,
'status': c.status,
'reason': c.reason,
'message': c.message
}
for c in (pod.status.conditions or [])
],
'containers': [
{
'name': c.name,
'image': c.image,
'state': self._parse_container_state(c.state),
'ready': c.ready,
'restart_count': c.restart_count,
'last_state': self._parse_container_state(c.last_state) if c.last_state else None
}
for c in (pod.status.container_statuses or [])
]
}
except Exception as e:
context['pod_error'] = str(e)
# Pod logs (last 100 lines)
try:
logs = self.v1.read_namespaced_pod_log(
pod_name, namespace,
tail_lines=100,
previous=False
)
context['recent_logs'] = logs[-3000:] if len(logs) > 3000 else logs
except Exception as e:
# Try previous container logs (for CrashLoopBackOff)
try:
logs = self.v1.read_namespaced_pod_log(
pod_name, namespace,
tail_lines=100,
previous=True
)
context['previous_logs'] = logs[-3000:] if len(logs) > 3000 else logs
except:
context['log_error'] = str(e)
# Pod events
context['events'] = self._get_pod_events(namespace, pod_name)
# Node info (if pod is scheduled)
if context.get('pod_spec', {}).get('node'):
context['node_info'] = self._get_node_summary(context['pod_spec']['node'])
return context
def _parse_container_state(self, state) -> dict:
if not state:
return {}
if state.running:
return {'status': 'running', 'started_at': str(state.running.started_at)}
if state.waiting:
return {'status': 'waiting', 'reason': state.waiting.reason, 'message': state.waiting.message}
if state.terminated:
return {
'status': 'terminated',
'reason': state.terminated.reason,
'exit_code': state.terminated.exit_code,
'message': state.terminated.message
}
return {}
def _get_pod_events(self, namespace: str, pod_name: str) -> list:
events = self.v1.list_namespaced_event(
namespace,
field_selector=f"involvedObject.name={pod_name}"
)
return [
{
'type': e.type,
'reason': e.reason,
'message': e.message,
'count': e.count,
'first_time': str(e.first_timestamp)
}
for e in sorted(events.items, key=lambda x: x.last_timestamp or datetime.min.replace(tzinfo=timezone.utc))[-20:]
]
def _get_node_summary(self, node_name: str) -> dict:
try:
node = self.v1.read_node(node_name)
conditions = {c.type: c.status for c in node.status.conditions}
capacity = node.status.capacity
return {
'name': node_name,
'ready': conditions.get('Ready', 'Unknown'),
'memory_pressure': conditions.get('MemoryPressure', 'Unknown'),
'disk_pressure': conditions.get('DiskPressure', 'Unknown'),
'cpu_capacity': capacity.get('cpu'),
'memory_capacity': capacity.get('memory')
}
except:
return {'name': node_name}
def get_deployment_context(self, namespace: str, deployment_name: str) -> dict:
"""Collect context for a failing deployment."""
context = {}
try:
dep = self.apps_v1.read_namespaced_deployment(deployment_name, namespace)
context['deployment'] = {
'name': dep.metadata.name,
'desired_replicas': dep.spec.replicas,
'ready_replicas': dep.status.ready_replicas,
'available_replicas': dep.status.available_replicas,
'conditions': [
{'type': c.type, 'status': c.status, 'reason': c.reason, 'message': c.message}
for c in (dep.status.conditions or [])
]
}
except Exception as e:
context['deployment_error'] = str(e)
# Get pods for this deployment
try:
pods = self.v1.list_namespaced_pod(
namespace,
label_selector=f"app={deployment_name}"
)
context['pods'] = []
for pod in pods.items[:5]: # Limit to 5 pods
pod_ctx = self.get_pod_context(namespace, pod.metadata.name)
context['pods'].append(pod_ctx)
except Exception as e:
context['pods_error'] = str(e)
return contextStep 3: LLM Prompt Templates
# prompts.py
RUNBOOK_SYSTEM_PROMPT = """You are an expert Kubernetes SRE. Your job is to analyze
Kubernetes incidents and generate clear, actionable runbooks.
When given cluster context about a failing workload:
1. Identify the root cause (be specific, not generic)
2. List immediate diagnostic commands to verify your diagnosis
3. Provide step-by-step remediation commands
4. List preventive measures
5. Estimate severity and impact
Format your response as a structured Markdown runbook with these sections:
- ## Incident Summary
- ## Root Cause Analysis
- ## Immediate Diagnostics
- ## Remediation Steps
- ## Verification
- ## Prevention
- ## Severity: [Critical/High/Medium/Low]
Be specific. Include actual kubectl commands. Reference the exact pod names,
namespaces, and error messages from the context provided."""
def build_pod_incident_prompt(context: dict) -> str:
context_json = __import__('json').dumps(context, indent=2, default=str)
return (
"## Kubernetes Pod Incident\n\n"
"**Cluster Context:**\n"
f"{context_json}\n\n"
"Generate a complete incident runbook for this failing pod.\n"
"Focus on the specific errors and conditions shown in the context."
)
def build_deployment_incident_prompt(context: dict, dep_name: str) -> str:
context_json = __import__('json').dumps(context, indent=2, default=str)
return (
f"## Kubernetes Deployment Incident: {dep_name}\n\n"
"**Cluster Context:**\n"
f"{context_json}\n\n"
"Generate a complete incident runbook for this failing deployment."
)Step 4: Runbook Generator
# generator.py
import ollama
from openai import OpenAI
from prompts import RUNBOOK_SYSTEM_PROMPT, build_pod_incident_prompt, build_deployment_incident_prompt
class RunbookGenerator:
def __init__(self, provider: str = "ollama", model: str = "llama3.2"):
self.provider = provider
self.model = model
if provider == "openai":
self.client = OpenAI() # Uses OPENAI_API_KEY env var
def generate(self, context: dict, incident_type: str = "pod", name: str = "") -> str:
if incident_type == "pod":
prompt = build_pod_incident_prompt(context)
else:
prompt = build_deployment_incident_prompt(context, name)
if self.provider == "ollama":
return self._generate_ollama(prompt)
elif self.provider == "openai":
return self._generate_openai(prompt)
else:
raise ValueError(f"Unknown provider: {self.provider}")
def _generate_ollama(self, prompt: str) -> str:
response = ollama.chat(
model=self.model,
messages=[
{"role": "system", "content": RUNBOOK_SYSTEM_PROMPT},
{"role": "user", "content": prompt}
],
options={
"temperature": 0.3, # Lower temp for more deterministic runbooks
"num_ctx": 8192
}
)
return response['message']['content']
def _generate_openai(self, prompt: str) -> str:
response = self.client.chat.completions.create(
model=self.model, # "gpt-4o" or "gpt-4o-mini"
messages=[
{"role": "system", "content": RUNBOOK_SYSTEM_PROMPT},
{"role": "user", "content": prompt}
],
temperature=0.3
)
return response.choices[0].message.contentStep 5: Event Watcher and Main Loop
# main.py
import time
import yaml
from kubernetes import client, config, watch
from collector import K8sContextCollector
from generator import RunbookGenerator
from rich.console import Console
from rich.markdown import Markdown
from datetime import datetime
import os
console = Console()
def load_config():
with open("config.yaml") as f:
return yaml.safe_load(f)
def save_runbook(runbook: str, pod_name: str, namespace: str):
"""Save runbook to file."""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"runbooks/{namespace}_{pod_name}_{timestamp}.md"
os.makedirs("runbooks", exist_ok=True)
with open(filename, "w") as f:
f.write(runbook)
return filename
def watch_pod_events(cfg: dict):
"""Watch for pod failure events and generate runbooks."""
try:
config.load_incluster_config()
except:
config.load_kube_config()
v1 = client.CoreV1Api()
collector = K8sContextCollector()
generator = RunbookGenerator(
provider=cfg.get('llm_provider', 'ollama'),
model=cfg.get('llm_model', 'llama3.2')
)
namespace = cfg.get('namespace', '') # Empty = all namespaces
processed_pods = set()
console.print("[bold green]🔍 Watching for pod failures...[/bold green]")
w = watch.Watch()
watch_kwargs = {"timeout_seconds": 0}
if namespace:
watch_fn = v1.list_namespaced_pod
watch_kwargs["namespace"] = namespace
else:
watch_fn = v1.list_pod_for_all_namespaces
for event in w.stream(watch_fn, **watch_kwargs):
pod = event['object']
pod_name = pod.metadata.name
pod_namespace = pod.metadata.namespace
event_type = event['type']
# Skip system namespaces unless explicitly included
if pod_namespace in ['kube-system', 'kube-public'] and not cfg.get('include_system', False):
continue
# Check for failure conditions
is_failing = False
failure_reason = ""
if pod.status.phase == "Failed":
is_failing = True
failure_reason = "Pod phase: Failed"
for cs in (pod.status.container_statuses or []):
if cs.state.waiting and cs.state.waiting.reason in [
'CrashLoopBackOff', 'OOMKilled', 'Error', 'CreateContainerError'
]:
is_failing = True
failure_reason = f"Container {cs.name}: {cs.state.waiting.reason}"
if cs.restart_count > cfg.get('restart_threshold', 3):
is_failing = True
failure_reason = f"Container {cs.name}: {cs.restart_count} restarts"
pod_key = f"{pod_namespace}/{pod_name}"
if is_failing and pod_key not in processed_pods:
processed_pods.add(pod_key)
console.print(f"\n[bold red]⚠️ Failure detected:[/bold red] {pod_key}")
console.print(f" Reason: {failure_reason}")
console.print("[yellow] Collecting context...[/yellow]")
context = collector.get_pod_context(pod_namespace, pod_name)
console.print("[yellow] Generating runbook with LLM...[/yellow]")
runbook = generator.generate(context, "pod", pod_name)
# Display in terminal
console.print("\n" + "="*60)
console.print(Markdown(runbook))
console.print("="*60)
# Save to file
filepath = save_runbook(runbook, pod_name, pod_namespace)
console.print(f"\n[green]✓ Runbook saved to: {filepath}[/green]")
# Clear from processed after 10 minutes (allow re-generation)
time.sleep(1)
if __name__ == "__main__":
cfg = load_config()
watch_pod_events(cfg)Step 6: Config File
# config.yaml
llm_provider: "ollama" # "ollama" or "openai"
llm_model: "llama3.2" # Or "gpt-4o-mini" for OpenAI
namespace: "" # Empty = watch all namespaces
restart_threshold: 3 # Generate runbook after N restarts
include_system: false # Include kube-system namespace
output:
save_to_file: true
slack_webhook: "" # Optional: Slack webhook URLStep 7: Run Ollama Locally
# Install Ollama
curl -fsSL https://ollama.com/install.sh | sh
# Pull a model (llama3.2 is fast and good for structured output)
ollama pull llama3.2
# Or for better quality (needs 8GB+ RAM)
ollama pull llama3.1:8b
# Verify
ollama run llama3.2 "Hello"Running the System
# Start the watcher
python main.py
# Test it by creating a crashing pod
kubectl run crash-test --image=alpine -- /bin/sh -c "exit 1"
# Watch the terminal — it should detect the failure and generate a runbookExample runbook output:
## Incident Summary
Pod `crash-test` in namespace `default` has entered a CrashLoopBackOff state.
The container exits immediately with code 1.
## Root Cause Analysis
The container command `/bin/sh -c "exit 1"` is designed to exit immediately
with a non-zero exit code. This is a manual test pod, but the same pattern
appears in production when application startup fails.
## Immediate Diagnostics
kubectl describe pod crash-test -n default
kubectl logs crash-test -n default --previous
## Remediation Steps
1. Check the actual container command:
kubectl get pod crash-test -o jsonpath='{.spec.containers[0].command}'
2. If this is a test pod, delete it:
kubectl delete pod crash-test
3. If this is a real service, fix the exit code in the application startup...Deploy to Kubernetes
# k8s-runbook-generator.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: runbook-generator
namespace: monitoring
spec:
replicas: 1
selector:
matchLabels:
app: runbook-generator
template:
spec:
serviceAccountName: runbook-generator-sa
containers:
- name: generator
image: your-registry/runbook-generator:latest
env:
- name: OPENAI_API_KEY
valueFrom:
secretKeyRef:
name: openai-secret
key: api-key
volumeMounts:
- name: config
mountPath: /app/config.yaml
subPath: config.yaml
volumes:
- name: config
configMap:
name: runbook-generator-config
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: runbook-generator-sa
namespace: monitoring
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: runbook-generator
rules:
- apiGroups: [""]
resources: ["pods", "pods/log", "events", "nodes", "namespaces"]
verbs: ["get", "list", "watch"]
- apiGroups: ["apps"]
resources: ["deployments", "replicasets"]
verbs: ["get", "list", "watch"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: runbook-generator
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: runbook-generator
subjects:
- kind: ServiceAccount
name: runbook-generator-sa
namespace: monitoringExtensions
Add Slack notifications:
from slack_sdk.webhook import WebhookClient
def notify_slack(runbook: str, pod_name: str, webhook_url: str):
client = WebhookClient(webhook_url)
client.send(
text=f"🚨 Runbook generated for {pod_name}",
blocks=[{
"type": "section",
"text": {"type": "mrkdwn", "text": runbook[:2900]}
}]
)Add vector store for historical incidents: Store generated runbooks in a vector database (Qdrant, ChromaDB). When a new incident occurs, retrieve similar past incidents to give the LLM more context — it improves runbook quality over time.
The full source is a starting point. In production, you'd add deduplication, incident correlation, and integration with your alerting system (PagerDuty, OpsGenie) to auto-attach runbooks to incidents when they fire.
Related: AI Powered Incident Response | NVIDIA GPU Operator on Kubernetes | Build Grafana AI Alert Classifier
Affiliate note: OpenAI API — GPT-4o Mini is cost-effective for runbook generation (~$0.001 per runbook). Ollama for running LLMs locally in your cluster with no API costs.
Stay ahead of the curve
Get the latest DevOps, Kubernetes, AWS, and AI/ML guides delivered straight to your inbox. No spam — just practical engineering content.
Related Articles
Build an AI Alert Classifier for Grafana Using LLMs (2026)
Tired of noisy Grafana alerts that wake you up for nothing? Build an AI layer that classifies incoming alerts as actionable or noise, enriches them with context, and routes them intelligently — using Claude or GPT-4 as the reasoning engine.
AI-Powered Kubernetes Anomaly Detection: Beyond Static Thresholds
Static alerts miss 40% of real incidents. Learn how AI and ML-based anomaly detection — using tools like Prometheus + ML, Dynatrace, and custom LLM runbooks — catches what thresholds can't.
Build an AI Kubernetes Troubleshooter with Claude (2026)
Build a CLI tool that automatically diagnoses Kubernetes issues — OOMKilled, CrashLoopBackOff, pending pods — by gathering cluster state and asking Claude what's wrong and how to fix it.