Production Deployment and Operations

Overview

Building an AI agent is like creating a prototype sports car in your garage—it might work perfectly in controlled conditions, but taking it to a racetrack requires entirely different considerations. You need robust safety systems, reliable performance monitoring, fuel efficiency for long races, and pit crew coordination for maintenance.

Similarly, deploying AI agents to production means transforming development prototypes into enterprise-grade systems that can handle real users, unexpected edge cases, security threats, and scale demands. This lesson focuses on the architectural foundations and scaling strategies essential for production agent deployments.

Learning Objectives

After completing this lesson, you will be able to:

  • Design production-ready architectures for AI agent systems
  • Choose appropriate scaling strategies for different workload patterns
  • Implement microservices architectures for agent systems
  • Design robust deployment patterns with load balancing and fault tolerance
  • Plan capacity and infrastructure requirements for agent workloads

Production Architecture Patterns

Interactive Deployment Architecture Explorer

Loading interactive component...

From Development to Production

The transition from development to production represents a fundamental shift in priorities and constraints:

Development Environment:

  • Single agent instances running locally
  • Synchronous processing with immediate responses
  • Local file-based state storage
  • Manual testing and debugging workflows
  • Direct API access without intermediate layers

Production Environment:

  • Horizontally scaled agent fleets with load balancing
  • Asynchronous, fault-tolerant processing pipelines
  • Distributed state management across multiple nodes
  • Automated monitoring and alerting systems
  • API gateways with authentication and rate limiting

Core Architecture Components

Loading interactive component...

Scaling Strategies Comparison

Loading interactive component...

Different scaling approaches suit different workload characteristics and business requirements:

StrategyComplexityCostThroughputFault ToleranceBest For
Vertical ScalingLowHighLimitedLowSimple workloads, quick scaling
Horizontal ScalingMediumMediumHighHighVariable workloads, high availability
Auto-scalingHighVariableVery HighVery HighUnpredictable traffic patterns
ServerlessLowUsage-basedHighHighEvent-driven, sporadic usage
Container OrchestrationVery HighMediumVery HighVery HighComplex microservices, enterprise

Microservices Architecture for Agents

# Production-Ready Agent Architecture import asyncio import json import time import uuid from typing import Dict, List, Optional, Any from dataclasses import dataclass, field from abc import ABC, abstractmethod from enum import Enum import logging from contextlib import asynccontextmanager # Configure production logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) class ServiceStatus(Enum): HEALTHY = "healthy" DEGRADED = "degraded" UNHEALTHY = "unhealthy" STARTING = "starting" STOPPING = "stopping" @dataclass class HealthCheck: """Health check result for a service""" service_name: str status: ServiceStatus details: Dict[str, Any] = field(default_factory=dict) timestamp: float = field(default_factory=time.time) def to_dict(self) -> Dict[str, Any]: return { "service": self.service_name, "status": self.status.value, "details": self.details, "timestamp": self.timestamp } class ProductionService(ABC): """Base class for production services""" def __init__(self, service_name: str): self.service_name = service_name self.logger = logging.getLogger(service_name) self.status = ServiceStatus.STARTING self.start_time = time.time() self.request_count = 0 self.error_count = 0 @abstractmethod async def initialize(self): """Initialize the service""" pass @abstractmethod async def shutdown(self): """Graceful shutdown""" pass @abstractmethod async def health_check(self) -> HealthCheck: """Perform health check""" pass async def process_request(self, request: Dict[str, Any]) -> Dict[str, Any]: """Process a request with monitoring""" request_id = str(uuid.uuid4()) start_time = time.time() self.logger.info(f"Processing request {request_id}") self.request_count += 1 try: result = await self._handle_request(request) duration = time.time() - start_time self.logger.info(f"Request {request_id} completed in {duration:.3f}s") return { "request_id": request_id, "result": result, "duration": duration, "status": "success" } except Exception as e: self.error_count += 1 duration = time.time() - start_time self.logger.error(f"Request {request_id} failed: {str(e)}") return { "request_id": request_id, "error": str(e), "duration": duration, "status": "error" } @abstractmethod async def _handle_request(self, request: Dict[str, Any]) -> Any: """Handle the actual request""" pass def get_metrics(self) -> Dict[str, Any]: """Get service metrics""" uptime = time.time() - self.start_time error_rate = self.error_count / max(self.request_count, 1) return { "service_name": self.service_name, "status": self.status.value, "uptime_seconds": uptime, "request_count": self.request_count, "error_count": self.error_count, "error_rate": error_rate, "timestamp": time.time() } class AgentService(ProductionService): """Production agent service""" def __init__(self, service_name: str, agent_type: str): super().__init__(service_name) self.agent_type = agent_type self.active_sessions: Dict[str, Dict] = {} self.tool_registry = None self.max_concurrent_sessions = 100 async def initialize(self): """Initialize the agent service""" self.status = ServiceStatus.STARTING self.logger.info(f"Initializing {self.agent_type} agent service") # Initialize agent components await self._initialize_tools() await self._initialize_memory() await self._initialize_planning() self.status = ServiceStatus.HEALTHY self.logger.info("Agent service initialized successfully") async def shutdown(self): """Graceful shutdown""" self.status = ServiceStatus.STOPPING self.logger.info("Shutting down agent service") # Complete active sessions for session_id in list(self.active_sessions.keys()): await self._complete_session(session_id) self.logger.info("Agent service shutdown complete") async def health_check(self) -> HealthCheck: """Perform comprehensive health check""" details = { "agent_type": self.agent_type, "active_sessions": len(self.active_sessions), "max_sessions": self.max_concurrent_sessions, "uptime": time.time() - self.start_time } # Check if we're approaching capacity limits if len(self.active_sessions) > self.max_concurrent_sessions * 0.9: status = ServiceStatus.DEGRADED details["warning"] = "Approaching session limit" elif len(self.active_sessions) >= self.max_concurrent_sessions: status = ServiceStatus.UNHEALTHY details["error"] = "Session limit exceeded" else: status = ServiceStatus.HEALTHY return HealthCheck(self.service_name, status, details) async def _handle_request(self, request: Dict[str, Any]) -> Any: """Handle agent request""" request_type = request.get("type", "chat") if request_type == "chat": return await self._handle_chat_request(request) elif request_type == "task": return await self._handle_task_request(request) elif request_type == "session_start": return await self._start_session(request) elif request_type == "session_end": return await self._end_session(request) else: raise ValueError(f"Unknown request type: {request_type}") async def _handle_chat_request(self, request: Dict[str, Any]) -> Dict[str, Any]: """Handle chat interaction""" session_id = request.get("session_id") message = request.get("message") if not session_id or not message: raise ValueError("session_id and message are required") # Simulate agent processing await asyncio.sleep(0.1) # Simulate thinking time return { "response": f"Agent processed: {message}", "session_id": session_id, "type": "chat_response" } async def _handle_task_request(self, request: Dict[str, Any]) -> Dict[str, Any]: """Handle task execution""" task = request.get("task") priority = request.get("priority", "normal") if not task: raise ValueError("task is required") # Simulate task processing processing_time = 0.5 if priority == "high" else 1.0 await asyncio.sleep(processing_time) return { "result": f"Task completed: {task}", "priority": priority, "type": "task_result" } async def _start_session(self, request: Dict[str, Any]) -> Dict[str, Any]: """Start a new agent session""" if len(self.active_sessions) >= self.max_concurrent_sessions: raise ValueError("Maximum concurrent sessions reached") session_id = str(uuid.uuid4()) session_data = { "created_at": time.time(), "user_id": request.get("user_id"), "context": request.get("context", {}), "message_count": 0 } self.active_sessions[session_id] = session_data return { "session_id": session_id, "status": "started", "type": "session_created" } async def _end_session(self, request: Dict[str, Any]) -> Dict[str, Any]: """End an agent session""" session_id = request.get("session_id") if session_id in self.active_sessions: session_data = self.active_sessions.pop(session_id) duration = time.time() - session_data["created_at"] return { "session_id": session_id, "status": "ended", "duration": duration, "message_count": session_data["message_count"], "type": "session_ended" } else: raise ValueError(f"Session {session_id} not found") async def _complete_session(self, session_id: str): """Complete a session during shutdown""" if session_id in self.active_sessions: session_data = self.active_sessions.pop(session_id) self.logger.info(f"Completed session {session_id} during shutdown") async def _initialize_tools(self): """Initialize tool registry""" # Placeholder for tool initialization self.logger.info("Tools initialized") async def _initialize_memory(self): """Initialize memory systems""" # Placeholder for memory initialization self.logger.info("Memory systems initialized") async def _initialize_planning(self): """Initialize planning components""" # Placeholder for planning initialization self.logger.info("Planning components initialized") # Service orchestration class ServiceOrchestrator: """Orchestrates multiple agent services""" def __init__(self): self.services: Dict[str, ProductionService] = {} self.health_check_interval = 30 # seconds self.running = False def register_service(self, service: ProductionService): """Register a service with the orchestrator""" self.services[service.service_name] = service async def start_all(self): """Start all registered services""" self.running = True # Initialize all services for service in self.services.values(): await service.initialize() # Start health check monitoring asyncio.create_task(self._health_check_loop()) async def stop_all(self): """Stop all services gracefully""" self.running = False # Shutdown all services for service in self.services.values(): await service.shutdown() async def _health_check_loop(self): """Continuous health checking""" while self.running: for service in self.services.values(): try: health = await service.health_check() if health.status != ServiceStatus.HEALTHY: logging.warning(f"Service {service.service_name} health: {health.to_dict()}") except Exception as e: logging.error(f"Health check failed for {service.service_name}: {e}") await asyncio.sleep(self.health_check_interval) def get_system_status(self) -> Dict[str, Any]: """Get overall system status""" service_statuses = {} for name, service in self.services.items(): service_statuses[name] = service.get_metrics() return { "timestamp": time.time(), "services": service_statuses, "total_services": len(self.services), "healthy_services": sum(1 for s in self.services.values() if s.status == ServiceStatus.HEALTHY) } # Example usage async def main(): # Create services chat_agent = AgentService("chat-agent", "conversational") task_agent = AgentService("task-agent", "task-oriented") planning_agent = AgentService("planning-agent", "planning") # Create orchestrator orchestrator = ServiceOrchestrator() orchestrator.register_service(chat_agent) orchestrator.register_service(task_agent) orchestrator.register_service(planning_agent) try: # Start all services await orchestrator.start_all() print("All services started successfully") # Simulate some requests for i in range(5): result = await chat_agent.process_request({ "type": "chat", "session_id": "test-session", "message": f"Hello {i}" }) print(f"Chat result: {result}") # Get system status status = orchestrator.get_system_status() print(f"System status: {json.dumps(status, indent=2)}") except KeyboardInterrupt: print("Shutting down...") finally: await orchestrator.stop_all() # if __name__ == "__main__": # asyncio.run(main())

Container Orchestration with Kubernetes

For enterprise deployments, Kubernetes provides sophisticated orchestration capabilities:

Kubernetes Deployment Strategy

# Kubernetes deployment configuration for agent services apiVersion: apps/v1 kind: Deployment metadata: name: ai-agent-deployment labels: app: ai-agent spec: replicas: 3 selector: matchLabels: app: ai-agent template: metadata: labels: app: ai-agent spec: containers: - name: ai-agent image: your-registry/ai-agent:latest ports: - containerPort: 8000 env: - name: AGENT_TYPE value: "conversational" - name: MAX_CONCURRENT_SESSIONS value: "100" - name: DATABASE_URL valueFrom: secretKeyRef: name: db-secret key: url resources: requests: memory: "512Mi" cpu: "500m" limits: memory: "1Gi" cpu: "1000m" livenessProbe: httpGet: path: /health port: 8000 initialDelaySeconds: 30 periodSeconds: 10 readinessProbe: httpGet: path: /ready port: 8000 initialDelaySeconds: 5 periodSeconds: 5 lifecycle: preStop: exec: command: ["/bin/sh", "-c", "sleep 15"] --- apiVersion: v1 kind: Service metadata: name: ai-agent-service spec: selector: app: ai-agent ports: - protocol: TCP port: 80 targetPort: 8000 type: ClusterIP --- apiVersion: autoscaling/v2 kind: HorizontalPodAutoscaler metadata: name: ai-agent-hpa spec: scaleTargetRef: apiVersion: apps/v1 kind: Deployment name: ai-agent-deployment minReplicas: 3 maxReplicas: 20 metrics: - type: Resource resource: name: cpu target: type: Utilization averageUtilization: 70 - type: Resource resource: name: memory target: type: Utilization averageUtilization: 80 --- apiVersion: networking.k8s.io/v1 kind: Ingress metadata: name: ai-agent-ingress annotations: nginx.ingress.kubernetes.io/rewrite-target: / nginx.ingress.kubernetes.io/rate-limit: "100" spec: rules: - host: api.yourdomain.com http: paths: - path: /agents pathType: Prefix backend: service: name: ai-agent-service port: number: 80

Load Balancing Strategies

Different load balancing approaches optimize for different agent characteristics:

StrategyDescriptionBest ForProsCons
Round RobinDistribute requests evenlyStateless agentsSimple, even distributionIgnores agent load
Least ConnectionsRoute to agent with fewest active connectionsSession-based agentsLoad awarenessMore complex
WeightedRoute based on agent capacityHeterogeneous agentsCapacity optimizationRequires tuning
Session AffinityRoute same user to same agentStateful conversationsConsistencyUneven distribution
GeographicRoute based on user locationGlobal deploymentsLatency optimizationComplex configuration

Infrastructure as Code

Terraform Configuration for Agent Infrastructure

# Terraform configuration for AI agent infrastructure terraform { required_version = ">= 1.0" required_providers { aws = { source = "hashicorp/aws" version = "~> 5.0" } kubernetes = { source = "hashicorp/kubernetes" version = "~> 2.0" } } } # VPC Configuration resource "aws_vpc" "agent_vpc" { cidr_block = "10.0.0.0/16" enable_dns_hostnames = true enable_dns_support = true tags = { Name = "ai-agent-vpc" Environment = var.environment } } # Subnets for high availability resource "aws_subnet" "agent_subnet" { count = 3 vpc_id = aws_vpc.agent_vpc.id cidr_block = "10.0.${count.index + 1}.0/24" availability_zone = data.aws_availability_zones.available.names[count.index] map_public_ip_on_launch = true tags = { Name = "ai-agent-subnet-${count.index + 1}" Environment = var.environment } } # EKS Cluster for container orchestration resource "aws_eks_cluster" "agent_cluster" { name = "ai-agent-cluster" role_arn = aws_iam_role.cluster_role.arn version = "1.28" vpc_config { subnet_ids = aws_subnet.agent_subnet[*].id endpoint_private_access = true endpoint_public_access = true } enabled_cluster_log_types = ["api", "audit", "authenticator", "controllerManager", "scheduler"] depends_on = [ aws_iam_role_policy_attachment.cluster_AmazonEKSClusterPolicy, ] tags = { Environment = var.environment } } # EKS Node Group resource "aws_eks_node_group" "agent_nodes" { cluster_name = aws_eks_cluster.agent_cluster.name node_group_name = "ai-agent-nodes" node_role_arn = aws_iam_role.node_role.arn subnet_ids = aws_subnet.agent_subnet[*].id instance_types = ["t3.large"] scaling_config { desired_size = 3 max_size = 10 min_size = 1 } update_config { max_unavailable = 1 } depends_on = [ aws_iam_role_policy_attachment.node_AmazonEKSWorkerNodePolicy, aws_iam_role_policy_attachment.node_AmazonEKS_CNI_Policy, aws_iam_role_policy_attachment.node_AmazonEC2ContainerRegistryReadOnly, ] tags = { Environment = var.environment } } # RDS for agent state storage resource "aws_db_instance" "agent_db" { identifier = "ai-agent-db" engine = "postgres" engine_version = "15.4" instance_class = "db.t3.medium" allocated_storage = 100 db_name = "agentdb" username = var.db_username password = var.db_password vpc_security_group_ids = [aws_security_group.db_sg.id] db_subnet_group_name = aws_db_subnet_group.agent_db_subnet_group.name backup_retention_period = 7 backup_window = "03:00-04:00" maintenance_window = "sun:04:00-sun:05:00" skip_final_snapshot = true deletion_protection = false tags = { Environment = var.environment } } # ElastiCache for session storage and caching resource "aws_elasticache_subnet_group" "agent_cache_subnet_group" { name = "ai-agent-cache-subnet-group" subnet_ids = aws_subnet.agent_subnet[*].id } resource "aws_elasticache_cluster" "agent_cache" { cluster_id = "ai-agent-cache" engine = "redis" node_type = "cache.t3.micro" num_cache_nodes = 1 parameter_group_name = "default.redis7" port = 6379 subnet_group_name = aws_elasticache_subnet_group.agent_cache_subnet_group.name security_group_ids = [aws_security_group.cache_sg.id] tags = { Environment = var.environment } } # Application Load Balancer resource "aws_lb" "agent_alb" { name = "ai-agent-alb" internal = false load_balancer_type = "application" security_groups = [aws_security_group.alb_sg.id] subnets = aws_subnet.agent_subnet[*].id enable_deletion_protection = false tags = { Environment = var.environment } } # Variables variable "environment" { description = "Environment name" type = string default = "production" } variable "db_username" { description = "Database username" type = string sensitive = true } variable "db_password" { description = "Database password" type = string sensitive = true } # Data sources data "aws_availability_zones" "available" { state = "available" } # Outputs output "cluster_endpoint" { description = "EKS cluster endpoint" value = aws_eks_cluster.agent_cluster.endpoint } output "cluster_name" { description = "EKS cluster name" value = aws_eks_cluster.agent_cluster.name } output "database_endpoint" { description = "RDS database endpoint" value = aws_db_instance.agent_db.endpoint sensitive = true } output "redis_endpoint" { description = "ElastiCache Redis endpoint" value = aws_elasticache_cluster.agent_cache.cache_nodes[0].address }

Capacity Planning and Performance

Resource Requirements Analysis

Planning capacity for agent workloads requires understanding resource consumption patterns:

Agent TypeCPU (cores)Memory (GB)Storage (GB)Network (Mbps)
Simple Chat0.5-1.01-210-2010-50
Tool-Using1.0-2.02-420-5050-100
Planning Agent2.0-4.04-850-100100-200
Multi-Modal4.0-8.08-16100-500200-500
Research Agent2.0-4.04-8100-200500-1000

Performance Optimization Strategies

# Performance optimization for production agents import asyncio import time from typing import Dict, Any, Optional from dataclasses import dataclass import aiohttp import redis.asyncio as redis from contextlib import asynccontextmanager @dataclass class PerformanceMetrics: """Track performance metrics""" requests_per_second: float average_response_time: float p95_response_time: float cache_hit_rate: float concurrent_connections: int memory_usage_mb: float cpu_usage_percent: float class OptimizedAgentService: """High-performance agent service with optimizations""" def __init__(self, config: Dict[str, Any]): self.config = config self.connection_pool = None self.redis_pool = None self.response_times = [] self.request_count = 0 self.start_time = time.time() # Connection pooling self.max_connections = config.get("max_connections", 100) self.pool_timeout = config.get("pool_timeout", 30) # Caching configuration self.cache_ttl = config.get("cache_ttl", 300) # 5 minutes self.cache_enabled = config.get("cache_enabled", True) # Performance thresholds self.max_response_time = config.get("max_response_time", 5.0) self.max_queue_size = config.get("max_queue_size", 1000) async def initialize(self): """Initialize with connection pooling""" # HTTP connection pool for external APIs connector = aiohttp.TCPConnector( limit=self.max_connections, limit_per_host=50, ttl_dns_cache=300, use_dns_cache=True, keepalive_timeout=30, enable_cleanup_closed=True ) self.session = aiohttp.ClientSession( connector=connector, timeout=aiohttp.ClientTimeout(total=self.pool_timeout) ) # Redis connection pool for caching if self.cache_enabled: self.redis_pool = redis.ConnectionPool.from_url( self.config.get("redis_url", "redis://localhost:6379"), max_connections=20, retry_on_timeout=True ) self.redis_client = redis.Redis(connection_pool=self.redis_pool) async def shutdown(self): """Clean shutdown of connections""" if self.session: await self.session.close() if self.redis_pool: await self.redis_pool.disconnect() @asynccontextmanager async def performance_tracking(self, operation: str): """Context manager for performance tracking""" start_time = time.time() try: yield finally: duration = time.time() - start_time self.response_times.append(duration) # Keep only recent measurements if len(self.response_times) > 1000: self.response_times = self.response_times[-1000:] # Alert on slow operations if duration > self.max_response_time: print(f"SLOW OPERATION: {operation} took {duration:.2f}s") async def cached_api_call(self, url: str, cache_key: str, ttl: Optional[int] = None) -> Dict[str, Any]: """Make API call with caching""" ttl = ttl or self.cache_ttl # Try cache first if self.cache_enabled: cached_result = await self.redis_client.get(cache_key) if cached_result: return json.loads(cached_result) # Make actual API call async with self.performance_tracking(f"API call: {url}"): async with self.session.get(url) as response: result = await response.json() # Cache the result if self.cache_enabled: await self.redis_client.setex( cache_key, ttl, json.dumps(result) ) return result async def batch_process(self, requests: List[Dict[str, Any]], batch_size: int = 10) -> List[Dict[str, Any]]: """Process requests in batches for better throughput""" results = [] for i in range(0, len(requests), batch_size): batch = requests[i:i + batch_size] # Process batch concurrently async with self.performance_tracking(f"Batch processing {len(batch)} requests"): batch_results = await asyncio.gather( *[self._process_single_request(req) for req in batch], return_exceptions=True ) results.extend(batch_results) return results async def _process_single_request(self, request: Dict[str, Any]) -> Dict[str, Any]: """Process a single request with optimization""" request_id = request.get("id", "unknown") try: # Simulate processing with potential optimizations processing_time = request.get("complexity", 0.1) # Use caching for expensive operations if processing_time > 1.0: cache_key = f"request:{hash(str(request))}" result = await self.cached_api_call( "http://api.example.com/process", cache_key ) else: # Quick processing await asyncio.sleep(processing_time) result = {"processed": True, "request_id": request_id} self.request_count += 1 return result except Exception as e: return {"error": str(e), "request_id": request_id} def get_performance_metrics(self) -> PerformanceMetrics: """Calculate current performance metrics""" if not self.response_times: return PerformanceMetrics(0, 0, 0, 0, 0, 0, 0) # Calculate response time statistics sorted_times = sorted(self.response_times) avg_time = sum(sorted_times) / len(sorted_times) p95_index = int(len(sorted_times) * 0.95) p95_time = sorted_times[p95_index] if p95_index < len(sorted_times) else sorted_times[-1] # Calculate requests per second elapsed_time = time.time() - self.start_time rps = self.request_count / elapsed_time if elapsed_time > 0 else 0 # Mock other metrics (in production, get from system) cache_hit_rate = 0.85 # Mock value concurrent_connections = 50 # Mock value memory_usage_mb = 512 # Mock value cpu_usage_percent = 45 # Mock value return PerformanceMetrics( requests_per_second=rps, average_response_time=avg_time, p95_response_time=p95_time, cache_hit_rate=cache_hit_rate, concurrent_connections=concurrent_connections, memory_usage_mb=memory_usage_mb, cpu_usage_percent=cpu_usage_percent ) async def health_check(self) -> Dict[str, Any]: """Comprehensive health check""" metrics = self.get_performance_metrics() # Determine health status health_issues = [] if metrics.average_response_time > self.max_response_time: health_issues.append(f"High response time: {metrics.average_response_time:.2f}s") if metrics.requests_per_second < 1.0 and self.request_count > 10: health_issues.append("Low throughput detected") if metrics.cache_hit_rate < 0.5: health_issues.append("Low cache hit rate") return { "status": "healthy" if not health_issues else "degraded", "issues": health_issues, "metrics": { "rps": metrics.requests_per_second, "avg_response_time": metrics.average_response_time, "p95_response_time": metrics.p95_response_time, "cache_hit_rate": metrics.cache_hit_rate, "total_requests": self.request_count } } # Load testing utility async def load_test(service: OptimizedAgentService, concurrent_requests: int = 50, duration: int = 60): """Simple load testing function""" print(f"Starting load test: {concurrent_requests} concurrent requests for {duration} seconds") start_time = time.time() completed_requests = 0 async def make_request(): nonlocal completed_requests while time.time() - start_time < duration: request = { "id": f"req_{completed_requests}", "complexity": 0.1, # Light processing "data": "test data" } try: await service._process_single_request(request) completed_requests += 1 except Exception as e: print(f"Request failed: {e}") await asyncio.sleep(0.01) # Small delay between requests # Start concurrent request generators tasks = [asyncio.create_task(make_request()) for _ in range(concurrent_requests)] # Wait for test duration await asyncio.sleep(duration) # Cancel remaining tasks for task in tasks: task.cancel() # Get final metrics metrics = service.get_performance_metrics() print(f"Load test completed:") print(f" Total requests: {completed_requests}") print(f" RPS: {metrics.requests_per_second:.2f}") print(f" Avg response time: {metrics.average_response_time:.3f}s") print(f" P95 response time: {metrics.p95_response_time:.3f}s") # Example usage async def main(): config = { "max_connections": 100, "cache_enabled": True, "cache_ttl": 300, "max_response_time": 2.0 } service = OptimizedAgentService(config) try: await service.initialize() # Run load test await load_test(service, concurrent_requests=20, duration=30) # Check health health = await service.health_check() print(f"Health status: {health}") finally: await service.shutdown() # if __name__ == "__main__": # asyncio.run(main())

Summary and Best Practices

Production Deployment Checklist

  • Architecture: Microservices design with clear service boundaries
  • Scaling: Horizontal scaling with load balancing configured
  • Infrastructure: Container orchestration (Kubernetes) set up
  • Networking: API gateway with rate limiting and authentication
  • Storage: Distributed databases and caching layers configured
  • Performance: Connection pooling and optimization implemented
  • Health Checks: Comprehensive health monitoring configured

Key Design Principles

  1. Design for Failure: Assume components will fail and plan accordingly
  2. Horizontal Scaling: Scale out, not up, for better fault tolerance
  3. Stateless Services: Keep services stateless for easier scaling
  4. Resource Efficiency: Optimize for both performance and cost
  5. Monitoring First: Build observability from the beginning

Next Steps

You now understand how to architect and deploy AI agent systems for production. In the next lesson, we'll explore monitoring and observability patterns that help you understand, debug, and optimize your agent systems in production environments.

Practice Exercises

  1. Architecture Design: Design a production architecture for a specific agent use case
  2. Kubernetes Deployment: Create complete Kubernetes manifests for an agent service
  3. Load Testing: Implement comprehensive load testing for agent services
  4. Infrastructure as Code: Write Terraform configuration for a complete agent infrastructure

Additional Resources