All Articles

Build a DevOps AI Agent with LangGraph on Kubernetes (2026)

Build a stateful DevOps agent using LangGraph that can plan multi-step infrastructure tasks, use tools, handle errors, and maintain conversation context — deployed on Kubernetes with a FastAPI interface.

DevOpsBoysApr 28, 20267 min read
Share:Tweet

Single LLM calls are useful. Agents that plan, use tools, observe results, and retry are transformative. This guide builds a DevOps agent using LangGraph — a framework for building stateful, multi-step AI workflows.

Unlike a simple function-calling bot, this agent can:

  • Break complex tasks into steps
  • Use multiple tools in sequence
  • Handle errors and retry with a different approach
  • Maintain conversation memory across a session

What LangGraph Adds Over Plain Function Calling

Plain function calling:

User → LLM decides tool → You run tool → LLM responds

LangGraph agent:

User → Agent "thinks" (plan) → Runs tool 1 → Observes result 
     → Decides next step → Runs tool 2 → Observes result
     → Synthesizes final answer (or loops if needed)

The agent is a graph where nodes are actions (call LLM, run tool) and edges are conditional transitions (if tool succeeded → next step, if failed → retry or ask user).


Architecture

User (FastAPI) → LangGraph Agent → Tools
                      │
                      ├── kubectl (k8s ops)
                      ├── aws_cli (cloud ops)
                      ├── log_analyzer (parse logs)
                      └── web_search (look up docs)
                      │
                 Redis (state persistence)

Setup

bash
pip install langgraph langchain-anthropic langchain-core \
  redis fastapi uvicorn kubernetes python-dotenv rich
devops-agent/
├── agent/
│   ├── graph.py        # LangGraph state machine
│   ├── tools.py        # Tool implementations
│   ├── nodes.py        # Graph nodes (LLM call, tool execution)
│   └── state.py        # Agent state schema
├── api/
│   └── main.py         # FastAPI server
└── Dockerfile

Step 1: Define Agent State

python
# agent/state.py
from typing import Annotated, Sequence
from langchain_core.messages import BaseMessage
from langgraph.graph.message import add_messages
from typing_extensions import TypedDict
 
 
class AgentState(TypedDict):
    """State that flows through the entire agent graph."""
    messages: Annotated[Sequence[BaseMessage], add_messages]
    # add_messages reducer appends new messages to the list
    # (rather than replacing the whole list)

Step 2: Define DevOps Tools

python
# agent/tools.py
from langchain_core.tools import tool
from kubernetes import client, config
import subprocess
import json
import re
 
try:
    config.load_incluster_config()
except:
    config.load_kube_config()
 
core_v1 = client.CoreV1Api()
apps_v1 = client.AppsV1Api()
 
 
@tool
def get_pod_status(namespace: str, label_selector: str = "") -> str:
    """Get status of all pods in a namespace. Use label_selector like 'app=payments' to filter."""
    try:
        pods = core_v1.list_namespaced_pod(namespace=namespace, label_selector=label_selector)
        result = []
        for pod in pods.items:
            restarts = 0
            status = pod.status.phase or "Unknown"
            if pod.status.container_statuses:
                restarts = sum(cs.restart_count for cs in pod.status.container_statuses)
                for cs in pod.status.container_statuses:
                    if cs.state.waiting:
                        status = cs.state.waiting.reason
            result.append(f"{pod.metadata.name}: {status} (restarts: {restarts})")
        return "\n".join(result) if result else "No pods found"
    except Exception as e:
        return f"Error: {e}"
 
 
@tool
def get_pod_logs(pod_name: str, namespace: str, tail: int = 50) -> str:
    """Get recent logs from a pod. Returns last N lines."""
    try:
        logs = core_v1.read_namespaced_pod_log(
            name=pod_name, namespace=namespace, tail_lines=tail
        )
        return logs[-3000:] if len(logs) > 3000 else logs
    except Exception as e:
        return f"Error getting logs: {e}"
 
 
@tool
def scale_deployment(deployment_name: str, namespace: str, replicas: int) -> str:
    """Scale a Kubernetes deployment. replicas must be between 0 and 20."""
    if not 0 <= replicas <= 20:
        return f"Error: replica count {replicas} out of safe range (0-20)"
    try:
        apps_v1.patch_namespaced_deployment_scale(
            name=deployment_name,
            namespace=namespace,
            body={"spec": {"replicas": replicas}}
        )
        return f"Successfully scaled {namespace}/{deployment_name} to {replicas} replicas"
    except Exception as e:
        return f"Error scaling: {e}"
 
 
@tool
def get_node_resources() -> str:
    """Get CPU and memory usage across all cluster nodes."""
    try:
        nodes = core_v1.list_node()
        result = []
        for node in nodes.items:
            allocatable = node.status.allocatable or {}
            result.append(
                f"{node.metadata.name}: "
                f"CPU={allocatable.get('cpu', '?')} "
                f"Memory={allocatable.get('memory', '?')}"
            )
        return "\n".join(result)
    except Exception as e:
        return f"Error: {e}"
 
 
@tool
def run_kubectl(command: str) -> str:
    """Run a read-only kubectl command. Only get/describe/logs commands are allowed."""
    # Safety: only allow read operations
    allowed_verbs = ["get", "describe", "logs", "top", "explain", "version", "cluster-info"]
    parts = command.strip().split()
    
    if not parts or parts[0] != "kubectl":
        return "Error: command must start with kubectl"
    
    if len(parts) < 2 or parts[1] not in allowed_verbs:
        return f"Error: only read-only kubectl commands allowed ({', '.join(allowed_verbs)})"
    
    try:
        result = subprocess.run(
            parts,
            capture_output=True, text=True, timeout=30
        )
        output = result.stdout or result.stderr
        return output[:3000] if len(output) > 3000 else output
    except subprocess.TimeoutExpired:
        return "Error: command timed out after 30 seconds"
    except Exception as e:
        return f"Error: {e}"
 
 
TOOLS = [get_pod_status, get_pod_logs, scale_deployment, get_node_resources, run_kubectl]

Step 3: Build the LangGraph Agent

python
# agent/graph.py
from langchain_anthropic import ChatAnthropic
from langgraph.graph import StateGraph, END
from langgraph.prebuilt import ToolNode
from langchain_core.messages import HumanMessage, SystemMessage
from .state import AgentState
from .tools import TOOLS
 
model = ChatAnthropic(model="claude-sonnet-4-6").bind_tools(TOOLS)
 
SYSTEM_MESSAGE = SystemMessage(content="""You are an expert DevOps SRE assistant with access to 
Kubernetes cluster tools. 
 
For complex requests, break them down step by step:
1. Gather information first (check pod status, logs, events)
2. Analyze what you found
3. Take action only when you understand the root cause
4. Verify the action worked
 
Be cautious with write operations (scaling). Always check current state before modifying it.
Report findings clearly with specific details (pod names, error messages, metrics).""")
 
 
def should_continue(state: AgentState) -> str:
    """Decide whether to continue using tools or end."""
    last_message = state["messages"][-1]
    if hasattr(last_message, "tool_calls") and last_message.tool_calls:
        return "tools"
    return END
 
 
def call_model(state: AgentState) -> AgentState:
    """Call the LLM with current message history."""
    messages = [SYSTEM_MESSAGE] + list(state["messages"])
    response = model.invoke(messages)
    return {"messages": [response]}
 
 
# Build the graph
def build_agent():
    graph = StateGraph(AgentState)
 
    # Nodes
    graph.add_node("agent", call_model)
    graph.add_node("tools", ToolNode(TOOLS))
 
    # Edges
    graph.set_entry_point("agent")
    graph.add_conditional_edges("agent", should_continue)
    graph.add_edge("tools", "agent")  # After tools, always go back to agent
 
    return graph.compile()
 
 
agent = build_agent()

Step 4: FastAPI Interface

python
# api/main.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from langchain_core.messages import HumanMessage
import uuid
from typing import Optional
 
from agent.graph import agent
 
app = FastAPI(title="DevOps AI Agent")
 
# In-memory session storage (use Redis for production)
sessions: dict = {}
 
 
class ChatRequest(BaseModel):
    message: str
    session_id: Optional[str] = None
 
 
class ChatResponse(BaseModel):
    session_id: str
    response: str
    tool_calls_made: int
 
 
@app.post("/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
    session_id = request.session_id or str(uuid.uuid4())
    
    # Get or create session history
    history = sessions.get(session_id, [])
    
    # Add user message
    history.append(HumanMessage(content=request.message))
    
    try:
        # Run agent
        result = await agent.ainvoke({"messages": history})
        
        # Save updated history
        sessions[session_id] = result["messages"]
        
        # Get final response text
        final_message = result["messages"][-1]
        response_text = final_message.content
        if isinstance(response_text, list):
            # Handle content blocks (Anthropic format)
            response_text = " ".join(
                block.get("text", "") if isinstance(block, dict) else str(block)
                for block in response_text
            )
        
        # Count tool calls made
        tool_calls = sum(
            1 for msg in result["messages"]
            if hasattr(msg, "tool_calls") and msg.tool_calls
        )
        
        return ChatResponse(
            session_id=session_id,
            response=str(response_text),
            tool_calls_made=tool_calls,
        )
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))
 
 
@app.delete("/session/{session_id}")
async def clear_session(session_id: str):
    sessions.pop(session_id, None)
    return {"cleared": True}
 
 
@app.get("/health")
async def health():
    return {"status": "ok"}

Example Interaction

bash
# Start the server
uvicorn api.main:app --host 0.0.0.0 --port 8000
 
# Multi-step task
curl -X POST http://localhost:8000/chat \
  -H "Content-Type: application/json" \
  -d '{
    "message": "Check the payments namespace. If any pods have high restart counts, investigate why and tell me if it is safe to restart them.",
    "session_id": "session-001"
  }'

The agent:

  1. Calls get_pod_status(namespace="payments")
  2. Sees payments-7d9f-abc: CrashLoopBackOff (restarts: 15)
  3. Calls get_pod_logs(pod_name="payments-7d9f-abc", namespace="payments")
  4. Finds ERROR: Cannot connect to database: connection refused
  5. Calls run_kubectl("kubectl describe pod payments-7d9f-abc -n payments") to check events
  6. Returns: "The payments pod is crashing due to database connection failure. The DB_HOST environment variable points to postgres-svc but no such service exists in the namespace. This is likely a misconfigured environment variable, not a transient error. Restarting will not fix it — you need to correct the DB_HOST value in the deployment."

Deploy to Kubernetes

yaml
# deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: devops-agent
  namespace: platform
spec:
  replicas: 1
  selector:
    matchLabels:
      app: devops-agent
  template:
    metadata:
      labels:
        app: devops-agent
    spec:
      serviceAccountName: devops-agent-sa
      containers:
      - name: agent
        image: your-registry/devops-agent:latest
        ports:
        - containerPort: 8000
        env:
        - name: ANTHROPIC_API_KEY
          valueFrom:
            secretKeyRef:
              name: ai-secrets
              key: anthropic-api-key
        resources:
          requests:
            memory: "512Mi"
            cpu: "250m"
          limits:
            memory: "1Gi"
            cpu: "500m"
yaml
# RBAC — read + limited write access
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: devops-agent
rules:
- apiGroups: [""]
  resources: ["pods", "pods/log", "events", "nodes", "services"]
  verbs: ["get", "list"]
- apiGroups: ["apps"]
  resources: ["deployments"]
  verbs: ["get", "list", "patch"]  # patch for scaling

LangGraph's graph-based state machine makes it natural to build agents that think in steps, handle failures gracefully, and maintain session context. For production deployments, add Redis for session persistence and rate limiting per user.

Full LangGraph documentation at langchain-ai.github.io/langgraph — the concepts section is particularly good for understanding when graphs > chains.

Newsletter

Stay ahead of the curve

Get the latest DevOps, Kubernetes, AWS, and AI/ML guides delivered straight to your inbox. No spam — just practical engineering content.

Related Articles

Comments