"""API dependencies: orchestrator, session store, guardrails.""" import os from dataclasses import dataclass from typing import Any from fusionagi import Orchestrator, EventBus, StateManager from fusionagi.agents import WitnessAgent from fusionagi.agents.heads import create_all_content_heads from fusionagi.adapters.base import LLMAdapter from fusionagi.adapters.native_adapter import NativeAdapter from fusionagi.schemas.head import HeadId from fusionagi.governance import SafetyPipeline, AuditLog def _get_reasoning_provider() -> Any: """Return reasoning provider based on SUPER_BIG_BRAIN_ENABLED env.""" if os.environ.get("SUPER_BIG_BRAIN_ENABLED", "false").lower() in ("true", "1", "yes"): from fusionagi.core.super_big_brain import SuperBigBrainReasoningProvider from fusionagi.memory import SemanticGraphMemory return SuperBigBrainReasoningProvider(semantic_graph=SemanticGraphMemory()) return None # App state populated by lifespan or lazy init _app_state: dict[str, Any] = {} _default_adapter: Any = None def set_default_adapter(adapter: Any) -> None: global _default_adapter _default_adapter = adapter def default_orchestrator(adapter: LLMAdapter | None = None) -> tuple[Orchestrator, Any]: """Create default Orchestrator with Dvādaśa heads and Witness registered. When adapter is None, uses native reasoning throughout: heads use NativeReasoningProvider, Witness uses NativeAdapter for synthesis. No external LLM calls. """ bus = EventBus() state = StateManager() orch = Orchestrator(event_bus=bus, state_manager=state) # Heads: use native or Super Big Brain reasoning when no adapter reasoning_provider = _get_reasoning_provider() heads = create_all_content_heads( adapter=adapter, reasoning_provider=reasoning_provider, use_native_reasoning=reasoning_provider is None, ) for hid, agent in heads.items(): orch.register_agent(hid.value, agent) # Witness: use NativeAdapter when no adapter for native synthesis witness_adapter = adapter if adapter is not None else NativeAdapter() orch.register_agent(HeadId.WITNESS.value, WitnessAgent(adapter=witness_adapter)) return orch, bus class SessionStore: """In-memory session store for API sessions.""" def __init__(self) -> None: self._sessions: dict[str, dict[str, Any]] = {} def create(self, session_id: str, user_id: str | None = None) -> dict[str, Any]: sess = {"session_id": session_id, "user_id": user_id, "history": []} self._sessions[session_id] = sess return sess def get(self, session_id: str) -> dict[str, Any] | None: return self._sessions.get(session_id) def append_history(self, session_id: str, entry: dict[str, Any]) -> None: sess = self._sessions.get(session_id) if sess: sess.setdefault("history", []).append(entry) def get_orchestrator() -> Any: return _app_state.get("orchestrator") def get_event_bus() -> Any: return _app_state.get("event_bus") def get_session_store() -> SessionStore | None: return _app_state.get("session_store") def get_safety_pipeline() -> Any: return _app_state.get("safety_pipeline") def get_telemetry_tracer() -> Any: return _app_state.get("telemetry_tracer") def set_app_state(orchestrator: Any, event_bus: Any, session_store: SessionStore) -> None: _app_state["orchestrator"] = orchestrator _app_state["event_bus"] = event_bus _app_state["session_store"] = session_store if "safety_pipeline" not in _app_state: _app_state["safety_pipeline"] = SafetyPipeline(audit_log=AuditLog()) try: from fusionagi.telemetry import TelemetryTracer, set_tracer tracer = TelemetryTracer() tracer.subscribe(event_bus) set_tracer(tracer) _app_state["telemetry_tracer"] = tracer except Exception: pass def ensure_initialized(adapter: Any = None) -> None: """Lazy init: ensure orchestrator and store exist (for TestClient).""" if _app_state.get("orchestrator") is not None: return adj = adapter if adapter is not None else _default_adapter orch, bus = default_orchestrator(adj) set_app_state(orch, bus, SessionStore()) @dataclass class OpenAIBridgeConfig: """Configuration for OpenAI-compatible API bridge.""" model_id: str auth_enabled: bool api_key: str | None timeout_per_head: float @classmethod def from_env(cls) -> "OpenAIBridgeConfig": """Load config from environment variables.""" auth = os.environ.get("OPENAI_BRIDGE_AUTH", "disabled").lower() auth_enabled = auth not in ("disabled", "false", "0", "no") return cls( model_id=os.environ.get("OPENAI_BRIDGE_MODEL_ID", "fusionagi-dvadasa"), auth_enabled=auth_enabled, api_key=os.environ.get("OPENAI_BRIDGE_API_KEY") if auth_enabled else None, timeout_per_head=float(os.environ.get("OPENAI_BRIDGE_TIMEOUT_PER_HEAD", "60")), ) def get_openai_bridge_config() -> OpenAIBridgeConfig: """Return OpenAI bridge config from app state or env.""" cfg = _app_state.get("openai_bridge_config") if cfg is not None: return cfg return OpenAIBridgeConfig.from_env() def verify_openai_bridge_auth(authorization: str | None) -> None: """ Verify OpenAI bridge auth. Raises HTTPException(401) if auth enabled and invalid. Call from route dependencies. """ try: from fastapi import HTTPException cfg = get_openai_bridge_config() if not cfg.auth_enabled: return if not cfg.api_key: return # Auth enabled but no key configured: allow (misconfig) if not authorization or not authorization.startswith("Bearer "): raise HTTPException( status_code=401, detail={"error": {"message": "Missing or invalid Authorization header", "type": "authentication_error"}}, ) token = authorization[7:].strip() if token != cfg.api_key: raise HTTPException( status_code=401, detail={"error": {"message": "Invalid API key", "type": "authentication_error"}}, ) except HTTPException: raise except Exception: pass