Build an AI DevOps Assistant with LangChain + Real Tools (2026)
Build a DevOps AI agent that can actually run kubectl, check AWS costs, read logs, and create GitHub issues ā using LangChain tool calling and Claude API.
Chatbots answer questions. Agents take actions. Here's how to build a DevOps AI agent that can actually run kubectl commands, query AWS CloudWatch, check pod logs, and open GitHub issues ā using LangChain's tool calling with Claude API.
What You'll Build
You: "Check why the payments service is down and create a GitHub issue"
ā
Agent thinks: I need to check pod status, then check logs, then create issue
ā
Tool 1: kubectl get pods -n payments
Tool 2: kubectl logs payments-pod-abc --tail=50
Tool 3: GitHub API ā create issue with findings
ā
Agent: "Found it ā OOMKilled due to memory leak. Created issue #247."
Prerequisites
pip install langchain langchain-anthropic anthropic kubernetes boto3 PyGithub python-dotenvStep 1: Define the Tools
Tools are functions the AI can call. Each tool has a name, description (tells the AI when to use it), and implementation.
# tools.py
import subprocess
import boto3
import json
from github import Github
from langchain.tools import tool
from kubernetes import client, config
import os
# Load K8s config
try:
config.load_incluster_config() # running inside cluster
except:
config.load_kube_config() # local development
v1 = client.CoreV1Api()
apps_v1 = client.AppsV1Api()
@tool
def kubectl_get_pods(namespace: str = "default") -> str:
"""Get all pods in a namespace and their status. Use this to check if pods are running, pending, or crashing."""
try:
pods = v1.list_namespaced_pod(namespace=namespace)
result = []
for pod in pods.items:
status = pod.status.phase
restarts = sum(
cs.restart_count
for cs in (pod.status.container_statuses or [])
)
result.append(f"{pod.metadata.name}: {status} (restarts: {restarts})")
return "\n".join(result) if result else "No pods found"
except Exception as e:
return f"Error: {str(e)}"
@tool
def kubectl_get_logs(pod_name: str, namespace: str = "default", lines: int = 50) -> str:
"""Get the last N lines of logs from a specific pod. Use this to investigate errors or crashes."""
try:
logs = v1.read_namespaced_pod_log(
name=pod_name,
namespace=namespace,
tail_lines=lines
)
return logs or "No logs found"
except Exception as e:
return f"Error getting logs: {str(e)}"
@tool
def kubectl_describe_pod(pod_name: str, namespace: str = "default") -> str:
"""Get detailed info about a pod including events, resource limits, and error conditions. Use when a pod is failing."""
try:
pod = v1.read_namespaced_pod(name=pod_name, namespace=namespace)
events = v1.list_namespaced_event(
namespace=namespace,
field_selector=f"involvedObject.name={pod_name}"
)
info = [
f"Status: {pod.status.phase}",
f"Node: {pod.spec.node_name}",
]
for container in (pod.status.container_statuses or []):
info.append(f"Container {container.name}: restarts={container.restart_count}")
if container.last_state.terminated:
t = container.last_state.terminated
info.append(f" Last exit: reason={t.reason}, exitCode={t.exit_code}")
for event in events.items:
if event.type == "Warning":
info.append(f"WARNING: {event.reason} - {event.message}")
return "\n".join(info)
except Exception as e:
return f"Error: {str(e)}"
@tool
def get_aws_cloudwatch_errors(log_group: str, hours: int = 1) -> str:
"""Get ERROR-level logs from AWS CloudWatch for a specific log group in the last N hours."""
try:
logs_client = boto3.client("logs")
import time
end_time = int(time.time() * 1000)
start_time = end_time - (hours * 3600 * 1000)
response = logs_client.filter_log_events(
logGroupName=log_group,
startTime=start_time,
endTime=end_time,
filterPattern="ERROR",
limit=20
)
events = response.get("events", [])
if not events:
return f"No errors in {log_group} in the last {hours} hour(s)"
return "\n".join([e["message"] for e in events])
except Exception as e:
return f"Error querying CloudWatch: {str(e)}"
@tool
def get_deployment_status(deployment_name: str, namespace: str = "default") -> str:
"""Check if a Kubernetes deployment has the expected number of ready replicas."""
try:
dep = apps_v1.read_namespaced_deployment(
name=deployment_name, namespace=namespace
)
spec_replicas = dep.spec.replicas
ready_replicas = dep.status.ready_replicas or 0
available = dep.status.available_replicas or 0
return (
f"Deployment: {deployment_name}\n"
f"Desired: {spec_replicas}\n"
f"Ready: {ready_replicas}\n"
f"Available: {available}\n"
f"Status: {'HEALTHY' if ready_replicas == spec_replicas else 'DEGRADED'}"
)
except Exception as e:
return f"Error: {str(e)}"
@tool
def create_github_issue(title: str, body: str, labels: list = None) -> str:
"""Create a GitHub issue to track an incident or bug. Use this after investigating an issue to document findings."""
try:
gh = Github(os.environ["GITHUB_TOKEN"])
repo = gh.get_repo(os.environ["GITHUB_REPO"]) # "owner/repo"
issue = repo.create_issue(
title=title,
body=body,
labels=labels or ["bug", "devops"]
)
return f"Created issue #{issue.number}: {issue.html_url}"
except Exception as e:
return f"Error creating issue: {str(e)}"
@tool
def restart_deployment(deployment_name: str, namespace: str = "default") -> str:
"""Restart a Kubernetes deployment by triggering a rolling restart. Use only when approved."""
try:
from datetime import datetime, timezone
import json
body = {
"spec": {
"template": {
"metadata": {
"annotations": {
"kubectl.kubernetes.io/restartedAt":
datetime.now(timezone.utc).isoformat()
}
}
}
}
}
apps_v1.patch_namespaced_deployment(
name=deployment_name,
namespace=namespace,
body=body
)
return f"Restart triggered for deployment/{deployment_name} in {namespace}"
except Exception as e:
return f"Error: {str(e)}"Step 2: Build the Agent
# agent.py
from langchain_anthropic import ChatAnthropic
from langchain.agents import AgentExecutor, create_tool_calling_agent
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.messages import HumanMessage, AIMessage
from tools import (
kubectl_get_pods, kubectl_get_logs, kubectl_describe_pod,
get_aws_cloudwatch_errors, get_deployment_status,
create_github_issue, restart_deployment
)
import os
from dotenv import load_dotenv
load_dotenv()
tools = [
kubectl_get_pods,
kubectl_get_logs,
kubectl_describe_pod,
get_aws_cloudwatch_errors,
get_deployment_status,
create_github_issue,
restart_deployment,
]
llm = ChatAnthropic(
model="claude-sonnet-4-6",
temperature=0, # deterministic for ops work
max_tokens=4096
)
prompt = ChatPromptTemplate.from_messages([
("system", """You are a senior DevOps engineer AI assistant with access to
Kubernetes and AWS tools. When investigating issues:
1. First check deployment status and pod states
2. Check logs of failing pods
3. Look for patterns (OOMKilled, CrashLoop, connection errors)
4. Suggest root cause and fix
5. Create a GitHub issue with findings only when explicitly asked
Be concise. Focus on actionable findings. Never restart deployments
without explicit user approval."""),
MessagesPlaceholder(variable_name="chat_history"),
("human", "{input}"),
MessagesPlaceholder(variable_name="agent_scratchpad"),
])
agent = create_tool_calling_agent(llm, tools, prompt)
agent_executor = AgentExecutor(
agent=agent,
tools=tools,
verbose=True, # shows tool calls
max_iterations=10, # prevent infinite loops
handle_parsing_errors=True
)
def chat(message: str, history: list = None):
history = history or []
result = agent_executor.invoke({
"input": message,
"chat_history": history
})
return result["output"]Step 3: Run the Agent
# main.py
from agent import chat
chat_history = []
print("DevOps AI Agent ready. Type your request.")
print("Examples:")
print(" - Check pods in the payments namespace")
print(" - Why is the auth service failing?")
print(" - Create a GitHub issue for the database connection errors\n")
while True:
user_input = input("You: ").strip()
if not user_input:
continue
if user_input.lower() in ["exit", "quit"]:
break
response = chat(user_input, chat_history)
print(f"\nAgent: {response}\n")
# Keep conversation history
from langchain_core.messages import HumanMessage, AIMessage
chat_history.extend([
HumanMessage(content=user_input),
AIMessage(content=response)
])Example Session
You: Why is the payments service degraded?
[Agent calls: get_deployment_status("payments", "payments")]
ā Desired: 3, Ready: 1, Status: DEGRADED
[Agent calls: kubectl_get_pods("payments")]
ā payments-abc12: Running
ā payments-def34: CrashLoopBackOff (restarts: 8)
ā payments-ghi56: CrashLoopBackOff (restarts: 7)
[Agent calls: kubectl_describe_pod("payments-def34", "payments")]
ā WARNING: OOMKilled - Container exceeded memory limit of 512Mi
[Agent calls: kubectl_get_logs("payments-def34", "payments")]
ā ... loading 1.2GB dataset into memory ...
Agent: The payments service is degraded (1/3 pods ready). 2 pods
are CrashLoopBackOff due to OOMKilled ā they're loading a 1.2GB
dataset but only have a 512Mi memory limit.
Fix: Increase the memory limit to at least 2Gi in the deployment,
or optimize the dataset loading to use streaming/batching instead
of loading everything into memory.
You: Create a GitHub issue for this
[Agent calls: create_github_issue(...)]
Agent: Created issue #312: https://github.com/myorg/myapp/issues/312
Deploy as a Slack Bot
# slack_bot.py ā add to Step 3
from slack_bolt import App
app = App(token=os.environ["SLACK_BOT_TOKEN"])
@app.message("@devopsbot")
def handle_mention(message, say):
user_text = message["text"].replace("@devopsbot", "").strip()
response = chat(user_text)
say(f"<@{message['user']}> {response}")
if __name__ == "__main__":
app.start(port=3000)Security Notes
- Run with read-only Kubernetes RBAC ā only get/list/watch permissions
restart_deploymenttool should require human-in-the-loop confirmation- Log all agent actions to audit trail
- Never give the agent
kubectl deleteorkubectl execpermissions
# rbac.yaml ā read-only ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: devops-agent-reader
rules:
- apiGroups: ["", "apps", "batch"]
resources: ["pods", "deployments", "jobs", "events"]
verbs: ["get", "list", "watch"]
- apiGroups: [""]
resources: ["pods/log"]
verbs: ["get"]Resources
- LangChain Docs ā tool calling reference
- Claude API Docs ā Anthropic API reference
- KodeKloud MLOps Path ā ML + DevOps integration
Today I Fixed
Short real fixes from production ā posted daily
Stay ahead of the curve
Get the latest DevOps, Kubernetes, AWS, and AI/ML guides delivered straight to your inbox. No spam ā just practical engineering content.
Related Articles
AWS CodePipeline vs GitHub Actions ā Which CI/CD Tool to Use? (2026)
AWS CodePipeline and GitHub Actions both automate deployments. But they have very different strengths. Here's an honest comparison with real examples.
Build an AI Kubernetes Cost Optimizer with Python and Claude API
Use AI to automatically analyze your Kubernetes resource usage, detect waste, and generate optimization recommendations. Full Python project with Claude API.
Build an AI-Powered SLO Budget Tracker with Python + Claude (2026)
Track your error budget automatically and get AI-generated burn rate alerts and incident summaries. Build a real SLO monitoring tool with Python, Prometheus, and Claude API.