🎉 DevOps Interview Prep Bundle is live — 1000+ Q&A across 20 topicsGet it →
All Articles

Build an AI Log Anomaly Detector with Claude API + Grafana Loki

Your logs have the answers — but there are too many to read. Build an AI anomaly detector that queries Loki for unusual patterns and uses Claude API to explain what's wrong and what to do.

DevOpsBoysJun 14, 20267 min read
Share:Tweet

Production logs contain every incident before it becomes an incident. The problem is volume — a medium-sized Kubernetes cluster generates millions of log lines per hour. Nobody reads them until something breaks.

Let's build an anomaly detector that watches your Loki logs, identifies unusual patterns using statistical analysis, and then uses Claude API to explain what the anomaly means and suggest next steps.

Architecture

Loki API → Fetch recent logs → Statistical baseline comparison
                                        ↓
                           Anomalies found → Claude API
                                        ↓
                           Explanation + recommended actions
                                        ↓
                           Slack alert with context

Prerequisites

bash
pip install anthropic requests python-dotenv slack-sdk numpy

You need:

  • Grafana Loki running (accessible via HTTP)
  • Anthropic API key
  • Slack webhook URL (optional, for alerts)

Step 1: Loki Query Client

python
# loki_client.py
import requests
import json
from datetime import datetime, timedelta
from urllib.parse import urlencode
 
class LokiClient:
    def __init__(self, url: str):
        self.url = url.rstrip('/')
    
    def query_range(self, logql: str, hours: int = 1, limit: int = 500) -> list[dict]:
        end = datetime.utcnow()
        start = end - timedelta(hours=hours)
        
        params = {
            "query": logql,
            "start": int(start.timestamp() * 1e9),  # Loki uses nanoseconds
            "end": int(end.timestamp() * 1e9),
            "limit": limit,
            "direction": "backward"
        }
        
        resp = requests.get(
            f"{self.url}/loki/api/v1/query_range",
            params=params,
            timeout=30
        )
        resp.raise_for_status()
        data = resp.json()
        
        logs = []
        for stream in data.get("data", {}).get("result", []):
            labels = stream["stream"]
            for ts, line in stream["values"]:
                logs.append({
                    "timestamp": int(ts) / 1e9,  # convert back to seconds
                    "labels": labels,
                    "line": line
                })
        
        return sorted(logs, key=lambda x: x["timestamp"])
    
    def get_error_rate(self, namespace: str, service: str, hours: int = 1) -> dict:
        """Get error rate as errors per minute for a service."""
        
        error_query = f'count_over_time({{namespace="{namespace}", app="{service}"}} |= "error" [1m])'
        total_query = f'count_over_time({{namespace="{namespace}", app="{service}"}} [1m])'
        
        errors = self.query_range(error_query, hours=hours)
        total = self.query_range(total_query, hours=hours)
        
        return {
            "service": service,
            "namespace": namespace,
            "error_count": len(errors),
            "total_count": len(total),
            "error_rate_pct": (len(errors) / len(total) * 100) if total else 0
        }
    
    def get_recent_errors(self, namespace: str, minutes: int = 15, limit: int = 50) -> list[str]:
        """Get recent error log lines for analysis."""
        query = f'{{namespace="{namespace}"}} |= "error" | logfmt | level="error"'
        logs = self.query_range(query, hours=minutes/60, limit=limit)
        return [log["line"] for log in logs]
    
    def get_service_logs(self, namespace: str, service: str, minutes: int = 30) -> list[str]:
        """Get all log lines for a specific service."""
        query = f'{{namespace="{namespace}", app="{service}"}}'
        logs = self.query_range(query, hours=minutes/60, limit=200)
        return [log["line"] for log in logs]

Step 2: Anomaly Detection

python
# anomaly_detector.py
import numpy as np
from collections import defaultdict
from datetime import datetime, timedelta
from loki_client import LokiClient
 
class AnomalyDetector:
    def __init__(self, loki: LokiClient):
        self.loki = loki
    
    def detect_error_spike(self, namespace: str, services: list[str]) -> list[dict]:
        """Detect services with unusually high error rates."""
        anomalies = []
        
        for service in services:
            # Get error rate for last hour vs last 24h baseline
            current = self.loki.get_error_rate(namespace, service, hours=1)
            baseline = self.loki.get_error_rate(namespace, service, hours=24)
            
            # If current error rate is 3x the baseline, flag it
            baseline_rate = baseline["error_rate_pct"] / 24  # hourly average
            current_rate = current["error_rate_pct"]
            
            if current_rate > 5 and current_rate > (baseline_rate * 3):
                anomalies.append({
                    "type": "error_spike",
                    "service": service,
                    "namespace": namespace,
                    "current_error_rate": round(current_rate, 2),
                    "baseline_error_rate": round(baseline_rate, 2),
                    "spike_factor": round(current_rate / max(baseline_rate, 0.1), 1),
                    "recent_errors": self.loki.get_recent_errors(namespace, minutes=15, limit=10)
                })
        
        return anomalies
    
    def detect_log_pattern_changes(self, namespace: str, service: str) -> dict:
        """Detect unusual log patterns — new error messages, repeated errors."""
        recent_logs = self.loki.get_service_logs(namespace, service, minutes=15)
        
        # Count message patterns (strip timestamps and IDs)
        pattern_counts = defaultdict(int)
        for line in recent_logs:
            # Normalize — remove timestamps, UUIDs, numbers
            import re
            normalized = re.sub(r'\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}', 'TIMESTAMP', line)
            normalized = re.sub(r'[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}', 'UUID', normalized)
            normalized = re.sub(r'\b\d+\b', 'N', normalized)
            pattern_counts[normalized[:100]] += 1
        
        # Find patterns that appear more than 10 times — potential loops or storms
        repeated = {p: c for p, c in pattern_counts.items() if c > 10}
        
        # Sample of unique error messages for Claude to analyze
        error_lines = [l for l in recent_logs if "error" in l.lower() or "exception" in l.lower()]
        
        return {
            "service": service,
            "total_logs": len(recent_logs),
            "unique_patterns": len(pattern_counts),
            "repeated_patterns": repeated,
            "error_sample": error_lines[:15],
            "has_log_storm": len(recent_logs) > 150  # more than 150 logs in 15 min is unusual
        }

Step 3: Claude API Analysis

python
# analyzer.py
import os
import json
from anthropic import Anthropic
 
client = Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY"))
 
def analyze_anomaly_with_claude(anomaly: dict) -> str:
    if anomaly["type"] == "error_spike":
        context = f"""
Service: {anomaly['service']} in namespace {anomaly['namespace']}
Current error rate: {anomaly['current_error_rate']}%
Baseline error rate: {anomaly['baseline_error_rate']}%
Spike factor: {anomaly['spike_factor']}x above baseline
 
Recent error log samples:
{chr(10).join(anomaly['recent_errors'][:10])}
"""
    else:
        context = f"""
Service: {anomaly['service']}
Total logs in last 15 min: {anomaly['total_logs']}
Unique log patterns: {anomaly['unique_patterns']}
Log storm detected: {anomaly['has_log_storm']}
 
Repeated patterns (possible loop/storm):
{json.dumps(anomaly['repeated_patterns'], indent=2)[:500]}
 
Sample error messages:
{chr(10).join(anomaly['error_sample'][:10])}
"""
    
    response = client.messages.create(
        model="claude-sonnet-4-6",
        max_tokens=1024,
        messages=[{
            "role": "user",
            "content": f"""You are a senior SRE analyzing a production log anomaly.
 
{context}
 
Provide:
1. Root cause hypothesis (most likely 2-3 causes based on the error messages)
2. Immediate action to take right now (specific kubectl or diagnostic command)
3. Whether this needs immediate escalation (yes/no and why)
4. One-line Slack-ready summary for the team
 
Be specific and concise. Reference the actual error messages in your analysis."""
        }]
    )
    
    return response.content[0].text
 
def format_slack_alert(anomaly: dict, analysis: str) -> dict:
    """Format anomaly + analysis as a Slack message."""
    emoji = "🔴" if anomaly.get("spike_factor", 0) > 5 else "🟡"
    
    return {
        "blocks": [
            {
                "type": "header",
                "text": {
                    "type": "plain_text",
                    "text": f"{emoji} Log Anomaly Detected — {anomaly.get('service', 'unknown')}"
                }
            },
            {
                "type": "section",
                "fields": [
                    {"type": "mrkdwn", "text": f"*Service:*\n{anomaly.get('service')}"},
                    {"type": "mrkdwn", "text": f"*Namespace:*\n{anomaly.get('namespace')}"},
                    {"type": "mrkdwn", "text": f"*Error Rate:*\n{anomaly.get('current_error_rate', 'N/A')}%"},
                    {"type": "mrkdwn", "text": f"*Spike:*\n{anomaly.get('spike_factor', 'N/A')}x baseline"}
                ]
            },
            {
                "type": "section",
                "text": {
                    "type": "mrkdwn",
                    "text": f"*AI Analysis:*\n{analysis[:1000]}"
                }
            }
        ]
    }

Step 4: Main Runner

python
# main.py
import os
import requests
from loki_client import LokiClient
from anomaly_detector import AnomalyDetector
from analyzer import analyze_anomaly_with_claude, format_slack_alert
 
def run_anomaly_check():
    loki = LokiClient(os.getenv("LOKI_URL", "http://loki.monitoring:3100"))
    detector = AnomalyDetector(loki)
    
    # Define what to monitor
    namespace = os.getenv("TARGET_NAMESPACE", "production")
    services = os.getenv("TARGET_SERVICES", "api-gateway,checkout,payment,auth").split(",")
    
    print(f"Checking {len(services)} services in {namespace}...")
    
    all_anomalies = detector.detect_error_spike(namespace, services)
    
    if not all_anomalies:
        print("No anomalies detected.")
        return
    
    print(f"Found {len(all_anomalies)} anomalies. Analyzing with Claude...")
    
    slack_webhook = os.getenv("SLACK_WEBHOOK_URL")
    
    for anomaly in all_anomalies:
        analysis = analyze_anomaly_with_claude(anomaly)
        
        print(f"\n{'='*50}")
        print(f"ANOMALY: {anomaly['service']}")
        print(f"{'='*50}")
        print(analysis)
        
        if slack_webhook:
            alert = format_slack_alert(anomaly, analysis)
            requests.post(slack_webhook, json=alert)
            print(f"Alert sent to Slack for {anomaly['service']}")
 
if __name__ == "__main__":
    run_anomaly_check()

Kubernetes CronJob — Run Every 5 Minutes

yaml
apiVersion: batch/v1
kind: CronJob
metadata:
  name: log-anomaly-detector
  namespace: monitoring
spec:
  schedule: "*/5 * * * *"
  concurrencyPolicy: Forbid
  jobTemplate:
    spec:
      template:
        spec:
          containers:
          - name: detector
            image: your-registry/log-anomaly-detector:latest
            env:
            - name: LOKI_URL
              value: "http://loki.monitoring:3100"
            - name: TARGET_NAMESPACE
              value: "production"
            - name: TARGET_SERVICES
              value: "api-gateway,checkout,payment,auth,worker"
            - name: ANTHROPIC_API_KEY
              valueFrom:
                secretKeyRef:
                  name: ai-secrets
                  key: anthropic-api-key
            - name: SLACK_WEBHOOK_URL
              valueFrom:
                secretKeyRef:
                  name: ai-secrets
                  key: slack-webhook
          restartPolicy: OnFailure

Sample Output

ANOMALY: checkout-service
==================================================
Root Cause Hypothesis:
1. Database connection pool exhaustion — the errors show 
   "connection timeout" occurring every ~3 seconds, suggesting 
   connections aren't being released properly after use.
2. Upstream dependency failure — "ServiceUnavailable" from 
   payment-gateway suggests the payment service may be the 
   actual origin.

Immediate Action:
kubectl exec -n production deploy/checkout-service -- \
  curl localhost:8080/actuator/metrics/hikaricp.connections.active
# Check if connection pool is saturated

Escalation: YES — error rate 18% and rising, affects 
checkout flow directly. Page on-call immediately.

Slack Summary: 🔴 checkout-service error spike (18%, 6x baseline) 
— likely DB connection exhaustion or payment-gateway down, 
investigate now.

The key insight: Claude isn't magic here — it's pattern matching on the error messages you give it. The quality of the analysis directly depends on the quality of your log sampling. Structured, contextual logs give dramatically better AI analysis than raw stack traces.

Set up Grafana Loki for log aggregation: Grafana Loki Log Aggregation Guide

🔧

Today I Fixed

Short real fixes from production — posted daily

Browse fixes
Newsletter

Stay ahead of the curve

Get the latest DevOps, Kubernetes, AWS, and AI/ML guides delivered straight to your inbox. No spam — just practical engineering content.

Related Articles

Comments