13 KiB
FusionAGI API and Middleware Interface Specification
1. Dvādaśa HTTP/WebSocket API
The main programmatic entry point is the Dvādaśa API, a FastAPI application exposing session-based prompts and streaming responses.
Request Flow (Prompt → Response)
sequenceDiagram
participant Client
participant API as Dvādaśa API
participant Safety as SafetyPipeline
participant Orch as Orchestrator
participant Heads as Dvādaśa Heads
participant Witness as Witness
Client->>API: POST /v1/sessions/{id}/prompt
API->>Safety: pre_check(prompt)
Safety-->>API: ModerationResult
API->>Orch: submit_task(goal)
Orch->>Heads: run_heads_parallel
Heads->>Witness: head outputs
Witness->>Orch: final_answer
Orch->>API: FinalResponse
API->>Safety: post_check(final_answer)
Safety-->>API: OutputScanResult
API->>Client: 200 + FinalResponse
1.1 Application Factory
- Location:
fusionagi/api/app.py - Factory:
create_app(adapter: Any = None, cors_origins: list[str] | None = None) -> FastAPI- Creates FastAPI app with title "FusionAGI Dvādaśa API"
- Accepts optional
LLMAdapterfor head/Witness LLM calls - Accepts optional
cors_originsto enable CORS middleware (e.g.["*"]or["https://example.com"]) - Mounts router at prefix
/v1with tagdvadasa - Uses startup event to initialize
Orchestrator,EventBus,SessionStore,SafetyPipeline
1.2 Base URL and Router Structure
| Prefix | Router | Tag |
|---|---|---|
/v1 |
api_router |
dvadasa |
/v1/sessions |
sessions_router |
sessions |
1.3 HTTP Endpoints
| Method | Path | Description |
|---|---|---|
| POST | /v1/sessions |
Create a new session |
| POST | /v1/sessions/{session_id}/prompt |
Submit prompt and receive sync FinalResponse |
| WebSocket | /v1/sessions/{session_id}/stream |
Streaming Dvādaśa response |
POST /v1/sessions
Query params:
user_id(optional, str): User identifier
Response (200):
{
"session_id": "uuid-string",
"user_id": "optional-user-id"
}
POST /v1/sessions/{session_id}/prompt
Path params:
session_id(required): Session UUID
Body:
{
"prompt": "string (required)",
"use_all_heads": false
}
Pipeline:
- SafetyPipeline
pre_check(prompt)- input moderation parse_user_input(prompt)- detect UserIntent (normal, head_strategy, show_dissent, etc.)- Submit task via
orchestrator.submit_task(goal=prompt[:200]) select_heads_for_complexity(prompt)or explicit head from/head <id>commandrun_dvadasa()- parallel heads + Witness synthesis- SafetyPipeline
post_check(final_answer)- output scan (PII, blocked content) - Append to session history
Response (200):
{
"task_id": "string",
"final_answer": "string",
"transparency_report": { "head_contributions": [...] },
"head_contributions": [...],
"confidence_score": 0.0
}
Errors:
- 400: missing prompt, pre_check failed, post_check failed
- 404: session not found
- 500: Dvādaśa failed
- 503: service not initialized
WebSocket /v1/sessions/{session_id}/stream
Flow:
- Client connects and accepts
- Client sends
{"prompt": "..."}JSON - Server streams events via
send_json:heads_runninghead_complete(per head:head_id,summary)witness_runningcomplete(final_answer, transparency_report, head_contributions, confidence_score)error(message)
1.4 Dependencies and App State
fusionagi/api/dependencies.py provides:
| Function | Returns | Purpose |
|---|---|---|
default_orchestrator(adapter) |
(Orchestrator, EventBus) |
Build orchestrator with Dvādaśa heads + Witness |
get_orchestrator() |
Orchestrator | From app state |
get_event_bus() |
EventBus | From app state |
get_session_store() |
SessionStore | In-memory session store |
get_safety_pipeline() |
SafetyPipeline | Pre/post checks |
set_app_state(orchestrator, event_bus, session_store) |
- | Populate app state |
ensure_initialized(adapter) |
- | Lazy init for tests |
SessionStore interface:
create(session_id, user_id)-> dictget(session_id)-> dict | Noneappend_history(session_id, entry)-> None
2. Request Pipeline (SafetyPipeline - Inline "Middleware")
FusionAGI does not use HTTP middleware (CORS, auth, etc.) by default. The SafetyPipeline acts as an inline request/response pipeline invoked inside route handlers. CORS can be enabled via create_app(cors_origins=[...]).
2.1 Components
fusionagi/governance/safety_pipeline.py
| Class | Role |
|---|---|
InputModerator |
Pre-check: block/transform user input |
OutputScanner |
Post-check: PII, blocked patterns in final answer |
SafetyPipeline |
Combines moderator + scanner + optional audit log |
2.2 InputModerator Interface
def add_blocked_pattern(self, pattern: str) -> None # Regex
def add_blocked_phrase(self, phrase: str) -> None # Exact phrase
def moderate(self, text: str) -> ModerationResult
ModerationResult: allowed: bool, transformed: str | None, reason: str | None
2.3 OutputScanner Interface
def add_pii_pattern(self, name: str, pattern: str) -> None
def add_blocked_pattern(self, pattern: str) -> None
def scan(self, text: str) -> OutputScanResult
OutputScanResult: passed: bool, flags: list[str], sanitized: str | None
Default PII: SSN, credit card patterns.
2.4 SafetyPipeline Interface
def pre_check(self, user_input: str) -> ModerationResult
def post_check(self, final_answer: str) -> OutputScanResult
Used in fusionagi/api/routes/sessions.py around prompt submission and final answer delivery.
3. Multi-Modal Interface Layer
The Interface layer provides abstractions for admin control and user interaction across modalities. This is not HTTP middleware but a conceptual middleware between end users and the core.
3.1 Base Abstractions
fusionagi/interfaces/base.py
| Type | Description |
|---|---|
ModalityType |
Enum: TEXT, VOICE, VISUAL, HAPTIC, GESTURE, BIOMETRIC |
InterfaceMessage |
Pydantic: id, modality, content, metadata, timestamp, user_id, session_id |
InterfaceCapabilities |
supported_modalities, supports_streaming, supports_interruption, supports_multimodal, latency_ms, max_concurrent_sessions |
InterfaceAdapter (ABC):
def capabilities(self) -> InterfaceCapabilities
async def send(self, message: InterfaceMessage) -> None
async def receive(self, timeout_seconds: float | None = None) -> InterfaceMessage | None
async def stream_send(self, messages: AsyncIterator[InterfaceMessage]) -> None # default impl
async def initialize(self) -> None
async def shutdown(self) -> None
def validate_message(self, message: InterfaceMessage) -> bool
3.2 AdminControlPanel
fusionagi/interfaces/admin_panel.py
Constructor: AdminControlPanel(orchestrator, event_bus, state_manager, voice_library?, conversation_tuner?, policy_engine?, audit_log?, session_count_callback?)
| Area | Methods |
|---|---|
| Voice | add_voice_profile, list_voices, update_voice_profile, remove_voice_profile, set_default_voice |
| Conversation | register_conversation_style, list_conversation_styles, get_conversation_style, set_default_conversation_style |
| Agents | configure_agent, get_agent_config, list_agents, enable_agent, disable_agent |
| Monitoring | get_system_status, get_task_statistics, get_recent_events |
| Governance | get_audit_entries, update_policy |
| Config | export_configuration, import_configuration |
Models: SystemStatus, AgentConfig
3.3 MultiModalUI
fusionagi/interfaces/multimodal_ui.py
Constructor: MultiModalUI(orchestrator, conversation_manager, voice_interface?, llm_process_callback?)
| Area | Methods |
|---|---|
| Sessions | create_session, get_session, active_session_count, end_session |
| Modalities | register_interface, enable_modality, disable_modality, get_available_modalities |
| I/O | send_to_user, receive_from_user |
| Tasks | submit_task_interactive |
| Conversation | converse |
| Stats | get_session_statistics |
UserSession: session_id, user_id, conversation_session_id, active_modalities, preferences, accessibility_settings, started_at, last_activity_at
3.4 Voice Layer
fusionagi/interfaces/voice.py
Protocols:
TTSAdapter:async def synthesize(text, voice_id?, **kwargs) -> bytes | NoneSTTAdapter:async def transcribe(audio_data?, timeout_seconds?, **kwargs) -> str | None
VoiceLibrary: add_voice, remove_voice, get_voice, list_voices, set_default_voice, get_default_voice, update_voice
VoiceProfile: id, name, language, gender, age_range, style, pitch, speed, provider, provider_voice_id, metadata
VoiceInterface(InterfaceAdapter): implements send (TTS), receive (STT), set_active_voice, capabilities
3.5 Conversation Layer
fusionagi/interfaces/conversation.py
ConversationStyle: formality, verbosity, personality_traits, empathy_level, proactivity, humor_level, technical_depth
ConversationTuner: register_style, get_style, list_styles, set_default_style, get_default_style, tune_for_context
ConversationManager: create_session, get_session, add_turn, get_history, get_style_for_session, update_style, end_session, get_context_summary
ConversationTurn: turn_id, session_id, speaker (user|agent|system), content, intent, sentiment, confidence, timestamp
4. Architecture Summary
flowchart TB
subgraph clients [Clients]
HTTP[HTTP Client]
WS[WebSocket Client]
end
subgraph api [Dvādaśa API]
Sessions[POST /v1/sessions]
Prompt[POST /v1/sessions/id/prompt]
Stream[WS /v1/sessions/id/stream]
end
subgraph pipeline [Request Pipeline]
PreCheck[SafetyPipeline.pre_check]
PostCheck[SafetyPipeline.post_check]
end
subgraph core [Core]
Orch[Orchestrator]
Heads[Dvādaśa Heads]
Witness[Witness]
end
subgraph interfaces [Interface Layer]
Admin[AdminControlPanel]
UI[MultiModalUI]
Adapters[InterfaceAdapters]
end
HTTP --> Sessions
HTTP --> Prompt
WS --> Stream
Prompt --> PreCheck
PreCheck --> Orch
Orch --> Heads
Heads --> Witness
Witness --> PostCheck
PostCheck --> Prompt
Admin --> Orch
UI --> Orch
UI --> Adapters
5. Adding HTTP Middleware
To enable CORS when creating the app:
from fusionagi.api import create_app
app = create_app(
cors_origins=["*"], # or ["https://example.com", "https://app.example.com"]
)
For additional custom middleware (auth, logging, etc.), use app.add_middleware() after creating the app. FusionAGI relies on FastAPI/Starlette defaults (ServerErrorMiddleware, ExceptionMiddleware, AsyncExitStackMiddleware) when no custom middleware is configured.