"""WebSocket streaming for Dvādaśa responses.""" import asyncio import json from concurrent.futures import ThreadPoolExecutor from typing import Any from fusionagi.api.dependencies import get_orchestrator, get_session_store, get_event_bus from fusionagi.core import run_heads_parallel, run_witness, select_heads_for_complexity from fusionagi.schemas.commands import parse_user_input from fusionagi.schemas.head import HeadId, HeadOutput async def handle_stream( session_id: str, prompt: str, send_fn: Any, ) -> None: """ Run Dvādaśa flow and stream events to WebSocket. Events: heads_running, head_complete, heads_done, witness_running, complete. """ from fusionagi.api.dependencies import ensure_initialized ensure_initialized() store = get_session_store() orch = get_orchestrator() bus = get_event_bus() if not store or not orch: await send_fn({"type": "error", "message": "Service not initialized"}) return sess = store.get(session_id) if not sess: await send_fn({"type": "error", "message": "Session not found"}) return if not prompt: await send_fn({"type": "error", "message": "prompt is required"}) return loop = asyncio.get_event_loop() executor = ThreadPoolExecutor(max_workers=1) parsed = parse_user_input(prompt) task_id = orch.submit_task(goal=prompt[:200]) head_ids = select_heads_for_complexity(prompt) if parsed.intent.value == "head_strategy" and parsed.head_id: head_ids = [parsed.head_id] await send_fn({"type": "heads_running", "message": "Heads running…"}) def run_heads(): return run_heads_parallel(orch, task_id, prompt, head_ids=head_ids) try: head_outputs = await loop.run_in_executor(executor, run_heads) except Exception as e: await send_fn({"type": "error", "message": str(e)}) return for ho in head_outputs: await send_fn({ "type": "head_complete", "head_id": ho.head_id.value, "summary": ho.summary, }) await send_fn({ "type": "head_speak", "head_id": ho.head_id.value, "summary": ho.summary, "audio_base64": None, }) await send_fn({"type": "witness_running", "message": "Witness composing…"}) def run_wit(): return run_witness(orch, task_id, head_outputs, prompt) try: final = await loop.run_in_executor(executor, run_wit) except Exception as e: await send_fn({"type": "error", "message": str(e)}) return if final: await send_fn({ "type": "complete", "final_answer": final.final_answer, "transparency_report": final.transparency_report.model_dump(), "head_contributions": final.head_contributions, "confidence_score": final.confidence_score, }) store.append_history(session_id, { "prompt": prompt, "final_answer": final.final_answer, "confidence_score": final.confidence_score, }) else: await send_fn({"type": "error", "message": "Failed to produce response"})