Overview
Think about how a surgical team operates during a complex procedure. The surgeon leads the operation, but success depends on seamless coordination: the anesthesiologist monitors vital signs, nurses anticipate instrument needs, residents assist with specific tasks, and the team communicates constantly to adapt to changing conditions. Each member has specialized skills, but they must work together as a unified system.
Multi-agent systems bring this same collaborative power to AI. While individual agents can accomplish impressive tasks, teams of specialized agents can tackle problems far beyond what any single agent could handle. But coordination is key—without proper protocols, multiple agents can interfere with each other, duplicate work, or fail to leverage their collective intelligence.
Learning Objectives
After completing this lesson, you will be able to:
- Design coordination protocols that prevent conflicts and ensure cooperation
- Implement communication systems for agent collaboration
- Build distributed problem-solving systems with multiple specialized agents
- Handle negotiation and consensus-building between agents
- Create fault-tolerant multi-agent architectures
Fundamentals of Multi-Agent Coordination
From Single Agents to Agent Teams
Single Agent Limitations:
- Bounded processing capacity
- Limited domain knowledge
- Single point of failure
- Cannot parallelize complex tasks
Multi-Agent Advantages:
- Distributed computation and parallel processing
- Specialized expertise for different domains
- Fault tolerance through redundancy
- Scalable problem-solving capacity
Communication Protocols Visualization
Types of Multi-Agent Coordination
| Type | Communication | Goal Alignment | Complexity | Best Use Cases |
|---|---|---|---|---|
| Cooperative | High | Shared goals | Medium | Research teams, collaborative writing |
| Competitive | Limited | Conflicting goals | High | Trading systems, game playing |
| Coopetitive | Selective | Mixed goals | Very High | Supply chains, marketplaces |
| Hierarchical | Structured | Aligned via hierarchy | Medium | Organizations, command systems |
| Peer-to-Peer | Direct | Negotiated alignment | High | Distributed systems, consensus |
Cooperative Systems: Agents share common goals
- Example: Research team agents gathering information for a report
- Challenge: Avoiding duplication of effort
Competitive Systems: Agents have conflicting goals
- Example: Trading agents in financial markets
- Challenge: Strategic behavior and game theory
Mixed Systems: Some cooperation, some competition
- Example: Auction systems where agents bid cooperatively within teams
- Challenge: Balancing cooperation and competition
# Foundational Multi-Agent System Architecture from typing import Dict, List, Any, Optional, Callable, Set from dataclasses import dataclass, field from enum import Enum from abc import ABC, abstractmethod import asyncio import time import uuid class MessageType(Enum): REQUEST = "request" RESPONSE = "response" BROADCAST = "broadcast" COORDINATION = "coordination" STATUS_UPDATE = "status" @dataclass class Message: """Message passed between agents""" id: str = field(default_factory=lambda: str(uuid.uuid4())) sender_id: str = "" receiver_id: str = "" # Empty for broadcast message_type: MessageType = MessageType.REQUEST content: Dict[str, Any] = field(default_factory=dict) timestamp: float = field(default_factory=time.time) requires_response: bool = False def to_dict(self) -> Dict[str, Any]: return { "id": self.id, "sender": self.sender_id, "receiver": self.receiver_id, "type": self.message_type.value, "content": self.content, "timestamp": self.timestamp, "requires_response": self.requires_response } class CommunicationHub: """Central communication system for multi-agent coordination""" def __init__(self): self.agents: Dict[str, 'AgentBase'] = {} self.message_queue: List[Message] = [] self.message_history: List[Message] = [] self.subscriptions: Dict[str, Set[str]] = {} # topic -> set of agent_ids def register_agent(self, agent: 'AgentBase'): """Register an agent with the communication hub""" self.agents[agent.agent_id] = agent agent.communication_hub = self def send_message(self, message: Message) -> bool: """Send a message through the hub""" self.message_history.append(message) if message.receiver_id: # Direct message if message.receiver_id in self.agents: self.agents[message.receiver_id].receive_message(message) return True return False else: # Broadcast message for agent_id, agent in self.agents.items(): if agent_id != message.sender_id: agent.receive_message(message) return True def subscribe_to_topic(self, agent_id: str, topic: str): """Subscribe agent to a topic for targeted broadcasts""" if topic not in self.subscriptions: self.subscriptions[topic] = set() self.subscriptions[topic].add(agent_id) def broadcast_to_topic(self, message: Message, topic: str): """Broadcast message to agents subscribed to a topic""" if topic in self.subscriptions: for agent_id in self.subscriptions[topic]: if agent_id != message.sender_id and agent_id in self.agents: self.agents[agent_id].receive_message(message) class AgentBase(ABC): """Base class for agents in multi-agent system""" def __init__(self, agent_id: str, capabilities: List[str] = None): self.agent_id = agent_id self.capabilities = capabilities or [] self.communication_hub: Optional[CommunicationHub] = None self.inbox: List[Message] = [] self.knowledge_base: Dict[str, Any] = {} self.current_tasks: List[Dict[str, Any]] = [] self.status = "idle" self.cooperation_history: Dict[str, List[Dict]] = {} def receive_message(self, message: Message): """Receive a message from another agent""" self.inbox.append(message) self.process_message(message) def send_message(self, receiver_id: str, content: Dict[str, Any], message_type: MessageType = MessageType.REQUEST, requires_response: bool = False) -> bool: """Send a message to another agent""" if not self.communication_hub: return False message = Message( sender_id=self.agent_id, receiver_id=receiver_id, message_type=message_type, content=content, requires_response=requires_response ) return self.communication_hub.send_message(message) def broadcast_message(self, content: Dict[str, Any], message_type: MessageType = MessageType.BROADCAST): """Broadcast a message to all agents""" if not self.communication_hub: return False message = Message( sender_id=self.agent_id, receiver_id="", # Empty for broadcast message_type=message_type, content=content ) return self.communication_hub.send_message(message) @abstractmethod def process_message(self, message: Message): """Process an incoming message""" pass @abstractmethod def execute_task(self, task: Dict[str, Any]) -> Dict[str, Any]: """Execute a task""" pass def update_status(self, new_status: str): """Update agent status and broadcast to others""" self.status = new_status self.broadcast_message({ "agent_id": self.agent_id, "status": new_status, "capabilities": self.capabilities }, MessageType.STATUS_UPDATE) # Example: Research Team Multi-Agent System class ResearchAgent(AgentBase): """Agent specialized in research tasks""" def __init__(self, agent_id: str, research_domain: str): super().__init__(agent_id, ["research", "analysis", "summarization"]) self.research_domain = research_domain self.research_cache: Dict[str, Dict[str, Any]] = {} def process_message(self, message: Message): """Process incoming messages""" if message.message_type == MessageType.REQUEST: if message.content.get("task_type") == "research": self.handle_research_request(message) elif message.content.get("task_type") == "collaborate": self.handle_collaboration_request(message) elif message.message_type == MessageType.RESPONSE: self.handle_response(message) def handle_research_request(self, message: Message): """Handle research task requests""" query = message.content.get("query", "") requester = message.sender_id # Check if we have cached results if query in self.research_cache: self.send_response(requester, message.id, self.research_cache[query]) return # Perform research (simulated) self.status = "researching" research_result = self.simulate_research(query) self.research_cache[query] = research_result # Send response self.send_response(requester, message.id, research_result) self.status = "idle" def handle_collaboration_request(self, message: Message): """Handle collaboration requests from other agents""" collaboration_type = message.content.get("collaboration_type") if collaboration_type == "knowledge_sharing": # Share relevant knowledge domain_knowledge = { "domain": self.research_domain, "recent_findings": list(self.research_cache.keys())[-5:], "agent_id": self.agent_id } self.send_response(message.sender_id, message.id, domain_knowledge) def handle_response(self, message: Message): """Handle responses from other agents""" # Store collaboration results if message.sender_id not in self.cooperation_history: self.cooperation_history[message.sender_id] = [] self.cooperation_history[message.sender_id].append({ "timestamp": message.timestamp, "content": message.content }) def send_response(self, receiver_id: str, original_message_id: str, result: Dict[str, Any]): """Send response to a request""" self.send_message(receiver_id, { "original_message_id": original_message_id, "result": result, "research_domain": self.research_domain }, MessageType.RESPONSE) def simulate_research(self, query: str) -> Dict[str, Any]: """Simulate research process""" # In real implementation, this would call actual research tools return { "query": query, "domain": self.research_domain, "findings": f"Research findings for '{query}' in {self.research_domain}", "confidence": 0.8, "sources": [f"source1_{self.research_domain}", f"source2_{self.research_domain}"], "timestamp": time.time() } def execute_task(self, task: Dict[str, Any]) -> Dict[str, Any]: """Execute a research task""" query = task.get("query", "") return self.simulate_research(query) # Demonstration def create_research_team_demo(): """Create a demonstration of multi-agent research coordination""" # Create communication hub hub = CommunicationHub() # Create specialized research agents ai_researcher = ResearchAgent("ai_researcher", "artificial_intelligence") bio_researcher = ResearchAgent("bio_researcher", "biology") physics_researcher = ResearchAgent("physics_researcher", "physics") # Register agents hub.register_agent(ai_researcher) hub.register_agent(bio_researcher) hub.register_agent(physics_researcher) print("Multi-Agent Research Team Demo:") print("=" * 40) # Agent 1 requests research from Agent 2 print("1. AI Researcher requests biology research...") ai_researcher.send_message( "bio_researcher", { "task_type": "research", "query": "neural networks in biological systems" }, requires_response=True ) # Agent 3 broadcasts collaboration request print("2. Physics Researcher broadcasts collaboration request...") physics_researcher.broadcast_message({ "task_type": "collaborate", "collaboration_type": "knowledge_sharing", "topic": "quantum computing applications" }) # Simulate message processing print("3. Processing messages...") time.sleep(0.1) # Allow message processing # Check collaboration results print("4. Collaboration Results:") for agent in [ai_researcher, bio_researcher, physics_researcher]: print(f" {agent.agent_id}: {len(agent.cooperation_history)} collaborations") if agent.cooperation_history: for partner, history in agent.cooperation_history.items(): print(f" - Collaborated with {partner}: {len(history)} interactions") return hub, [ai_researcher, bio_researcher, physics_researcher] # Run demonstration hub, agents = create_research_team_demo()
Coordination Protocols
Task Allocation and Work Distribution
# Advanced Coordination Protocols class TaskAllocationProtocol: """Protocol for allocating tasks among multiple agents""" def __init__(self, agents: List[AgentBase]): self.agents = {agent.agent_id: agent for agent in agents} self.task_queue: List[Dict[str, Any]] = [] self.active_tasks: Dict[str, Dict[str, Any]] = {} self.allocation_strategy = "capability_match" def submit_task(self, task: Dict[str, Any]) -> str: """Submit a task for allocation""" task_id = str(uuid.uuid4()) task["task_id"] = task_id task["status"] = "queued" task["submitted_at"] = time.time() self.task_queue.append(task) self.allocate_tasks() return task_id def allocate_tasks(self): """Allocate queued tasks to available agents""" available_agents = [ agent for agent in self.agents.values() if agent.status == "idle" ] while self.task_queue and available_agents: task = self.task_queue.pop(0) best_agent = self.select_best_agent(task, available_agents) if best_agent: self.assign_task_to_agent(task, best_agent) available_agents.remove(best_agent) def select_best_agent(self, task: Dict[str, Any], available_agents: List[AgentBase]) -> Optional[AgentBase]: """Select the best agent for a task based on capabilities""" required_capabilities = task.get("required_capabilities", []) if not required_capabilities: return available_agents[0] if available_agents else None # Score agents based on capability match best_agent = None best_score = -1 for agent in available_agents: score = len(set(agent.capabilities) & set(required_capabilities)) if score > best_score: best_score = score best_agent = agent return best_agent if best_score > 0 else None def assign_task_to_agent(self, task: Dict[str, Any], agent: AgentBase): """Assign a specific task to an agent""" task_id = task["task_id"] task["assigned_to"] = agent.agent_id task["assigned_at"] = time.time() task["status"] = "assigned" self.active_tasks[task_id] = task agent.current_tasks.append(task) agent.update_status("busy") # Send task to agent agent.receive_message(Message( sender_id="task_allocator", receiver_id=agent.agent_id, message_type=MessageType.REQUEST, content=task )) class ConsensusProtocol: """Protocol for reaching consensus among agents""" def __init__(self, agents: List[AgentBase]): self.agents = {agent.agent_id: agent for agent in agents} self.consensus_sessions: Dict[str, Dict[str, Any]] = {} def initiate_consensus(self, proposal: Dict[str, Any], proposer_id: str) -> str: """Initiate a consensus session""" session_id = str(uuid.uuid4()) session = { "session_id": session_id, "proposal": proposal, "proposer": proposer_id, "votes": {}, "status": "voting", "created_at": time.time(), "voting_deadline": time.time() + 30.0 # 30 second voting window } self.consensus_sessions[session_id] = session # Broadcast proposal to all agents for agent_id, agent in self.agents.items(): if agent_id != proposer_id: agent.receive_message(Message( sender_id="consensus_coordinator", receiver_id=agent_id, message_type=MessageType.REQUEST, content={ "action": "vote", "session_id": session_id, "proposal": proposal, "deadline": session["voting_deadline"] } )) return session_id def cast_vote(self, session_id: str, agent_id: str, vote: str, reasoning: str = ""): """Cast a vote in a consensus session""" if session_id not in self.consensus_sessions: return False session = self.consensus_sessions[session_id] if session["status"] != "voting": return False session["votes"][agent_id] = { "vote": vote, # "approve", "reject", "abstain" "reasoning": reasoning, "timestamp": time.time() } # Check if we have enough votes to make a decision self.check_consensus(session_id) return True def check_consensus(self, session_id: str): """Check if consensus has been reached""" session = self.consensus_sessions[session_id] votes = session["votes"] total_agents = len(self.agents) votes_cast = len(votes) # Check if voting deadline passed or all votes are in current_time = time.time() deadline_passed = current_time > session["voting_deadline"] all_votes_in = votes_cast >= total_agents - 1 # Exclude proposer if deadline_passed or all_votes_in: # Calculate consensus approve_votes = sum(1 for v in votes.values() if v["vote"] == "approve") reject_votes = sum(1 for v in votes.values() if v["vote"] == "reject") # Simple majority rule if approve_votes > reject_votes: session["status"] = "approved" session["result"] = "consensus_reached" else: session["status"] = "rejected" session["result"] = "consensus_failed" session["concluded_at"] = current_time # Notify all agents of the result self.broadcast_consensus_result(session_id) def broadcast_consensus_result(self, session_id: str): """Broadcast consensus result to all agents""" session = self.consensus_sessions[session_id] for agent_id, agent in self.agents.items(): agent.receive_message(Message( sender_id="consensus_coordinator", receiver_id=agent_id, message_type=MessageType.BROADCAST, content={ "action": "consensus_result", "session_id": session_id, "result": session["status"], "proposal": session["proposal"], "vote_summary": { "total_votes": len(session["votes"]), "approve": sum(1 for v in session["votes"].values() if v["vote"] == "approve"), "reject": sum(1 for v in session["votes"].values() if v["vote"] == "reject") } } )) # Enhanced Research Agent with Coordination Capabilities class CoordinatedResearchAgent(ResearchAgent): """Research agent with advanced coordination capabilities""" def __init__(self, agent_id: str, research_domain: str): super().__init__(agent_id, research_domain) self.collaboration_preferences = { "willingness_to_share": 0.8, "preferred_partners": [], "trust_scores": {} } def process_message(self, message: Message): """Enhanced message processing with coordination support""" super().process_message(message) # Handle coordination-specific messages if message.content.get("action") == "vote": self.handle_vote_request(message) elif message.content.get("action") == "consensus_result": self.handle_consensus_result(message) elif message.message_type == MessageType.REQUEST and "task_id" in message.content: self.handle_task_assignment(message) def handle_vote_request(self, message: Message): """Handle voting requests in consensus protocols""" proposal = message.content.get("proposal", {}) session_id = message.content.get("session_id") # Simple voting logic based on domain expertise vote = self.evaluate_proposal(proposal) reasoning = self.generate_vote_reasoning(proposal, vote) # In a real system, this would interact with the consensus protocol print(f"{self.agent_id} votes '{vote}' on proposal: {proposal.get('title', 'Unknown')}") print(f"Reasoning: {reasoning}") def evaluate_proposal(self, proposal: Dict[str, Any]) -> str: """Evaluate a proposal and decide how to vote""" proposal_domain = proposal.get("domain", "") # Vote based on domain expertise if proposal_domain == self.research_domain: return "approve" # Approve proposals in our domain elif proposal_domain in ["artificial_intelligence", "biology", "physics"]: return "approve" # Approve related scientific domains else: return "abstain" # Abstain from unfamiliar domains def generate_vote_reasoning(self, proposal: Dict[str, Any], vote: str) -> str: """Generate reasoning for the vote""" if vote == "approve": return f"Proposal aligns with {self.research_domain} expertise" elif vote == "reject": return f"Proposal conflicts with {self.research_domain} principles" else: return f"Outside {self.research_domain} domain of expertise" def handle_consensus_result(self, message: Message): """Handle consensus results""" result = message.content.get("result") proposal = message.content.get("proposal", {}) if result == "approved": print(f"{self.agent_id}: Consensus reached! Implementing: {proposal.get('title', 'proposal')}") else: print(f"{self.agent_id}: Consensus failed for: {proposal.get('title', 'proposal')}") def handle_task_assignment(self, message: Message): """Handle task assignments from coordination protocol""" task = message.content task_type = task.get("type", "unknown") print(f"{self.agent_id}: Received task assignment - {task_type}") # Execute the task result = self.execute_task(task) # Report completion self.send_message("task_allocator", { "action": "task_completed", "task_id": task.get("task_id"), "result": result }, MessageType.RESPONSE) self.update_status("idle") # Demonstration of Coordination Protocols def demonstrate_coordination_protocols(): """Demonstrate task allocation and consensus protocols""" print("Multi-Agent Coordination Protocol Demo:") print("=" * 45) # Create agents hub = CommunicationHub() agents = [ CoordinatedResearchAgent("ai_agent", "artificial_intelligence"), CoordinatedResearchAgent("bio_agent", "biology"), CoordinatedResearchAgent("physics_agent", "physics") ] for agent in agents: hub.register_agent(agent) # Task Allocation Demo print("1. Task Allocation Protocol:") allocator = TaskAllocationProtocol(agents) tasks = [ { "type": "research", "title": "Machine Learning Applications", "required_capabilities": ["research", "analysis"], "domain": "artificial_intelligence" }, { "type": "analysis", "title": "Protein Folding Study", "required_capabilities": ["research", "analysis"], "domain": "biology" } ] for task in tasks: task_id = allocator.submit_task(task) print(f" Submitted task: {task['title']} (ID: {task_id[:8]}...)") # Consensus Protocol Demo print("\n2. Consensus Protocol:") consensus = ConsensusProtocol(agents) proposal = { "title": "Interdisciplinary AI-Biology Research Initiative", "domain": "interdisciplinary", "description": "Collaborative project combining AI and biology research" } session_id = consensus.initiate_consensus(proposal, "system") print(f" Initiated consensus for: {proposal['title']}") # Simulate voting consensus.cast_vote(session_id, "ai_agent", "approve", "AI expertise valuable") consensus.cast_vote(session_id, "bio_agent", "approve", "Biology component essential") consensus.cast_vote(session_id, "physics_agent", "abstain", "Outside expertise area") return hub, agents, allocator, consensus # Run demonstration hub, agents, allocator, consensus = demonstrate_coordination_protocols()
Distributed Problem-Solving
Hierarchical vs Flat Coordination Structures
# Distributed Problem-Solving Architectures class HierarchicalCoordinator(AgentBase): """Coordinator agent for hierarchical multi-agent systems""" def __init__(self, coordinator_id: str): super().__init__(coordinator_id, ["coordination", "planning", "monitoring"]) self.subordinate_agents: Dict[str, AgentBase] = {} self.active_projects: Dict[str, Dict[str, Any]] = {} self.delegation_history: List[Dict[str, Any]] = [] def add_subordinate(self, agent: AgentBase): """Add an agent as a subordinate""" self.subordinate_agents[agent.agent_id] = agent # Subscribe to their status updates if self.communication_hub: self.communication_hub.subscribe_to_topic(self.agent_id, f"status_{agent.agent_id}") def initiate_project(self, project: Dict[str, Any]) -> str: """Initiate a complex project requiring multiple agents""" project_id = str(uuid.uuid4()) project["project_id"] = project_id project["status"] = "planning" project["created_at"] = time.time() project["coordinator"] = self.agent_id self.active_projects[project_id] = project # Decompose project into tasks tasks = self.decompose_project(project) project["tasks"] = tasks # Plan task allocation allocation_plan = self.plan_task_allocation(tasks) project["allocation_plan"] = allocation_plan # Execute allocation plan self.execute_allocation_plan(project_id, allocation_plan) return project_id def decompose_project(self, project: Dict[str, Any]) -> List[Dict[str, Any]]: """Decompose a project into manageable tasks""" project_type = project.get("type", "research") if project_type == "research": return self.decompose_research_project(project) elif project_type == "analysis": return self.decompose_analysis_project(project) else: return [{"type": "general", "description": project.get("description", "")}] def decompose_research_project(self, project: Dict[str, Any]) -> List[Dict[str, Any]]: """Decompose a research project into specialized tasks""" tasks = [] domains = project.get("domains", ["general"]) for domain in domains: tasks.extend([ { "type": "literature_review", "domain": domain, "required_capabilities": ["research"], "estimated_duration": 2.0, "dependencies": [] }, { "type": "data_collection", "domain": domain, "required_capabilities": ["research", "analysis"], "estimated_duration": 3.0, "dependencies": ["literature_review"] }, { "type": "analysis", "domain": domain, "required_capabilities": ["analysis"], "estimated_duration": 4.0, "dependencies": ["data_collection"] } ]) # Add synthesis task tasks.append({ "type": "synthesis", "domain": "interdisciplinary", "required_capabilities": ["analysis", "summarization"], "estimated_duration": 2.0, "dependencies": [f"analysis_{domain}" for domain in domains] }) return tasks def decompose_analysis_project(self, project: Dict[str, Any]) -> List[Dict[str, Any]]: """Decompose an analysis project""" return [ { "type": "data_preprocessing", "required_capabilities": ["analysis"], "estimated_duration": 1.0, "dependencies": [] }, { "type": "statistical_analysis", "required_capabilities": ["analysis"], "estimated_duration": 2.0, "dependencies": ["data_preprocessing"] }, { "type": "visualization", "required_capabilities": ["analysis"], "estimated_duration": 1.0, "dependencies": ["statistical_analysis"] } ] def plan_task_allocation(self, tasks: List[Dict[str, Any]]) -> Dict[str, List[str]]: """Plan how to allocate tasks to subordinate agents""" allocation = {} for task in tasks: required_caps = task.get("required_capabilities", []) best_agents = self.find_capable_agents(required_caps) if best_agents: task_id = f"{task['type']}_{task.get('domain', 'general')}" allocation[task_id] = best_agents[:1] # Assign to best agent return allocation def find_capable_agents(self, required_capabilities: List[str]) -> List[str]: """Find agents capable of handling required capabilities""" capable_agents = [] for agent_id, agent in self.subordinate_agents.items(): if agent.status == "idle": overlap = set(agent.capabilities) & set(required_capabilities) if overlap: capable_agents.append((agent_id, len(overlap))) # Sort by capability overlap (descending) capable_agents.sort(key=lambda x: x[1], reverse=True) return [agent_id for agent_id, _ in capable_agents] def execute_allocation_plan(self, project_id: str, allocation_plan: Dict[str, List[str]]): """Execute the task allocation plan""" project = self.active_projects[project_id] for task_id, agent_ids in allocation_plan.items(): if agent_ids: agent_id = agent_ids[0] # Take first (best) agent # Find the corresponding task task = next((t for t in project["tasks"] if f"{t['type']}_{t.get('domain', 'general')}" == task_id), None) if task: self.delegate_task(agent_id, task, project_id) def delegate_task(self, agent_id: str, task: Dict[str, Any], project_id: str): """Delegate a specific task to an agent""" task["task_id"] = str(uuid.uuid4()) task["project_id"] = project_id task["assigned_to"] = agent_id task["assigned_at"] = time.time() # Record delegation delegation_record = { "task_id": task["task_id"], "agent_id": agent_id, "project_id": project_id, "task_type": task["type"], "delegated_at": time.time() } self.delegation_history.append(delegation_record) # Send task to agent self.send_message(agent_id, { "action": "task_assignment", "task": task, "project_id": project_id, "coordinator": self.agent_id }, MessageType.REQUEST, requires_response=True) print(f"Coordinator: Delegated {task['type']} task to {agent_id}") def process_message(self, message: Message): """Process messages from subordinate agents""" if message.message_type == MessageType.RESPONSE: action = message.content.get("action") if action == "task_completed": self.handle_task_completion(message) elif action == "task_progress": self.handle_progress_update(message) elif action == "request_assistance": self.handle_assistance_request(message) elif message.message_type == MessageType.STATUS_UPDATE: self.handle_agent_status_update(message) def handle_task_completion(self, message: Message): """Handle task completion notifications""" task_id = message.content.get("task_id") result = message.content.get("result", {}) agent_id = message.sender_id print(f"Coordinator: Task {task_id} completed by {agent_id}") # Update project status project_id = message.content.get("project_id") if project_id in self.active_projects: self.update_project_progress(project_id, task_id, result) def handle_progress_update(self, message: Message): """Handle progress updates from agents""" progress = message.content.get("progress", 0) task_id = message.content.get("task_id") print(f"Coordinator: Progress update for {task_id}: {progress}%") def handle_assistance_request(self, message: Message): """Handle requests for assistance from agents""" requester = message.sender_id assistance_type = message.content.get("assistance_type") print(f"Coordinator: Assistance request from {requester}: {assistance_type}") # Find available agents who can help helper_agents = self.find_available_helpers(assistance_type) if helper_agents: # Assign helper helper_id = helper_agents[0] self.facilitate_collaboration(requester, helper_id, message.content) def find_available_helpers(self, assistance_type: str) -> List[str]: """Find agents available to provide assistance""" available = [] for agent_id, agent in self.subordinate_agents.items(): if agent.status in ["idle", "available"]: if assistance_type in agent.capabilities: available.append(agent_id) return available def facilitate_collaboration(self, requester_id: str, helper_id: str, request_details: Dict[str, Any]): """Facilitate collaboration between two agents""" # Notify helper about collaboration request self.send_message(helper_id, { "action": "collaboration_request", "requester": requester_id, "details": request_details, "facilitated_by": self.agent_id }) print(f"Coordinator: Facilitating collaboration between {requester_id} and {helper_id}") def update_project_progress(self, project_id: str, completed_task_id: str, result: Dict[str, Any]): """Update overall project progress""" project = self.active_projects[project_id] # Mark task as completed for task in project.get("tasks", []): if task.get("task_id") == completed_task_id: task["status"] = "completed" task["result"] = result task["completed_at"] = time.time() break # Check if project is complete completed_tasks = sum(1 for task in project.get("tasks", []) if task.get("status") == "completed") total_tasks = len(project.get("tasks", [])) if completed_tasks == total_tasks: project["status"] = "completed" project["completed_at"] = time.time() print(f"Coordinator: Project {project_id} completed!") self.finalize_project(project_id) def finalize_project(self, project_id: str): """Finalize a completed project""" project = self.active_projects[project_id] # Compile final results final_results = { "project_id": project_id, "title": project.get("title", "Unnamed Project"), "results": [task.get("result", {}) for task in project.get("tasks", [])], "duration": project.get("completed_at", 0) - project.get("created_at", 0), "participating_agents": list(set(task.get("assigned_to") for task in project.get("tasks", []) if task.get("assigned_to"))) } # Broadcast project completion self.broadcast_message({ "action": "project_completed", "project_results": final_results }) def execute_task(self, task: Dict[str, Any]) -> Dict[str, Any]: """Execute coordination tasks""" return {"status": "coordination_task_completed"} # Demonstration def demonstrate_hierarchical_coordination(): """Demonstrate hierarchical coordination""" print("Hierarchical Multi-Agent Coordination Demo:") print("=" * 45) # Create communication hub hub = CommunicationHub() # Create coordinator coordinator = HierarchicalCoordinator("project_coordinator") hub.register_agent(coordinator) # Create subordinate agents subordinates = [ CoordinatedResearchAgent("ai_specialist", "artificial_intelligence"), CoordinatedResearchAgent("bio_specialist", "biology"), CoordinatedResearchAgent("data_analyst", "data_analysis") ] for agent in subordinates: hub.register_agent(agent) coordinator.add_subordinate(agent) # Initiate a complex project project = { "title": "AI-Driven Biological Data Analysis", "type": "research", "description": "Analyze biological datasets using AI techniques", "domains": ["artificial_intelligence", "biology"], "deadline": time.time() + 86400 # 24 hours } project_id = coordinator.initiate_project(project) print(f"Initiated project: {project['title']}") print(f"Project ID: {project_id}") # Simulate some task completions time.sleep(0.1) return hub, coordinator, subordinates # Run demonstration hub, coordinator, subordinates = demonstrate_hierarchical_coordination()
Looking Ahead
Perfect! We've completed Lesson 8 on Multi-Agent Systems and Coordination.
Status Update:
- ✅ 8 lessons completed out of 12 total
- 4 lessons remaining:
- Deployment and Production
- Performance Optimization
- Ethics and Safety
- Future Directions
The next logical step is Lesson 9: Deployment and Production - moving from development to real-world deployment of AI agent systems. This will cover scalability, monitoring, reliability, and production best practices.
Shall I continue with creating the Deployment and Production lesson next?