All Articles

Build an AI DevOps Assistant with LangChain + Real Tools (2026)

Build a DevOps AI agent that can actually run kubectl, check AWS costs, read logs, and create GitHub issues — using LangChain tool calling and Claude API.

DevOpsBoysApr 16, 20266 min read
Share:Tweet

Chatbots answer questions. Agents take actions. Here's how to build a DevOps AI agent that can actually run kubectl commands, query AWS CloudWatch, check pod logs, and open GitHub issues — using LangChain's tool calling with Claude API.

What You'll Build

You: "Check why the payments service is down and create a GitHub issue"
                    ↓
Agent thinks: I need to check pod status, then check logs, then create issue
                    ↓
Tool 1: kubectl get pods -n payments
Tool 2: kubectl logs payments-pod-abc --tail=50
Tool 3: GitHub API → create issue with findings
                    ↓
Agent: "Found it — OOMKilled due to memory leak. Created issue #247."

Prerequisites

bash
pip install langchain langchain-anthropic anthropic kubernetes boto3 PyGithub python-dotenv

Step 1: Define the Tools

Tools are functions the AI can call. Each tool has a name, description (tells the AI when to use it), and implementation.

python
# tools.py
import subprocess
import boto3
import json
from github import Github
from langchain.tools import tool
from kubernetes import client, config
import os
 
# Load K8s config
try:
    config.load_incluster_config()    # running inside cluster
except:
    config.load_kube_config()         # local development
 
v1 = client.CoreV1Api()
apps_v1 = client.AppsV1Api()
 
@tool
def kubectl_get_pods(namespace: str = "default") -> str:
    """Get all pods in a namespace and their status. Use this to check if pods are running, pending, or crashing."""
    try:
        pods = v1.list_namespaced_pod(namespace=namespace)
        result = []
        for pod in pods.items:
            status = pod.status.phase
            restarts = sum(
                cs.restart_count 
                for cs in (pod.status.container_statuses or [])
            )
            result.append(f"{pod.metadata.name}: {status} (restarts: {restarts})")
        return "\n".join(result) if result else "No pods found"
    except Exception as e:
        return f"Error: {str(e)}"
 
@tool
def kubectl_get_logs(pod_name: str, namespace: str = "default", lines: int = 50) -> str:
    """Get the last N lines of logs from a specific pod. Use this to investigate errors or crashes."""
    try:
        logs = v1.read_namespaced_pod_log(
            name=pod_name,
            namespace=namespace,
            tail_lines=lines
        )
        return logs or "No logs found"
    except Exception as e:
        return f"Error getting logs: {str(e)}"
 
@tool
def kubectl_describe_pod(pod_name: str, namespace: str = "default") -> str:
    """Get detailed info about a pod including events, resource limits, and error conditions. Use when a pod is failing."""
    try:
        pod = v1.read_namespaced_pod(name=pod_name, namespace=namespace)
        events = v1.list_namespaced_event(
            namespace=namespace,
            field_selector=f"involvedObject.name={pod_name}"
        )
        
        info = [
            f"Status: {pod.status.phase}",
            f"Node: {pod.spec.node_name}",
        ]
        
        for container in (pod.status.container_statuses or []):
            info.append(f"Container {container.name}: restarts={container.restart_count}")
            if container.last_state.terminated:
                t = container.last_state.terminated
                info.append(f"  Last exit: reason={t.reason}, exitCode={t.exit_code}")
        
        for event in events.items:
            if event.type == "Warning":
                info.append(f"WARNING: {event.reason} - {event.message}")
        
        return "\n".join(info)
    except Exception as e:
        return f"Error: {str(e)}"
 
@tool
def get_aws_cloudwatch_errors(log_group: str, hours: int = 1) -> str:
    """Get ERROR-level logs from AWS CloudWatch for a specific log group in the last N hours."""
    try:
        logs_client = boto3.client("logs")
        import time
        end_time = int(time.time() * 1000)
        start_time = end_time - (hours * 3600 * 1000)
        
        response = logs_client.filter_log_events(
            logGroupName=log_group,
            startTime=start_time,
            endTime=end_time,
            filterPattern="ERROR",
            limit=20
        )
        
        events = response.get("events", [])
        if not events:
            return f"No errors in {log_group} in the last {hours} hour(s)"
        
        return "\n".join([e["message"] for e in events])
    except Exception as e:
        return f"Error querying CloudWatch: {str(e)}"
 
@tool
def get_deployment_status(deployment_name: str, namespace: str = "default") -> str:
    """Check if a Kubernetes deployment has the expected number of ready replicas."""
    try:
        dep = apps_v1.read_namespaced_deployment(
            name=deployment_name, namespace=namespace
        )
        spec_replicas = dep.spec.replicas
        ready_replicas = dep.status.ready_replicas or 0
        available = dep.status.available_replicas or 0
        
        return (
            f"Deployment: {deployment_name}\n"
            f"Desired: {spec_replicas}\n"
            f"Ready: {ready_replicas}\n"
            f"Available: {available}\n"
            f"Status: {'HEALTHY' if ready_replicas == spec_replicas else 'DEGRADED'}"
        )
    except Exception as e:
        return f"Error: {str(e)}"
 
@tool
def create_github_issue(title: str, body: str, labels: list = None) -> str:
    """Create a GitHub issue to track an incident or bug. Use this after investigating an issue to document findings."""
    try:
        gh = Github(os.environ["GITHUB_TOKEN"])
        repo = gh.get_repo(os.environ["GITHUB_REPO"])  # "owner/repo"
        
        issue = repo.create_issue(
            title=title,
            body=body,
            labels=labels or ["bug", "devops"]
        )
        return f"Created issue #{issue.number}: {issue.html_url}"
    except Exception as e:
        return f"Error creating issue: {str(e)}"
 
@tool
def restart_deployment(deployment_name: str, namespace: str = "default") -> str:
    """Restart a Kubernetes deployment by triggering a rolling restart. Use only when approved."""
    try:
        from datetime import datetime, timezone
        import json
        
        body = {
            "spec": {
                "template": {
                    "metadata": {
                        "annotations": {
                            "kubectl.kubernetes.io/restartedAt": 
                                datetime.now(timezone.utc).isoformat()
                        }
                    }
                }
            }
        }
        apps_v1.patch_namespaced_deployment(
            name=deployment_name,
            namespace=namespace,
            body=body
        )
        return f"Restart triggered for deployment/{deployment_name} in {namespace}"
    except Exception as e:
        return f"Error: {str(e)}"

Step 2: Build the Agent

python
# agent.py
from langchain_anthropic import ChatAnthropic
from langchain.agents import AgentExecutor, create_tool_calling_agent
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.messages import HumanMessage, AIMessage
from tools import (
    kubectl_get_pods, kubectl_get_logs, kubectl_describe_pod,
    get_aws_cloudwatch_errors, get_deployment_status,
    create_github_issue, restart_deployment
)
import os
from dotenv import load_dotenv
 
load_dotenv()
 
tools = [
    kubectl_get_pods,
    kubectl_get_logs,
    kubectl_describe_pod,
    get_aws_cloudwatch_errors,
    get_deployment_status,
    create_github_issue,
    restart_deployment,
]
 
llm = ChatAnthropic(
    model="claude-sonnet-4-6",
    temperature=0,    # deterministic for ops work
    max_tokens=4096
)
 
prompt = ChatPromptTemplate.from_messages([
    ("system", """You are a senior DevOps engineer AI assistant with access to 
Kubernetes and AWS tools. When investigating issues:
1. First check deployment status and pod states
2. Check logs of failing pods  
3. Look for patterns (OOMKilled, CrashLoop, connection errors)
4. Suggest root cause and fix
5. Create a GitHub issue with findings only when explicitly asked
 
Be concise. Focus on actionable findings. Never restart deployments 
without explicit user approval."""),
    MessagesPlaceholder(variable_name="chat_history"),
    ("human", "{input}"),
    MessagesPlaceholder(variable_name="agent_scratchpad"),
])
 
agent = create_tool_calling_agent(llm, tools, prompt)
agent_executor = AgentExecutor(
    agent=agent,
    tools=tools,
    verbose=True,          # shows tool calls
    max_iterations=10,     # prevent infinite loops
    handle_parsing_errors=True
)
 
def chat(message: str, history: list = None):
    history = history or []
    result = agent_executor.invoke({
        "input": message,
        "chat_history": history
    })
    return result["output"]

Step 3: Run the Agent

python
# main.py
from agent import chat
 
chat_history = []
 
print("DevOps AI Agent ready. Type your request.")
print("Examples:")
print("  - Check pods in the payments namespace")
print("  - Why is the auth service failing?")
print("  - Create a GitHub issue for the database connection errors\n")
 
while True:
    user_input = input("You: ").strip()
    if not user_input:
        continue
    if user_input.lower() in ["exit", "quit"]:
        break
    
    response = chat(user_input, chat_history)
    print(f"\nAgent: {response}\n")
    
    # Keep conversation history
    from langchain_core.messages import HumanMessage, AIMessage
    chat_history.extend([
        HumanMessage(content=user_input),
        AIMessage(content=response)
    ])

Example Session

You: Why is the payments service degraded?

[Agent calls: get_deployment_status("payments", "payments")]
→ Desired: 3, Ready: 1, Status: DEGRADED

[Agent calls: kubectl_get_pods("payments")]
→ payments-abc12: Running
→ payments-def34: CrashLoopBackOff (restarts: 8)
→ payments-ghi56: CrashLoopBackOff (restarts: 7)

[Agent calls: kubectl_describe_pod("payments-def34", "payments")]
→ WARNING: OOMKilled - Container exceeded memory limit of 512Mi

[Agent calls: kubectl_get_logs("payments-def34", "payments")]
→ ... loading 1.2GB dataset into memory ...

Agent: The payments service is degraded (1/3 pods ready). 2 pods 
are CrashLoopBackOff due to OOMKilled — they're loading a 1.2GB 
dataset but only have a 512Mi memory limit. 

Fix: Increase the memory limit to at least 2Gi in the deployment, 
or optimize the dataset loading to use streaming/batching instead 
of loading everything into memory.

You: Create a GitHub issue for this

[Agent calls: create_github_issue(...)]

Agent: Created issue #312: https://github.com/myorg/myapp/issues/312

Deploy as a Slack Bot

python
# slack_bot.py — add to Step 3
from slack_bolt import App
 
app = App(token=os.environ["SLACK_BOT_TOKEN"])
 
@app.message("@devopsbot")
def handle_mention(message, say):
    user_text = message["text"].replace("@devopsbot", "").strip()
    response = chat(user_text)
    say(f"<@{message['user']}> {response}")
 
if __name__ == "__main__":
    app.start(port=3000)

Security Notes

  • Run with read-only Kubernetes RBAC — only get/list/watch permissions
  • restart_deployment tool should require human-in-the-loop confirmation
  • Log all agent actions to audit trail
  • Never give the agent kubectl delete or kubectl exec permissions
yaml
# rbac.yaml — read-only ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: devops-agent-reader
rules:
- apiGroups: ["", "apps", "batch"]
  resources: ["pods", "deployments", "jobs", "events"]
  verbs: ["get", "list", "watch"]
- apiGroups: [""]
  resources: ["pods/log"]
  verbs: ["get"]

Resources

Newsletter

Stay ahead of the curve

Get the latest DevOps, Kubernetes, AWS, and AI/ML guides delivered straight to your inbox. No spam — just practical engineering content.

Related Articles

Comments