"""Admin control panel for FusionAGI system management. Provides administrative interface for: - Voice library management - Conversation tuning - Agent configuration - System monitoring - Governance policies - Manufacturing authority """ from typing import Any, Callable, Literal from pydantic import BaseModel, Field from fusionagi._time import utc_now, utc_now_iso from fusionagi.interfaces.voice import VoiceLibrary, VoiceProfile from fusionagi.interfaces.conversation import ConversationTuner, ConversationStyle from fusionagi.core import Orchestrator, EventBus, StateManager from fusionagi.governance import PolicyEngine, AuditLog from fusionagi._logger import logger class SystemStatus(BaseModel): """System status information.""" status: Literal["healthy", "degraded", "offline"] = Field(description="Overall system status") uptime_seconds: float = Field(description="System uptime in seconds") active_tasks: int = Field(description="Number of active tasks") active_agents: int = Field(description="Number of registered agents") active_sessions: int = Field(description="Number of active user sessions") memory_usage_mb: float | None = Field(default=None, description="Memory usage in MB") cpu_usage_percent: float | None = Field(default=None, description="CPU usage percentage") timestamp: str = Field(default_factory=utc_now_iso) class AgentConfig(BaseModel): """Configuration for an agent.""" agent_id: str agent_type: str enabled: bool = Field(default=True) max_concurrent_tasks: int = Field(default=10) timeout_seconds: float = Field(default=300.0) retry_policy: dict[str, Any] = Field(default_factory=dict) metadata: dict[str, Any] = Field(default_factory=dict) class AdminControlPanel: """ Administrative control panel for FusionAGI. Provides centralized management interface for: - Voice libraries and TTS/STT configuration - Conversation styles and natural language tuning - Agent configuration and monitoring - System health and performance metrics - Governance policies and audit logs - Manufacturing authority (MAA) settings """ def __init__( self, orchestrator: Orchestrator, event_bus: EventBus, state_manager: StateManager, voice_library: VoiceLibrary | None = None, conversation_tuner: ConversationTuner | None = None, policy_engine: PolicyEngine | None = None, audit_log: AuditLog | None = None, session_count_callback: Callable[[], int] | None = None, ) -> None: """ Initialize admin control panel. Args: orchestrator: FusionAGI orchestrator. event_bus: Event bus for system events (use EventBus(history_size=N) for event history). state_manager: State manager for task state. voice_library: Voice library for TTS management. conversation_tuner: Conversation tuner for NL configuration. policy_engine: Policy engine for governance. audit_log: Audit log for compliance tracking. session_count_callback: Optional callback returning active user session count (e.g. from MultiModalUI). """ self.orchestrator = orchestrator self.event_bus = event_bus self.state_manager = state_manager self.voice_library = voice_library or VoiceLibrary() self.conversation_tuner = conversation_tuner or ConversationTuner() self.policy_engine = policy_engine self.audit_log = audit_log self._session_count_callback = session_count_callback self._agent_configs: dict[str, AgentConfig] = {} self._start_time = utc_now() logger.info("AdminControlPanel initialized") # ========== Voice Management ========== def add_voice_profile(self, profile: VoiceProfile) -> str: """ Add a voice profile to the library. Args: profile: Voice profile to add. Returns: Voice ID. """ voice_id = self.voice_library.add_voice(profile) self._log_admin_action("voice_added", {"voice_id": voice_id, "name": profile.name}) return voice_id def list_voices( self, language: str | None = None, gender: str | None = None, style: str | None = None, ) -> list[VoiceProfile]: """List voice profiles with optional filtering.""" return self.voice_library.list_voices(language=language, gender=gender, style=style) def update_voice_profile(self, voice_id: str, updates: dict[str, Any]) -> bool: """ Update a voice profile. Args: voice_id: Voice ID to update. updates: Dictionary of fields to update. Returns: True if updated, False if not found. """ success = self.voice_library.update_voice(voice_id, updates) if success: self._log_admin_action("voice_updated", {"voice_id": voice_id, "fields": list(updates.keys())}) return success def remove_voice_profile(self, voice_id: str) -> bool: """Remove a voice profile.""" success = self.voice_library.remove_voice(voice_id) if success: self._log_admin_action("voice_removed", {"voice_id": voice_id}) return success def set_default_voice(self, voice_id: str) -> bool: """Set the default voice.""" success = self.voice_library.set_default_voice(voice_id) if success: self._log_admin_action("default_voice_set", {"voice_id": voice_id}) return success # ========== Conversation Tuning ========== def register_conversation_style(self, name: str, style: ConversationStyle) -> None: """ Register a conversation style. Args: name: Style name. style: Conversation style configuration. """ self.conversation_tuner.register_style(name, style) self._log_admin_action("conversation_style_registered", {"name": name}) def list_conversation_styles(self) -> list[str]: """List all registered conversation style names.""" return self.conversation_tuner.list_styles() def get_conversation_style(self, name: str) -> ConversationStyle | None: """Get a conversation style by name.""" return self.conversation_tuner.get_style(name) def set_default_conversation_style(self, style: ConversationStyle) -> None: """Set the default conversation style.""" self.conversation_tuner.set_default_style(style) self._log_admin_action("default_conversation_style_set", {}) # ========== Agent Management ========== def configure_agent(self, config: AgentConfig) -> None: """ Configure an agent. Args: config: Agent configuration. """ self._agent_configs[config.agent_id] = config self._log_admin_action("agent_configured", {"agent_id": config.agent_id}) logger.info("Agent configured", extra={"agent_id": config.agent_id}) def get_agent_config(self, agent_id: str) -> AgentConfig | None: """Get agent configuration.""" return self._agent_configs.get(agent_id) def list_agents(self) -> list[str]: """List all registered agent IDs.""" return list(self.orchestrator._agents.keys()) def enable_agent(self, agent_id: str) -> bool: """Enable an agent.""" config = self._agent_configs.get(agent_id) if config: config.enabled = True self._log_admin_action("agent_enabled", {"agent_id": agent_id}) return True return False def disable_agent(self, agent_id: str) -> bool: """Disable an agent.""" config = self._agent_configs.get(agent_id) if config: config.enabled = False self._log_admin_action("agent_disabled", {"agent_id": agent_id}) return True return False # ========== System Monitoring ========== def get_system_status(self) -> SystemStatus: """ Get current system status. Returns: System status information. """ uptime = (utc_now() - self._start_time).total_seconds() # Count active tasks active_tasks = 0 failed_count = 0 for task_id in self.state_manager._tasks.keys(): task = self.state_manager.get_task(task_id) if task: if task.state.value in ("pending", "active"): active_tasks += 1 elif task.state.value == "failed": failed_count += 1 active_agents = len(self.orchestrator._agents) active_sessions = self._session_count_callback() if self._session_count_callback else 0 # Health: healthy under normal load; degraded if high task count or many failures if active_tasks > 1000 or (failed_count > 50 and active_tasks > 100): status: Literal["healthy", "degraded", "offline"] = "degraded" else: status = "healthy" return SystemStatus( status=status, uptime_seconds=uptime, active_tasks=active_tasks, active_agents=active_agents, active_sessions=active_sessions, ) def get_task_statistics(self) -> dict[str, Any]: """ Get task execution statistics. Returns: Dictionary with task statistics. """ stats = { "total_tasks": len(self.state_manager._tasks), "by_state": {}, "by_priority": {}, } for task_id in self.state_manager._tasks.keys(): task = self.state_manager.get_task(task_id) if task: # Count by state state_key = task.state.value stats["by_state"][state_key] = stats["by_state"].get(state_key, 0) + 1 # Count by priority priority_key = task.priority.value stats["by_priority"][priority_key] = stats["by_priority"].get(priority_key, 0) + 1 return stats def get_recent_events(self, limit: int = 50) -> list[dict[str, Any]]: """ Get recent system events from the event bus. Requires EventBus(history_size=N) at construction for non-empty results. Args: limit: Maximum number of events to return. Returns: List of recent events (event_type, payload, timestamp). """ if hasattr(self.event_bus, "get_recent_events"): return self.event_bus.get_recent_events(limit=limit) return [] # ========== Governance & Audit ========== def get_audit_entries( self, limit: int = 100, action_type: str | None = None, ) -> list[dict[str, Any]]: """ Get audit log entries. Args: limit: Maximum number of entries to return. action_type: Optional filter by action type. Returns: List of audit entries. """ if not self.audit_log: return [] entries = self.audit_log.query(limit=limit) if action_type: entries = [e for e in entries if e.get("action") == action_type] return entries def update_policy(self, policy_id: str, policy_data: dict[str, Any]) -> bool: """ Update a governance policy. Args: policy_id: Policy identifier. policy_data: Policy configuration. Returns: True if updated, False if policy engine not available. """ if not self.policy_engine: return False rule_id = policy_data.get("rule_id", policy_id) if self.policy_engine.get_rule(rule_id) is None: return False updates = {k: v for k, v in policy_data.items() if k in ("condition", "effect", "reason", "priority")} ok = self.policy_engine.update_rule(rule_id, updates) if ok: self._log_admin_action("policy_updated", {"policy_id": policy_id, "rule_id": rule_id}) return ok # ========== Utility Methods ========== def _log_admin_action(self, action: str, details: dict[str, Any]) -> None: """ Log an administrative action. Args: action: Action type. details: Action details. """ logger.info(f"Admin action: {action}", extra=details) if self.audit_log: self.audit_log.log( action=action, actor="admin", details=details, timestamp=utc_now_iso(), ) def export_configuration(self) -> dict[str, Any]: """ Export system configuration. Returns: Dictionary with full system configuration. """ return { "voices": [v.model_dump() for v in self.voice_library.list_voices()], "conversation_styles": { name: self.conversation_tuner.get_style(name).model_dump() for name in self.conversation_tuner.list_styles() }, "agent_configs": { agent_id: config.model_dump() for agent_id, config in self._agent_configs.items() }, "exported_at": utc_now_iso(), } def import_configuration(self, config: dict[str, Any]) -> bool: """ Import system configuration. Args: config: Configuration dictionary to import. Returns: True if successful, False otherwise. """ try: # Import voices if "voices" in config: for voice_data in config["voices"]: profile = VoiceProfile(**voice_data) self.voice_library.add_voice(profile) # Import conversation styles if "conversation_styles" in config: for name, style_data in config["conversation_styles"].items(): style = ConversationStyle(**style_data) self.conversation_tuner.register_style(name, style) # Import agent configs if "agent_configs" in config: for agent_id, config_data in config["agent_configs"].items(): agent_config = AgentConfig(**config_data) self._agent_configs[agent_id] = agent_config self._log_admin_action("configuration_imported", {"source": "file"}) return True except Exception as e: logger.error("Configuration import failed", extra={"error": str(e)}) return False