🎉 DevOps Interview Prep Bundle is live — 1000+ Q&A across 20 topicsGet it →
All Articles

LLM Multi-Agent Orchestration with LangGraph in Production

Build a production-ready multi-agent system with LangGraph for DevOps automation — Planner, Executor, and Reviewer agents with shared state, conditional edges, human-in-the-loop checkpoints, and LangSmith observability.

DevOpsBoys6 min read
Share:Tweet

Single LLM calls break down on complex tasks. A request like "review this Terraform plan and apply it safely" requires planning, tool execution, error handling, and verification — more than one prompt can reliably do. Multi-agent orchestration splits this into specialized agents that pass work to each other with shared context.

LangGraph is the right tool for this in 2026. It gives you explicit state machines for agents, conditional routing, and built-in support for human-in-the-loop — all critical for production DevOps automation.

What We're Building

A 3-agent system for handling infra change requests:

  • Planner Agent — takes a natural language request ("scale the payment service to 5 replicas") and produces a structured execution plan with risk assessment
  • Executor Agent — runs the plan step by step using kubectl/terraform tools, captures output
  • Reviewer Agent — validates execution results, checks for errors or unexpected changes, decides to proceed or rollback

Setup

bash
pip install langgraph langchain-anthropic langsmith
export ANTHROPIC_API_KEY="sk-ant-..."
export LANGCHAIN_TRACING_V2=true
export LANGCHAIN_API_KEY="ls__..."  # LangSmith key
export LANGCHAIN_PROJECT="devops-agents"

Shared State Definition

LangGraph agents communicate via a typed state object that flows through the graph:

python
from typing import TypedDict, Annotated, Literal
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver
import operator
 
class AgentState(TypedDict):
    # Input
    request: str
    
    # Planner output
    plan: list[dict]          # [{"step": 1, "action": "...", "risk": "low"}]
    risk_level: str           # "low" | "medium" | "high"
    
    # Executor output
    execution_log: Annotated[list[str], operator.add]  # appends each step
    current_step: int
    execution_status: str     # "pending" | "running" | "success" | "failed"
    
    # Reviewer output
    review_result: str        # "approved" | "rollback" | "needs_human"
    review_notes: str
    
    # Control flow
    human_approved: bool
    error: str

The Annotated[list[str], operator.add] pattern tells LangGraph to append to the list rather than replace it — essential for execution logs.

Agent Nodes

Planner Agent

python
from langchain_anthropic import ChatAnthropic
from langchain_core.messages import HumanMessage, SystemMessage
import json
 
llm = ChatAnthropic(model="claude-sonnet-4-5", temperature=0)
 
def planner_agent(state: AgentState) -> AgentState:
    """Converts a natural language request into a structured execution plan."""
    
    response = llm.invoke([
        SystemMessage(content="""You are a DevOps planning agent. Convert infrastructure change requests into step-by-step execution plans.
 
Output JSON with this structure:
{
  "plan": [
    {"step": 1, "action": "kubectl scale deployment payment-api --replicas=5 -n production", "description": "Scale payment-api to 5 replicas", "risk": "low", "rollback": "kubectl scale deployment payment-api --replicas=3 -n production"}
  ],
  "risk_level": "low|medium|high",
  "risk_reason": "why this risk level"
}
 
Risk levels:
- low: read-only or non-destructive change
- medium: modifies running services but rollback is straightforward  
- high: deletes resources, modifies production databases, or changes networking"""),
        HumanMessage(content=f"Plan this change: {state['request']}")
    ])
    
    try:
        plan_data = json.loads(response.content)
        return {
            "plan": plan_data["plan"],
            "risk_level": plan_data["risk_level"],
            "human_approved": False,
            "execution_status": "pending",
            "current_step": 0,
            "execution_log": [f"Plan created with {len(plan_data['plan'])} steps. Risk: {plan_data['risk_level']}"]
        }
    except json.JSONDecodeError:
        return {
            "error": f"Planner failed to produce valid JSON: {response.content}",
            "execution_status": "failed"
        }

Executor Agent

python
import subprocess
 
def execute_command(command: str) -> tuple[bool, str]:
    """Execute a shell command and return (success, output)."""
    try:
        result = subprocess.run(
            command, shell=True, capture_output=True,
            text=True, timeout=60
        )
        if result.returncode == 0:
            return True, result.stdout.strip()
        else:
            return False, f"Exit {result.returncode}: {result.stderr.strip()}"
    except subprocess.TimeoutExpired:
        return False, "Command timed out after 60 seconds"
 
def executor_agent(state: AgentState) -> AgentState:
    """Executes the plan step by step."""
    
    if state.get("error"):
        return state
    
    plan = state["plan"]
    logs = []
    
    for i, step in enumerate(plan):
        logs.append(f"Step {step['step']}: {step['description']}")
        success, output = execute_command(step["action"])
        
        if success:
            logs.append(f"  OK: {output[:200]}")
        else:
            logs.append(f"  FAILED: {output}")
            return {
                "execution_log": logs,
                "current_step": i,
                "execution_status": "failed",
                "error": f"Step {step['step']} failed: {output}"
            }
    
    return {
        "execution_log": logs,
        "current_step": len(plan),
        "execution_status": "success"
    }

Reviewer Agent

python
def reviewer_agent(state: AgentState) -> AgentState:
    """Reviews execution results and decides next action."""
    
    execution_summary = "\n".join(state.get("execution_log", []))
    
    response = llm.invoke([
        SystemMessage(content="""You are a DevOps review agent. Analyze execution logs and determine if the change was successful.
 
Respond with JSON:
{
  "result": "approved|rollback|needs_human",
  "notes": "explanation of your decision"
}
 
- approved: all steps succeeded, outputs look correct
- rollback: critical errors detected, rollback commands should be run  
- needs_human: ambiguous state, escalate to human operator"""),
        HumanMessage(content=f"""
Request: {state['request']}
Execution status: {state['execution_status']}
Execution log:
{execution_summary}
 
Error (if any): {state.get('error', 'none')}
 
Review this and provide your decision.""")
    ])
    
    try:
        review = json.loads(response.content)
        return {
            "review_result": review["result"],
            "review_notes": review["notes"]
        }
    except json.JSONDecodeError:
        return {
            "review_result": "needs_human",
            "review_notes": f"Reviewer failed to parse: {response.content}"
        }

Human-in-the-Loop Checkpoint

For high-risk changes, the graph pauses and waits for human approval:

python
def human_approval_node(state: AgentState) -> AgentState:
    """This node interrupts the graph — LangGraph waits for human input."""
    print(f"\nHIGH RISK CHANGE DETECTED")
    print(f"Request: {state['request']}")
    print(f"Plan:")
    for step in state["plan"]:
        print(f"  {step['step']}. {step['description']} [{step['risk']}]")
    print(f"\nApproval required. Use graph.update_state() to approve.")
    return state  # Graph is interrupted here by the interrupt_before config

Building the StateGraph

python
def route_after_plan(state: AgentState) -> Literal["human_approval", "executor", "end"]:
    """Route based on risk level after planning."""
    if state.get("error"):
        return "end"
    if state.get("risk_level") == "high":
        return "human_approval"
    return "executor"
 
def route_after_review(state: AgentState) -> Literal["end", "rollback"]:
    """Route based on reviewer decision."""
    result = state.get("review_result", "needs_human")
    if result == "rollback":
        return "rollback"
    return "end"
 
def rollback_node(state: AgentState) -> AgentState:
    """Execute rollback commands from the plan."""
    logs = ["INITIATING ROLLBACK"]
    for step in reversed(state.get("plan", [])):
        if "rollback" in step:
            success, output = execute_command(step["rollback"])
            logs.append(f"Rollback step {step['step']}: {'OK' if success else 'FAILED'}{output[:100]}")
    return {"execution_log": logs, "execution_status": "rolled_back"}
 
# Build graph
builder = StateGraph(AgentState)
 
builder.add_node("planner", planner_agent)
builder.add_node("human_approval", human_approval_node)
builder.add_node("executor", executor_agent)
builder.add_node("reviewer", reviewer_agent)
builder.add_node("rollback", rollback_node)
 
builder.set_entry_point("planner")
 
builder.add_conditional_edges("planner", route_after_plan, {
    "human_approval": "human_approval",
    "executor": "executor",
    "end": END
})
builder.add_edge("human_approval", "executor")
builder.add_edge("executor", "reviewer")
builder.add_conditional_edges("reviewer", route_after_review, {
    "end": END,
    "rollback": "rollback"
})
builder.add_edge("rollback", END)
 
# Memory saver enables pause/resume for human-in-the-loop
memory = MemorySaver()
graph = builder.compile(
    checkpointer=memory,
    interrupt_before=["human_approval"]  # Pause before this node
)

Running the Graph

python
config = {"configurable": {"thread_id": "deploy-001"}}
 
# Start the graph
result = graph.invoke(
    {"request": "scale payment-api to 5 replicas in production"},
    config=config
)
 
print(f"Status: {result['execution_status']}")
print(f"Review: {result.get('review_result')}")
print("\nExecution log:")
for log in result.get("execution_log", []):
    print(f"  {log}")

For high-risk changes, the graph pauses. Resume after human approval:

python
# Human approves — update state and resume
graph.update_state(config, {"human_approved": True}, as_node="human_approval")
result = graph.invoke(None, config=config)  # None = resume from checkpoint

LangSmith Observability

With LANGCHAIN_TRACING_V2=true, every agent run appears in LangSmith with:

  • Full input/output for each node
  • Latency per node
  • Token usage breakdown
  • The complete state at each transition

This is essential for debugging agent failures in production. You can trace exactly which agent made a wrong decision and why.

Error Handling Between Agents

The key pattern: each agent checks state.get("error") at the start and short-circuits if a previous agent failed. The router functions also check for errors and route to END rather than continuing. This prevents cascading failures where a broken plan gets executed.

Production Deployment Pattern

Run the graph as an API endpoint using FastAPI:

python
from fastapi import FastAPI
app = FastAPI()
 
@app.post("/infra-change")
async def handle_change(request: dict):
    thread_id = f"change-{uuid4()}"
    config = {"configurable": {"thread_id": thread_id}}
    result = await asyncio.to_thread(
        graph.invoke, {"request": request["description"]}, config
    )
    return {"thread_id": thread_id, "result": result}

This architecture handles the messy reality of infrastructure automation: not every step succeeds, not every change is low-risk, and some decisions genuinely need a human. LangGraph makes those control flows explicit and inspectable rather than buried in ad-hoc if/else chains.

🔧

Today I Fixed

Short real fixes from production — posted daily

Browse fixes
Newsletter

Stay ahead of the curve

Get the latest DevOps, Kubernetes, AWS, and AI/ML guides delivered straight to your inbox. No spam — just practical engineering content.

Related Articles

Comments