Files
FusionAGI/docs/multi_agent_acceleration.md
defiQUG c052b07662
Some checks failed
Tests / test (3.10) (push) Has been cancelled
Tests / test (3.11) (push) Has been cancelled
Tests / test (3.12) (push) Has been cancelled
Tests / lint (push) Has been cancelled
Tests / docker (push) Has been cancelled
Initial commit: add .gitignore and README
2026-02-09 21:51:42 -08:00

174 lines
5.7 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# Multi-Agent Acceleration
FusionAGI includes multi-agent accelerations, enhancements, optimizations, and scaling capabilities.
## Architecture Overview
```mermaid
flowchart TB
subgraph supervisor [Supervisor Agent]
S[Supervisor]
end
subgraph agents [Agents]
P[Planner]
R[Reasoner]
EP[Executor Pool]
end
subgraph pool [Executor Pool]
E1[Executor 1]
E2[Executor 2]
E3[Executor 3]
end
S --> P
S --> EP
S --> R
EP --> E1
EP --> E2
EP --> E3
```
**Parallel step execution:** Independent plan steps run concurrently. **Agent pool:** Multiple executors behind one logical endpoint (round_robin, least_busy, random). **Supervisor:** Drives planexecute loop with parallel dispatch.
## Features
### 1. Parallel Step Execution
Independent plan steps (those with satisfied dependencies and no mutual dependencies) run **concurrently**.
```python
from fusionagi.planning import ready_steps
from fusionagi.multi_agent import execute_steps_parallel, execute_steps_parallel_wave
# Get all steps ready to run in parallel
ready = ready_steps(plan, completed_step_ids={"s1", "s2"})
# Execute them concurrently
results = execute_steps_parallel(execute_fn, task_id, plan, completed_step_ids, max_workers=4)
# Or run full plan in waves (each wave = parallel batch)
results = execute_steps_parallel_wave(execute_fn, task_id, plan, max_workers=4)
```
### 2. Agent Pool & Load Balancing
Scale horizontally with multiple executors behind a single logical endpoint.
```python
from fusionagi.agents import ExecutorAgent
from fusionagi.multi_agent import PooledExecutorRouter, AgentPool
from fusionagi.core import StateManager
from fusionagi.tools import ToolRegistry
# Create pool router (register as "executor")
registry = ToolRegistry()
state = StateManager()
pool = PooledExecutorRouter(identity="executor", pool=AgentPool(strategy="least_busy"))
# Add multiple executors
for i in range(4):
ex = ExecutorAgent(identity=f"executor_{i}", registry=registry, state_manager=state)
pool.add_executor(f"executor_{i}", ex)
orch.register_agent("executor", pool)
# All execute_step messages now load-balance across 4 executors
```
**Strategies:** `round_robin`, `least_busy`, `random`
### 3. Sub-Task Delegation (Fan-Out / Fan-In)
Decompose tasks and delegate to specialized sub-agents in parallel.
```python
from fusionagi.multi_agent import delegate_sub_tasks, SubTask, DelegationConfig
sub_tasks = [
SubTask("st1", "Analyze requirements"),
SubTask("st2", "Design solution"),
SubTask("st3", "Validate constraints"),
]
results = delegate_sub_tasks(
sub_tasks,
delegate_fn=lambda st: run_sub_agent(st),
config=DelegationConfig(max_parallel=3, fail_fast=False),
)
```
### 4. Supervisor Agent
Single agent that drives the full plan-execute loop with **parallel dispatch**.
```python
from fusionagi.multi_agent import SupervisorAgent
from fusionagi.core import Orchestrator, EventBus, StateManager
orch = Orchestrator(EventBus(), StateManager())
supervisor = SupervisorAgent(
identity="supervisor",
orchestrator=orch,
planner_id="planner",
executor_id="executor",
parallel_mode=True, # Enable parallel step execution
max_parallel_workers=4,
)
orch.register_agent("supervisor", supervisor)
# Send run_task → supervisor gets plan → executes steps in parallel waves
```
### 5. Batch Message Routing
Route multiple messages in parallel.
```python
responses = orch.route_messages_batch([env1, env2, env3, ...])
```
### 6. Async Message Routing
Non-blocking dispatch with optional callback.
```python
future = orch.route_message_async(envelope, callback=lambda resp: handle(resp))
# Or await: result = future.result()
```
## Scaling Checklist
| Capability | Module | Use Case |
|------------|--------|----------|
| Parallel step execution | `multi_agent.parallel` | Plans with independent steps |
| Agent pool | `multi_agent.pool` | Horizontal executor scaling |
| Sub-task delegation | `multi_agent.delegation` | Hierarchical task decomposition |
| Supervisor | `multi_agent.supervisor` | Automated parallel orchestration |
| Batch routing | `Orchestrator.route_messages_batch` | Multi-task throughput |
| Async routing | `Orchestrator.route_message_async` | Non-blocking pipelines |
## Architecture
```
┌─────────────────┐
│ Supervisor │
│ (parallel_mode) │
└────────┬────────┘
┌───────────────────┼───────────────────┐
│ │ │
▼ ▼ ▼
┌──────────┐ ┌─────────────┐ ┌──────────┐
│ Planner │ │ Executor │ │ Reasoner │
│ │ │ Pool │ │ │
└──────────┘ └──────┬──────┘ └──────────┘
┌──────────────┼──────────────┐
│ │ │
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│Exec 1 │ │Exec 2 │ │Exec 3 │ (parallel steps)
└──────────┘ └──────────┘ └──────────┘
```