🎉 DevOps Interview Prep Bundle is live — 1000+ Q&A across 20 topicsGet it →
All Articles

Build an AI SRE Runbook Agent with LangGraph and Kubernetes

Use LangGraph to build an agentic SRE assistant that reads Kubernetes state, queries Prometheus, and executes runbook steps autonomously using Claude API.

DevOpsBoys5 min read
Share:Tweet

Traditional runbooks are static documents. This tutorial shows how to build a LangGraph-based agent that reads live cluster state and executes runbook steps dynamically — a first step toward self-healing infrastructure.

Architecture

Alert (Prometheus/PagerDuty)
         ↓
LangGraph SRE Agent
    ├── Tool: kubectl (read cluster state)
    ├── Tool: Prometheus query (metrics)
    ├── Tool: Logs (Loki/CloudWatch)
    ├── Tool: Runbook lookup (vector search)
    └── Tool: Apply fix (with human approval)
         ↓
Slack notification + incident report

Setup

bash
pip install langgraph langchain-anthropic kubernetes prometheus-api-client
bash
export ANTHROPIC_API_KEY="sk-ant-..."

Step 1: Define the SRE Tools

python
from langchain_core.tools import tool
from kubernetes import client, config
from prometheus_api_client import PrometheusConnect
import subprocess
import json
 
# Load kubeconfig
config.load_incluster_config()  # in-cluster
# config.load_kube_config()  # local development
 
k8s_apps = client.AppsV1Api()
k8s_core = client.CoreV1Api()
prom = PrometheusConnect(url="http://prometheus.monitoring:9090")
 
 
@tool
def get_pod_status(namespace: str, label_selector: str = "") -> str:
    """Get status of pods in a namespace. Use label_selector like 'app=myapp'."""
    try:
        pods = k8s_core.list_namespaced_pod(
            namespace=namespace,
            label_selector=label_selector
        )
        result = []
        for pod in pods.items:
            status = pod.status.phase
            conditions = [c.type for c in (pod.status.conditions or []) if c.status == "True"]
            container_statuses = []
            for cs in (pod.status.container_statuses or []):
                container_statuses.append({
                    "name": cs.name,
                    "ready": cs.ready,
                    "restart_count": cs.restart_count,
                    "state": list(cs.state.to_dict().keys())[0] if cs.state else "unknown"
                })
            result.append({
                "name": pod.metadata.name,
                "namespace": namespace,
                "phase": status,
                "conditions": conditions,
                "containers": container_statuses
            })
        return json.dumps(result, indent=2)
    except Exception as e:
        return f"Error: {e}"
 
 
@tool
def get_pod_logs(namespace: str, pod_name: str, tail_lines: int = 100) -> str:
    """Get recent logs from a pod."""
    try:
        logs = k8s_core.read_namespaced_pod_log(
            name=pod_name,
            namespace=namespace,
            tail_lines=tail_lines
        )
        return logs[-5000:] if len(logs) > 5000 else logs  # limit output
    except Exception as e:
        return f"Error getting logs: {e}"
 
 
@tool
def query_prometheus(promql: str, time_range_minutes: int = 30) -> str:
    """Query Prometheus metrics. Returns the result of a PromQL query."""
    try:
        result = prom.custom_query(query=promql)
        return json.dumps(result, indent=2)[:3000]  # limit output
    except Exception as e:
        return f"Error querying Prometheus: {e}"
 
 
@tool
def get_deployment_info(namespace: str, deployment_name: str) -> str:
    """Get detailed information about a Kubernetes deployment."""
    try:
        deploy = k8s_apps.read_namespaced_deployment(
            name=deployment_name,
            namespace=namespace
        )
        return json.dumps({
            "name": deploy.metadata.name,
            "namespace": namespace,
            "replicas": {
                "desired": deploy.spec.replicas,
                "ready": deploy.status.ready_replicas,
                "available": deploy.status.available_replicas,
            },
            "image": deploy.spec.template.spec.containers[0].image,
            "resources": deploy.spec.template.spec.containers[0].resources.to_dict(),
            "conditions": [c.to_dict() for c in (deploy.status.conditions or [])],
        }, indent=2)
    except Exception as e:
        return f"Error: {e}"
 
 
@tool
def scale_deployment(namespace: str, deployment_name: str, replicas: int) -> str:
    """Scale a Kubernetes deployment to the specified number of replicas. Use with caution."""
    try:
        body = {"spec": {"replicas": replicas}}
        k8s_apps.patch_namespaced_deployment_scale(
            name=deployment_name,
            namespace=namespace,
            body=body
        )
        return f"Successfully scaled {deployment_name} to {replicas} replicas"
    except Exception as e:
        return f"Error scaling: {e}"
 
 
@tool
def restart_deployment(namespace: str, deployment_name: str) -> str:
    """Trigger a rolling restart of a deployment by updating an annotation."""
    try:
        import datetime
        body = {
            "spec": {
                "template": {
                    "metadata": {
                        "annotations": {
                            "kubectl.kubernetes.io/restartedAt": datetime.datetime.now().isoformat()
                        }
                    }
                }
            }
        }
        k8s_apps.patch_namespaced_deployment(
            name=deployment_name,
            namespace=namespace,
            body=body
        )
        return f"Rolling restart triggered for {deployment_name}"
    except Exception as e:
        return f"Error: {e}"

Step 2: Build the LangGraph Agent

python
from langgraph.prebuilt import create_react_agent
from langchain_anthropic import ChatAnthropic
 
llm = ChatAnthropic(model="claude-opus-4-8", temperature=0)
 
tools = [
    get_pod_status,
    get_pod_logs,
    query_prometheus,
    get_deployment_info,
    scale_deployment,
    restart_deployment,
]
 
SRE_SYSTEM_PROMPT = """You are an expert Site Reliability Engineer (SRE) agent.
 
You have access to tools to:
- Read Kubernetes cluster state (pods, deployments)
- Query Prometheus metrics
- Get pod logs
- Apply fixes (scale, restart deployments)
 
Your approach to incidents:
1. DIAGNOSE first — understand what's happening before taking action
2. Check multiple signals — pod state + metrics + logs
3. Hypothesize the root cause
4. Apply the minimal fix
5. Verify the fix worked
 
IMPORTANT:
- Always explain what you're doing and why
- Scale deployments conservatively (don't scale to 100 replicas)
- Only restart if you have evidence it will help
- Provide a clear summary of findings and actions taken
 
Start by gathering information before making changes."""
 
agent = create_react_agent(llm, tools, state_modifier=SRE_SYSTEM_PROMPT)

Step 3: Handle Alerts

python
from typing import TypedDict
import asyncio
 
class AlertPayload(TypedDict):
    alert_name: str
    namespace: str
    service: str
    severity: str
    description: str
    labels: dict
 
 
async def handle_alert(alert: AlertPayload) -> str:
    """Process an incoming alert with the SRE agent."""
    
    incident_message = f"""
ALERT: {alert['alert_name']}
Severity: {alert['severity']}
Service: {alert['service']}
Namespace: {alert['namespace']}
Description: {alert['description']}
Labels: {alert['labels']}
 
Please diagnose this incident and take appropriate remediation steps.
Start by checking the pod status in namespace {alert['namespace']} for service {alert['service']}.
"""
    
    result = await agent.ainvoke({
        "messages": [{"role": "user", "content": incident_message}]
    })
    
    # Extract the final response
    final_message = result["messages"][-1].content
    
    return final_message
 
 
# Example: Prometheus Alertmanager webhook receiver
from fastapi import FastAPI
from pydantic import BaseModel
 
app = FastAPI()
 
class PrometheusAlert(BaseModel):
    alerts: list[dict]
    commonLabels: dict
 
 
@app.post("/webhook/alertmanager")
async def receive_alert(payload: PrometheusAlert):
    for alert in payload.alerts:
        if alert.get("status") == "firing":
            incident = AlertPayload(
                alert_name=alert["labels"].get("alertname", "Unknown"),
                namespace=alert["labels"].get("namespace", "default"),
                service=alert["labels"].get("service", "unknown"),
                severity=alert["labels"].get("severity", "warning"),
                description=alert["annotations"].get("description", ""),
                labels=alert["labels"]
            )
            
            # Run agent in background
            asyncio.create_task(handle_alert(incident))
    
    return {"status": "processing"}

Step 4: Deploy as a Kubernetes Service

yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: sre-agent
  namespace: monitoring
spec:
  replicas: 1
  template:
    spec:
      serviceAccountName: sre-agent  # needs RBAC permissions
      containers:
        - name: sre-agent
          image: your-org/sre-agent:latest
          env:
            - name: ANTHROPIC_API_KEY
              valueFrom:
                secretKeyRef:
                  name: llm-secrets
                  key: anthropic_api_key
---
# RBAC for the agent
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: sre-agent
rules:
  - apiGroups: ["", "apps"]
    resources: ["pods", "deployments", "replicasets", "services"]
    verbs: ["get", "list", "watch", "patch", "update"]
  - apiGroups: [""]
    resources: ["pods/log"]
    verbs: ["get"]

Adding Human Approval Gates

For destructive operations, add a human-in-the-loop step:

python
from langgraph.checkpoint.memory import MemorySaver
from langgraph.prebuilt import create_react_agent
 
# Agent with checkpointing for human interrupts
checkpointer = MemorySaver()
 
agent_with_approval = create_react_agent(
    llm, 
    tools, 
    state_modifier=SRE_SYSTEM_PROMPT,
    checkpointer=checkpointer,
    interrupt_before=["scale_deployment", "restart_deployment"]  # pause before these tools
)

When the agent wants to scale or restart, it pauses and sends a Slack message asking for approval. A human approves via slash command, which resumes the agent.

This pattern gives you autonomous diagnosis with supervised remediation — the sweet spot for production systems.

Stack: LangGraph, Claude API, Kubernetes Python client, FastAPI
LangGraph docs | Anthropic API

🔧

Today I Fixed

Short real fixes from production — posted daily

Browse fixes
Newsletter

Stay ahead of the curve

Get the latest DevOps, Kubernetes, AWS, and AI/ML guides delivered straight to your inbox. No spam — just practical engineering content.

Related Articles

Comments