174 lines
5.7 KiB
Markdown
174 lines
5.7 KiB
Markdown
# Multi-Agent Acceleration
|
||
|
||
FusionAGI includes multi-agent accelerations, enhancements, optimizations, and scaling capabilities.
|
||
|
||
## Architecture Overview
|
||
|
||
```mermaid
|
||
flowchart TB
|
||
subgraph supervisor [Supervisor Agent]
|
||
S[Supervisor]
|
||
end
|
||
|
||
subgraph agents [Agents]
|
||
P[Planner]
|
||
R[Reasoner]
|
||
EP[Executor Pool]
|
||
end
|
||
|
||
subgraph pool [Executor Pool]
|
||
E1[Executor 1]
|
||
E2[Executor 2]
|
||
E3[Executor 3]
|
||
end
|
||
|
||
S --> P
|
||
S --> EP
|
||
S --> R
|
||
EP --> E1
|
||
EP --> E2
|
||
EP --> E3
|
||
```
|
||
|
||
**Parallel step execution:** Independent plan steps run concurrently. **Agent pool:** Multiple executors behind one logical endpoint (round_robin, least_busy, random). **Supervisor:** Drives plan–execute loop with parallel dispatch.
|
||
|
||
## Features
|
||
|
||
### 1. Parallel Step Execution
|
||
|
||
Independent plan steps (those with satisfied dependencies and no mutual dependencies) run **concurrently**.
|
||
|
||
```python
|
||
from fusionagi.planning import ready_steps
|
||
from fusionagi.multi_agent import execute_steps_parallel, execute_steps_parallel_wave
|
||
|
||
# Get all steps ready to run in parallel
|
||
ready = ready_steps(plan, completed_step_ids={"s1", "s2"})
|
||
|
||
# Execute them concurrently
|
||
results = execute_steps_parallel(execute_fn, task_id, plan, completed_step_ids, max_workers=4)
|
||
|
||
# Or run full plan in waves (each wave = parallel batch)
|
||
results = execute_steps_parallel_wave(execute_fn, task_id, plan, max_workers=4)
|
||
```
|
||
|
||
### 2. Agent Pool & Load Balancing
|
||
|
||
Scale horizontally with multiple executors behind a single logical endpoint.
|
||
|
||
```python
|
||
from fusionagi.agents import ExecutorAgent
|
||
from fusionagi.multi_agent import PooledExecutorRouter, AgentPool
|
||
from fusionagi.core import StateManager
|
||
from fusionagi.tools import ToolRegistry
|
||
|
||
# Create pool router (register as "executor")
|
||
registry = ToolRegistry()
|
||
state = StateManager()
|
||
pool = PooledExecutorRouter(identity="executor", pool=AgentPool(strategy="least_busy"))
|
||
|
||
# Add multiple executors
|
||
for i in range(4):
|
||
ex = ExecutorAgent(identity=f"executor_{i}", registry=registry, state_manager=state)
|
||
pool.add_executor(f"executor_{i}", ex)
|
||
|
||
orch.register_agent("executor", pool)
|
||
# All execute_step messages now load-balance across 4 executors
|
||
```
|
||
|
||
**Strategies:** `round_robin`, `least_busy`, `random`
|
||
|
||
### 3. Sub-Task Delegation (Fan-Out / Fan-In)
|
||
|
||
Decompose tasks and delegate to specialized sub-agents in parallel.
|
||
|
||
```python
|
||
from fusionagi.multi_agent import delegate_sub_tasks, SubTask, DelegationConfig
|
||
|
||
sub_tasks = [
|
||
SubTask("st1", "Analyze requirements"),
|
||
SubTask("st2", "Design solution"),
|
||
SubTask("st3", "Validate constraints"),
|
||
]
|
||
|
||
results = delegate_sub_tasks(
|
||
sub_tasks,
|
||
delegate_fn=lambda st: run_sub_agent(st),
|
||
config=DelegationConfig(max_parallel=3, fail_fast=False),
|
||
)
|
||
```
|
||
|
||
### 4. Supervisor Agent
|
||
|
||
Single agent that drives the full plan-execute loop with **parallel dispatch**.
|
||
|
||
```python
|
||
from fusionagi.multi_agent import SupervisorAgent
|
||
from fusionagi.core import Orchestrator, EventBus, StateManager
|
||
|
||
orch = Orchestrator(EventBus(), StateManager())
|
||
supervisor = SupervisorAgent(
|
||
identity="supervisor",
|
||
orchestrator=orch,
|
||
planner_id="planner",
|
||
executor_id="executor",
|
||
parallel_mode=True, # Enable parallel step execution
|
||
max_parallel_workers=4,
|
||
)
|
||
|
||
orch.register_agent("supervisor", supervisor)
|
||
# Send run_task → supervisor gets plan → executes steps in parallel waves
|
||
```
|
||
|
||
### 5. Batch Message Routing
|
||
|
||
Route multiple messages in parallel.
|
||
|
||
```python
|
||
responses = orch.route_messages_batch([env1, env2, env3, ...])
|
||
```
|
||
|
||
### 6. Async Message Routing
|
||
|
||
Non-blocking dispatch with optional callback.
|
||
|
||
```python
|
||
future = orch.route_message_async(envelope, callback=lambda resp: handle(resp))
|
||
# Or await: result = future.result()
|
||
```
|
||
|
||
## Scaling Checklist
|
||
|
||
| Capability | Module | Use Case |
|
||
|------------|--------|----------|
|
||
| Parallel step execution | `multi_agent.parallel` | Plans with independent steps |
|
||
| Agent pool | `multi_agent.pool` | Horizontal executor scaling |
|
||
| Sub-task delegation | `multi_agent.delegation` | Hierarchical task decomposition |
|
||
| Supervisor | `multi_agent.supervisor` | Automated parallel orchestration |
|
||
| Batch routing | `Orchestrator.route_messages_batch` | Multi-task throughput |
|
||
| Async routing | `Orchestrator.route_message_async` | Non-blocking pipelines |
|
||
|
||
## Architecture
|
||
|
||
```
|
||
┌─────────────────┐
|
||
│ Supervisor │
|
||
│ (parallel_mode) │
|
||
└────────┬────────┘
|
||
│
|
||
┌───────────────────┼───────────────────┐
|
||
│ │ │
|
||
▼ ▼ ▼
|
||
┌──────────┐ ┌─────────────┐ ┌──────────┐
|
||
│ Planner │ │ Executor │ │ Reasoner │
|
||
│ │ │ Pool │ │ │
|
||
└──────────┘ └──────┬──────┘ └──────────┘
|
||
│
|
||
┌──────────────┼──────────────┐
|
||
│ │ │
|
||
▼ ▼ ▼
|
||
┌──────────┐ ┌──────────┐ ┌──────────┐
|
||
│Exec 1 │ │Exec 2 │ │Exec 3 │ (parallel steps)
|
||
└──────────┘ └──────────┘ └──────────┘
|
||
```
|