Model Context Protocol is the new standard for AI agent communication. The specification is elegant, the tooling is maturing, and the promise is real: a unified way for AI systems to discover and invoke capabilities across distributed services.
But here’s what the tutorials won’t tell you: the gap between a working demo and a production system is measured in patterns, not features.
I recently architected a multi-agent system using FastMCP for inter-agent communication. The system works. It scales. It handles failures gracefully. Here’s what it took to get there.
Why MCP Changes the Game
Before MCP, every team built custom integration layers. REST with hand-rolled schemas. GraphQL with agent-specific queries. gRPC with protobuf definitions. WebSocket connections with JSON messages. Every integration was a snowflake.
MCP standardizes this chaos:
- Unified protocol for tool discovery and execution
- Native streaming support for real-time updates
- Built-in session management for stateful interactions
- Standardized error handling with defined error codes
- Type safety through JSON Schema validation
FastMCP takes this further by providing a production-ready Python implementation with minimal boilerplate. It’s the FastAPI of agent communication.
The Eight Production Patterns
Pattern 1: Connection Pooling
Creating a new client for every request is the fastest way to kill performance. Connection overhead, TCP handshakes, TLS negotiationโit all adds up.
The solution: connection pooling.
class MCPClientPool:
"""High-performance connection pool for MCP clients."""
def __init__(self, min_size: int = 2, max_size: int = 10):
self._pools: Dict[str, asyncio.Queue] = defaultdict(
lambda: asyncio.Queue(maxsize=max_size)
)
self._min_size = min_size
self._max_size = max_size
self._lock = asyncio.Lock()
self._initialized_pools = set()
async def initialize_pool(self, url: str, headers: Optional[Dict] = None):
"""Pre-populate pool with minimum number of clients."""
async with self._lock:
if url in self._initialized_pools:
return
pool = self._pools[url]
for i in range(self._min_size):
try:
client = await self._create_client(url, headers)
await pool.put(client)
except Exception as e:
logger.error(f"Failed to initialize client {i+1}: {e}")
break
self._initialized_pools.add(url)
@asynccontextmanager
async def get_client(self, url: str, headers: Optional[Dict] = None):
"""Get a client from the pool with automatic return."""
await self.initialize_pool(url, headers)
pool = self._pools[url]
client = None
try:
<em># Try to get client with timeout</em>
try:
client = await asyncio.wait_for(pool.get(), timeout=15.0)
except asyncio.TimeoutError:
<em># Pool exhausted - create new client if under max</em>
async with self._lock:
if pool.qsize() < self._max_size:
client = await self._create_client(url, headers)
else:
raise PoolExhaustedError(f"Pool full for {url}")
yield client
finally:
if client:
await pool.put(client)
This pattern reduced our latency by 60% and eliminated timeout errors under load.
Pattern 2: Streaming with Progress Updates
Long-running operations need progress feedback. FastMCP’s Context object provides this through the report_progress and logging methods.
from fastmcp import FastMCP, Context
mcp = FastMCP("Data Processing Service")
@mcp.tool
async def process_dataset(
correlation_id: str,
records: List[Dict[str, Any]],
ctx: Context = None
) -> Dict[str, Any]:
"""Process dataset with streaming progress updates."""
total_records = len(records)
processed_records = []
for i, record in enumerate(records):
<em># Stream progress update</em>
if ctx:
await ctx.report_progress(
progress=i + 1,
total=total_records,
message=f"Processing record {i+1} of {total_records}"
)
<em># Perform processing</em>
result = await process_single_record(record)
processed_records.append(result)
<em># Prevent overwhelming the client</em>
await asyncio.sleep(0.1)
<em># Stream completion</em>
if ctx:
await ctx.info(f"Completed processing {total_records} records")
return {
"status": "success",
"processed_records": processed_records,
"total_processed": len(processed_records)
}
The supervisor receives real-time updates and can show progress to end users.
Pattern 3: Retry Logic with Exponential Backoff
Network calls fail. Your client needs retry logic.
async def call_tool_with_retry(
self,
tool_name: str,
args: Dict[str, Any],
max_retries: int = 3,
timeout_s: float = 30.0
) -> Any:
"""Call MCP tool with exponential backoff retry."""
last_error = None
for attempt in range(max_retries):
try:
async with self._pool.get_client(self.base_url, self._headers) as client:
result = await asyncio.wait_for(
client.call_tool(tool_name, args),
timeout=timeout_s
)
return result
except asyncio.TimeoutError:
last_error = f"Timeout after {timeout_s}s"
logger.warning(f"Attempt {attempt + 1}/{max_retries} timed out")
except Exception as e:
last_error = str(e)
logger.error(f"Attempt {attempt + 1}/{max_retries} failed: {e}")
<em># Exponential backoff</em>
if attempt < max_retries - 1:
wait_time = 2 ** attempt
await asyncio.sleep(wait_time)
raise ToolCallError(f"Failed after {max_retries} attempts: {last_error}")
This handles transient failures without crashing your system.
Pattern 4: Dynamic Tool Discovery
Workers should advertise their capabilities. This enables intelligent routing and dynamic delegation.
@mcp.tool
async def get_capabilities() -> Dict[str, Any]:
"""Return service capabilities for intelligent routing."""
return {
"service_name": "data-processing-service",
"version": "1.0.0",
"capabilities": {
"data_validation": {
"description": "Validate data against schemas",
"input_schema": {
"type": "object",
"properties": {
"data": {"type": "object"},
"schema": {"type": "object"}
}
},
"estimated_duration_ms": 500
},
"data_transformation": {
"description": "Transform data using rules",
"input_schema": {
"type": "object",
"properties": {
"data": {"type": "array"},
"rules": {"type": "object"}
}
},
"estimated_duration_ms": 2000
}
},
"supported_features": [
"streaming_progress",
"batch_processing",
"idempotent_operations"
],
"health_status": "healthy"
}
Supervisors can query capabilities and route requests intelligently.
Pattern 5: Health Checks for Orchestration
Container orchestration platforms need health endpoints. FastMCP’s custom_route decorator makes this trivial.
from starlette.requests import Request
from starlette.responses import JSONResponse
@mcp.custom_route("/health", methods=["GET"])
async def health_check(request: Request) -> JSONResponse:
"""Health check endpoint for container monitoring."""
try:
health_status = {
"status": "healthy",
"service": "data-processing-service",
"version": "1.0.0",
"timestamp": datetime.now(timezone.utc).isoformat(),
"components": {
"database": await check_database(),
"cache": await check_cache(),
"external_apis": await check_external_apis()
}
}
<em># Check if all components are healthy</em>
all_healthy = all(health_status["components"].values())
if all_healthy:
return JSONResponse(health_status, status_code=200)
else:
health_status["status"] = "degraded"
return JSONResponse(health_status, status_code=503)
except Exception as e:
error_status = {
"status": "unhealthy",
"service": "data-processing-service",
"error": str(e),
"timestamp": datetime.now(timezone.utc).isoformat()
}
return JSONResponse(error_status, status_code=503)
This enables proper container orchestration with Kubernetes, Docker Swarm, or ECS.
Pattern 6: Correlation IDs for Distributed Tracing
Track requests across multiple services with correlation IDs.
@mcp.tool
async def process_request(
correlation_id: str,
data: Dict[str, Any],
session_id: Optional[str] = None,
ctx: Context = None
) -> Dict[str, Any]:
"""Process request with correlation ID for distributed tracing."""
<em># Set correlation ID in logging context</em>
logger = get_logger(__name__)
logger = logger.bind(
correlation_id=correlation_id,
session_id=session_id,
tool="process_request"
)
logger.info("Processing request")
try:
<em># Process request</em>
result = await process_workflow(data, correlation_id)
logger.info("Request processed successfully", result_id=result["id"])
return {
"correlation_id": correlation_id,
"result_id": result["id"],
"status": "success"
}
except Exception as e:
logger.error("Request processing failed", error=str(e))
raise
Now you can trace requests through your entire distributed system.
Pattern 7: Idempotency for Reliability
Network failures cause retries. Make operations idempotent to prevent duplicate processing.
<em># Use Redis in production, in-memory for demo</em>
idempotency_cache = {}
@mcp.tool
async def submit_transaction(
correlation_id: str,
transaction: Dict[str, Any],
idempotency_key: Optional[str] = None,
ctx: Context = None
) -> Dict[str, Any]:
"""Submit transaction with idempotency support."""
<em># Check idempotency cache</em>
if idempotency_key:
if idempotency_key in idempotency_cache:
logger.info(f"Returning cached result for key: {idempotency_key}")
return idempotency_cache[idempotency_key]
<em># Process transaction</em>
result = await process_transaction(transaction, correlation_id)
<em># Cache result</em>
if idempotency_key:
idempotency_cache[idempotency_key] = result
<em># Set expiration (clean up after 1 hour)</em>
asyncio.create_task(expire_cache_entry(idempotency_key, 3600))
return result
Retries are now safe. Duplicate requests return cached results.
Pattern 8: Structured Logging
Production systems need queryable, analyzable logs.
import structlog
logger = structlog.get_logger()
@mcp.tool
async def analyze_data(
correlation_id: str,
dataset: List[Dict[str, Any]],
ctx: Context = None
) -> Dict[str, Any]:
"""Analyze data with structured logging."""
logger.info(
"analysis_started",
correlation_id=correlation_id,
dataset_size=len(dataset),
tool="analyze_data"
)
start_time = time.time()
try:
result = await perform_analysis(dataset)
duration_ms = (time.time() - start_time) * 1000
logger.info(
"analysis_completed",
correlation_id=correlation_id,
dataset_size=len(dataset),
duration_ms=duration_ms,
status="success"
)
return result
except Exception as e:
duration_ms = (time.time() - start_time) * 1000
logger.error(
"analysis_failed",
correlation_id=correlation_id,
dataset_size=len(dataset),
duration_ms=duration_ms,
error=str(e),
error_type=type(e).__name__
)
raise
Structured logs integrate seamlessly with observability platforms like Datadog, Splunk, or ELK.
Common Pitfalls to Avoid
Not Handling Timeouts
<em># Wrong - no timeout</em>
result = await client.call_tool(tool_name, args)
<em># Right - with timeout</em>
result = await asyncio.wait_for(
client.call_tool(tool_name, args),
timeout=30.0
)
Forgetting Connection Cleanup
<em># Wrong - connection leak</em>
client = MCPClient(url)
result = await client.call_tool(tool_name, args)
<em># Right - proper cleanup</em>
async with pool.get_client(url) as client:
result = await client.call_tool(tool_name, args)
Skipping Input Validation
<em># Wrong - no validation</em>
@mcp.tool
async def process_data(data: dict) -> dict:
return await process(data)
<em># Right - with validation</em>
@mcp.tool
async def process_data(data: dict) -> dict:
if not data.get("required_field"):
raise ValueError("Missing required_field")
return await process(data)
The Bottom Line
MCP is the future of agent communication. But production systems require more than the protocol specification.
You need:
- Connection pooling for performance
- Streaming for long operations
- Retry logic for reliability
- Health checks for orchestration
- Correlation IDs for tracing
- Idempotency for safety
- Structured logging for observability
- Input validation for security
Get these patterns right from the start. Your future self will thank you.
Stop Comparing. Start Architecting.
The AI ecosystem is moving toward modular, agentic architectures. MCP is the connective tissue that makes this possible. The question isn’t whether to adopt MCPโit’s how to build it right.
These eight patterns are your foundation. They’re battle-tested, production-proven, and ready to scale.
The future is agentic. The future is modular. The future is MCP.
Build it right.
Discover more from The Data Lead
Subscribe to get the latest posts sent to your email.
