LLM Multi-Agent Orchestration with LangGraph in Production
Build a production-ready multi-agent system with LangGraph for DevOps automation — Planner, Executor, and Reviewer agents with shared state, conditional edges, human-in-the-loop checkpoints, and LangSmith observability.
Single LLM calls break down on complex tasks. A request like "review this Terraform plan and apply it safely" requires planning, tool execution, error handling, and verification — more than one prompt can reliably do. Multi-agent orchestration splits this into specialized agents that pass work to each other with shared context.
LangGraph is the right tool for this in 2026. It gives you explicit state machines for agents, conditional routing, and built-in support for human-in-the-loop — all critical for production DevOps automation.
What We're Building
A 3-agent system for handling infra change requests:
- Planner Agent — takes a natural language request ("scale the payment service to 5 replicas") and produces a structured execution plan with risk assessment
- Executor Agent — runs the plan step by step using kubectl/terraform tools, captures output
- Reviewer Agent — validates execution results, checks for errors or unexpected changes, decides to proceed or rollback
Setup
pip install langgraph langchain-anthropic langsmith
export ANTHROPIC_API_KEY="sk-ant-..."
export LANGCHAIN_TRACING_V2=true
export LANGCHAIN_API_KEY="ls__..." # LangSmith key
export LANGCHAIN_PROJECT="devops-agents"Shared State Definition
LangGraph agents communicate via a typed state object that flows through the graph:
from typing import TypedDict, Annotated, Literal
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver
import operator
class AgentState(TypedDict):
# Input
request: str
# Planner output
plan: list[dict] # [{"step": 1, "action": "...", "risk": "low"}]
risk_level: str # "low" | "medium" | "high"
# Executor output
execution_log: Annotated[list[str], operator.add] # appends each step
current_step: int
execution_status: str # "pending" | "running" | "success" | "failed"
# Reviewer output
review_result: str # "approved" | "rollback" | "needs_human"
review_notes: str
# Control flow
human_approved: bool
error: strThe Annotated[list[str], operator.add] pattern tells LangGraph to append to the list rather than replace it — essential for execution logs.
Agent Nodes
Planner Agent
from langchain_anthropic import ChatAnthropic
from langchain_core.messages import HumanMessage, SystemMessage
import json
llm = ChatAnthropic(model="claude-sonnet-4-5", temperature=0)
def planner_agent(state: AgentState) -> AgentState:
"""Converts a natural language request into a structured execution plan."""
response = llm.invoke([
SystemMessage(content="""You are a DevOps planning agent. Convert infrastructure change requests into step-by-step execution plans.
Output JSON with this structure:
{
"plan": [
{"step": 1, "action": "kubectl scale deployment payment-api --replicas=5 -n production", "description": "Scale payment-api to 5 replicas", "risk": "low", "rollback": "kubectl scale deployment payment-api --replicas=3 -n production"}
],
"risk_level": "low|medium|high",
"risk_reason": "why this risk level"
}
Risk levels:
- low: read-only or non-destructive change
- medium: modifies running services but rollback is straightforward
- high: deletes resources, modifies production databases, or changes networking"""),
HumanMessage(content=f"Plan this change: {state['request']}")
])
try:
plan_data = json.loads(response.content)
return {
"plan": plan_data["plan"],
"risk_level": plan_data["risk_level"],
"human_approved": False,
"execution_status": "pending",
"current_step": 0,
"execution_log": [f"Plan created with {len(plan_data['plan'])} steps. Risk: {plan_data['risk_level']}"]
}
except json.JSONDecodeError:
return {
"error": f"Planner failed to produce valid JSON: {response.content}",
"execution_status": "failed"
}Executor Agent
import subprocess
def execute_command(command: str) -> tuple[bool, str]:
"""Execute a shell command and return (success, output)."""
try:
result = subprocess.run(
command, shell=True, capture_output=True,
text=True, timeout=60
)
if result.returncode == 0:
return True, result.stdout.strip()
else:
return False, f"Exit {result.returncode}: {result.stderr.strip()}"
except subprocess.TimeoutExpired:
return False, "Command timed out after 60 seconds"
def executor_agent(state: AgentState) -> AgentState:
"""Executes the plan step by step."""
if state.get("error"):
return state
plan = state["plan"]
logs = []
for i, step in enumerate(plan):
logs.append(f"Step {step['step']}: {step['description']}")
success, output = execute_command(step["action"])
if success:
logs.append(f" OK: {output[:200]}")
else:
logs.append(f" FAILED: {output}")
return {
"execution_log": logs,
"current_step": i,
"execution_status": "failed",
"error": f"Step {step['step']} failed: {output}"
}
return {
"execution_log": logs,
"current_step": len(plan),
"execution_status": "success"
}Reviewer Agent
def reviewer_agent(state: AgentState) -> AgentState:
"""Reviews execution results and decides next action."""
execution_summary = "\n".join(state.get("execution_log", []))
response = llm.invoke([
SystemMessage(content="""You are a DevOps review agent. Analyze execution logs and determine if the change was successful.
Respond with JSON:
{
"result": "approved|rollback|needs_human",
"notes": "explanation of your decision"
}
- approved: all steps succeeded, outputs look correct
- rollback: critical errors detected, rollback commands should be run
- needs_human: ambiguous state, escalate to human operator"""),
HumanMessage(content=f"""
Request: {state['request']}
Execution status: {state['execution_status']}
Execution log:
{execution_summary}
Error (if any): {state.get('error', 'none')}
Review this and provide your decision.""")
])
try:
review = json.loads(response.content)
return {
"review_result": review["result"],
"review_notes": review["notes"]
}
except json.JSONDecodeError:
return {
"review_result": "needs_human",
"review_notes": f"Reviewer failed to parse: {response.content}"
}Human-in-the-Loop Checkpoint
For high-risk changes, the graph pauses and waits for human approval:
def human_approval_node(state: AgentState) -> AgentState:
"""This node interrupts the graph — LangGraph waits for human input."""
print(f"\nHIGH RISK CHANGE DETECTED")
print(f"Request: {state['request']}")
print(f"Plan:")
for step in state["plan"]:
print(f" {step['step']}. {step['description']} [{step['risk']}]")
print(f"\nApproval required. Use graph.update_state() to approve.")
return state # Graph is interrupted here by the interrupt_before configBuilding the StateGraph
def route_after_plan(state: AgentState) -> Literal["human_approval", "executor", "end"]:
"""Route based on risk level after planning."""
if state.get("error"):
return "end"
if state.get("risk_level") == "high":
return "human_approval"
return "executor"
def route_after_review(state: AgentState) -> Literal["end", "rollback"]:
"""Route based on reviewer decision."""
result = state.get("review_result", "needs_human")
if result == "rollback":
return "rollback"
return "end"
def rollback_node(state: AgentState) -> AgentState:
"""Execute rollback commands from the plan."""
logs = ["INITIATING ROLLBACK"]
for step in reversed(state.get("plan", [])):
if "rollback" in step:
success, output = execute_command(step["rollback"])
logs.append(f"Rollback step {step['step']}: {'OK' if success else 'FAILED'} — {output[:100]}")
return {"execution_log": logs, "execution_status": "rolled_back"}
# Build graph
builder = StateGraph(AgentState)
builder.add_node("planner", planner_agent)
builder.add_node("human_approval", human_approval_node)
builder.add_node("executor", executor_agent)
builder.add_node("reviewer", reviewer_agent)
builder.add_node("rollback", rollback_node)
builder.set_entry_point("planner")
builder.add_conditional_edges("planner", route_after_plan, {
"human_approval": "human_approval",
"executor": "executor",
"end": END
})
builder.add_edge("human_approval", "executor")
builder.add_edge("executor", "reviewer")
builder.add_conditional_edges("reviewer", route_after_review, {
"end": END,
"rollback": "rollback"
})
builder.add_edge("rollback", END)
# Memory saver enables pause/resume for human-in-the-loop
memory = MemorySaver()
graph = builder.compile(
checkpointer=memory,
interrupt_before=["human_approval"] # Pause before this node
)Running the Graph
config = {"configurable": {"thread_id": "deploy-001"}}
# Start the graph
result = graph.invoke(
{"request": "scale payment-api to 5 replicas in production"},
config=config
)
print(f"Status: {result['execution_status']}")
print(f"Review: {result.get('review_result')}")
print("\nExecution log:")
for log in result.get("execution_log", []):
print(f" {log}")For high-risk changes, the graph pauses. Resume after human approval:
# Human approves — update state and resume
graph.update_state(config, {"human_approved": True}, as_node="human_approval")
result = graph.invoke(None, config=config) # None = resume from checkpointLangSmith Observability
With LANGCHAIN_TRACING_V2=true, every agent run appears in LangSmith with:
- Full input/output for each node
- Latency per node
- Token usage breakdown
- The complete state at each transition
This is essential for debugging agent failures in production. You can trace exactly which agent made a wrong decision and why.
Error Handling Between Agents
The key pattern: each agent checks state.get("error") at the start and short-circuits if a previous agent failed. The router functions also check for errors and route to END rather than continuing. This prevents cascading failures where a broken plan gets executed.
Production Deployment Pattern
Run the graph as an API endpoint using FastAPI:
from fastapi import FastAPI
app = FastAPI()
@app.post("/infra-change")
async def handle_change(request: dict):
thread_id = f"change-{uuid4()}"
config = {"configurable": {"thread_id": thread_id}}
result = await asyncio.to_thread(
graph.invoke, {"request": request["description"]}, config
)
return {"thread_id": thread_id, "result": result}This architecture handles the messy reality of infrastructure automation: not every step succeeds, not every change is low-risk, and some decisions genuinely need a human. LangGraph makes those control flows explicit and inspectable rather than buried in ad-hoc if/else chains.
Today I Fixed
Short real fixes from production — posted daily
Stay ahead of the curve
Get the latest DevOps, Kubernetes, AWS, and AI/ML guides delivered straight to your inbox. No spam — just practical engineering content.
Related Articles
Structured Outputs and JSON Mode for LLMs in Production
How to enforce structured JSON output from LLMs in production — Claude tool use, OpenAI JSON mode, Pydantic + Instructor validation, retry logic, schema versioning, and testing pipelines with the Anthropic SDK.
AI Coding Assistants Will Change DevOps — But Not in the Way You Think
GitHub Copilot, Cursor, and Claude are already writing infrastructure code. But the real disruption isn't replacing DevOps engineers — it's reshaping what the job actually is.
Build an AI-Powered Terraform Drift Detection System
Terraform drift happens silently. Here's how to build an automated drift detector using Terraform plan + Claude API that alerts your team and explains exactly what changed.