Build a DevOps AI Agent with LangGraph on Kubernetes (2026)
Build a stateful DevOps agent using LangGraph that can plan multi-step infrastructure tasks, use tools, handle errors, and maintain conversation context — deployed on Kubernetes with a FastAPI interface.
Single LLM calls are useful. Agents that plan, use tools, observe results, and retry are transformative. This guide builds a DevOps agent using LangGraph — a framework for building stateful, multi-step AI workflows.
Unlike a simple function-calling bot, this agent can:
- Break complex tasks into steps
- Use multiple tools in sequence
- Handle errors and retry with a different approach
- Maintain conversation memory across a session
What LangGraph Adds Over Plain Function Calling
Plain function calling:
User → LLM decides tool → You run tool → LLM responds
LangGraph agent:
User → Agent "thinks" (plan) → Runs tool 1 → Observes result
→ Decides next step → Runs tool 2 → Observes result
→ Synthesizes final answer (or loops if needed)
The agent is a graph where nodes are actions (call LLM, run tool) and edges are conditional transitions (if tool succeeded → next step, if failed → retry or ask user).
Architecture
User (FastAPI) → LangGraph Agent → Tools
│
├── kubectl (k8s ops)
├── aws_cli (cloud ops)
├── log_analyzer (parse logs)
└── web_search (look up docs)
│
Redis (state persistence)
Setup
pip install langgraph langchain-anthropic langchain-core \
redis fastapi uvicorn kubernetes python-dotenv richdevops-agent/
├── agent/
│ ├── graph.py # LangGraph state machine
│ ├── tools.py # Tool implementations
│ ├── nodes.py # Graph nodes (LLM call, tool execution)
│ └── state.py # Agent state schema
├── api/
│ └── main.py # FastAPI server
└── Dockerfile
Step 1: Define Agent State
# agent/state.py
from typing import Annotated, Sequence
from langchain_core.messages import BaseMessage
from langgraph.graph.message import add_messages
from typing_extensions import TypedDict
class AgentState(TypedDict):
"""State that flows through the entire agent graph."""
messages: Annotated[Sequence[BaseMessage], add_messages]
# add_messages reducer appends new messages to the list
# (rather than replacing the whole list)Step 2: Define DevOps Tools
# agent/tools.py
from langchain_core.tools import tool
from kubernetes import client, config
import subprocess
import json
import re
try:
config.load_incluster_config()
except:
config.load_kube_config()
core_v1 = client.CoreV1Api()
apps_v1 = client.AppsV1Api()
@tool
def get_pod_status(namespace: str, label_selector: str = "") -> str:
"""Get status of all pods in a namespace. Use label_selector like 'app=payments' to filter."""
try:
pods = core_v1.list_namespaced_pod(namespace=namespace, label_selector=label_selector)
result = []
for pod in pods.items:
restarts = 0
status = pod.status.phase or "Unknown"
if pod.status.container_statuses:
restarts = sum(cs.restart_count for cs in pod.status.container_statuses)
for cs in pod.status.container_statuses:
if cs.state.waiting:
status = cs.state.waiting.reason
result.append(f"{pod.metadata.name}: {status} (restarts: {restarts})")
return "\n".join(result) if result else "No pods found"
except Exception as e:
return f"Error: {e}"
@tool
def get_pod_logs(pod_name: str, namespace: str, tail: int = 50) -> str:
"""Get recent logs from a pod. Returns last N lines."""
try:
logs = core_v1.read_namespaced_pod_log(
name=pod_name, namespace=namespace, tail_lines=tail
)
return logs[-3000:] if len(logs) > 3000 else logs
except Exception as e:
return f"Error getting logs: {e}"
@tool
def scale_deployment(deployment_name: str, namespace: str, replicas: int) -> str:
"""Scale a Kubernetes deployment. replicas must be between 0 and 20."""
if not 0 <= replicas <= 20:
return f"Error: replica count {replicas} out of safe range (0-20)"
try:
apps_v1.patch_namespaced_deployment_scale(
name=deployment_name,
namespace=namespace,
body={"spec": {"replicas": replicas}}
)
return f"Successfully scaled {namespace}/{deployment_name} to {replicas} replicas"
except Exception as e:
return f"Error scaling: {e}"
@tool
def get_node_resources() -> str:
"""Get CPU and memory usage across all cluster nodes."""
try:
nodes = core_v1.list_node()
result = []
for node in nodes.items:
allocatable = node.status.allocatable or {}
result.append(
f"{node.metadata.name}: "
f"CPU={allocatable.get('cpu', '?')} "
f"Memory={allocatable.get('memory', '?')}"
)
return "\n".join(result)
except Exception as e:
return f"Error: {e}"
@tool
def run_kubectl(command: str) -> str:
"""Run a read-only kubectl command. Only get/describe/logs commands are allowed."""
# Safety: only allow read operations
allowed_verbs = ["get", "describe", "logs", "top", "explain", "version", "cluster-info"]
parts = command.strip().split()
if not parts or parts[0] != "kubectl":
return "Error: command must start with kubectl"
if len(parts) < 2 or parts[1] not in allowed_verbs:
return f"Error: only read-only kubectl commands allowed ({', '.join(allowed_verbs)})"
try:
result = subprocess.run(
parts,
capture_output=True, text=True, timeout=30
)
output = result.stdout or result.stderr
return output[:3000] if len(output) > 3000 else output
except subprocess.TimeoutExpired:
return "Error: command timed out after 30 seconds"
except Exception as e:
return f"Error: {e}"
TOOLS = [get_pod_status, get_pod_logs, scale_deployment, get_node_resources, run_kubectl]Step 3: Build the LangGraph Agent
# agent/graph.py
from langchain_anthropic import ChatAnthropic
from langgraph.graph import StateGraph, END
from langgraph.prebuilt import ToolNode
from langchain_core.messages import HumanMessage, SystemMessage
from .state import AgentState
from .tools import TOOLS
model = ChatAnthropic(model="claude-sonnet-4-6").bind_tools(TOOLS)
SYSTEM_MESSAGE = SystemMessage(content="""You are an expert DevOps SRE assistant with access to
Kubernetes cluster tools.
For complex requests, break them down step by step:
1. Gather information first (check pod status, logs, events)
2. Analyze what you found
3. Take action only when you understand the root cause
4. Verify the action worked
Be cautious with write operations (scaling). Always check current state before modifying it.
Report findings clearly with specific details (pod names, error messages, metrics).""")
def should_continue(state: AgentState) -> str:
"""Decide whether to continue using tools or end."""
last_message = state["messages"][-1]
if hasattr(last_message, "tool_calls") and last_message.tool_calls:
return "tools"
return END
def call_model(state: AgentState) -> AgentState:
"""Call the LLM with current message history."""
messages = [SYSTEM_MESSAGE] + list(state["messages"])
response = model.invoke(messages)
return {"messages": [response]}
# Build the graph
def build_agent():
graph = StateGraph(AgentState)
# Nodes
graph.add_node("agent", call_model)
graph.add_node("tools", ToolNode(TOOLS))
# Edges
graph.set_entry_point("agent")
graph.add_conditional_edges("agent", should_continue)
graph.add_edge("tools", "agent") # After tools, always go back to agent
return graph.compile()
agent = build_agent()Step 4: FastAPI Interface
# api/main.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from langchain_core.messages import HumanMessage
import uuid
from typing import Optional
from agent.graph import agent
app = FastAPI(title="DevOps AI Agent")
# In-memory session storage (use Redis for production)
sessions: dict = {}
class ChatRequest(BaseModel):
message: str
session_id: Optional[str] = None
class ChatResponse(BaseModel):
session_id: str
response: str
tool_calls_made: int
@app.post("/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
session_id = request.session_id or str(uuid.uuid4())
# Get or create session history
history = sessions.get(session_id, [])
# Add user message
history.append(HumanMessage(content=request.message))
try:
# Run agent
result = await agent.ainvoke({"messages": history})
# Save updated history
sessions[session_id] = result["messages"]
# Get final response text
final_message = result["messages"][-1]
response_text = final_message.content
if isinstance(response_text, list):
# Handle content blocks (Anthropic format)
response_text = " ".join(
block.get("text", "") if isinstance(block, dict) else str(block)
for block in response_text
)
# Count tool calls made
tool_calls = sum(
1 for msg in result["messages"]
if hasattr(msg, "tool_calls") and msg.tool_calls
)
return ChatResponse(
session_id=session_id,
response=str(response_text),
tool_calls_made=tool_calls,
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.delete("/session/{session_id}")
async def clear_session(session_id: str):
sessions.pop(session_id, None)
return {"cleared": True}
@app.get("/health")
async def health():
return {"status": "ok"}Example Interaction
# Start the server
uvicorn api.main:app --host 0.0.0.0 --port 8000
# Multi-step task
curl -X POST http://localhost:8000/chat \
-H "Content-Type: application/json" \
-d '{
"message": "Check the payments namespace. If any pods have high restart counts, investigate why and tell me if it is safe to restart them.",
"session_id": "session-001"
}'The agent:
- Calls
get_pod_status(namespace="payments") - Sees
payments-7d9f-abc: CrashLoopBackOff (restarts: 15) - Calls
get_pod_logs(pod_name="payments-7d9f-abc", namespace="payments") - Finds
ERROR: Cannot connect to database: connection refused - Calls
run_kubectl("kubectl describe pod payments-7d9f-abc -n payments")to check events - Returns: "The payments pod is crashing due to database connection failure. The DB_HOST environment variable points to
postgres-svcbut no such service exists in the namespace. This is likely a misconfigured environment variable, not a transient error. Restarting will not fix it — you need to correct the DB_HOST value in the deployment."
Deploy to Kubernetes
# deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: devops-agent
namespace: platform
spec:
replicas: 1
selector:
matchLabels:
app: devops-agent
template:
metadata:
labels:
app: devops-agent
spec:
serviceAccountName: devops-agent-sa
containers:
- name: agent
image: your-registry/devops-agent:latest
ports:
- containerPort: 8000
env:
- name: ANTHROPIC_API_KEY
valueFrom:
secretKeyRef:
name: ai-secrets
key: anthropic-api-key
resources:
requests:
memory: "512Mi"
cpu: "250m"
limits:
memory: "1Gi"
cpu: "500m"# RBAC — read + limited write access
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: devops-agent
rules:
- apiGroups: [""]
resources: ["pods", "pods/log", "events", "nodes", "services"]
verbs: ["get", "list"]
- apiGroups: ["apps"]
resources: ["deployments"]
verbs: ["get", "list", "patch"] # patch for scalingLangGraph's graph-based state machine makes it natural to build agents that think in steps, handle failures gracefully, and maintain session context. For production deployments, add Redis for session persistence and rate limiting per user.
Full LangGraph documentation at langchain-ai.github.io/langgraph — the concepts section is particularly good for understanding when graphs > chains.
Stay ahead of the curve
Get the latest DevOps, Kubernetes, AWS, and AI/ML guides delivered straight to your inbox. No spam — just practical engineering content.
Related Articles
Build a DevOps Automation Bot with LLM Function Calling (2026)
Use Claude or GPT-4o function calling to build a DevOps bot that can check pod status, scale deployments, query logs, and trigger pipelines — all from plain English commands in Slack or terminal.
AI-Driven Capacity Planning for Kubernetes Clusters (2026)
How to use AI and machine learning for Kubernetes capacity planning. Covers predictive autoscaling, cost optimization, tools like StormForge and Kubecost, and building custom ML models for resource forecasting.
AI-Powered Kubernetes Anomaly Detection: Beyond Static Thresholds
Static alerts miss 40% of real incidents. Learn how AI and ML-based anomaly detection — using tools like Prometheus + ML, Dynatrace, and custom LLM runbooks — catches what thresholds can't.