🎉 DevOps Interview Prep Bundle is live — 1000+ Q&A across 20 topicsGet it →
All Articles

LLM Tool Calling and Parallel Function Execution in Production

Tool calling lets LLMs execute real functions. Parallel tool calling makes agents fast. Here's how to implement it properly with Claude API and handle errors gracefully.

DevOpsBoys6 min read
Share:Tweet

Tool calling (function calling) is what separates LLM chatbots from LLM agents. When Claude can call your functions — query databases, call APIs, run commands — it becomes genuinely useful for automation.

How Tool Calling Works

1. You define tools (JSON Schema describing functions)
2. User sends a message
3. Claude decides which tools to call and with what arguments
4. You execute the tools and return results
5. Claude synthesizes the results into a final answer

Basic Tool Calling

python
from anthropic import Anthropic
import json
 
client = Anthropic()
 
# Define tools
tools = [
    {
        "name": "get_kubernetes_pod_status",
        "description": "Get the current status of pods in a Kubernetes namespace",
        "input_schema": {
            "type": "object",
            "properties": {
                "namespace": {
                    "type": "string",
                    "description": "The Kubernetes namespace to check"
                },
                "label_selector": {
                    "type": "string",
                    "description": "Optional label selector like 'app=nginx'"
                }
            },
            "required": ["namespace"]
        }
    },
    {
        "name": "get_prometheus_metric",
        "description": "Query a Prometheus metric using PromQL",
        "input_schema": {
            "type": "object",
            "properties": {
                "query": {
                    "type": "string",
                    "description": "PromQL query string"
                },
                "time_range": {
                    "type": "string",
                    "description": "Time range like '5m', '1h', '24h'"
                }
            },
            "required": ["query"]
        }
    }
]
 
 
def execute_tool(tool_name: str, tool_input: dict) -> str:
    """Execute a tool and return result as string."""
    if tool_name == "get_kubernetes_pod_status":
        # Real implementation would call k8s API
        namespace = tool_input["namespace"]
        return json.dumps({
            "namespace": namespace,
            "pods": [
                {"name": "nginx-abc", "status": "Running", "restarts": 0},
                {"name": "api-xyz", "status": "CrashLoopBackOff", "restarts": 5}
            ]
        })
    
    elif tool_name == "get_prometheus_metric":
        # Real implementation would query Prometheus
        query = tool_input["query"]
        return json.dumps({
            "query": query,
            "result": [{"metric": {"pod": "api-xyz"}, "value": [1750000000, "0.95"]}]
        })
    
    return f"Unknown tool: {tool_name}"
 
 
def run_agent(user_message: str) -> str:
    """Run the agent loop until a final answer is produced."""
    messages = [{"role": "user", "content": user_message}]
    
    while True:
        response = client.messages.create(
            model="claude-opus-4-8",
            max_tokens=4096,
            tools=tools,
            messages=messages,
        )
        
        # Check if we're done
        if response.stop_reason == "end_turn":
            # Extract text from response
            text_blocks = [b.text for b in response.content if hasattr(b, "text")]
            return "\n".join(text_blocks)
        
        # Handle tool calls
        if response.stop_reason == "tool_use":
            tool_results = []
            
            for block in response.content:
                if block.type == "tool_use":
                    print(f"Calling tool: {block.name}({json.dumps(block.input, indent=2)})")
                    result = execute_tool(block.name, block.input)
                    print(f"Result: {result[:200]}...")
                    
                    tool_results.append({
                        "type": "tool_result",
                        "tool_use_id": block.id,
                        "content": result,
                    })
            
            # Add assistant's tool call message
            messages.append({"role": "assistant", "content": response.content})
            # Add tool results
            messages.append({"role": "user", "content": tool_results})
        else:
            break
    
    return "No response generated"
 
 
# Usage
result = run_agent("What pods are having issues in the production namespace? Check their metrics too.")
print(result)

Parallel Tool Calling

When Claude needs multiple independent pieces of information, it can call multiple tools in a single turn. This is much faster than sequential calls.

python
def run_agent_with_parallel_tools(user_message: str) -> str:
    """Agent that handles multiple tool calls in a single response."""
    messages = [{"role": "user", "content": user_message}]
    
    while True:
        response = client.messages.create(
            model="claude-opus-4-8",
            max_tokens=4096,
            tools=tools,
            messages=messages,
        )
        
        if response.stop_reason == "end_turn":
            return next(b.text for b in response.content if hasattr(b, "text"))
        
        if response.stop_reason == "tool_use":
            # Claude may call MULTIPLE tools at once
            tool_use_blocks = [b for b in response.content if b.type == "tool_use"]
            
            print(f"Claude wants to call {len(tool_use_blocks)} tools in parallel")
            
            # Execute all tool calls in parallel
            import concurrent.futures
            tool_results = []
            
            with concurrent.futures.ThreadPoolExecutor() as executor:
                future_to_block = {
                    executor.submit(execute_tool, block.name, block.input): block
                    for block in tool_use_blocks
                }
                
                for future in concurrent.futures.as_completed(future_to_block):
                    block = future_to_block[future]
                    try:
                        result = future.result()
                    except Exception as e:
                        result = f"Error: {e}"
                    
                    tool_results.append({
                        "type": "tool_result",
                        "tool_use_id": block.id,
                        "content": result,
                    })
            
            messages.append({"role": "assistant", "content": response.content})
            messages.append({"role": "user", "content": tool_results})

When you ask "check the status of all 5 services", Claude will call 5 tools simultaneously instead of waiting for each one sequentially. This can reduce response time from 25 seconds to 5 seconds.

Error Handling in Tool Results

Tools fail. Handle this gracefully:

python
def execute_tool_safe(tool_name: str, tool_input: dict) -> str:
    """Execute tool with error handling."""
    try:
        result = execute_tool(tool_name, tool_input)
        return json.dumps({"success": True, "data": result})
    except TimeoutError:
        return json.dumps({
            "success": False,
            "error": "Tool timed out after 30 seconds",
            "tool": tool_name
        })
    except PermissionError as e:
        return json.dumps({
            "success": False,
            "error": f"Permission denied: {e}",
            "tool": tool_name
        })
    except Exception as e:
        return json.dumps({
            "success": False,
            "error": str(e),
            "tool": tool_name
        })

Claude handles tool errors gracefully — it'll tell the user "I couldn't get X because of Y, but here's what I found from the other tools."

Tool Result Caching

For tools that are called repeatedly with the same inputs:

python
import functools
import hashlib
import json
import redis
 
cache = redis.Redis(host="redis", port=6379)
 
def cached_tool(ttl_seconds: int = 60):
    """Decorator to cache tool results in Redis."""
    def decorator(func):
        @functools.wraps(func)
        def wrapper(tool_name: str, tool_input: dict) -> str:
            # Create cache key from tool name + input
            cache_key = f"tool:{tool_name}:{hashlib.md5(json.dumps(tool_input, sort_keys=True).encode()).hexdigest()}"
            
            cached = cache.get(cache_key)
            if cached:
                return cached.decode()
            
            result = func(tool_name, tool_input)
            cache.setex(cache_key, ttl_seconds, result)
            return result
        return wrapper
    return decorator
 
 
@cached_tool(ttl_seconds=30)
def execute_tool_cached(tool_name: str, tool_input: dict) -> str:
    return execute_tool(tool_name, tool_input)

For Prometheus queries, a 30-second cache means the same metric isn't queried 10 times per second when Claude makes multiple tool calls.

Streaming Tool Calls

For better UX, stream the response and tool calls as they happen:

python
def run_agent_streaming(user_message: str):
    """Stream the agent's thinking and tool calls."""
    messages = [{"role": "user", "content": user_message}]
    
    while True:
        with client.messages.stream(
            model="claude-opus-4-8",
            max_tokens=4096,
            tools=tools,
            messages=messages,
        ) as stream:
            full_response = stream.get_final_message()
        
        if full_response.stop_reason == "end_turn":
            break
        
        if full_response.stop_reason == "tool_use":
            tool_use_blocks = [b for b in full_response.content if b.type == "tool_use"]
            tool_results = []
            
            for block in tool_use_blocks:
                yield f"\n🔧 Calling {block.name}...\n"
                result = execute_tool(block.name, block.input)
                yield f"✅ Got result\n"
                tool_results.append({
                    "type": "tool_result",
                    "tool_use_id": block.id,
                    "content": result,
                })
            
            messages.append({"role": "assistant", "content": full_response.content})
            messages.append({"role": "user", "content": tool_results})
 
 
for chunk in run_agent_streaming("Analyze the health of our production cluster"):
    print(chunk, end="", flush=True)

Production Best Practices

  1. Set token limits per tool call — a tool returning 50MB of logs will break the context window
  2. Add tool descriptions carefully — Claude picks tools based on descriptions, be specific
  3. Use required fields — don't make everything optional if the tool needs it
  4. Log every tool call — for debugging and cost tracking
  5. Implement retries for flaky tools (with backoff) but limit total retries per conversation
  6. Use claude-haiku-4-5-20251001 for tool selection, claude-opus-4-8 for synthesis if cost matters

Resources: Anthropic Tool Use docs | Tool use examples on GitHub

🔧

Today I Fixed

Short real fixes from production — posted daily

Browse fixes
Newsletter

Stay ahead of the curve

Get the latest DevOps, Kubernetes, AWS, and AI/ML guides delivered straight to your inbox. No spam — just practical engineering content.

Related Articles

Comments