Build an AI Log Anomaly Detector with Claude API + Grafana Loki
Your logs have the answers — but there are too many to read. Build an AI anomaly detector that queries Loki for unusual patterns and uses Claude API to explain what's wrong and what to do.
Production logs contain every incident before it becomes an incident. The problem is volume — a medium-sized Kubernetes cluster generates millions of log lines per hour. Nobody reads them until something breaks.
Let's build an anomaly detector that watches your Loki logs, identifies unusual patterns using statistical analysis, and then uses Claude API to explain what the anomaly means and suggest next steps.
Architecture
Loki API → Fetch recent logs → Statistical baseline comparison
↓
Anomalies found → Claude API
↓
Explanation + recommended actions
↓
Slack alert with context
Prerequisites
pip install anthropic requests python-dotenv slack-sdk numpyYou need:
- Grafana Loki running (accessible via HTTP)
- Anthropic API key
- Slack webhook URL (optional, for alerts)
Step 1: Loki Query Client
# loki_client.py
import requests
import json
from datetime import datetime, timedelta
from urllib.parse import urlencode
class LokiClient:
def __init__(self, url: str):
self.url = url.rstrip('/')
def query_range(self, logql: str, hours: int = 1, limit: int = 500) -> list[dict]:
end = datetime.utcnow()
start = end - timedelta(hours=hours)
params = {
"query": logql,
"start": int(start.timestamp() * 1e9), # Loki uses nanoseconds
"end": int(end.timestamp() * 1e9),
"limit": limit,
"direction": "backward"
}
resp = requests.get(
f"{self.url}/loki/api/v1/query_range",
params=params,
timeout=30
)
resp.raise_for_status()
data = resp.json()
logs = []
for stream in data.get("data", {}).get("result", []):
labels = stream["stream"]
for ts, line in stream["values"]:
logs.append({
"timestamp": int(ts) / 1e9, # convert back to seconds
"labels": labels,
"line": line
})
return sorted(logs, key=lambda x: x["timestamp"])
def get_error_rate(self, namespace: str, service: str, hours: int = 1) -> dict:
"""Get error rate as errors per minute for a service."""
error_query = f'count_over_time({{namespace="{namespace}", app="{service}"}} |= "error" [1m])'
total_query = f'count_over_time({{namespace="{namespace}", app="{service}"}} [1m])'
errors = self.query_range(error_query, hours=hours)
total = self.query_range(total_query, hours=hours)
return {
"service": service,
"namespace": namespace,
"error_count": len(errors),
"total_count": len(total),
"error_rate_pct": (len(errors) / len(total) * 100) if total else 0
}
def get_recent_errors(self, namespace: str, minutes: int = 15, limit: int = 50) -> list[str]:
"""Get recent error log lines for analysis."""
query = f'{{namespace="{namespace}"}} |= "error" | logfmt | level="error"'
logs = self.query_range(query, hours=minutes/60, limit=limit)
return [log["line"] for log in logs]
def get_service_logs(self, namespace: str, service: str, minutes: int = 30) -> list[str]:
"""Get all log lines for a specific service."""
query = f'{{namespace="{namespace}", app="{service}"}}'
logs = self.query_range(query, hours=minutes/60, limit=200)
return [log["line"] for log in logs]Step 2: Anomaly Detection
# anomaly_detector.py
import numpy as np
from collections import defaultdict
from datetime import datetime, timedelta
from loki_client import LokiClient
class AnomalyDetector:
def __init__(self, loki: LokiClient):
self.loki = loki
def detect_error_spike(self, namespace: str, services: list[str]) -> list[dict]:
"""Detect services with unusually high error rates."""
anomalies = []
for service in services:
# Get error rate for last hour vs last 24h baseline
current = self.loki.get_error_rate(namespace, service, hours=1)
baseline = self.loki.get_error_rate(namespace, service, hours=24)
# If current error rate is 3x the baseline, flag it
baseline_rate = baseline["error_rate_pct"] / 24 # hourly average
current_rate = current["error_rate_pct"]
if current_rate > 5 and current_rate > (baseline_rate * 3):
anomalies.append({
"type": "error_spike",
"service": service,
"namespace": namespace,
"current_error_rate": round(current_rate, 2),
"baseline_error_rate": round(baseline_rate, 2),
"spike_factor": round(current_rate / max(baseline_rate, 0.1), 1),
"recent_errors": self.loki.get_recent_errors(namespace, minutes=15, limit=10)
})
return anomalies
def detect_log_pattern_changes(self, namespace: str, service: str) -> dict:
"""Detect unusual log patterns — new error messages, repeated errors."""
recent_logs = self.loki.get_service_logs(namespace, service, minutes=15)
# Count message patterns (strip timestamps and IDs)
pattern_counts = defaultdict(int)
for line in recent_logs:
# Normalize — remove timestamps, UUIDs, numbers
import re
normalized = re.sub(r'\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}', 'TIMESTAMP', line)
normalized = re.sub(r'[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}', 'UUID', normalized)
normalized = re.sub(r'\b\d+\b', 'N', normalized)
pattern_counts[normalized[:100]] += 1
# Find patterns that appear more than 10 times — potential loops or storms
repeated = {p: c for p, c in pattern_counts.items() if c > 10}
# Sample of unique error messages for Claude to analyze
error_lines = [l for l in recent_logs if "error" in l.lower() or "exception" in l.lower()]
return {
"service": service,
"total_logs": len(recent_logs),
"unique_patterns": len(pattern_counts),
"repeated_patterns": repeated,
"error_sample": error_lines[:15],
"has_log_storm": len(recent_logs) > 150 # more than 150 logs in 15 min is unusual
}Step 3: Claude API Analysis
# analyzer.py
import os
import json
from anthropic import Anthropic
client = Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY"))
def analyze_anomaly_with_claude(anomaly: dict) -> str:
if anomaly["type"] == "error_spike":
context = f"""
Service: {anomaly['service']} in namespace {anomaly['namespace']}
Current error rate: {anomaly['current_error_rate']}%
Baseline error rate: {anomaly['baseline_error_rate']}%
Spike factor: {anomaly['spike_factor']}x above baseline
Recent error log samples:
{chr(10).join(anomaly['recent_errors'][:10])}
"""
else:
context = f"""
Service: {anomaly['service']}
Total logs in last 15 min: {anomaly['total_logs']}
Unique log patterns: {anomaly['unique_patterns']}
Log storm detected: {anomaly['has_log_storm']}
Repeated patterns (possible loop/storm):
{json.dumps(anomaly['repeated_patterns'], indent=2)[:500]}
Sample error messages:
{chr(10).join(anomaly['error_sample'][:10])}
"""
response = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=1024,
messages=[{
"role": "user",
"content": f"""You are a senior SRE analyzing a production log anomaly.
{context}
Provide:
1. Root cause hypothesis (most likely 2-3 causes based on the error messages)
2. Immediate action to take right now (specific kubectl or diagnostic command)
3. Whether this needs immediate escalation (yes/no and why)
4. One-line Slack-ready summary for the team
Be specific and concise. Reference the actual error messages in your analysis."""
}]
)
return response.content[0].text
def format_slack_alert(anomaly: dict, analysis: str) -> dict:
"""Format anomaly + analysis as a Slack message."""
emoji = "🔴" if anomaly.get("spike_factor", 0) > 5 else "🟡"
return {
"blocks": [
{
"type": "header",
"text": {
"type": "plain_text",
"text": f"{emoji} Log Anomaly Detected — {anomaly.get('service', 'unknown')}"
}
},
{
"type": "section",
"fields": [
{"type": "mrkdwn", "text": f"*Service:*\n{anomaly.get('service')}"},
{"type": "mrkdwn", "text": f"*Namespace:*\n{anomaly.get('namespace')}"},
{"type": "mrkdwn", "text": f"*Error Rate:*\n{anomaly.get('current_error_rate', 'N/A')}%"},
{"type": "mrkdwn", "text": f"*Spike:*\n{anomaly.get('spike_factor', 'N/A')}x baseline"}
]
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"*AI Analysis:*\n{analysis[:1000]}"
}
}
]
}Step 4: Main Runner
# main.py
import os
import requests
from loki_client import LokiClient
from anomaly_detector import AnomalyDetector
from analyzer import analyze_anomaly_with_claude, format_slack_alert
def run_anomaly_check():
loki = LokiClient(os.getenv("LOKI_URL", "http://loki.monitoring:3100"))
detector = AnomalyDetector(loki)
# Define what to monitor
namespace = os.getenv("TARGET_NAMESPACE", "production")
services = os.getenv("TARGET_SERVICES", "api-gateway,checkout,payment,auth").split(",")
print(f"Checking {len(services)} services in {namespace}...")
all_anomalies = detector.detect_error_spike(namespace, services)
if not all_anomalies:
print("No anomalies detected.")
return
print(f"Found {len(all_anomalies)} anomalies. Analyzing with Claude...")
slack_webhook = os.getenv("SLACK_WEBHOOK_URL")
for anomaly in all_anomalies:
analysis = analyze_anomaly_with_claude(anomaly)
print(f"\n{'='*50}")
print(f"ANOMALY: {anomaly['service']}")
print(f"{'='*50}")
print(analysis)
if slack_webhook:
alert = format_slack_alert(anomaly, analysis)
requests.post(slack_webhook, json=alert)
print(f"Alert sent to Slack for {anomaly['service']}")
if __name__ == "__main__":
run_anomaly_check()Kubernetes CronJob — Run Every 5 Minutes
apiVersion: batch/v1
kind: CronJob
metadata:
name: log-anomaly-detector
namespace: monitoring
spec:
schedule: "*/5 * * * *"
concurrencyPolicy: Forbid
jobTemplate:
spec:
template:
spec:
containers:
- name: detector
image: your-registry/log-anomaly-detector:latest
env:
- name: LOKI_URL
value: "http://loki.monitoring:3100"
- name: TARGET_NAMESPACE
value: "production"
- name: TARGET_SERVICES
value: "api-gateway,checkout,payment,auth,worker"
- name: ANTHROPIC_API_KEY
valueFrom:
secretKeyRef:
name: ai-secrets
key: anthropic-api-key
- name: SLACK_WEBHOOK_URL
valueFrom:
secretKeyRef:
name: ai-secrets
key: slack-webhook
restartPolicy: OnFailureSample Output
ANOMALY: checkout-service
==================================================
Root Cause Hypothesis:
1. Database connection pool exhaustion — the errors show
"connection timeout" occurring every ~3 seconds, suggesting
connections aren't being released properly after use.
2. Upstream dependency failure — "ServiceUnavailable" from
payment-gateway suggests the payment service may be the
actual origin.
Immediate Action:
kubectl exec -n production deploy/checkout-service -- \
curl localhost:8080/actuator/metrics/hikaricp.connections.active
# Check if connection pool is saturated
Escalation: YES — error rate 18% and rising, affects
checkout flow directly. Page on-call immediately.
Slack Summary: 🔴 checkout-service error spike (18%, 6x baseline)
— likely DB connection exhaustion or payment-gateway down,
investigate now.
The key insight: Claude isn't magic here — it's pattern matching on the error messages you give it. The quality of the analysis directly depends on the quality of your log sampling. Structured, contextual logs give dramatically better AI analysis than raw stack traces.
Set up Grafana Loki for log aggregation: Grafana Loki Log Aggregation Guide
Today I Fixed
Short real fixes from production — posted daily
Stay ahead of the curve
Get the latest DevOps, Kubernetes, AWS, and AI/ML guides delivered straight to your inbox. No spam — just practical engineering content.
Related Articles
AI-Powered Kubernetes Anomaly Detection: Beyond Static Thresholds
Static alerts miss 40% of real incidents. Learn how AI and ML-based anomaly detection — using tools like Prometheus + ML, Dynatrace, and custom LLM runbooks — catches what thresholds can't.
AI-Powered Log Analysis Is Replacing Manual Debugging in DevOps (2026)
How LLMs and AI are transforming log analysis, anomaly detection, and root cause analysis — and the tools DevOps engineers should know about in 2026.
AI-Powered Log Analysis — How LLMs Are Replacing grep for DevOps Engineers
How to use LLMs and AI tools for intelligent log analysis in DevOps. Covers practical workflows, open-source tools, prompt engineering for logs, and building custom log analysis agents.