LLM Tool Calling and Parallel Function Execution in Production
Tool calling lets LLMs execute real functions. Parallel tool calling makes agents fast. Here's how to implement it properly with Claude API and handle errors gracefully.
Tool calling (function calling) is what separates LLM chatbots from LLM agents. When Claude can call your functions — query databases, call APIs, run commands — it becomes genuinely useful for automation.
How Tool Calling Works
1. You define tools (JSON Schema describing functions)
2. User sends a message
3. Claude decides which tools to call and with what arguments
4. You execute the tools and return results
5. Claude synthesizes the results into a final answer
Basic Tool Calling
from anthropic import Anthropic
import json
client = Anthropic()
# Define tools
tools = [
{
"name": "get_kubernetes_pod_status",
"description": "Get the current status of pods in a Kubernetes namespace",
"input_schema": {
"type": "object",
"properties": {
"namespace": {
"type": "string",
"description": "The Kubernetes namespace to check"
},
"label_selector": {
"type": "string",
"description": "Optional label selector like 'app=nginx'"
}
},
"required": ["namespace"]
}
},
{
"name": "get_prometheus_metric",
"description": "Query a Prometheus metric using PromQL",
"input_schema": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "PromQL query string"
},
"time_range": {
"type": "string",
"description": "Time range like '5m', '1h', '24h'"
}
},
"required": ["query"]
}
}
]
def execute_tool(tool_name: str, tool_input: dict) -> str:
"""Execute a tool and return result as string."""
if tool_name == "get_kubernetes_pod_status":
# Real implementation would call k8s API
namespace = tool_input["namespace"]
return json.dumps({
"namespace": namespace,
"pods": [
{"name": "nginx-abc", "status": "Running", "restarts": 0},
{"name": "api-xyz", "status": "CrashLoopBackOff", "restarts": 5}
]
})
elif tool_name == "get_prometheus_metric":
# Real implementation would query Prometheus
query = tool_input["query"]
return json.dumps({
"query": query,
"result": [{"metric": {"pod": "api-xyz"}, "value": [1750000000, "0.95"]}]
})
return f"Unknown tool: {tool_name}"
def run_agent(user_message: str) -> str:
"""Run the agent loop until a final answer is produced."""
messages = [{"role": "user", "content": user_message}]
while True:
response = client.messages.create(
model="claude-opus-4-8",
max_tokens=4096,
tools=tools,
messages=messages,
)
# Check if we're done
if response.stop_reason == "end_turn":
# Extract text from response
text_blocks = [b.text for b in response.content if hasattr(b, "text")]
return "\n".join(text_blocks)
# Handle tool calls
if response.stop_reason == "tool_use":
tool_results = []
for block in response.content:
if block.type == "tool_use":
print(f"Calling tool: {block.name}({json.dumps(block.input, indent=2)})")
result = execute_tool(block.name, block.input)
print(f"Result: {result[:200]}...")
tool_results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": result,
})
# Add assistant's tool call message
messages.append({"role": "assistant", "content": response.content})
# Add tool results
messages.append({"role": "user", "content": tool_results})
else:
break
return "No response generated"
# Usage
result = run_agent("What pods are having issues in the production namespace? Check their metrics too.")
print(result)Parallel Tool Calling
When Claude needs multiple independent pieces of information, it can call multiple tools in a single turn. This is much faster than sequential calls.
def run_agent_with_parallel_tools(user_message: str) -> str:
"""Agent that handles multiple tool calls in a single response."""
messages = [{"role": "user", "content": user_message}]
while True:
response = client.messages.create(
model="claude-opus-4-8",
max_tokens=4096,
tools=tools,
messages=messages,
)
if response.stop_reason == "end_turn":
return next(b.text for b in response.content if hasattr(b, "text"))
if response.stop_reason == "tool_use":
# Claude may call MULTIPLE tools at once
tool_use_blocks = [b for b in response.content if b.type == "tool_use"]
print(f"Claude wants to call {len(tool_use_blocks)} tools in parallel")
# Execute all tool calls in parallel
import concurrent.futures
tool_results = []
with concurrent.futures.ThreadPoolExecutor() as executor:
future_to_block = {
executor.submit(execute_tool, block.name, block.input): block
for block in tool_use_blocks
}
for future in concurrent.futures.as_completed(future_to_block):
block = future_to_block[future]
try:
result = future.result()
except Exception as e:
result = f"Error: {e}"
tool_results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": result,
})
messages.append({"role": "assistant", "content": response.content})
messages.append({"role": "user", "content": tool_results})When you ask "check the status of all 5 services", Claude will call 5 tools simultaneously instead of waiting for each one sequentially. This can reduce response time from 25 seconds to 5 seconds.
Error Handling in Tool Results
Tools fail. Handle this gracefully:
def execute_tool_safe(tool_name: str, tool_input: dict) -> str:
"""Execute tool with error handling."""
try:
result = execute_tool(tool_name, tool_input)
return json.dumps({"success": True, "data": result})
except TimeoutError:
return json.dumps({
"success": False,
"error": "Tool timed out after 30 seconds",
"tool": tool_name
})
except PermissionError as e:
return json.dumps({
"success": False,
"error": f"Permission denied: {e}",
"tool": tool_name
})
except Exception as e:
return json.dumps({
"success": False,
"error": str(e),
"tool": tool_name
})Claude handles tool errors gracefully — it'll tell the user "I couldn't get X because of Y, but here's what I found from the other tools."
Tool Result Caching
For tools that are called repeatedly with the same inputs:
import functools
import hashlib
import json
import redis
cache = redis.Redis(host="redis", port=6379)
def cached_tool(ttl_seconds: int = 60):
"""Decorator to cache tool results in Redis."""
def decorator(func):
@functools.wraps(func)
def wrapper(tool_name: str, tool_input: dict) -> str:
# Create cache key from tool name + input
cache_key = f"tool:{tool_name}:{hashlib.md5(json.dumps(tool_input, sort_keys=True).encode()).hexdigest()}"
cached = cache.get(cache_key)
if cached:
return cached.decode()
result = func(tool_name, tool_input)
cache.setex(cache_key, ttl_seconds, result)
return result
return wrapper
return decorator
@cached_tool(ttl_seconds=30)
def execute_tool_cached(tool_name: str, tool_input: dict) -> str:
return execute_tool(tool_name, tool_input)For Prometheus queries, a 30-second cache means the same metric isn't queried 10 times per second when Claude makes multiple tool calls.
Streaming Tool Calls
For better UX, stream the response and tool calls as they happen:
def run_agent_streaming(user_message: str):
"""Stream the agent's thinking and tool calls."""
messages = [{"role": "user", "content": user_message}]
while True:
with client.messages.stream(
model="claude-opus-4-8",
max_tokens=4096,
tools=tools,
messages=messages,
) as stream:
full_response = stream.get_final_message()
if full_response.stop_reason == "end_turn":
break
if full_response.stop_reason == "tool_use":
tool_use_blocks = [b for b in full_response.content if b.type == "tool_use"]
tool_results = []
for block in tool_use_blocks:
yield f"\n🔧 Calling {block.name}...\n"
result = execute_tool(block.name, block.input)
yield f"✅ Got result\n"
tool_results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": result,
})
messages.append({"role": "assistant", "content": full_response.content})
messages.append({"role": "user", "content": tool_results})
for chunk in run_agent_streaming("Analyze the health of our production cluster"):
print(chunk, end="", flush=True)Production Best Practices
- Set token limits per tool call — a tool returning 50MB of logs will break the context window
- Add tool descriptions carefully — Claude picks tools based on descriptions, be specific
- Use
requiredfields — don't make everything optional if the tool needs it - Log every tool call — for debugging and cost tracking
- Implement retries for flaky tools (with backoff) but limit total retries per conversation
- Use
claude-haiku-4-5-20251001for tool selection,claude-opus-4-8for synthesis if cost matters
Resources: Anthropic Tool Use docs | Tool use examples on GitHub
Today I Fixed
Short real fixes from production — posted daily
Stay ahead of the curve
Get the latest DevOps, Kubernetes, AWS, and AI/ML guides delivered straight to your inbox. No spam — just practical engineering content.
Related Articles
AI-Driven Capacity Planning for Kubernetes Clusters (2026)
How to use AI and machine learning for Kubernetes capacity planning. Covers predictive autoscaling, cost optimization, tools like StormForge and Kubecost, and building custom ML models for resource forecasting.
AI-Powered Kubernetes Anomaly Detection: Beyond Static Thresholds
Static alerts miss 40% of real incidents. Learn how AI and ML-based anomaly detection — using tools like Prometheus + ML, Dynatrace, and custom LLM runbooks — catches what thresholds can't.
Argo Workflows vs Prefect vs Airflow — Best for ML Pipelines 2026
Choosing a workflow orchestrator for your ML pipelines? Argo Workflows, Prefect, and Apache Airflow each have distinct strengths. Here's which to pick for your use case.