Multi-Agent System: Enabling AI Collaboration
A single AI model has limits. When multiple agents collaborate, they can solve far more complex problems. This guide shows how to build efficient multi-agent systems.
Multi-agent Architecture Design
System Architecture Overview
// Multi-Agent System framework
class MultiAgentSystem {
private agents: Map<string, Agent>;
private coordinator: Coordinator;
private messageQueue: MessageQueue;
private sharedMemory: SharedMemory;
constructor() {
this.agents = new Map();
this.coordinator = new Coordinator();
this.messageQueue = new MessageQueue();
this.sharedMemory = new SharedMemory();
}
// Register an agent
registerAgent(agent: Agent) {
this.agents.set(agent.id, agent);
// Set up communication channels
agent.setMessageQueue(this.messageQueue);
agent.setSharedMemory(this.sharedMemory);
// Register with the coordinator
this.coordinator.register(agent);
}
// Execute a task
async executeTask(task: Task): Promise<TaskResult> {
// 1. Decompose task
const subtasks = await this.coordinator.decomposeTask(task);
// 2. Assign tasks
const assignments = await this.coordinator.assignTasks(
subtasks,
this.agents
);
// 3. Execute in parallel
const results = await Promise.all(
assignments.map(async ({ agent, subtask }) => {
return agent.execute(subtask);
})
);
// 4. Aggregate results
return this.coordinator.aggregateResults(results);
}
}
// Agent base class
abstract class Agent {
readonly id: string;
readonly capabilities: string[];
private messageQueue: MessageQueue;
private memory: SharedMemory;
constructor(id: string, capabilities: string[]) {
this.id = id;
this.capabilities = capabilities;
}
// Execute a subtask
abstract async execute(task: Subtask): Promise<any>;
// Send a message to another agent
async sendMessage(recipientId: string, message: Message) {
await this.messageQueue.send({
from: this.id,
to: recipientId,
content: message,
timestamp: Date.now()
});
}
// Receive messages
async receiveMessages(): Promise<Message[]> {
return this.messageQueue.receive(this.id);
}
// Access shared memory
async readMemory(key: string): Promise<any> {
return this.memory.get(key);
}
async writeMemory(key: string, value: any) {
await this.memory.set(key, value);
}
}
// Specialized agent implementation
class ResearchAgent extends Agent {
private llm: LLMClient;
constructor() {
super('research-agent', ['research', 'analysis']);
this.llm = new LLMClient({ model: 'gpt-4' });
}
async execute(task: Subtask): Promise<ResearchResult> {
// 1. Understand task
const understanding = await this.understandTask(task);
// 2. Gather information
const information = await this.gatherInformation(
understanding.keywords
);
// 3. Analyze
const analysis = await this.analyzeInformation(information);
// 4. Generate report
const report = await this.generateReport(analysis);
// 5. Write to shared memory
await this.writeMemory(
`research_${task.id}`,
report
);
return report;
}
private async gatherInformation(keywords: string[]) {
// Search related information
const searchResults = await this.searchWeb(keywords);
// Extract key info
const extracted = await this.llm.extract({
prompt: "Extract key information from search results",
data: searchResults
});
return extracted;
}
}Task Decomposition and Coordination
Intelligent Task Scheduler
// Coordinator implementation
class Coordinator {
private planner: TaskPlanner;
private scheduler: TaskScheduler;
private monitor: TaskMonitor;
constructor() {
this.planner = new TaskPlanner();
this.scheduler = new TaskScheduler();
this.monitor = new TaskMonitor();
}
// Decompose a task
async decomposeTask(task: Task): Promise<Subtask[]> {
// Use LLM to decompose
const decomposition = await this.planner.decompose({
task: task.description,
constraints: task.constraints,
resources: task.resources
});
// Create dependency graph
const dependencyGraph = this.createDependencyGraph(
decomposition.subtasks
);
// Optimize execution order
const optimizedOrder = this.topologicalSort(dependencyGraph);
return optimizedOrder.map(node => ({
id: generateId(),
name: node.name,
description: node.description,
dependencies: node.dependencies,
requiredCapabilities: node.capabilities,
estimatedDuration: node.duration,
priority: node.priority
}));
}
// Task assignment algorithm
async assignTasks(
subtasks: Subtask[],
agents: Map<string, Agent>
): Promise<Assignment[]> {
const assignments: Assignment[] = [];
const agentWorkload = new Map<string, number>();
// Initialize workload
agents.forEach((_, id) => agentWorkload.set(id, 0));
// Sort by priority
const sortedTasks = subtasks.sort((a, b) =>
b.priority - a.priority
);
for (const subtask of sortedTasks) {
// Find best agent
const bestAgent = this.findBestAgent(
subtask,
agents,
agentWorkload
);
if (!bestAgent) {
throw new Error(`No agent available for task ${subtask.id}`);
}
// Assign task
assignments.push({
agent: bestAgent,
subtask: subtask
});
// Update workload
const currentLoad = agentWorkload.get(bestAgent.id) || 0;
agentWorkload.set(
bestAgent.id,
currentLoad + subtask.estimatedDuration
);
}
return assignments;
}
// Select the best agent
private findBestAgent(
subtask: Subtask,
agents: Map<string, Agent>,
workload: Map<string, number>
): Agent | null {
let bestAgent: Agent | null = null;
let bestScore = -Infinity;
agents.forEach(agent => {
// Capability match
const hasCapabilities = subtask.requiredCapabilities.every(
cap => agent.capabilities.includes(cap)
);
if (!hasCapabilities) return;
// Fitness score
const score = this.calculateFitness(
agent,
subtask,
workload.get(agent.id) || 0
);
if (score > bestScore) {
bestScore = score;
bestAgent = agent;
}
});
return bestAgent;
}
// Aggregate results
async aggregateResults(results: any[]): Promise<TaskResult> {
// Validate
const validResults = results.filter(r => this.validateResult(r));
// Merge results
const merged = this.mergeResults(validResults);
// Quality check
const quality = await this.assessQuality(merged);
return {
success: quality.score > 0.8,
data: merged,
quality: quality,
metadata: {
totalSubtasks: results.length,
successfulSubtasks: validResults.length,
executionTime: this.monitor.getTotalTime(),
agentPerformance: this.monitor.getAgentStats()
}
};
}
}Agent Communication
Message-passing System
Message Queue Implementation
class MessageQueue {
private queues: Map<string, Message[]>;
private subscribers: Map<string, Subscriber[]>;
constructor() {
this.queues = new Map();
this.subscribers = new Map();
}
// Send a message
async send(message: Message) {
const queue = this.getOrCreateQueue(message.to);
queue.push(message);
// Notify subscribers
await this.notifySubscribers(message.to, message);
// Log message
this.logMessage(message);
}
// Broadcast
async broadcast(
from: string,
content: any,
filter?: (agent: Agent) => boolean
) {
const agents = this.getAllAgents();
for (const agentId of agents) {
if (agentId === from) continue;
if (filter && !filter(this.getAgent(agentId))) continue;
await this.send({
from,
to: agentId,
content,
type: 'broadcast',
timestamp: Date.now()
});
}
}
// Subscribe
subscribe(
agentId: string,
callback: (message: Message) => void
) {
const subs = this.subscribers.get(agentId) || [];
subs.push({ callback });
this.subscribers.set(agentId, subs);
}
}Protocol Definition
// Communication protocol
interface Protocol {
// Request help
requestHelp(task: Task): Promise<HelpResponse>;
// Share information
shareInformation(info: Information): void;
// Negotiate decisions
negotiate(proposal: Proposal): Promise<Decision>;
// State sync
syncState(state: AgentState): void;
}
// Message types
enum MessageType {
REQUEST = 'request',
RESPONSE = 'response',
BROADCAST = 'broadcast',
NOTIFICATION = 'notification',
NEGOTIATION = 'negotiation'
}
// Message format
interface Message {
id: string;
from: string;
to: string;
type: MessageType;
content: any;
timestamp: number;
priority?: number;
requiresResponse?: boolean;
timeout?: number;
}Case Study: Customer Support
Multi-agent Customer Support System
// Customer support system implementation
class CustomerServiceSystem extends MultiAgentSystem {
constructor() {
super();
// Register specialized agents
this.registerAgent(new IntentClassifierAgent());
this.registerAgent(new FAQAgent());
this.registerAgent(new TechnicalSupportAgent());
this.registerAgent(new OrderProcessingAgent());
this.registerAgent(new EscalationAgent());
this.registerAgent(new SentimentAnalysisAgent());
}
async handleCustomerQuery(query: CustomerQuery): Promise<Response> {
// 1. Intent classification
const intent = await this.classifyIntent(query);
// 2. Sentiment analysis
const sentiment = await this.analyzeSentiment(query);
// 3. Route to appropriate agent
const response = await this.routeQuery(query, intent, sentiment);
// 4. Quality check
const qualityCheck = await this.checkResponseQuality(response);
// 5. Escalate to human if needed
if (qualityCheck.needsEscalation) {
return this.escalateToHuman(query, response);
}
return response;
}
}
// Intent classifier agent
class IntentClassifierAgent extends Agent {
constructor() {
super('intent-classifier', ['classification', 'nlp']);
}
async execute(task: Subtask): Promise<Intent> {
const query = task.data.query;
// Use LLM for intent classification
const classification = await this.llm.classify({
text: query,
categories: [
'technical_support',
'order_inquiry',
'product_question',
'complaint',
'general_inquiry'
]
});
// Extract entities
const entities = await this.extractEntities(query);
// Broadcast intent info
await this.sendMessage('broadcast', {
type: 'intent_classified',
intent: classification.category,
confidence: classification.confidence,
entities: entities
});
return {
category: classification.category,
confidence: classification.confidence,
entities: entities,
keywords: this.extractKeywords(query)
};
}
}
// FAQ agent
class FAQAgent extends Agent {
private knowledgeBase: KnowledgeBase;
constructor() {
super('faq-agent', ['faq', 'knowledge_retrieval']);
this.knowledgeBase = new KnowledgeBase();
}
async execute(task: Subtask): Promise<FAQResponse> {
const { query, intent } = task.data;
// Check if FAQ-type question
if (!this.canHandle(intent)) {
return null;
}
// Search KB
const relevantDocs = await this.knowledgeBase.search(query, {
topK: 5,
threshold: 0.8
});
if (relevantDocs.length === 0) {
// Request help from other agents
const helpResponse = await this.requestHelp({
query,
reason: 'No FAQ matches found'
});
return helpResponse;
}
// Generate answer
const answer = await this.generateAnswer(query, relevantDocs);
// Store in shared memory for others
await this.writeMemory('last_faq_answer', {
query,
answer,
confidence: answer.confidence
});
return answer;
}
private async requestHelp(context: any): Promise<any> {
// Ask technical support agent for help
const response = await this.sendMessage('technical-support-agent', {
type: 'help_request',
context,
sender: this.id
});
return response;
}
}Collaboration Patterns
Common Collaboration Modes
π Pipeline
// Sequential processing
class Pipeline {
async execute(input: any) {
let result = input;
// Agent 1: Preprocess
result = await this.preprocessAgent.process(result);
// Agent 2: Main processing
result = await this.mainAgent.process(result);
// Agent 3: Postprocess
result = await this.postprocessAgent.process(result);
return result;
}
}π Star
// Centralized coordination
class StarTopology {
async execute(task: Task) {
// Central agent distributes
const subtasks = this.centralAgent.distribute(task);
// Execute in parallel
const results = await Promise.all(
subtasks.map(st =>
this.workerAgents.get(st.type).execute(st)
)
);
// Central aggregation
return this.centralAgent.aggregate(results);
}
}πΈοΈ Mesh
// Peer-to-peer collaboration
class MeshTopology {
async execute(task: Task) {
// All agents can communicate
const agents = this.getAllAgents();
// Self-organizing collaboration
for (const agent of agents) {
agent.startCollaboration(task);
}
// Wait for consensus
return this.waitForConsensus();
}
}ποΈ Hierarchical
// Layered management
class HierarchicalTopology {
async execute(task: Task) {
// Manager plans
const strategy = await this.managerAgent.plan(task);
// Supervisors assign
const assignments = await this.supervisorAgents.assign(strategy);
// Workers execute
return this.workerAgents.execute(assignments);
}
}Performance Optimization
System Optimization Strategies
β‘ Concurrency control
class ConcurrencyController {
private semaphore: Semaphore;
private taskQueue: PriorityQueue<Task>;
constructor(maxConcurrency: number) {
this.semaphore = new Semaphore(maxConcurrency);
this.taskQueue = new PriorityQueue();
}
async executeWithLimit(tasks: Task[]): Promise<Result[]> {
const results = [];
// Use semaphore to control concurrency
const promises = tasks.map(async (task) => {
await this.semaphore.acquire();
try {
const result = await this.executeTask(task);
results.push(result);
} finally {
this.semaphore.release();
}
});
await Promise.all(promises);
return results;
}
}πΎ Resource management
Memory optimization
- β’ Shared memory pool
- β’ Periodic cleanup
- β’ Smart caching
Compute optimization
- β’ Load balancing
- β’ Task batching
- β’ Result reuse
Debugging and Monitoring
Multi-agent Debugging Tools
// Debug and monitoring system
class MultiAgentDebugger {
private tracer: DistributedTracer;
private visualizer: AgentVisualizer;
private profiler: PerformanceProfiler;
// Trace message flow
traceMessageFlow(sessionId: string) {
return this.tracer.getMessageFlow(sessionId);
}
// Visualize agent states
visualizeAgentStates() {
const states = this.collectAgentStates();
return this.visualizer.render(states);
}
// Performance analysis
profileExecution(taskId: string) {
return {
timeline: this.profiler.getTimeline(taskId),
bottlenecks: this.profiler.findBottlenecks(taskId),
resourceUsage: this.profiler.getResourceUsage(taskId),
recommendations: this.generateOptimizationTips()
};
}
// Real-time dashboard
getDashboard() {
return {
activeAgents: this.getActiveAgentCount(),
messageRate: this.getMessageRate(),
taskQueue: this.getQueueDepth(),
errorRate: this.getErrorRate(),
avgResponseTime: this.getAverageResponseTime()
};
}
}Build Intelligent Collaborative Systems
Master multi-agent techniques to unleash collective intelligence and solve complex business challenges.
Get Started