Skip to content
Home ยป Building Production-Ready MCP Agent Services: Eight Patterns That Matter

Building Production-Ready MCP Agent Services: Eight Patterns That Matter

Model Context Protocol is the new standard for AI agent communication. The specification is elegant, the tooling is maturing, and the promise is real: a unified way for AI systems to discover and invoke capabilities across distributed services.

But here’s what the tutorials won’t tell you: the gap between a working demo and a production system is measured in patterns, not features.

I recently architected a multi-agent system using FastMCP for inter-agent communication. The system works. It scales. It handles failures gracefully. Here’s what it took to get there.

Why MCP Changes the Game

Before MCP, every team built custom integration layers. REST with hand-rolled schemas. GraphQL with agent-specific queries. gRPC with protobuf definitions. WebSocket connections with JSON messages. Every integration was a snowflake.

MCP standardizes this chaos:

  • Unified protocol for tool discovery and execution
  • Native streaming support for real-time updates
  • Built-in session management for stateful interactions
  • Standardized error handling with defined error codes
  • Type safety through JSON Schema validation

FastMCP takes this further by providing a production-ready Python implementation with minimal boilerplate. It’s the FastAPI of agent communication.

The Eight Production Patterns

Pattern 1: Connection Pooling

Creating a new client for every request is the fastest way to kill performance. Connection overhead, TCP handshakes, TLS negotiationโ€”it all adds up.

The solution: connection pooling.

class MCPClientPool:
    """High-performance connection pool for MCP clients."""
    
    def __init__(self, min_size: int = 2, max_size: int = 10):
        self._pools: Dict[str, asyncio.Queue] = defaultdict(
            lambda: asyncio.Queue(maxsize=max_size)
        )
        self._min_size = min_size
        self._max_size = max_size
        self._lock = asyncio.Lock()
        self._initialized_pools = set()
    
    async def initialize_pool(self, url: str, headers: Optional[Dict] = None):
        """Pre-populate pool with minimum number of clients."""
        async with self._lock:
            if url in self._initialized_pools:
                return
            
            pool = self._pools[url]
            
            for i in range(self._min_size):
                try:
                    client = await self._create_client(url, headers)
                    await pool.put(client)
                except Exception as e:
                    logger.error(f"Failed to initialize client {i+1}: {e}")
                    break
            
            self._initialized_pools.add(url)
    
    @asynccontextmanager
    async def get_client(self, url: str, headers: Optional[Dict] = None):
        """Get a client from the pool with automatic return."""
        await self.initialize_pool(url, headers)
        
        pool = self._pools[url]
        client = None
        
        try:
            <em># Try to get client with timeout</em>
            try:
                client = await asyncio.wait_for(pool.get(), timeout=15.0)
            except asyncio.TimeoutError:
                <em># Pool exhausted - create new client if under max</em>
                async with self._lock:
                    if pool.qsize() < self._max_size:
                        client = await self._create_client(url, headers)
                    else:
                        raise PoolExhaustedError(f"Pool full for {url}")
            
            yield client
        
        finally:
            if client:
                await pool.put(client)

This pattern reduced our latency by 60% and eliminated timeout errors under load.

Pattern 2: Streaming with Progress Updates

Long-running operations need progress feedback. FastMCP’s Context object provides this through the report_progress and logging methods.

from fastmcp import FastMCP, Context

mcp = FastMCP("Data Processing Service")

@mcp.tool
async def process_dataset(
    correlation_id: str,
    records: List[Dict[str, Any]],
    ctx: Context = None
) -> Dict[str, Any]:
    """Process dataset with streaming progress updates."""
    
    total_records = len(records)
    processed_records = []
    
    for i, record in enumerate(records):
        <em># Stream progress update</em>
        if ctx:
            await ctx.report_progress(
                progress=i + 1,
                total=total_records,
                message=f"Processing record {i+1} of {total_records}"
            )
        
        <em># Perform processing</em>
        result = await process_single_record(record)
        processed_records.append(result)
        
        <em># Prevent overwhelming the client</em>
        await asyncio.sleep(0.1)
    
    <em># Stream completion</em>
    if ctx:
        await ctx.info(f"Completed processing {total_records} records")
    
    return {
        "status": "success",
        "processed_records": processed_records,
        "total_processed": len(processed_records)
    }

The supervisor receives real-time updates and can show progress to end users.

Pattern 3: Retry Logic with Exponential Backoff

Network calls fail. Your client needs retry logic.

async def call_tool_with_retry(
    self,
    tool_name: str,
    args: Dict[str, Any],
    max_retries: int = 3,
    timeout_s: float = 30.0
) -> Any:
    """Call MCP tool with exponential backoff retry."""
    
    last_error = None
    
    for attempt in range(max_retries):
        try:
            async with self._pool.get_client(self.base_url, self._headers) as client:
                result = await asyncio.wait_for(
                    client.call_tool(tool_name, args),
                    timeout=timeout_s
                )
                return result
        
        except asyncio.TimeoutError:
            last_error = f"Timeout after {timeout_s}s"
            logger.warning(f"Attempt {attempt + 1}/{max_retries} timed out")
        
        except Exception as e:
            last_error = str(e)
            logger.error(f"Attempt {attempt + 1}/{max_retries} failed: {e}")
        
        <em># Exponential backoff</em>
        if attempt < max_retries - 1:
            wait_time = 2 ** attempt
            await asyncio.sleep(wait_time)
    
    raise ToolCallError(f"Failed after {max_retries} attempts: {last_error}")

This handles transient failures without crashing your system.

Pattern 4: Dynamic Tool Discovery

Workers should advertise their capabilities. This enables intelligent routing and dynamic delegation.

@mcp.tool
async def get_capabilities() -> Dict[str, Any]:
    """Return service capabilities for intelligent routing."""
    return {
        "service_name": "data-processing-service",
        "version": "1.0.0",
        "capabilities": {
            "data_validation": {
                "description": "Validate data against schemas",
                "input_schema": {
                    "type": "object",
                    "properties": {
                        "data": {"type": "object"},
                        "schema": {"type": "object"}
                    }
                },
                "estimated_duration_ms": 500
            },
            "data_transformation": {
                "description": "Transform data using rules",
                "input_schema": {
                    "type": "object",
                    "properties": {
                        "data": {"type": "array"},
                        "rules": {"type": "object"}
                    }
                },
                "estimated_duration_ms": 2000
            }
        },
        "supported_features": [
            "streaming_progress",
            "batch_processing",
            "idempotent_operations"
        ],
        "health_status": "healthy"
    }

Supervisors can query capabilities and route requests intelligently.

Pattern 5: Health Checks for Orchestration

Container orchestration platforms need health endpoints. FastMCP’s custom_route decorator makes this trivial.

from starlette.requests import Request
from starlette.responses import JSONResponse

@mcp.custom_route("/health", methods=["GET"])
async def health_check(request: Request) -> JSONResponse:
    """Health check endpoint for container monitoring."""
    try:
        health_status = {
            "status": "healthy",
            "service": "data-processing-service",
            "version": "1.0.0",
            "timestamp": datetime.now(timezone.utc).isoformat(),
            "components": {
                "database": await check_database(),
                "cache": await check_cache(),
                "external_apis": await check_external_apis()
            }
        }

        <em># Check if all components are healthy</em>
        all_healthy = all(health_status["components"].values())

        if all_healthy:
            return JSONResponse(health_status, status_code=200)
        else:
            health_status["status"] = "degraded"
            return JSONResponse(health_status, status_code=503)

    except Exception as e:
        error_status = {
            "status": "unhealthy",
            "service": "data-processing-service",
            "error": str(e),
            "timestamp": datetime.now(timezone.utc).isoformat()
        }
        return JSONResponse(error_status, status_code=503)

This enables proper container orchestration with Kubernetes, Docker Swarm, or ECS.

Pattern 6: Correlation IDs for Distributed Tracing

Track requests across multiple services with correlation IDs.

@mcp.tool
async def process_request(
    correlation_id: str,
    data: Dict[str, Any],
    session_id: Optional[str] = None,
    ctx: Context = None
) -> Dict[str, Any]:
    """Process request with correlation ID for distributed tracing."""

    <em># Set correlation ID in logging context</em>
    logger = get_logger(__name__)
    logger = logger.bind(
        correlation_id=correlation_id,
        session_id=session_id,
        tool="process_request"
    )

    logger.info("Processing request")

    try:
        <em># Process request</em>
        result = await process_workflow(data, correlation_id)

        logger.info("Request processed successfully", result_id=result["id"])

        return {
            "correlation_id": correlation_id,
            "result_id": result["id"],
            "status": "success"
        }

    except Exception as e:
        logger.error("Request processing failed", error=str(e))
        raise

Now you can trace requests through your entire distributed system.

Pattern 7: Idempotency for Reliability

Network failures cause retries. Make operations idempotent to prevent duplicate processing.

<em># Use Redis in production, in-memory for demo</em>
idempotency_cache = {}

@mcp.tool
async def submit_transaction(
    correlation_id: str,
    transaction: Dict[str, Any],
    idempotency_key: Optional[str] = None,
    ctx: Context = None
) -> Dict[str, Any]:
    """Submit transaction with idempotency support."""

    <em># Check idempotency cache</em>
    if idempotency_key:
        if idempotency_key in idempotency_cache:
            logger.info(f"Returning cached result for key: {idempotency_key}")
            return idempotency_cache[idempotency_key]

    <em># Process transaction</em>
    result = await process_transaction(transaction, correlation_id)

    <em># Cache result</em>
    if idempotency_key:
        idempotency_cache[idempotency_key] = result

        <em># Set expiration (clean up after 1 hour)</em>
        asyncio.create_task(expire_cache_entry(idempotency_key, 3600))

    return result

Retries are now safe. Duplicate requests return cached results.

Pattern 8: Structured Logging

Production systems need queryable, analyzable logs.

import structlog

logger = structlog.get_logger()

@mcp.tool
async def analyze_data(
    correlation_id: str,
    dataset: List[Dict[str, Any]],
    ctx: Context = None
) -> Dict[str, Any]:
    """Analyze data with structured logging."""

    logger.info(
        "analysis_started",
        correlation_id=correlation_id,
        dataset_size=len(dataset),
        tool="analyze_data"
    )

    start_time = time.time()

    try:
        result = await perform_analysis(dataset)

        duration_ms = (time.time() - start_time) * 1000

        logger.info(
            "analysis_completed",
            correlation_id=correlation_id,
            dataset_size=len(dataset),
            duration_ms=duration_ms,
            status="success"
        )

        return result

    except Exception as e:
        duration_ms = (time.time() - start_time) * 1000

        logger.error(
            "analysis_failed",
            correlation_id=correlation_id,
            dataset_size=len(dataset),
            duration_ms=duration_ms,
            error=str(e),
            error_type=type(e).__name__
        )

        raise

Structured logs integrate seamlessly with observability platforms like Datadog, Splunk, or ELK.

Common Pitfalls to Avoid

Not Handling Timeouts

<em># Wrong - no timeout</em>
result = await client.call_tool(tool_name, args)

<em># Right - with timeout</em>
result = await asyncio.wait_for(
    client.call_tool(tool_name, args),
    timeout=30.0
)

Forgetting Connection Cleanup

<em># Wrong - connection leak</em>
client = MCPClient(url)
result = await client.call_tool(tool_name, args)

<em># Right - proper cleanup</em>
async with pool.get_client(url) as client:
    result = await client.call_tool(tool_name, args)

Skipping Input Validation

<em># Wrong - no validation</em>
@mcp.tool
async def process_data(data: dict) -> dict:
    return await process(data)

<em># Right - with validation</em>
@mcp.tool
async def process_data(data: dict) -> dict:
    if not data.get("required_field"):
        raise ValueError("Missing required_field")

    return await process(data)

The Bottom Line

MCP is the future of agent communication. But production systems require more than the protocol specification.

You need:

  • Connection pooling for performance
  • Streaming for long operations
  • Retry logic for reliability
  • Health checks for orchestration
  • Correlation IDs for tracing
  • Idempotency for safety
  • Structured logging for observability
  • Input validation for security

Get these patterns right from the start. Your future self will thank you.

Stop Comparing. Start Architecting.

The AI ecosystem is moving toward modular, agentic architectures. MCP is the connective tissue that makes this possible. The question isn’t whether to adopt MCPโ€”it’s how to build it right.

These eight patterns are your foundation. They’re battle-tested, production-proven, and ready to scale.

The future is agentic. The future is modular. The future is MCP.

Build it right.


Discover more from The Data Lead

Subscribe to get the latest posts sent to your email.

Leave a Reply

Your email address will not be published. Required fields are marked *