Core Build: Full Orchestration System
Build a complete orchestration system with parallel execution, monitoring, and control in 50 minutes
Part 1: Orchestrator Design (15 minutes)
Task Decomposition Strategy
The orchestrator must intelligently split complex tasks into agent-specific subtasks.
Decomposition Patterns:
By Task Type:
- Analysis tasks → Research agents
- Writing tasks → Content agents
- Data tasks → Processing agents
By Data Scope:
- Split large datasets into chunks
- Assign chunks to parallel agents
- Merge results at the end
By Priority:
- Critical path tasks execute first
- Non-blocking tasks run in parallel
- Dependencies determine order
Example decomposition logic:
tasks = [Task(type="research", data=chunk, priority=1)
for chunk in split_data(input_data)]
agent_map = {"research": ResearchAgent, "write": WriterAgent}Agent Registry and Discovery
The orchestrator maintains a registry of available agents and their capabilities.
Registry Structure:
Agent Metadata:
- Agent ID and type
- Capabilities (tasks it can handle)
- Current status (idle, busy, failed)
- Resource requirements
Discovery Process:
- Register agents on startup
- Query capabilities for task matching
- Load balance across available agents
- Handle agent failures gracefully
Example registry implementation:
registry = {agent.id: {"type": agent.type, "status": "idle"}
for agent in available_agents}Task Queue Management
Efficient task queuing ensures optimal resource utilization.
Queue Mechanics:
Priority Queue:
- High priority tasks execute first
- FIFO within same priority level
- Dynamic priority adjustment
Task States:
- Pending: In queue waiting
- Running: Currently executing
- Completed: Finished successfully
- Failed: Encountered error
Queue Operations:
queue.put((priority, timestamp, task))
task = await queue.get() # Blocks until task available
queue.task_done()Result Aggregation Patterns
Collecting and synthesizing results from multiple agents requires careful design.
Aggregation Strategies:
Merge Pattern:
- Combine similar outputs
- Remove duplicates
- Sort by relevance
Reduce Pattern:
- Fold results into single output
- Apply transformations
- Maintain consistency
Example aggregation:
final = reduce(lambda a,b: merge(a,b), results)
return {"status": "complete", "data": final}Design Principle: Always design for partial failures. Agents may fail individually without bringing down the entire workflow.
Part 2: Parallel Execution (15 minutes)
Concurrent Agent Execution with Asyncio
Python's asyncio enables efficient parallel execution for I/O-bound agent tasks.
Asyncio Fundamentals:
Event Loop:
- Single thread manages multiple coroutines
- Switches between tasks at await points
- Ideal for network calls and API requests
Coroutines:
- Functions defined with
async def - Use
awaitto yield control - Run concurrently, not in parallel
Example concurrent execution:
results = await asyncio.gather(
agent1.process(task1), agent2.process(task2))Gathers results from multiple agents running concurrently.
Thread Pool vs Async/Await Patterns
Choose the right concurrency model based on task characteristics.
Async/Await (I/O-bound):
- Best for: API calls, file I/O, database queries
- Single thread, event-driven
- Low overhead, high throughput
- Example: Web scraping, data fetching
Thread Pool (CPU-bound):
- Best for: Data processing, calculations, transformations
- True parallelism on multi-core CPUs
- Higher overhead, limited by GIL
- Example: Image processing, statistical analysis
Hybrid Approach:
loop = asyncio.get_event_loop()
cpu_result = await loop.run_in_executor(pool, cpu_task)Run CPU-bound tasks in thread pool while maintaining async interface.
Synchronization Points
Coordinate agent execution at critical workflow stages.
Synchronization Patterns:
Barrier Pattern:
- All agents must reach checkpoint
- Wait for slowest agent
- Proceed together after sync
Semaphore Pattern:
- Limit concurrent resource access
- Control rate of execution
- Prevent resource exhaustion
Lock Pattern:
- Ensure exclusive access to shared state
- Prevent race conditions
- Use sparingly to avoid deadlocks
Example synchronization:
async with semaphore: # Limit to N concurrent agents
result = await agent.process(task)Error Handling in Parallel Workflows
Robust error handling prevents cascade failures in parallel execution.
Error Strategies:
Retry with Backoff:
- Retry failed tasks automatically
- Exponential backoff prevents overload
- Maximum retry limit
Fallback Agents:
- Switch to backup agent on failure
- Maintain workflow continuity
- Log original failure
Partial Success:
- Continue with available results
- Mark failed tasks explicitly
- Allow graceful degradation
Example error handling:
try:
result = await agent.process(task)
except AgentError:
result = await fallback_agent.process(task)Performance Tip: Use async/await for I/O-bound tasks (API calls, file operations). Use thread pools for CPU-bound tasks (data processing, calculations). Mixing the two requires run_in_executor().
Part 3: Workflow Control (10 minutes)
Start/Stop/Restart Workflow
Implement lifecycle management for orchestrated workflows.
Workflow States:
Start:
- Initialize agent registry
- Load task queue
- Begin execution loop
- Return workflow ID
Stop:
- Cancel pending tasks
- Wait for running tasks to complete
- Save state for resume
- Release resources
Restart:
- Load saved state
- Re-register agents
- Resume from checkpoint
- Handle failed tasks
Example workflow control:
workflow.start() # Begin execution
workflow.stop() # Graceful shutdown
workflow.restart(checkpoint_id) # ResumePause and Resume
Support workflow suspension for maintenance or resource constraints.
Pause Mechanism:
Checkpoint Creation:
- Save current task queue
- Record agent states
- Persist partial results
- Generate resume token
Resume Process:
- Load checkpoint data
- Restore agent states
- Continue from last task
- Validate consistency
Use Cases:
- System maintenance windows
- Resource quota management
- User-initiated suspension
- Error investigation
Example pause/resume:
checkpoint = workflow.pause()
# Later...
workflow.resume(checkpoint)Cancel Individual Tasks
Allow granular task cancellation without disrupting the entire workflow.
Cancellation Process:
Task Selection:
- Cancel by task ID
- Cancel by agent ID
- Cancel by priority range
- Cancel by task type
Cleanup:
- Stop task execution immediately
- Release allocated resources
- Remove from queue
- Update status to cancelled
Dependencies:
- Check for dependent tasks
- Cancel dependents or reassign
- Maintain workflow integrity
Example cancellation:
workflow.cancel_task(task_id)
# Returns: {"status": "cancelled", "dependents": []}Monitor Progress
Real-time monitoring provides visibility into workflow execution.
Monitoring Metrics:
Task Metrics:
- Total tasks: pending, running, completed, failed
- Average execution time
- Throughput (tasks per second)
- Error rate
Agent Metrics:
- Active agents count
- Idle vs busy distribution
- Agent failure rate
- Resource utilization
Workflow Metrics:
- Overall progress percentage
- Estimated completion time
- Bottleneck identification
- Critical path status
Example progress query:
status = workflow.get_status()
# {"progress": 65, "active": 8, "completed": 130}Part 4: Result Aggregation (10 minutes)
Collect Results from Multiple Agents
Gather outputs from parallel agents efficiently.
Collection Strategies:
Streaming Collection:
- Collect results as they arrive
- Process incrementally
- Reduce memory footprint
- Enable early termination
Batch Collection:
- Wait for all agents to complete
- Collect results together
- Simplify synchronization
- Higher latency
Hybrid Collection:
- Process available results immediately
- Wait for critical results
- Balance throughput and latency
Example result collection:
async for result in workflow.results_stream():
process_result(result) # Handle as availableHandle Partial Failures
Gracefully manage scenarios where some agents fail.
Failure Handling:
Mark Failed Tasks:
- Tag results with success/failure status
- Include error messages
- Preserve partial outputs
- Enable retry decision
Threshold-Based Decisions:
- Proceed if N% of tasks succeed
- Fail workflow if critical tasks fail
- Retry failed tasks automatically
- Escalate to human if threshold not met
Fallback Values:
- Use default values for failed tasks
- Interpolate from successful results
- Mark missing data explicitly
Example partial failure handling:
if success_rate > 0.8:
return merge_results(successful_results)
else:
retry_failed_tasks()Synthesize Final Output
Combine individual agent outputs into coherent final result.
Synthesis Strategies:
Merge Strategy:
- Combine similar outputs (e.g., concatenate text)
- Remove duplicates
- Sort by relevance or timestamp
Reduce Strategy:
- Apply reduction function across results
- Example: sum, average, max, min
- Preserve metadata
Transform Strategy:
- Convert results to target format
- Apply post-processing rules
- Add computed fields
Example synthesis:
final = {"summary": merge([r.text for r in results]),
"confidence": avg([r.score for r in results])}Return Structured Results
Provide well-formatted output with metadata.
Result Structure:
Core Fields:
status: "success" | "partial" | "failed"data: Aggregated outputmetadata: Execution statisticserrors: List of failures if any
Metadata Fields:
- Execution time
- Number of agents used
- Success rate
- Resource consumption
Error Information:
- Failed task IDs
- Error messages
- Suggested remediation
- Retry recommendations
Example structured result:
return {"status": "success", "data": final_output,
"metadata": {"time": elapsed, "agents": 12}}Critical Pattern: Always handle partial failures in parallel workflows. Design aggregation logic to work with incomplete result sets. Never assume all agents will succeed.
Final Verification
System Completeness Check:
Orchestrator Capabilities:
- Manages 4+ agents simultaneously
- Decomposes tasks intelligently
- Executes agents in parallel
- Handles individual agent failures
Execution Verification:
- Parallel execution completes faster than sequential
- Results aggregated correctly
- Partial failures handled gracefully
- Control panel responds to commands
Testing Commands:
# Test parallel execution
orchestrator.run(task_count=10, parallel=True)
# Verify aggregation
results = orchestrator.get_results()
assert results["status"] == "success"
# Test failure handling
orchestrator.simulate_agent_failure(agent_id=3)
assert orchestrator.recovery_status() == "recovered"Expected Output:
- All 10 tasks complete
- Execution time < 50% of sequential
- Failed agent replaced with fallback
- Final results valid and complete
Success Criteria:
- Orchestrator launches successfully
- Parallel execution 2x faster than sequential
- Aggregation produces valid output
- Control panel functional (start/stop/pause)