507 lines
17 KiB
Python
507 lines
17 KiB
Python
"""Multi-modal user interface for full sensory experience with FusionAGI.
|
|
|
|
Supports:
|
|
- Text (chat, commands)
|
|
- Voice (speech input/output)
|
|
- Visual (images, video, AR/VR)
|
|
- Haptic (touch feedback)
|
|
- Gesture (motion control)
|
|
- Biometric (emotion detection, physiological signals)
|
|
"""
|
|
|
|
import asyncio
|
|
import uuid
|
|
from typing import Any, AsyncIterator, Callable
|
|
|
|
from pydantic import BaseModel, Field
|
|
|
|
from fusionagi._time import utc_now_iso
|
|
from fusionagi.interfaces.base import (
|
|
InterfaceAdapter,
|
|
InterfaceMessage,
|
|
ModalityType,
|
|
)
|
|
from fusionagi.interfaces.voice import VoiceInterface, VoiceLibrary
|
|
from fusionagi.interfaces.conversation import ConversationManager, ConversationTurn
|
|
from fusionagi.core import Orchestrator
|
|
from fusionagi.schemas import Task, TaskState
|
|
from fusionagi._logger import logger
|
|
|
|
|
|
class UserSession(BaseModel):
|
|
"""User session with multi-modal interface."""
|
|
|
|
session_id: str = Field(default_factory=lambda: f"user_session_{uuid.uuid4().hex}")
|
|
user_id: str | None = Field(default=None)
|
|
conversation_session_id: str | None = Field(default=None)
|
|
active_modalities: list[ModalityType] = Field(default_factory=list)
|
|
preferences: dict[str, Any] = Field(default_factory=dict)
|
|
accessibility_settings: dict[str, Any] = Field(default_factory=dict)
|
|
started_at: str = Field(default_factory=utc_now_iso)
|
|
last_activity_at: str = Field(default_factory=utc_now_iso)
|
|
|
|
|
|
class MultiModalUI:
|
|
"""
|
|
Multi-modal user interface for FusionAGI.
|
|
|
|
Provides a unified interface that supports multiple sensory modalities
|
|
simultaneously, allowing users to interact through their preferred
|
|
combination of text, voice, visual, haptic, gesture, and biometric inputs.
|
|
|
|
Features:
|
|
- Seamless switching between modalities
|
|
- Simultaneous multi-modal input/output
|
|
- Accessibility support
|
|
- Context-aware modality selection
|
|
- Real-time feedback across all active modalities
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
orchestrator: Orchestrator,
|
|
conversation_manager: ConversationManager,
|
|
voice_interface: VoiceInterface | None = None,
|
|
llm_process_callback: Callable[[str, str, dict[str, Any], Any], str] | None = None,
|
|
) -> None:
|
|
"""
|
|
Initialize multi-modal UI.
|
|
|
|
Args:
|
|
orchestrator: FusionAGI orchestrator for task execution.
|
|
conversation_manager: Conversation manager for natural language.
|
|
voice_interface: Voice interface for speech interaction.
|
|
llm_process_callback: Optional (session_id, user_input, context, style) -> response for converse().
|
|
"""
|
|
self.orchestrator = orchestrator
|
|
self.conversation_manager = conversation_manager
|
|
self.voice_interface = voice_interface
|
|
self._llm_process_callback = llm_process_callback
|
|
|
|
self._sessions: dict[str, UserSession] = {}
|
|
self._interface_adapters: dict[ModalityType, InterfaceAdapter] = {}
|
|
self._receive_lock = asyncio.Lock()
|
|
|
|
# Register voice interface if provided
|
|
if voice_interface:
|
|
self._interface_adapters[ModalityType.VOICE] = voice_interface
|
|
|
|
logger.info("MultiModalUI initialized")
|
|
|
|
# ========== Session Management ==========
|
|
|
|
def create_session(
|
|
self,
|
|
user_id: str | None = None,
|
|
preferred_modalities: list[ModalityType] | None = None,
|
|
accessibility_settings: dict[str, Any] | None = None,
|
|
) -> str:
|
|
"""
|
|
Create a new user session.
|
|
|
|
Args:
|
|
user_id: Optional user identifier.
|
|
preferred_modalities: Preferred interaction modalities.
|
|
accessibility_settings: Accessibility preferences.
|
|
|
|
Returns:
|
|
Session ID.
|
|
"""
|
|
# Create conversation session
|
|
conv_session_id = self.conversation_manager.create_session(user_id=user_id)
|
|
|
|
session = UserSession(
|
|
user_id=user_id,
|
|
conversation_session_id=conv_session_id,
|
|
active_modalities=preferred_modalities or [ModalityType.TEXT],
|
|
accessibility_settings=accessibility_settings or {},
|
|
)
|
|
|
|
self._sessions[session.session_id] = session
|
|
|
|
logger.info(
|
|
"User session created",
|
|
extra={
|
|
"session_id": session.session_id,
|
|
"user_id": user_id,
|
|
"modalities": [m.value for m in session.active_modalities],
|
|
}
|
|
)
|
|
|
|
return session.session_id
|
|
|
|
def get_session(self, session_id: str) -> UserSession | None:
|
|
"""Get user session."""
|
|
return self._sessions.get(session_id)
|
|
|
|
def active_session_count(self) -> int:
|
|
"""Return number of active user sessions (for admin panel session_count_callback)."""
|
|
return len(self._sessions)
|
|
|
|
def end_session(self, session_id: str) -> bool:
|
|
"""
|
|
End a user session.
|
|
|
|
Args:
|
|
session_id: Session identifier.
|
|
|
|
Returns:
|
|
True if ended, False if not found.
|
|
"""
|
|
session = self._sessions.get(session_id)
|
|
if not session:
|
|
return False
|
|
|
|
# End conversation session
|
|
if session.conversation_session_id:
|
|
self.conversation_manager.end_session(session.conversation_session_id)
|
|
|
|
del self._sessions[session_id]
|
|
logger.info("User session ended", extra={"session_id": session_id})
|
|
return True
|
|
|
|
# ========== Modality Management ==========
|
|
|
|
def register_interface(self, modality: ModalityType, adapter: InterfaceAdapter) -> None:
|
|
"""
|
|
Register an interface adapter for a modality.
|
|
|
|
Args:
|
|
modality: Modality type.
|
|
adapter: Interface adapter implementation.
|
|
"""
|
|
self._interface_adapters[modality] = adapter
|
|
logger.info("Interface adapter registered", extra={"modality": modality.value})
|
|
|
|
def enable_modality(self, session_id: str, modality: ModalityType) -> bool:
|
|
"""
|
|
Enable a modality for a session.
|
|
|
|
Args:
|
|
session_id: Session identifier.
|
|
modality: Modality to enable.
|
|
|
|
Returns:
|
|
True if enabled, False if session not found or modality unavailable.
|
|
"""
|
|
session = self._sessions.get(session_id)
|
|
if not session:
|
|
return False
|
|
|
|
if modality not in self._interface_adapters:
|
|
logger.warning(
|
|
"Modality not available",
|
|
extra={"modality": modality.value}
|
|
)
|
|
return False
|
|
|
|
if modality not in session.active_modalities:
|
|
session.active_modalities.append(modality)
|
|
logger.info(
|
|
"Modality enabled",
|
|
extra={"session_id": session_id, "modality": modality.value}
|
|
)
|
|
|
|
return True
|
|
|
|
def disable_modality(self, session_id: str, modality: ModalityType) -> bool:
|
|
"""
|
|
Disable a modality for a session.
|
|
|
|
Args:
|
|
session_id: Session identifier.
|
|
modality: Modality to disable.
|
|
|
|
Returns:
|
|
True if disabled, False if session not found.
|
|
"""
|
|
session = self._sessions.get(session_id)
|
|
if not session:
|
|
return False
|
|
|
|
if modality in session.active_modalities:
|
|
session.active_modalities.remove(modality)
|
|
logger.info(
|
|
"Modality disabled",
|
|
extra={"session_id": session_id, "modality": modality.value}
|
|
)
|
|
|
|
return True
|
|
|
|
# ========== User Interaction ==========
|
|
|
|
async def send_to_user(
|
|
self,
|
|
session_id: str,
|
|
content: Any,
|
|
modalities: list[ModalityType] | None = None,
|
|
metadata: dict[str, Any] | None = None,
|
|
) -> None:
|
|
"""
|
|
Send content to user through active modalities.
|
|
|
|
Args:
|
|
session_id: Session identifier.
|
|
content: Content to send (will be adapted per modality).
|
|
modalities: Specific modalities to use (uses active if None).
|
|
metadata: Additional metadata for the message.
|
|
"""
|
|
session = self._sessions.get(session_id)
|
|
if not session:
|
|
logger.warning("Session not found", extra={"session_id": session_id})
|
|
return
|
|
|
|
# Determine which modalities to use
|
|
target_modalities = modalities or session.active_modalities
|
|
|
|
# Send through each active modality
|
|
for modality in target_modalities:
|
|
adapter = self._interface_adapters.get(modality)
|
|
if not adapter:
|
|
continue
|
|
|
|
# Create modality-specific message
|
|
message = InterfaceMessage(
|
|
id=f"msg_{uuid.uuid4().hex[:8]}",
|
|
modality=modality,
|
|
content=self._adapt_content(content, modality),
|
|
metadata=metadata or {},
|
|
session_id=session_id,
|
|
user_id=session.user_id,
|
|
)
|
|
|
|
try:
|
|
await adapter.send(message)
|
|
except Exception as e:
|
|
logger.error(
|
|
"Failed to send through modality",
|
|
extra={"modality": modality.value, "error": str(e)}
|
|
)
|
|
|
|
async def receive_from_user(
|
|
self,
|
|
session_id: str,
|
|
timeout_seconds: float | None = None,
|
|
) -> InterfaceMessage | None:
|
|
"""
|
|
Receive input from user through any active modality.
|
|
|
|
Args:
|
|
session_id: Session identifier.
|
|
timeout_seconds: Optional timeout for receiving.
|
|
|
|
Returns:
|
|
Received message or None if timeout.
|
|
"""
|
|
session = self._sessions.get(session_id)
|
|
if not session:
|
|
return None
|
|
|
|
# Listen on all active modalities (first to respond wins)
|
|
# TODO: Implement proper async race condition handling
|
|
for modality in session.active_modalities:
|
|
adapter = self._interface_adapters.get(modality)
|
|
if adapter:
|
|
try:
|
|
message = await adapter.receive(timeout_seconds)
|
|
if message:
|
|
# Update session activity
|
|
session.last_activity_at = utc_now_iso()
|
|
return message
|
|
except Exception as e:
|
|
logger.error(
|
|
"Failed to receive from modality",
|
|
extra={"modality": modality.value, "error": str(e)}
|
|
)
|
|
|
|
return None
|
|
|
|
# ========== Task Interaction ==========
|
|
|
|
async def submit_task_interactive(
|
|
self,
|
|
session_id: str,
|
|
goal: str,
|
|
constraints: dict[str, Any] | None = None,
|
|
) -> str:
|
|
"""
|
|
Submit a task and provide interactive feedback.
|
|
|
|
Args:
|
|
session_id: Session identifier.
|
|
goal: Task goal description.
|
|
constraints: Optional task constraints.
|
|
|
|
Returns:
|
|
Task ID.
|
|
"""
|
|
session = self._sessions.get(session_id)
|
|
if not session:
|
|
raise ValueError(f"Session not found: {session_id}")
|
|
|
|
# Submit task
|
|
task_id = self.orchestrator.submit_task(
|
|
goal=goal,
|
|
constraints=constraints or {},
|
|
)
|
|
|
|
# Send confirmation to user
|
|
await self.send_to_user(
|
|
session_id,
|
|
f"Task submitted: {goal}",
|
|
metadata={"task_id": task_id, "type": "task_confirmation"},
|
|
)
|
|
|
|
# Subscribe to task events for real-time updates
|
|
self._subscribe_to_task_updates(session_id, task_id)
|
|
|
|
logger.info(
|
|
"Interactive task submitted",
|
|
extra={"session_id": session_id, "task_id": task_id}
|
|
)
|
|
|
|
return task_id
|
|
|
|
def _subscribe_to_task_updates(self, session_id: str, task_id: str) -> None:
|
|
"""
|
|
Subscribe to task updates and relay to user.
|
|
|
|
Args:
|
|
session_id: Session identifier.
|
|
task_id: Task identifier.
|
|
"""
|
|
def on_task_update(event_type: str, data: dict[str, Any]) -> None:
|
|
"""Handle task update event."""
|
|
if data.get("task_id") != task_id:
|
|
return
|
|
|
|
# Format update message
|
|
if event_type == "task_state_changed":
|
|
state = data.get("new_state")
|
|
message = f"Task {task_id[:8]}: {state}"
|
|
else:
|
|
message = f"Task update: {event_type}"
|
|
|
|
# Send to user (async in background)
|
|
import asyncio
|
|
try:
|
|
asyncio.create_task(
|
|
self.send_to_user(
|
|
session_id,
|
|
message,
|
|
metadata={"task_id": task_id, "event_type": event_type},
|
|
)
|
|
)
|
|
except Exception as e:
|
|
logger.error("Failed to send task update", extra={"error": str(e)})
|
|
|
|
# Subscribe to events
|
|
self.orchestrator._event_bus.subscribe("task_state_changed", on_task_update)
|
|
self.orchestrator._event_bus.subscribe("task_step_completed", on_task_update)
|
|
|
|
# ========== Conversation Integration ==========
|
|
|
|
async def converse(
|
|
self,
|
|
session_id: str,
|
|
user_input: str,
|
|
) -> str:
|
|
"""
|
|
Handle conversational interaction.
|
|
|
|
Args:
|
|
session_id: Session identifier.
|
|
user_input: User's conversational input.
|
|
|
|
Returns:
|
|
Agent's response.
|
|
"""
|
|
session = self._sessions.get(session_id)
|
|
if not session or not session.conversation_session_id:
|
|
return "Session not found"
|
|
|
|
# Add user turn
|
|
user_turn = ConversationTurn(
|
|
session_id=session.conversation_session_id,
|
|
speaker="user",
|
|
content=user_input,
|
|
)
|
|
self.conversation_manager.add_turn(user_turn)
|
|
|
|
context = self.conversation_manager.get_context_summary(session.conversation_session_id)
|
|
style = self.conversation_manager.get_style_for_session(session.conversation_session_id)
|
|
if self._llm_process_callback is not None:
|
|
response = self._llm_process_callback(session_id, user_input, context, style)
|
|
else:
|
|
response = f"I understand you said: {user_input}"
|
|
|
|
# Add agent turn
|
|
agent_turn = ConversationTurn(
|
|
session_id=session.conversation_session_id,
|
|
speaker="agent",
|
|
content=response,
|
|
)
|
|
self.conversation_manager.add_turn(agent_turn)
|
|
|
|
return response
|
|
|
|
# ========== Utility Methods ==========
|
|
|
|
def _adapt_content(self, content: Any, modality: ModalityType) -> Any:
|
|
"""
|
|
Adapt content for a specific modality.
|
|
|
|
Args:
|
|
content: Original content.
|
|
modality: Target modality.
|
|
|
|
Returns:
|
|
Adapted content.
|
|
"""
|
|
# Convert content to appropriate format for modality
|
|
if modality == ModalityType.TEXT:
|
|
return str(content)
|
|
elif modality == ModalityType.VOICE:
|
|
# For voice, ensure it's text that can be synthesized
|
|
return str(content)
|
|
elif modality == ModalityType.VISUAL:
|
|
# For visual, might need to generate images or format for display
|
|
return {"type": "text", "content": str(content)}
|
|
elif modality == ModalityType.HAPTIC:
|
|
# For haptic, might need to generate vibration patterns
|
|
return {"pattern": "notification", "intensity": 0.5}
|
|
else:
|
|
return content
|
|
|
|
def get_available_modalities(self) -> list[ModalityType]:
|
|
"""Get list of available modalities."""
|
|
return list(self._interface_adapters.keys())
|
|
|
|
def get_session_statistics(self, session_id: str) -> dict[str, Any]:
|
|
"""
|
|
Get statistics for a session.
|
|
|
|
Args:
|
|
session_id: Session identifier.
|
|
|
|
Returns:
|
|
Dictionary with session statistics.
|
|
"""
|
|
session = self._sessions.get(session_id)
|
|
if not session:
|
|
return {}
|
|
|
|
# Get conversation history
|
|
history = []
|
|
if session.conversation_session_id:
|
|
history = self.conversation_manager.get_history(session.conversation_session_id)
|
|
|
|
return {
|
|
"session_id": session_id,
|
|
"user_id": session.user_id,
|
|
"active_modalities": [m.value for m in session.active_modalities],
|
|
"conversation_turns": len(history),
|
|
"started_at": session.started_at,
|
|
"last_activity_at": session.last_activity_at,
|
|
}
|