Deploying LLM Agents in Production — Tools, Memory, and Orchestration
Production LLM agents need more than a prompt loop. Learn how to architect tool use, persistent memory, multi-agent orchestration, and Kubernetes deployment for real agentic systems.
An LLM agent that works in a demo fails in production because demos don't have retries, state management, tool timeouts, or concurrency. Building production-grade agents means treating them like distributed systems.
What Makes an Agent "Production-Ready"
| Feature | Demo | Production |
|---|---|---|
| Tool calls | Happy path only | Retry, timeout, error handling |
| Memory | In-memory conversation | Persistent, retrievable, scoped |
| Concurrency | Single thread | Multiple users, rate limiting |
| Observability | print() | Traces, spans, token usage |
| Reliability | Crash = restart | Circuit breakers, fallbacks |
| State | Lost on restart | Persisted to database |
Agent Architecture
User Request
↓
Agent Orchestrator
↓
┌───────────────────────────────┐
│ Planning Layer (Claude) │
│ - Decide next action │
│ - Select tool │
└───────────────────────────────┘
↓
┌───────────────────────────────┐
│ Tool Executor │
│ - kubectl, terraform, API │
│ - Timeout + retry wrapper │
└───────────────────────────────┘
↓
┌───────────────────────────────┐
│ Memory Manager │
│ - Short-term: conversation │
│ - Long-term: vector store │
└───────────────────────────────┘
↓
Response to User
Tool Use with Anthropic SDK
# tools.py
import subprocess
import json
from typing import Any
# Tool definitions for Claude
DEVOPS_TOOLS = [
{
"name": "kubectl_get",
"description": "Run kubectl get command to inspect Kubernetes resources",
"input_schema": {
"type": "object",
"properties": {
"resource": {"type": "string", "description": "e.g. pods, deployments, services"},
"namespace": {"type": "string", "description": "Kubernetes namespace"},
"name": {"type": "string", "description": "Optional specific resource name"},
},
"required": ["resource"],
},
},
{
"name": "get_pod_logs",
"description": "Fetch logs from a Kubernetes pod",
"input_schema": {
"type": "object",
"properties": {
"pod_name": {"type": "string"},
"namespace": {"type": "string"},
"tail_lines": {"type": "integer", "default": 100},
"container": {"type": "string"},
},
"required": ["pod_name", "namespace"],
},
},
]
def execute_tool(tool_name: str, tool_input: dict) -> str:
"""Execute a tool and return result as string"""
try:
if tool_name == "kubectl_get":
cmd = ["kubectl", "get", tool_input["resource"], "-o", "json"]
if tool_input.get("namespace"):
cmd.extend(["-n", tool_input["namespace"]])
if tool_input.get("name"):
cmd.append(tool_input["name"])
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
return result.stdout if result.returncode == 0 else f"Error: {result.stderr}"
elif tool_name == "get_pod_logs":
cmd = [
"kubectl", "logs", tool_input["pod_name"],
"-n", tool_input["namespace"],
"--tail", str(tool_input.get("tail_lines", 100)),
]
if tool_input.get("container"):
cmd.extend(["-c", tool_input["container"]])
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
return result.stdout if result.returncode == 0 else f"Error: {result.stderr}"
return f"Unknown tool: {tool_name}"
except subprocess.TimeoutExpired:
return f"Tool {tool_name} timed out after 30 seconds"
except Exception as e:
return f"Tool execution error: {str(e)}"Agentic Loop with Tool Use
# agent.py
import anthropic
from tools import DEVOPS_TOOLS, execute_tool
client = anthropic.Anthropic()
SYSTEM_PROMPT = """You are a DevOps troubleshooting agent with access to kubectl.
When investigating issues:
1. Start by checking pod status
2. Get logs if pods are unhealthy
3. Check recent events
4. Provide a clear diagnosis and fix
Always explain what you're checking and why."""
def run_agent(user_request: str, max_iterations: int = 10) -> str:
messages = [{"role": "user", "content": user_request}]
iteration = 0
while iteration < max_iterations:
response = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=4096,
system=SYSTEM_PROMPT,
tools=DEVOPS_TOOLS,
messages=messages,
)
# Add assistant response to history
messages.append({"role": "assistant", "content": response.content})
# Check if done
if response.stop_reason == "end_turn":
# Extract text response
for block in response.content:
if hasattr(block, "text"):
return block.text
return "Agent completed without text response"
# Process tool calls
if response.stop_reason == "tool_use":
tool_results = []
for block in response.content:
if block.type == "tool_use":
print(f"[Agent] Calling tool: {block.name}({block.input})")
result = execute_tool(block.name, block.input)
tool_results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": result,
})
messages.append({"role": "user", "content": tool_results})
iteration += 1
return "Agent reached max iterations without completing"Persistent Memory with Redis
# memory.py
import redis
import json
from typing import Optional
import anthropic
r = redis.Redis(host="redis.default.svc", port=6379, decode_responses=True)
class AgentMemory:
def __init__(self, session_id: str, max_messages: int = 20):
self.session_id = session_id
self.key = f"agent:session:{session_id}"
self.max_messages = max_messages
def add(self, role: str, content):
messages = self.get_all()
messages.append({"role": role, "content": content})
# Keep only recent messages
if len(messages) > self.max_messages:
messages = messages[-self.max_messages:]
r.setex(self.key, 3600, json.dumps(messages)) # 1hr TTL
def get_all(self) -> list:
data = r.get(self.key)
return json.loads(data) if data else []
def clear(self):
r.delete(self.key)
def run_agent_with_memory(user_request: str, session_id: str) -> str:
memory = AgentMemory(session_id)
memory.add("user", user_request)
response = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=4096,
system=SYSTEM_PROMPT,
tools=DEVOPS_TOOLS,
messages=memory.get_all(),
)
# ... tool loop ...
memory.add("assistant", response.content)
return extract_text(response)Multi-Agent Orchestration
# orchestrator.py — route tasks to specialized agents
AGENT_REGISTRY = {
"k8s_troubleshooter": {
"description": "Diagnoses Kubernetes issues",
"tools": ["kubectl_get", "get_pod_logs", "get_events"],
},
"cost_analyzer": {
"description": "Analyzes cloud spend",
"tools": ["get_aws_costs", "list_resources"],
},
"security_auditor": {
"description": "Reviews security posture",
"tools": ["check_rbac", "scan_images", "check_network_policies"],
},
}
ROUTER_PROMPT = """You are a DevOps task router. Given a user request,
determine which specialized agent should handle it.
Return JSON: {"agent": "<agent_name>", "reason": "<why>"}
Available agents: k8s_troubleshooter, cost_analyzer, security_auditor"""
def route_request(user_request: str) -> str:
response = client.messages.create(
model="claude-haiku-4-5-20251001", # fast model for routing
max_tokens=256,
system=ROUTER_PROMPT,
messages=[{"role": "user", "content": user_request}],
)
import json, re
text = response.content[0].text
data = json.loads(re.search(r"\{.*\}", text, re.DOTALL).group())
return data["agent"]Kubernetes Deployment
apiVersion: apps/v1
kind: Deployment
metadata:
name: devops-agent
namespace: ai-platform
spec:
replicas: 3
selector:
matchLabels:
app: devops-agent
template:
spec:
serviceAccountName: devops-agent-sa # RBAC for kubectl access
containers:
- name: agent
image: your-registry/devops-agent:latest
env:
- name: ANTHROPIC_API_KEY
valueFrom:
secretKeyRef:
name: anthropic-secret
key: api-key
- name: REDIS_URL
value: "redis://redis.default.svc:6379"
resources:
requests:
cpu: 200m
memory: 256Mi
limits:
cpu: 1000m
memory: 1Gi
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: devops-agent-sa
namespace: ai-platform
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: devops-agent-reader
rules:
- apiGroups: ["", "apps", "batch"]
resources: ["pods", "deployments", "services", "events", "nodes"]
verbs: ["get", "list", "watch"] # read-only!
- apiGroups: [""]
resources: ["pods/log"]
verbs: ["get"]Observability
# Add OpenTelemetry tracing to agent calls
from opentelemetry import trace
tracer = trace.get_tracer(__name__)
def traced_agent(user_request: str, session_id: str) -> str:
with tracer.start_as_current_span("agent_run") as span:
span.set_attribute("session.id", session_id)
span.set_attribute("request.preview", user_request[:100])
result = run_agent_with_memory(user_request, session_id)
span.set_attribute("result.length", len(result))
return resultKey metrics to track: tool call latency per tool, iterations per agent run, token usage per session, tool error rate. Production agents need the same observability treatment as any microservice.
Today I Fixed
Short real fixes from production — posted daily
Stay ahead of the curve
Get the latest DevOps, Kubernetes, AWS, and AI/ML guides delivered straight to your inbox. No spam — just practical engineering content.
Related Articles
AI-Powered Kubernetes Anomaly Detection: Beyond Static Thresholds
Static alerts miss 40% of real incidents. Learn how AI and ML-based anomaly detection — using tools like Prometheus + ML, Dynatrace, and custom LLM runbooks — catches what thresholds can't.
Build an AI Kubernetes Runbook Generator with LLMs (2026)
Manual runbooks go stale. Build a system that watches your Kubernetes cluster, detects incidents, and generates step-by-step runbooks automatically using LLMs. Full implementation with Python, kubectl, and Ollama.
Build an AI-Powered SLO Breach Predictor with Claude and Prometheus
Build an SLO breach predictor that reads error budget burn rate from Prometheus, uses Claude to analyze patterns, and sends Slack alerts before SLOs breach — not after.