🎉 DevOps Interview Prep Bundle is live — 1000+ Q&A across 20 topicsGet it →
All Articles

Build an AI Kubernetes Runbook Generator with LLMs (2026)

Manual runbooks go stale. Build a system that watches your Kubernetes cluster, detects incidents, and generates step-by-step runbooks automatically using LLMs. Full implementation with Python, kubectl, and Ollama.

DevOpsBoysMay 22, 20269 min read
Share:Tweet

Runbooks are supposed to be your first line of defense during incidents. In practice, they're outdated the moment they're written, ignored under pressure, and rarely match your actual cluster state.

This project builds an AI system that generates context-aware, actionable runbooks on demand — using the live state of your cluster as input to an LLM.


What We're Building

Kubernetes Cluster
      ↓ (kubectl/k8s API)
Event Detector (Python)
      ↓
Context Collector (namespace, resources, recent events)
      ↓
LLM (Ollama/OpenAI)
      ↓
Runbook (Markdown) → Slack/File/Web UI

When a pod crashes, a deployment fails, or a node goes NotReady, the system:

  1. Detects the event
  2. Collects relevant context (pod logs, events, resource specs)
  3. Sends it to an LLM
  4. Gets back a structured runbook with diagnosis steps and fixes
  5. Posts it to Slack (or saves as file)

Prerequisites

  • Python 3.11+
  • kubectl configured
  • Kubernetes cluster (local k3s or cloud)
  • Ollama running locally, or OpenAI API key

Step 1: Project Structure

k8s-runbook-generator/
├── main.py              # Entry point / watcher
├── collector.py         # K8s context collection
├── generator.py         # LLM runbook generation
├── notifier.py          # Slack/file output
├── prompts.py           # LLM prompt templates
├── requirements.txt
└── config.yaml
bash
mkdir k8s-runbook-generator && cd k8s-runbook-generator
python -m venv venv && source venv/bin/activate
pip install kubernetes openai ollama pyyaml slack-sdk rich

Step 2: Kubernetes Context Collector

python
# collector.py
from kubernetes import client, config
from datetime import datetime, timezone
import json
 
class K8sContextCollector:
    def __init__(self):
        try:
            config.load_incluster_config()
        except:
            config.load_kube_config()
        
        self.v1 = client.CoreV1Api()
        self.apps_v1 = client.AppsV1Api()
    
    def get_pod_context(self, namespace: str, pod_name: str) -> dict:
        """Collect full context for a failing pod."""
        context = {}
        
        # Pod details
        try:
            pod = self.v1.read_namespaced_pod(pod_name, namespace)
            context['pod_spec'] = {
                'name': pod.metadata.name,
                'namespace': pod.metadata.namespace,
                'node': pod.spec.node_name,
                'phase': pod.status.phase,
                'conditions': [
                    {
                        'type': c.type,
                        'status': c.status,
                        'reason': c.reason,
                        'message': c.message
                    }
                    for c in (pod.status.conditions or [])
                ],
                'containers': [
                    {
                        'name': c.name,
                        'image': c.image,
                        'state': self._parse_container_state(c.state),
                        'ready': c.ready,
                        'restart_count': c.restart_count,
                        'last_state': self._parse_container_state(c.last_state) if c.last_state else None
                    }
                    for c in (pod.status.container_statuses or [])
                ]
            }
        except Exception as e:
            context['pod_error'] = str(e)
        
        # Pod logs (last 100 lines)
        try:
            logs = self.v1.read_namespaced_pod_log(
                pod_name, namespace,
                tail_lines=100,
                previous=False
            )
            context['recent_logs'] = logs[-3000:] if len(logs) > 3000 else logs
        except Exception as e:
            # Try previous container logs (for CrashLoopBackOff)
            try:
                logs = self.v1.read_namespaced_pod_log(
                    pod_name, namespace,
                    tail_lines=100,
                    previous=True
                )
                context['previous_logs'] = logs[-3000:] if len(logs) > 3000 else logs
            except:
                context['log_error'] = str(e)
        
        # Pod events
        context['events'] = self._get_pod_events(namespace, pod_name)
        
        # Node info (if pod is scheduled)
        if context.get('pod_spec', {}).get('node'):
            context['node_info'] = self._get_node_summary(context['pod_spec']['node'])
        
        return context
    
    def _parse_container_state(self, state) -> dict:
        if not state:
            return {}
        if state.running:
            return {'status': 'running', 'started_at': str(state.running.started_at)}
        if state.waiting:
            return {'status': 'waiting', 'reason': state.waiting.reason, 'message': state.waiting.message}
        if state.terminated:
            return {
                'status': 'terminated',
                'reason': state.terminated.reason,
                'exit_code': state.terminated.exit_code,
                'message': state.terminated.message
            }
        return {}
    
    def _get_pod_events(self, namespace: str, pod_name: str) -> list:
        events = self.v1.list_namespaced_event(
            namespace,
            field_selector=f"involvedObject.name={pod_name}"
        )
        return [
            {
                'type': e.type,
                'reason': e.reason,
                'message': e.message,
                'count': e.count,
                'first_time': str(e.first_timestamp)
            }
            for e in sorted(events.items, key=lambda x: x.last_timestamp or datetime.min.replace(tzinfo=timezone.utc))[-20:]
        ]
    
    def _get_node_summary(self, node_name: str) -> dict:
        try:
            node = self.v1.read_node(node_name)
            conditions = {c.type: c.status for c in node.status.conditions}
            capacity = node.status.capacity
            return {
                'name': node_name,
                'ready': conditions.get('Ready', 'Unknown'),
                'memory_pressure': conditions.get('MemoryPressure', 'Unknown'),
                'disk_pressure': conditions.get('DiskPressure', 'Unknown'),
                'cpu_capacity': capacity.get('cpu'),
                'memory_capacity': capacity.get('memory')
            }
        except:
            return {'name': node_name}
    
    def get_deployment_context(self, namespace: str, deployment_name: str) -> dict:
        """Collect context for a failing deployment."""
        context = {}
        
        try:
            dep = self.apps_v1.read_namespaced_deployment(deployment_name, namespace)
            context['deployment'] = {
                'name': dep.metadata.name,
                'desired_replicas': dep.spec.replicas,
                'ready_replicas': dep.status.ready_replicas,
                'available_replicas': dep.status.available_replicas,
                'conditions': [
                    {'type': c.type, 'status': c.status, 'reason': c.reason, 'message': c.message}
                    for c in (dep.status.conditions or [])
                ]
            }
        except Exception as e:
            context['deployment_error'] = str(e)
        
        # Get pods for this deployment
        try:
            pods = self.v1.list_namespaced_pod(
                namespace,
                label_selector=f"app={deployment_name}"
            )
            context['pods'] = []
            for pod in pods.items[:5]:  # Limit to 5 pods
                pod_ctx = self.get_pod_context(namespace, pod.metadata.name)
                context['pods'].append(pod_ctx)
        except Exception as e:
            context['pods_error'] = str(e)
        
        return context

Step 3: LLM Prompt Templates

python
# prompts.py
 
RUNBOOK_SYSTEM_PROMPT = """You are an expert Kubernetes SRE. Your job is to analyze 
Kubernetes incidents and generate clear, actionable runbooks.
 
When given cluster context about a failing workload:
1. Identify the root cause (be specific, not generic)
2. List immediate diagnostic commands to verify your diagnosis
3. Provide step-by-step remediation commands
4. List preventive measures
5. Estimate severity and impact
 
Format your response as a structured Markdown runbook with these sections:
- ## Incident Summary
- ## Root Cause Analysis
- ## Immediate Diagnostics
- ## Remediation Steps
- ## Verification
- ## Prevention
- ## Severity: [Critical/High/Medium/Low]
 
Be specific. Include actual kubectl commands. Reference the exact pod names, 
namespaces, and error messages from the context provided."""
 
def build_pod_incident_prompt(context: dict) -> str:
    context_json = __import__('json').dumps(context, indent=2, default=str)
    return (
        "## Kubernetes Pod Incident\n\n"
        "**Cluster Context:**\n"
        f"{context_json}\n\n"
        "Generate a complete incident runbook for this failing pod.\n"
        "Focus on the specific errors and conditions shown in the context."
    )
 
def build_deployment_incident_prompt(context: dict, dep_name: str) -> str:
    context_json = __import__('json').dumps(context, indent=2, default=str)
    return (
        f"## Kubernetes Deployment Incident: {dep_name}\n\n"
        "**Cluster Context:**\n"
        f"{context_json}\n\n"
        "Generate a complete incident runbook for this failing deployment."
    )

Step 4: Runbook Generator

python
# generator.py
import ollama
from openai import OpenAI
from prompts import RUNBOOK_SYSTEM_PROMPT, build_pod_incident_prompt, build_deployment_incident_prompt
 
class RunbookGenerator:
    def __init__(self, provider: str = "ollama", model: str = "llama3.2"):
        self.provider = provider
        self.model = model
        
        if provider == "openai":
            self.client = OpenAI()  # Uses OPENAI_API_KEY env var
    
    def generate(self, context: dict, incident_type: str = "pod", name: str = "") -> str:
        if incident_type == "pod":
            prompt = build_pod_incident_prompt(context)
        else:
            prompt = build_deployment_incident_prompt(context, name)
        
        if self.provider == "ollama":
            return self._generate_ollama(prompt)
        elif self.provider == "openai":
            return self._generate_openai(prompt)
        else:
            raise ValueError(f"Unknown provider: {self.provider}")
    
    def _generate_ollama(self, prompt: str) -> str:
        response = ollama.chat(
            model=self.model,
            messages=[
                {"role": "system", "content": RUNBOOK_SYSTEM_PROMPT},
                {"role": "user", "content": prompt}
            ],
            options={
                "temperature": 0.3,  # Lower temp for more deterministic runbooks
                "num_ctx": 8192
            }
        )
        return response['message']['content']
    
    def _generate_openai(self, prompt: str) -> str:
        response = self.client.chat.completions.create(
            model=self.model,  # "gpt-4o" or "gpt-4o-mini"
            messages=[
                {"role": "system", "content": RUNBOOK_SYSTEM_PROMPT},
                {"role": "user", "content": prompt}
            ],
            temperature=0.3
        )
        return response.choices[0].message.content

Step 5: Event Watcher and Main Loop

python
# main.py
import time
import yaml
from kubernetes import client, config, watch
from collector import K8sContextCollector
from generator import RunbookGenerator
from rich.console import Console
from rich.markdown import Markdown
from datetime import datetime
import os
 
console = Console()
 
def load_config():
    with open("config.yaml") as f:
        return yaml.safe_load(f)
 
def save_runbook(runbook: str, pod_name: str, namespace: str):
    """Save runbook to file."""
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    filename = f"runbooks/{namespace}_{pod_name}_{timestamp}.md"
    os.makedirs("runbooks", exist_ok=True)
    with open(filename, "w") as f:
        f.write(runbook)
    return filename
 
def watch_pod_events(cfg: dict):
    """Watch for pod failure events and generate runbooks."""
    try:
        config.load_incluster_config()
    except:
        config.load_kube_config()
    
    v1 = client.CoreV1Api()
    collector = K8sContextCollector()
    generator = RunbookGenerator(
        provider=cfg.get('llm_provider', 'ollama'),
        model=cfg.get('llm_model', 'llama3.2')
    )
    
    namespace = cfg.get('namespace', '')  # Empty = all namespaces
    processed_pods = set()
    
    console.print("[bold green]🔍 Watching for pod failures...[/bold green]")
    
    w = watch.Watch()
    watch_kwargs = {"timeout_seconds": 0}
    if namespace:
        watch_fn = v1.list_namespaced_pod
        watch_kwargs["namespace"] = namespace
    else:
        watch_fn = v1.list_pod_for_all_namespaces
    
    for event in w.stream(watch_fn, **watch_kwargs):
        pod = event['object']
        pod_name = pod.metadata.name
        pod_namespace = pod.metadata.namespace
        event_type = event['type']
        
        # Skip system namespaces unless explicitly included
        if pod_namespace in ['kube-system', 'kube-public'] and not cfg.get('include_system', False):
            continue
        
        # Check for failure conditions
        is_failing = False
        failure_reason = ""
        
        if pod.status.phase == "Failed":
            is_failing = True
            failure_reason = "Pod phase: Failed"
        
        for cs in (pod.status.container_statuses or []):
            if cs.state.waiting and cs.state.waiting.reason in [
                'CrashLoopBackOff', 'OOMKilled', 'Error', 'CreateContainerError'
            ]:
                is_failing = True
                failure_reason = f"Container {cs.name}: {cs.state.waiting.reason}"
            
            if cs.restart_count > cfg.get('restart_threshold', 3):
                is_failing = True
                failure_reason = f"Container {cs.name}: {cs.restart_count} restarts"
        
        pod_key = f"{pod_namespace}/{pod_name}"
        
        if is_failing and pod_key not in processed_pods:
            processed_pods.add(pod_key)
            
            console.print(f"\n[bold red]⚠️  Failure detected:[/bold red] {pod_key}")
            console.print(f"   Reason: {failure_reason}")
            console.print("[yellow]   Collecting context...[/yellow]")
            
            context = collector.get_pod_context(pod_namespace, pod_name)
            
            console.print("[yellow]   Generating runbook with LLM...[/yellow]")
            runbook = generator.generate(context, "pod", pod_name)
            
            # Display in terminal
            console.print("\n" + "="*60)
            console.print(Markdown(runbook))
            console.print("="*60)
            
            # Save to file
            filepath = save_runbook(runbook, pod_name, pod_namespace)
            console.print(f"\n[green]✓ Runbook saved to: {filepath}[/green]")
            
            # Clear from processed after 10 minutes (allow re-generation)
            time.sleep(1)
 
if __name__ == "__main__":
    cfg = load_config()
    watch_pod_events(cfg)

Step 6: Config File

yaml
# config.yaml
llm_provider: "ollama"        # "ollama" or "openai"
llm_model: "llama3.2"         # Or "gpt-4o-mini" for OpenAI
namespace: ""                  # Empty = watch all namespaces
restart_threshold: 3           # Generate runbook after N restarts
include_system: false          # Include kube-system namespace
output:
  save_to_file: true
  slack_webhook: ""            # Optional: Slack webhook URL

Step 7: Run Ollama Locally

bash
# Install Ollama
curl -fsSL https://ollama.com/install.sh | sh
 
# Pull a model (llama3.2 is fast and good for structured output)
ollama pull llama3.2
 
# Or for better quality (needs 8GB+ RAM)
ollama pull llama3.1:8b
 
# Verify
ollama run llama3.2 "Hello"

Running the System

bash
# Start the watcher
python main.py
 
# Test it by creating a crashing pod
kubectl run crash-test --image=alpine -- /bin/sh -c "exit 1"
 
# Watch the terminal — it should detect the failure and generate a runbook

Example runbook output:

markdown
## Incident Summary
Pod `crash-test` in namespace `default` has entered a CrashLoopBackOff state.
The container exits immediately with code 1.
 
## Root Cause Analysis
The container command `/bin/sh -c "exit 1"` is designed to exit immediately 
with a non-zero exit code. This is a manual test pod, but the same pattern 
appears in production when application startup fails.
 
## Immediate Diagnostics
kubectl describe pod crash-test -n default
kubectl logs crash-test -n default --previous
 
## Remediation Steps
1. Check the actual container command:
   kubectl get pod crash-test -o jsonpath='{.spec.containers[0].command}'
2. If this is a test pod, delete it:
   kubectl delete pod crash-test
3. If this is a real service, fix the exit code in the application startup...

Deploy to Kubernetes

yaml
# k8s-runbook-generator.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: runbook-generator
  namespace: monitoring
spec:
  replicas: 1
  selector:
    matchLabels:
      app: runbook-generator
  template:
    spec:
      serviceAccountName: runbook-generator-sa
      containers:
        - name: generator
          image: your-registry/runbook-generator:latest
          env:
            - name: OPENAI_API_KEY
              valueFrom:
                secretKeyRef:
                  name: openai-secret
                  key: api-key
          volumeMounts:
            - name: config
              mountPath: /app/config.yaml
              subPath: config.yaml
      volumes:
        - name: config
          configMap:
            name: runbook-generator-config
---
apiVersion: v1
kind: ServiceAccount
metadata:
  name: runbook-generator-sa
  namespace: monitoring
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: runbook-generator
rules:
  - apiGroups: [""]
    resources: ["pods", "pods/log", "events", "nodes", "namespaces"]
    verbs: ["get", "list", "watch"]
  - apiGroups: ["apps"]
    resources: ["deployments", "replicasets"]
    verbs: ["get", "list", "watch"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: runbook-generator
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: ClusterRole
  name: runbook-generator
subjects:
  - kind: ServiceAccount
    name: runbook-generator-sa
    namespace: monitoring

Extensions

Add Slack notifications:

python
from slack_sdk.webhook import WebhookClient
 
def notify_slack(runbook: str, pod_name: str, webhook_url: str):
    client = WebhookClient(webhook_url)
    client.send(
        text=f"🚨 Runbook generated for {pod_name}",
        blocks=[{
            "type": "section",
            "text": {"type": "mrkdwn", "text": runbook[:2900]}
        }]
    )

Add vector store for historical incidents: Store generated runbooks in a vector database (Qdrant, ChromaDB). When a new incident occurs, retrieve similar past incidents to give the LLM more context — it improves runbook quality over time.


The full source is a starting point. In production, you'd add deduplication, incident correlation, and integration with your alerting system (PagerDuty, OpsGenie) to auto-attach runbooks to incidents when they fire.

Related: AI Powered Incident Response | NVIDIA GPU Operator on Kubernetes | Build Grafana AI Alert Classifier

Affiliate note: OpenAI API — GPT-4o Mini is cost-effective for runbook generation (~$0.001 per runbook). Ollama for running LLMs locally in your cluster with no API costs.

Newsletter

Stay ahead of the curve

Get the latest DevOps, Kubernetes, AWS, and AI/ML guides delivered straight to your inbox. No spam — just practical engineering content.

Related Articles

Comments