🎉 DevOps Interview Prep Bundle is live — 1000+ Q&A across 20 topicsGet it →
All Articles

Build an AI Kubernetes NetworkPolicy Generator with Claude API

Writing NetworkPolicy YAML by hand is error-prone and easy to get wrong. Build a tool that reads your namespace's actual traffic patterns and generates a least-privilege NetworkPolicy using Claude API.

DevOpsBoysJun 15, 20264 min read
Share:Tweet

Most clusters run with no NetworkPolicies at all, because writing them correctly by hand is tedious — you have to know every legitimate connection a pod makes before you can safely deny everything else. Get it wrong and you break production; get it too loose and you've enforced nothing.

Let's build a tool that observes real traffic (via existing flow logs), and uses Claude to generate a tight, least-privilege NetworkPolicy from that observed behavior.

Architecture

Cilium/Hubble flow logs (or kubectl exec + tcpdump)
        ↓
  Parse into structured connection list
        ↓
  Claude API: generate NetworkPolicy YAML
        ↓
  Validate with kubectl --dry-run
        ↓
  Human review → apply

Step 1: Collect Observed Traffic

If you're running Cilium, Hubble gives you this for free:

bash
# Get all flows for a namespace over the last hour
hubble observe --namespace production --last 1h -o json > flows.json
python
# parse_flows.py
import json
 
def extract_connections(flows_file: str, target_namespace: str) -> list[dict]:
    connections = []
    seen = set()
    
    with open(flows_file) as f:
        for line in f:
            flow = json.loads(line)
            src = flow.get("source", {})
            dst = flow.get("destination", {})
            
            if dst.get("namespace") != target_namespace:
                continue
            
            key = (
                src.get("pod_name", src.get("identity", "external")),
                dst.get("pod_name"),
                flow.get("l4", {}).get("TCP", {}).get("destination_port")
            )
            if key in seen:
                continue
            seen.add(key)
            
            connections.append({
                "source_pod": src.get("pod_name", "external"),
                "source_namespace": src.get("namespace", "external"),
                "dest_pod": dst.get("pod_name"),
                "dest_port": flow.get("l4", {}).get("TCP", {}).get("destination_port"),
                "verdict": flow.get("verdict")
            })
    
    return connections

If you're not running Cilium, the same idea works with any flow-logging tool, or even a week of kubectl logs plus app-level connection logs as a fallback — less precise, but workable.

Step 2: Generate the NetworkPolicy with Claude

python
import json
from anthropic import Anthropic
 
client = Anthropic()
 
def generate_network_policy(namespace: str, connections: list[dict]) -> str:
    connections_summary = json.dumps(connections, indent=2)
    
    prompt = f"""You are a Kubernetes security engineer writing a least-privilege NetworkPolicy.
 
Here are the OBSERVED connections in namespace "{namespace}" over the past week:
 
{connections_summary}
 
Generate a Kubernetes NetworkPolicy (or set of policies, one per workload if connections 
differ significantly) that:
1. Denies all traffic by default (default-deny)
2. Explicitly allows ONLY the observed connections above
3. Uses podSelector matchLabels based on the pod names shown (infer reasonable label 
   selectors, e.g. app=checkout-service from pod name checkout-service-7d9f8)
4. Includes both ingress and egress rules where applicable
5. Adds a comment above each rule explaining which observed connection it covers
 
Output only the YAML, no explanation outside of YAML comments."""
 
    response = client.messages.create(
        model="claude-sonnet-4-6",
        max_tokens=4096,
        messages=[{"role": "user", "content": prompt}]
    )
    
    return response.content[0].text

Step 3: Validate Before Applying

Never apply AI-generated policy directly to production. Always dry-run and diff first.

python
import subprocess
import tempfile
 
def validate_policy(policy_yaml: str) -> tuple[bool, str]:
    with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f:
        f.write(policy_yaml)
        path = f.name
    
    result = subprocess.run(
        ["kubectl", "apply", "--dry-run=server", "-f", path],
        capture_output=True, text=True
    )
    
    return result.returncode == 0, result.stdout + result.stderr

Step 4: Generate, Then Test in Audit Mode First

Don't go straight to Enforce. If you're on Cilium, apply the policy in audit-only mode first and watch for drops that shouldn't happen:

bash
# Apply but watch denied connections before committing to enforcement
kubectl apply -f generated-policy.yaml
 
# Watch what WOULD be denied (Cilium-specific monitoring)
hubble observe --namespace production --verdict DROPPED --last 30m

If DROPPED shows legitimate traffic you missed during the observation window (cron jobs that only run weekly, for example), add it manually before enforcing.

Example Output

yaml
# Generated from 7 days of observed traffic in namespace: production
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
  name: checkout-service-policy
  namespace: production
spec:
  podSelector:
    matchLabels:
      app: checkout-service
  policyTypes:
  - Ingress
  - Egress
  ingress:
  # Observed: api-gateway -> checkout-service:8080 (147 connections/day avg)
  - from:
    - podSelector:
        matchLabels:
          app: api-gateway
    ports:
    - protocol: TCP
      port: 8080
  egress:
  # Observed: checkout-service -> payment-service:8443 (89 connections/day avg)
  - to:
    - podSelector:
        matchLabels:
          app: payment-service
    ports:
    - protocol: TCP
      port: 8443
  # Observed: checkout-service -> postgres:5432 (continuous)
  - to:
    - podSelector:
        matchLabels:
          app: postgres
    ports:
    - protocol: TCP
      port: 5432
  # Required: DNS resolution
  - to:
    - namespaceSelector: {}
    ports:
    - protocol: UDP
      port: 53

Notice Claude correctly adds the DNS egress rule even though it wasn't explicitly in the flow data — this is the kind of domain knowledge that makes AI generation genuinely useful here, not just a YAML templating exercise. It still needs a human to confirm the observation window actually captured every legitimate pattern, especially periodic batch jobs.

Understand the policies you're generating: What Is Kubernetes Network Policy — Explained

🔧

Today I Fixed

Short real fixes from production — posted daily

Browse fixes
Newsletter

Stay ahead of the curve

Get the latest DevOps, Kubernetes, AWS, and AI/ML guides delivered straight to your inbox. No spam — just practical engineering content.

Related Articles

Comments