"""Multi-modal user interface for full sensory experience with FusionAGI. Supports: - Text (chat, commands) - Voice (speech input/output) - Visual (images, video, AR/VR) - Haptic (touch feedback) - Gesture (motion control) - Biometric (emotion detection, physiological signals) """ import asyncio import uuid from typing import Any, AsyncIterator, Callable from pydantic import BaseModel, Field from fusionagi._time import utc_now_iso from fusionagi.interfaces.base import ( InterfaceAdapter, InterfaceMessage, ModalityType, ) from fusionagi.interfaces.voice import VoiceInterface, VoiceLibrary from fusionagi.interfaces.conversation import ConversationManager, ConversationTurn from fusionagi.core import Orchestrator from fusionagi.schemas import Task, TaskState from fusionagi._logger import logger class UserSession(BaseModel): """User session with multi-modal interface.""" session_id: str = Field(default_factory=lambda: f"user_session_{uuid.uuid4().hex}") user_id: str | None = Field(default=None) conversation_session_id: str | None = Field(default=None) active_modalities: list[ModalityType] = Field(default_factory=list) preferences: dict[str, Any] = Field(default_factory=dict) accessibility_settings: dict[str, Any] = Field(default_factory=dict) started_at: str = Field(default_factory=utc_now_iso) last_activity_at: str = Field(default_factory=utc_now_iso) class MultiModalUI: """ Multi-modal user interface for FusionAGI. Provides a unified interface that supports multiple sensory modalities simultaneously, allowing users to interact through their preferred combination of text, voice, visual, haptic, gesture, and biometric inputs. Features: - Seamless switching between modalities - Simultaneous multi-modal input/output - Accessibility support - Context-aware modality selection - Real-time feedback across all active modalities """ def __init__( self, orchestrator: Orchestrator, conversation_manager: ConversationManager, voice_interface: VoiceInterface | None = None, llm_process_callback: Callable[[str, str, dict[str, Any], Any], str] | None = None, ) -> None: """ Initialize multi-modal UI. Args: orchestrator: FusionAGI orchestrator for task execution. conversation_manager: Conversation manager for natural language. voice_interface: Voice interface for speech interaction. llm_process_callback: Optional (session_id, user_input, context, style) -> response for converse(). """ self.orchestrator = orchestrator self.conversation_manager = conversation_manager self.voice_interface = voice_interface self._llm_process_callback = llm_process_callback self._sessions: dict[str, UserSession] = {} self._interface_adapters: dict[ModalityType, InterfaceAdapter] = {} self._receive_lock = asyncio.Lock() # Register voice interface if provided if voice_interface: self._interface_adapters[ModalityType.VOICE] = voice_interface logger.info("MultiModalUI initialized") # ========== Session Management ========== def create_session( self, user_id: str | None = None, preferred_modalities: list[ModalityType] | None = None, accessibility_settings: dict[str, Any] | None = None, ) -> str: """ Create a new user session. Args: user_id: Optional user identifier. preferred_modalities: Preferred interaction modalities. accessibility_settings: Accessibility preferences. Returns: Session ID. """ # Create conversation session conv_session_id = self.conversation_manager.create_session(user_id=user_id) session = UserSession( user_id=user_id, conversation_session_id=conv_session_id, active_modalities=preferred_modalities or [ModalityType.TEXT], accessibility_settings=accessibility_settings or {}, ) self._sessions[session.session_id] = session logger.info( "User session created", extra={ "session_id": session.session_id, "user_id": user_id, "modalities": [m.value for m in session.active_modalities], } ) return session.session_id def get_session(self, session_id: str) -> UserSession | None: """Get user session.""" return self._sessions.get(session_id) def active_session_count(self) -> int: """Return number of active user sessions (for admin panel session_count_callback).""" return len(self._sessions) def end_session(self, session_id: str) -> bool: """ End a user session. Args: session_id: Session identifier. Returns: True if ended, False if not found. """ session = self._sessions.get(session_id) if not session: return False # End conversation session if session.conversation_session_id: self.conversation_manager.end_session(session.conversation_session_id) del self._sessions[session_id] logger.info("User session ended", extra={"session_id": session_id}) return True # ========== Modality Management ========== def register_interface(self, modality: ModalityType, adapter: InterfaceAdapter) -> None: """ Register an interface adapter for a modality. Args: modality: Modality type. adapter: Interface adapter implementation. """ self._interface_adapters[modality] = adapter logger.info("Interface adapter registered", extra={"modality": modality.value}) def enable_modality(self, session_id: str, modality: ModalityType) -> bool: """ Enable a modality for a session. Args: session_id: Session identifier. modality: Modality to enable. Returns: True if enabled, False if session not found or modality unavailable. """ session = self._sessions.get(session_id) if not session: return False if modality not in self._interface_adapters: logger.warning( "Modality not available", extra={"modality": modality.value} ) return False if modality not in session.active_modalities: session.active_modalities.append(modality) logger.info( "Modality enabled", extra={"session_id": session_id, "modality": modality.value} ) return True def disable_modality(self, session_id: str, modality: ModalityType) -> bool: """ Disable a modality for a session. Args: session_id: Session identifier. modality: Modality to disable. Returns: True if disabled, False if session not found. """ session = self._sessions.get(session_id) if not session: return False if modality in session.active_modalities: session.active_modalities.remove(modality) logger.info( "Modality disabled", extra={"session_id": session_id, "modality": modality.value} ) return True # ========== User Interaction ========== async def send_to_user( self, session_id: str, content: Any, modalities: list[ModalityType] | None = None, metadata: dict[str, Any] | None = None, ) -> None: """ Send content to user through active modalities. Args: session_id: Session identifier. content: Content to send (will be adapted per modality). modalities: Specific modalities to use (uses active if None). metadata: Additional metadata for the message. """ session = self._sessions.get(session_id) if not session: logger.warning("Session not found", extra={"session_id": session_id}) return # Determine which modalities to use target_modalities = modalities or session.active_modalities # Send through each active modality for modality in target_modalities: adapter = self._interface_adapters.get(modality) if not adapter: continue # Create modality-specific message message = InterfaceMessage( id=f"msg_{uuid.uuid4().hex[:8]}", modality=modality, content=self._adapt_content(content, modality), metadata=metadata or {}, session_id=session_id, user_id=session.user_id, ) try: await adapter.send(message) except Exception as e: logger.error( "Failed to send through modality", extra={"modality": modality.value, "error": str(e)} ) async def receive_from_user( self, session_id: str, timeout_seconds: float | None = None, ) -> InterfaceMessage | None: """ Receive input from user through any active modality. Args: session_id: Session identifier. timeout_seconds: Optional timeout for receiving. Returns: Received message or None if timeout. """ session = self._sessions.get(session_id) if not session: return None # Listen on all active modalities (first to respond wins) # TODO: Implement proper async race condition handling for modality in session.active_modalities: adapter = self._interface_adapters.get(modality) if adapter: try: message = await adapter.receive(timeout_seconds) if message: # Update session activity session.last_activity_at = utc_now_iso() return message except Exception as e: logger.error( "Failed to receive from modality", extra={"modality": modality.value, "error": str(e)} ) return None # ========== Task Interaction ========== async def submit_task_interactive( self, session_id: str, goal: str, constraints: dict[str, Any] | None = None, ) -> str: """ Submit a task and provide interactive feedback. Args: session_id: Session identifier. goal: Task goal description. constraints: Optional task constraints. Returns: Task ID. """ session = self._sessions.get(session_id) if not session: raise ValueError(f"Session not found: {session_id}") # Submit task task_id = self.orchestrator.submit_task( goal=goal, constraints=constraints or {}, ) # Send confirmation to user await self.send_to_user( session_id, f"Task submitted: {goal}", metadata={"task_id": task_id, "type": "task_confirmation"}, ) # Subscribe to task events for real-time updates self._subscribe_to_task_updates(session_id, task_id) logger.info( "Interactive task submitted", extra={"session_id": session_id, "task_id": task_id} ) return task_id def _subscribe_to_task_updates(self, session_id: str, task_id: str) -> None: """ Subscribe to task updates and relay to user. Args: session_id: Session identifier. task_id: Task identifier. """ def on_task_update(event_type: str, data: dict[str, Any]) -> None: """Handle task update event.""" if data.get("task_id") != task_id: return # Format update message if event_type == "task_state_changed": state = data.get("new_state") message = f"Task {task_id[:8]}: {state}" else: message = f"Task update: {event_type}" # Send to user (async in background) import asyncio try: asyncio.create_task( self.send_to_user( session_id, message, metadata={"task_id": task_id, "event_type": event_type}, ) ) except Exception as e: logger.error("Failed to send task update", extra={"error": str(e)}) # Subscribe to events self.orchestrator._event_bus.subscribe("task_state_changed", on_task_update) self.orchestrator._event_bus.subscribe("task_step_completed", on_task_update) # ========== Conversation Integration ========== async def converse( self, session_id: str, user_input: str, ) -> str: """ Handle conversational interaction. Args: session_id: Session identifier. user_input: User's conversational input. Returns: Agent's response. """ session = self._sessions.get(session_id) if not session or not session.conversation_session_id: return "Session not found" # Add user turn user_turn = ConversationTurn( session_id=session.conversation_session_id, speaker="user", content=user_input, ) self.conversation_manager.add_turn(user_turn) context = self.conversation_manager.get_context_summary(session.conversation_session_id) style = self.conversation_manager.get_style_for_session(session.conversation_session_id) if self._llm_process_callback is not None: response = self._llm_process_callback(session_id, user_input, context, style) else: response = f"I understand you said: {user_input}" # Add agent turn agent_turn = ConversationTurn( session_id=session.conversation_session_id, speaker="agent", content=response, ) self.conversation_manager.add_turn(agent_turn) return response # ========== Utility Methods ========== def _adapt_content(self, content: Any, modality: ModalityType) -> Any: """ Adapt content for a specific modality. Args: content: Original content. modality: Target modality. Returns: Adapted content. """ # Convert content to appropriate format for modality if modality == ModalityType.TEXT: return str(content) elif modality == ModalityType.VOICE: # For voice, ensure it's text that can be synthesized return str(content) elif modality == ModalityType.VISUAL: # For visual, might need to generate images or format for display return {"type": "text", "content": str(content)} elif modality == ModalityType.HAPTIC: # For haptic, might need to generate vibration patterns return {"pattern": "notification", "intensity": 0.5} else: return content def get_available_modalities(self) -> list[ModalityType]: """Get list of available modalities.""" return list(self._interface_adapters.keys()) def get_session_statistics(self, session_id: str) -> dict[str, Any]: """ Get statistics for a session. Args: session_id: Session identifier. Returns: Dictionary with session statistics. """ session = self._sessions.get(session_id) if not session: return {} # Get conversation history history = [] if session.conversation_session_id: history = self.conversation_manager.get_history(session.conversation_session_id) return { "session_id": session_id, "user_id": session.user_id, "active_modalities": [m.value for m in session.active_modalities], "conversation_turns": len(history), "started_at": session.started_at, "last_activity_at": session.last_activity_at, }