Files
FusionAGI/fusionagi/interfaces/admin_panel.py
defiQUG c052b07662
Some checks failed
Tests / test (3.10) (push) Has been cancelled
Tests / test (3.11) (push) Has been cancelled
Tests / test (3.12) (push) Has been cancelled
Tests / lint (push) Has been cancelled
Tests / docker (push) Has been cancelled
Initial commit: add .gitignore and README
2026-02-09 21:51:42 -08:00

426 lines
15 KiB
Python

"""Admin control panel for FusionAGI system management.
Provides administrative interface for:
- Voice library management
- Conversation tuning
- Agent configuration
- System monitoring
- Governance policies
- Manufacturing authority
"""
from typing import Any, Callable, Literal
from pydantic import BaseModel, Field
from fusionagi._time import utc_now, utc_now_iso
from fusionagi.interfaces.voice import VoiceLibrary, VoiceProfile
from fusionagi.interfaces.conversation import ConversationTuner, ConversationStyle
from fusionagi.core import Orchestrator, EventBus, StateManager
from fusionagi.governance import PolicyEngine, AuditLog
from fusionagi._logger import logger
class SystemStatus(BaseModel):
"""System status information."""
status: Literal["healthy", "degraded", "offline"] = Field(description="Overall system status")
uptime_seconds: float = Field(description="System uptime in seconds")
active_tasks: int = Field(description="Number of active tasks")
active_agents: int = Field(description="Number of registered agents")
active_sessions: int = Field(description="Number of active user sessions")
memory_usage_mb: float | None = Field(default=None, description="Memory usage in MB")
cpu_usage_percent: float | None = Field(default=None, description="CPU usage percentage")
timestamp: str = Field(default_factory=utc_now_iso)
class AgentConfig(BaseModel):
"""Configuration for an agent."""
agent_id: str
agent_type: str
enabled: bool = Field(default=True)
max_concurrent_tasks: int = Field(default=10)
timeout_seconds: float = Field(default=300.0)
retry_policy: dict[str, Any] = Field(default_factory=dict)
metadata: dict[str, Any] = Field(default_factory=dict)
class AdminControlPanel:
"""
Administrative control panel for FusionAGI.
Provides centralized management interface for:
- Voice libraries and TTS/STT configuration
- Conversation styles and natural language tuning
- Agent configuration and monitoring
- System health and performance metrics
- Governance policies and audit logs
- Manufacturing authority (MAA) settings
"""
def __init__(
self,
orchestrator: Orchestrator,
event_bus: EventBus,
state_manager: StateManager,
voice_library: VoiceLibrary | None = None,
conversation_tuner: ConversationTuner | None = None,
policy_engine: PolicyEngine | None = None,
audit_log: AuditLog | None = None,
session_count_callback: Callable[[], int] | None = None,
) -> None:
"""
Initialize admin control panel.
Args:
orchestrator: FusionAGI orchestrator.
event_bus: Event bus for system events (use EventBus(history_size=N) for event history).
state_manager: State manager for task state.
voice_library: Voice library for TTS management.
conversation_tuner: Conversation tuner for NL configuration.
policy_engine: Policy engine for governance.
audit_log: Audit log for compliance tracking.
session_count_callback: Optional callback returning active user session count (e.g. from MultiModalUI).
"""
self.orchestrator = orchestrator
self.event_bus = event_bus
self.state_manager = state_manager
self.voice_library = voice_library or VoiceLibrary()
self.conversation_tuner = conversation_tuner or ConversationTuner()
self.policy_engine = policy_engine
self.audit_log = audit_log
self._session_count_callback = session_count_callback
self._agent_configs: dict[str, AgentConfig] = {}
self._start_time = utc_now()
logger.info("AdminControlPanel initialized")
# ========== Voice Management ==========
def add_voice_profile(self, profile: VoiceProfile) -> str:
"""
Add a voice profile to the library.
Args:
profile: Voice profile to add.
Returns:
Voice ID.
"""
voice_id = self.voice_library.add_voice(profile)
self._log_admin_action("voice_added", {"voice_id": voice_id, "name": profile.name})
return voice_id
def list_voices(
self,
language: str | None = None,
gender: str | None = None,
style: str | None = None,
) -> list[VoiceProfile]:
"""List voice profiles with optional filtering."""
return self.voice_library.list_voices(language=language, gender=gender, style=style)
def update_voice_profile(self, voice_id: str, updates: dict[str, Any]) -> bool:
"""
Update a voice profile.
Args:
voice_id: Voice ID to update.
updates: Dictionary of fields to update.
Returns:
True if updated, False if not found.
"""
success = self.voice_library.update_voice(voice_id, updates)
if success:
self._log_admin_action("voice_updated", {"voice_id": voice_id, "fields": list(updates.keys())})
return success
def remove_voice_profile(self, voice_id: str) -> bool:
"""Remove a voice profile."""
success = self.voice_library.remove_voice(voice_id)
if success:
self._log_admin_action("voice_removed", {"voice_id": voice_id})
return success
def set_default_voice(self, voice_id: str) -> bool:
"""Set the default voice."""
success = self.voice_library.set_default_voice(voice_id)
if success:
self._log_admin_action("default_voice_set", {"voice_id": voice_id})
return success
# ========== Conversation Tuning ==========
def register_conversation_style(self, name: str, style: ConversationStyle) -> None:
"""
Register a conversation style.
Args:
name: Style name.
style: Conversation style configuration.
"""
self.conversation_tuner.register_style(name, style)
self._log_admin_action("conversation_style_registered", {"name": name})
def list_conversation_styles(self) -> list[str]:
"""List all registered conversation style names."""
return self.conversation_tuner.list_styles()
def get_conversation_style(self, name: str) -> ConversationStyle | None:
"""Get a conversation style by name."""
return self.conversation_tuner.get_style(name)
def set_default_conversation_style(self, style: ConversationStyle) -> None:
"""Set the default conversation style."""
self.conversation_tuner.set_default_style(style)
self._log_admin_action("default_conversation_style_set", {})
# ========== Agent Management ==========
def configure_agent(self, config: AgentConfig) -> None:
"""
Configure an agent.
Args:
config: Agent configuration.
"""
self._agent_configs[config.agent_id] = config
self._log_admin_action("agent_configured", {"agent_id": config.agent_id})
logger.info("Agent configured", extra={"agent_id": config.agent_id})
def get_agent_config(self, agent_id: str) -> AgentConfig | None:
"""Get agent configuration."""
return self._agent_configs.get(agent_id)
def list_agents(self) -> list[str]:
"""List all registered agent IDs."""
return list(self.orchestrator._agents.keys())
def enable_agent(self, agent_id: str) -> bool:
"""Enable an agent."""
config = self._agent_configs.get(agent_id)
if config:
config.enabled = True
self._log_admin_action("agent_enabled", {"agent_id": agent_id})
return True
return False
def disable_agent(self, agent_id: str) -> bool:
"""Disable an agent."""
config = self._agent_configs.get(agent_id)
if config:
config.enabled = False
self._log_admin_action("agent_disabled", {"agent_id": agent_id})
return True
return False
# ========== System Monitoring ==========
def get_system_status(self) -> SystemStatus:
"""
Get current system status.
Returns:
System status information.
"""
uptime = (utc_now() - self._start_time).total_seconds()
# Count active tasks
active_tasks = 0
failed_count = 0
for task_id in self.state_manager._tasks.keys():
task = self.state_manager.get_task(task_id)
if task:
if task.state.value in ("pending", "active"):
active_tasks += 1
elif task.state.value == "failed":
failed_count += 1
active_agents = len(self.orchestrator._agents)
active_sessions = self._session_count_callback() if self._session_count_callback else 0
# Health: healthy under normal load; degraded if high task count or many failures
if active_tasks > 1000 or (failed_count > 50 and active_tasks > 100):
status: Literal["healthy", "degraded", "offline"] = "degraded"
else:
status = "healthy"
return SystemStatus(
status=status,
uptime_seconds=uptime,
active_tasks=active_tasks,
active_agents=active_agents,
active_sessions=active_sessions,
)
def get_task_statistics(self) -> dict[str, Any]:
"""
Get task execution statistics.
Returns:
Dictionary with task statistics.
"""
stats = {
"total_tasks": len(self.state_manager._tasks),
"by_state": {},
"by_priority": {},
}
for task_id in self.state_manager._tasks.keys():
task = self.state_manager.get_task(task_id)
if task:
# Count by state
state_key = task.state.value
stats["by_state"][state_key] = stats["by_state"].get(state_key, 0) + 1
# Count by priority
priority_key = task.priority.value
stats["by_priority"][priority_key] = stats["by_priority"].get(priority_key, 0) + 1
return stats
def get_recent_events(self, limit: int = 50) -> list[dict[str, Any]]:
"""
Get recent system events from the event bus.
Requires EventBus(history_size=N) at construction for non-empty results.
Args:
limit: Maximum number of events to return.
Returns:
List of recent events (event_type, payload, timestamp).
"""
if hasattr(self.event_bus, "get_recent_events"):
return self.event_bus.get_recent_events(limit=limit)
return []
# ========== Governance & Audit ==========
def get_audit_entries(
self,
limit: int = 100,
action_type: str | None = None,
) -> list[dict[str, Any]]:
"""
Get audit log entries.
Args:
limit: Maximum number of entries to return.
action_type: Optional filter by action type.
Returns:
List of audit entries.
"""
if not self.audit_log:
return []
entries = self.audit_log.query(limit=limit)
if action_type:
entries = [e for e in entries if e.get("action") == action_type]
return entries
def update_policy(self, policy_id: str, policy_data: dict[str, Any]) -> bool:
"""
Update a governance policy.
Args:
policy_id: Policy identifier.
policy_data: Policy configuration.
Returns:
True if updated, False if policy engine not available.
"""
if not self.policy_engine:
return False
rule_id = policy_data.get("rule_id", policy_id)
if self.policy_engine.get_rule(rule_id) is None:
return False
updates = {k: v for k, v in policy_data.items() if k in ("condition", "effect", "reason", "priority")}
ok = self.policy_engine.update_rule(rule_id, updates)
if ok:
self._log_admin_action("policy_updated", {"policy_id": policy_id, "rule_id": rule_id})
return ok
# ========== Utility Methods ==========
def _log_admin_action(self, action: str, details: dict[str, Any]) -> None:
"""
Log an administrative action.
Args:
action: Action type.
details: Action details.
"""
logger.info(f"Admin action: {action}", extra=details)
if self.audit_log:
self.audit_log.log(
action=action,
actor="admin",
details=details,
timestamp=utc_now_iso(),
)
def export_configuration(self) -> dict[str, Any]:
"""
Export system configuration.
Returns:
Dictionary with full system configuration.
"""
return {
"voices": [v.model_dump() for v in self.voice_library.list_voices()],
"conversation_styles": {
name: self.conversation_tuner.get_style(name).model_dump()
for name in self.conversation_tuner.list_styles()
},
"agent_configs": {
agent_id: config.model_dump()
for agent_id, config in self._agent_configs.items()
},
"exported_at": utc_now_iso(),
}
def import_configuration(self, config: dict[str, Any]) -> bool:
"""
Import system configuration.
Args:
config: Configuration dictionary to import.
Returns:
True if successful, False otherwise.
"""
try:
# Import voices
if "voices" in config:
for voice_data in config["voices"]:
profile = VoiceProfile(**voice_data)
self.voice_library.add_voice(profile)
# Import conversation styles
if "conversation_styles" in config:
for name, style_data in config["conversation_styles"].items():
style = ConversationStyle(**style_data)
self.conversation_tuner.register_style(name, style)
# Import agent configs
if "agent_configs" in config:
for agent_id, config_data in config["agent_configs"].items():
agent_config = AgentConfig(**config_data)
self._agent_configs[agent_id] = agent_config
self._log_admin_action("configuration_imported", {"source": "file"})
return True
except Exception as e:
logger.error("Configuration import failed", extra={"error": str(e)})
return False