🎉 DevOps Interview Prep Bundle is live — 1000+ Q&A across 20 topicsGet it →
All Articles

Build an AI Deployment Health Checker with Claude API and Kubernetes

Step-by-step tutorial to build an AI-powered deployment health checker using Claude API and the Kubernetes Python client. Automatically diagnose failing pods, check resource limits, and get plain-English explanations of what's wrong.

Shubham8 min read
Share:Tweet

Every Kubernetes deployment failure tells a story. The problem is that story is written in a combination of pod events, container logs, resource metrics, and status conditions — spread across multiple kubectl commands that you have to run, interpret, and connect yourself.

What if you could just ask "why is my deployment unhealthy?" and get a clear answer?

That's exactly what we're building here: a tool that pulls deployment state from Kubernetes, feeds it to Claude API, and returns a plain-English diagnosis with specific fix recommendations.

What We're Building

A Python script that:

  1. Connects to your Kubernetes cluster
  2. Pulls deployment status, pod events, and container states
  3. Sends all of that context to Claude API
  4. Returns a clear diagnosis: what's wrong, why it's wrong, and how to fix it

This is genuinely useful in CI/CD pipelines — run it after a deploy to automatically catch issues before they become incidents.

Prerequisites

bash
pip install anthropic kubernetes python-dotenv

You'll also need:

  • A running Kubernetes cluster (local or remote)
  • An Anthropic API key from console.anthropic.com
  • kubectl configured with access to the cluster

Project Structure

k8s-health-checker/
├── checker.py
├── k8s_client.py
├── .env
└── requirements.txt

Step 1: Kubernetes Data Collection

Create k8s_client.py to pull all the context we need:

python
from kubernetes import client, config
from kubernetes.client.rest import ApiException
from typing import Optional
import json
 
 
def load_kube_config(in_cluster: bool = False):
    """Load kubeconfig — in-cluster for prod, local for dev."""
    if in_cluster:
        config.load_incluster_config()
    else:
        config.load_kube_config()
 
 
def get_deployment_health(namespace: str, deployment_name: str) -> dict:
    """
    Collect everything needed to diagnose a deployment:
    - Deployment spec and status
    - Pod states and conditions
    - Recent pod events
    - Container resource usage
    """
    apps_v1 = client.AppsV1Api()
    core_v1 = client.CoreV1Api()
 
    result = {
        "deployment": {},
        "pods": [],
        "events": [],
        "errors": []
    }
 
    # Get deployment details
    try:
        deployment = apps_v1.read_namespaced_deployment(
            name=deployment_name,
            namespace=namespace
        )
        status = deployment.status
        spec = deployment.spec
 
        result["deployment"] = {
            "name": deployment_name,
            "namespace": namespace,
            "desired_replicas": spec.replicas,
            "ready_replicas": status.ready_replicas or 0,
            "available_replicas": status.available_replicas or 0,
            "updated_replicas": status.updated_replicas or 0,
            "conditions": [
                {
                    "type": c.type,
                    "status": c.status,
                    "reason": c.reason,
                    "message": c.message
                }
                for c in (status.conditions or [])
            ],
            "strategy": spec.strategy.type,
            "image": spec.template.spec.containers[0].image if spec.template.spec.containers else "unknown",
            "resources": _extract_resources(spec.template.spec.containers)
        }
    except ApiException as e:
        result["errors"].append(f"Could not fetch deployment: {e.reason}")
        return result
 
    # Get pods belonging to this deployment
    try:
        label_selector = _get_label_selector(deployment)
        pods = core_v1.list_namespaced_pod(
            namespace=namespace,
            label_selector=label_selector
        )
 
        for pod in pods.items:
            pod_info = {
                "name": pod.metadata.name,
                "phase": pod.status.phase,
                "conditions": [],
                "containers": [],
                "node": pod.spec.node_name
            }
 
            # Pod conditions
            for condition in (pod.status.conditions or []):
                pod_info["conditions"].append({
                    "type": condition.type,
                    "status": condition.status,
                    "reason": condition.reason,
                    "message": condition.message
                })
 
            # Container states
            for cs in (pod.status.container_statuses or []):
                container_info = {
                    "name": cs.name,
                    "ready": cs.ready,
                    "restart_count": cs.restart_count,
                    "image": cs.image,
                    "state": {}
                }
 
                if cs.state.running:
                    container_info["state"] = {"type": "running"}
                elif cs.state.waiting:
                    container_info["state"] = {
                        "type": "waiting",
                        "reason": cs.state.waiting.reason,
                        "message": cs.state.waiting.message
                    }
                elif cs.state.terminated:
                    container_info["state"] = {
                        "type": "terminated",
                        "reason": cs.state.terminated.reason,
                        "exit_code": cs.state.terminated.exit_code,
                        "message": cs.state.terminated.message
                    }
 
                # Last terminated state (useful for crash loops)
                if cs.last_state.terminated:
                    lt = cs.last_state.terminated
                    container_info["last_terminated"] = {
                        "reason": lt.reason,
                        "exit_code": lt.exit_code,
                        "message": lt.message
                    }
 
                pod_info["containers"].append(container_info)
 
            result["pods"].append(pod_info)
 
    except ApiException as e:
        result["errors"].append(f"Could not fetch pods: {e.reason}")
 
    # Get recent events
    try:
        events = core_v1.list_namespaced_event(
            namespace=namespace,
            field_selector=f"involvedObject.name={deployment_name}"
        )
 
        # Also get pod events
        for pod in result["pods"]:
            pod_events = core_v1.list_namespaced_event(
                namespace=namespace,
                field_selector=f"involvedObject.name={pod['name']}"
            )
            events.items.extend(pod_events.items)
 
        # Sort by time, take most recent 15
        sorted_events = sorted(
            events.items,
            key=lambda e: e.last_timestamp or e.event_time or "",
            reverse=True
        )[:15]
 
        result["events"] = [
            {
                "type": e.type,
                "reason": e.reason,
                "message": e.message,
                "count": e.count,
                "source": e.source.component if e.source else "unknown"
            }
            for e in sorted_events
        ]
 
    except ApiException as e:
        result["errors"].append(f"Could not fetch events: {e.reason}")
 
    return result
 
 
def _get_label_selector(deployment) -> str:
    """Extract label selector from deployment spec."""
    selector = deployment.spec.selector.match_labels or {}
    return ",".join(f"{k}={v}" for k, v in selector.items())
 
 
def _extract_resources(containers) -> list:
    """Extract resource requests and limits from containers."""
    resources = []
    for container in (containers or []):
        if container.resources:
            r = container.resources
            resources.append({
                "container": container.name,
                "requests": {
                    "cpu": r.requests.get("cpu") if r.requests else None,
                    "memory": r.requests.get("memory") if r.requests else None
                },
                "limits": {
                    "cpu": r.limits.get("cpu") if r.limits else None,
                    "memory": r.limits.get("memory") if r.limits else None
                }
            })
    return resources

Step 2: The AI Diagnosis Engine

Now create checker.py — this is where Claude API does the actual diagnosis:

python
import anthropic
import json
import sys
from k8s_client import load_kube_config, get_deployment_health
from dotenv import load_dotenv
import os
 
load_dotenv()
 
 
def diagnose_deployment(namespace: str, deployment_name: str) -> str:
    """
    Main function: collect K8s data, send to Claude, return diagnosis.
    """
    print(f"Collecting health data for {namespace}/{deployment_name}...")
 
    # Load kubeconfig
    load_kube_config(in_cluster=False)
 
    # Collect all deployment context
    health_data = get_deployment_health(namespace, deployment_name)
 
    if health_data["errors"] and not health_data["deployment"]:
        return f"Error: Could not fetch deployment data.\n" + "\n".join(health_data["errors"])
 
    # Build the prompt
    prompt = _build_diagnosis_prompt(deployment_name, namespace, health_data)
 
    # Send to Claude
    print("Sending to Claude API for diagnosis...")
    client = anthropic.Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY"))
 
    message = client.messages.create(
        model="claude-sonnet-5",
        max_tokens=1500,
        messages=[
            {
                "role": "user",
                "content": prompt
            }
        ]
    )
 
    return message.content[0].text
 
 
def _build_diagnosis_prompt(deployment_name: str, namespace: str, health_data: dict) -> str:
    """Build a structured prompt with all K8s context."""
 
    deployment = health_data["deployment"]
    pods = health_data["pods"]
    events = health_data["events"]
 
    # Determine health status
    desired = deployment.get("desired_replicas", 0)
    ready = deployment.get("ready_replicas", 0)
    is_healthy = desired > 0 and desired == ready
 
    prompt = f"""You are a Kubernetes expert helping diagnose a deployment issue.
 
## Deployment: {deployment_name} (namespace: {namespace})
 
### Status
- Desired replicas: {desired}
- Ready replicas: {ready}
- Available replicas: {deployment.get('available_replicas', 0)}
- Health: {'HEALTHY' if is_healthy else 'UNHEALTHY'}
- Strategy: {deployment.get('strategy', 'Unknown')}
- Image: {deployment.get('image', 'Unknown')}
 
### Deployment Conditions
{json.dumps(deployment.get('conditions', []), indent=2)}
 
### Resource Configuration
{json.dumps(deployment.get('resources', []), indent=2)}
 
### Pod States ({len(pods)} pods found)
{json.dumps(pods, indent=2)}
 
### Recent Events (most recent first)
{json.dumps(events, indent=2)}
 
---
 
Based on this data, provide:
 
1. **Health Assessment**: Is this deployment healthy or not? One sentence.
 
2. **Root Cause**: What is the specific problem (if any)? Be precise — reference exact pod names, container states, exit codes, or event messages.
 
3. **Explanation**: Why is this happening? Keep it concise and practical.
 
4. **Fix**: Exact kubectl commands or configuration changes to resolve the issue. If multiple fixes are possible, list them in order of likelihood.
 
5. **Prevention**: One practical tip to prevent this in the future.
 
Keep the response focused and actionable. Skip any issue that is clearly not present in the data."""
 
    return prompt
 
 
def format_output(diagnosis: str, deployment_name: str) -> str:
    """Format the output nicely for terminal."""
    separator = "=" * 60
    return f"""
{separator}
AI DEPLOYMENT HEALTH DIAGNOSIS
Deployment: {deployment_name}
{separator}
 
{diagnosis}
 
{separator}
"""
 
 
if __name__ == "__main__":
    if len(sys.argv) < 3:
        print("Usage: python checker.py <namespace> <deployment-name>")
        print("Example: python checker.py default my-api")
        sys.exit(1)
 
    namespace = sys.argv[1]
    deployment_name = sys.argv[2]
 
    diagnosis = diagnose_deployment(namespace, deployment_name)
    print(format_output(diagnosis, deployment_name))

Step 3: Environment Setup

Create .env:

bash
ANTHROPIC_API_KEY=sk-ant-your-key-here

Create requirements.txt:

anthropic>=0.30.0
kubernetes>=28.1.0
python-dotenv>=1.0.0

Running the Checker

bash
# Check a specific deployment
python checker.py production my-api
 
# Check a deployment in a specific namespace
python checker.py staging payment-service

Example Output

When a deployment has CrashLoopBackOff:

============================================================
AI DEPLOYMENT HEALTH DIAGNOSIS
Deployment: payment-service
============================================================

**Health Assessment**: This deployment is unhealthy — 0 of 3 desired replicas are ready.

**Root Cause**: Container `payment-service` in pod `payment-service-7d9f8b-xk2p4`
is in CrashLoopBackOff with exit code 1. The last terminated state shows
"Error" with 47 restarts, indicating a persistent application crash on startup.

**Explanation**: Exit code 1 is a generic application error. Combined with
the event message "Back-off restarting failed container," this means the
container starts, crashes immediately, and Kubernetes keeps retrying with
exponential backoff. The application is likely failing to connect to a
dependency (database, external service) or missing a required environment variable.

**Fix**:
1. Check the container logs for the actual error:
   ```bash
   kubectl logs payment-service-7d9f8b-xk2p4 --previous -n staging
  1. If it's a missing env var, check your ConfigMap/Secret:
    bash
    kubectl describe pod payment-service-7d9f8b-xk2p4 -n staging
  2. If it's a connection issue, verify your database service is reachable:
    bash
    kubectl exec -it payment-service-7d9f8b-xk2p4 -n staging -- nslookup postgres-service

Prevention: Add a readiness probe that checks your database connection before the pod is marked ready. This prevents traffic routing to pods that can't serve requests and gives clearer failure signals.

============================================================


## Adding This to Your CI/CD Pipeline

The real value comes from running this automatically after every deploy:

```yaml
# GitHub Actions example
- name: Deploy to Kubernetes
  run: kubectl apply -f k8s/

- name: Wait for rollout
  run: kubectl rollout status deployment/my-api -n production --timeout=120s

- name: AI Health Check
  if: failure()  # Only run if rollout fails
  run: python checker.py production my-api
  env:
    ANTHROPIC_API_KEY: ${{ secrets.ANTHROPIC_API_KEY }}

Now when a deployment fails, your CI pipeline automatically runs a diagnosis and posts the result as a comment or notification — giving your team a clear starting point instead of blank kubectl output.

What to Build Next

This is a solid foundation. From here you can:

  • Add log fetching — pull the last 50 lines of container logs and include them in the prompt
  • Slack integration — post diagnosis to a Slack channel automatically
  • Metrics context — pull CPU/memory from the metrics-server and include resource pressure signals
  • Multi-deployment checks — scan all deployments in a namespace and surface unhealthy ones

The pattern — collect structured K8s data, send to Claude with a focused prompt, return actionable output — works for any Kubernetes diagnostic scenario.


Check out our other AI + DevOps projects: Build an AI log anomaly detector with Loki and Build an AI Kubernetes event analyzer.

🔧

Today I Fixed

Short real fixes from production — posted daily

Browse fixes
Newsletter

Stay ahead of the curve

Get the latest DevOps, Kubernetes, AWS, and AI/ML guides delivered straight to your inbox. No spam — just practical engineering content.

Related Articles

Comments