"""Dvādaśa head agent base: structured output via LLM or native reasoning.""" from typing import Any, Protocol, runtime_checkable from fusionagi.agents.base_agent import BaseAgent from fusionagi.adapters.base import LLMAdapter from fusionagi.schemas.messages import AgentMessage, AgentMessageEnvelope from fusionagi.schemas.head import HeadId, HeadOutput, HeadClaim, HeadRisk from fusionagi.schemas.grounding import Citation from fusionagi._logger import logger @runtime_checkable class ReasoningProvider(Protocol): """Protocol for native reasoning: produce HeadOutput without external APIs.""" def produce_head_output(self, head_id: HeadId, prompt: str) -> HeadOutput: """Produce structured HeadOutput for the given head and prompt.""" ... def _head_output_json_schema() -> dict[str, Any]: """JSON schema for HeadOutput for LLM structured generation.""" return { "type": "object", "required": ["head_id", "summary"], "properties": { "head_id": { "type": "string", "enum": [h.value for h in HeadId if h != HeadId.WITNESS], }, "summary": {"type": "string"}, "claims": { "type": "array", "items": { "type": "object", "properties": { "claim_text": {"type": "string"}, "confidence": {"type": "number", "minimum": 0, "maximum": 1}, "evidence": { "type": "array", "items": { "type": "object", "properties": { "source_id": {"type": "string"}, "excerpt": {"type": "string"}, "confidence": {"type": "number"}, }, }, }, "assumptions": {"type": "array", "items": {"type": "string"}}, }, }, }, "risks": { "type": "array", "items": { "type": "object", "properties": { "description": {"type": "string"}, "severity": {"type": "string"}, }, }, }, "questions": {"type": "array", "items": {"type": "string"}}, "recommended_actions": {"type": "array", "items": {"type": "string"}}, "tone_guidance": {"type": "string"}, }, } class HeadAgent(BaseAgent): """ Dvādaśa head agent: produces structured HeadOutput from user prompt. Uses LLMAdapter.complete_structured with JSON schema. """ def __init__( self, head_id: HeadId, role: str, objective: str, system_prompt: str, adapter: LLMAdapter | None = None, tool_permissions: list[str] | None = None, reasoning_provider: "ReasoningProvider | None" = None, ) -> None: if head_id == HeadId.WITNESS: raise ValueError("HeadAgent is for content heads only; use WitnessAgent for Witness") super().__init__( identity=head_id.value, role=role, objective=objective, memory_access=True, tool_permissions=tool_permissions or [], ) self._head_id = head_id self._system_prompt = system_prompt self._adapter = adapter self._reasoning_provider = reasoning_provider def handle_message(self, envelope: AgentMessageEnvelope) -> AgentMessageEnvelope | None: """On head_request, produce HeadOutput and return head_output envelope.""" if envelope.message.intent != "head_request": return None payload = envelope.message.payload or {} user_prompt = payload.get("prompt", "") logger.info( "HeadAgent handle_message", extra={"head_id": self._head_id.value, "intent": envelope.message.intent}, ) output = self._produce_output(user_prompt) if output is None: return envelope.create_response( "head_failed", payload={"error": "Failed to produce head output", "head_id": self._head_id.value}, ) return AgentMessageEnvelope( message=AgentMessage( sender=self.identity, recipient=envelope.message.sender, intent="head_output", payload={"head_output": output.model_dump()}, ), task_id=envelope.task_id, correlation_id=envelope.correlation_id, ) def _produce_output(self, user_prompt: str) -> HeadOutput | None: """Produce HeadOutput via native reasoning or LLM adapter.""" # Prefer native reasoning when available (no external APIs) if self._reasoning_provider is not None: try: return self._reasoning_provider.produce_head_output( self._head_id, user_prompt or "(No prompt provided)" ) except Exception as e: logger.warning( "Native reasoning failed, falling back", extra={"head_id": self._head_id.value, "error": str(e)}, ) if not self._adapter: return self._fallback_output(user_prompt) messages = [ {"role": "system", "content": self._system_prompt}, {"role": "user", "content": user_prompt or "(No prompt provided)"}, ] raw = self._adapter.complete_structured( messages, schema=_head_output_json_schema(), temperature=0.3, ) if not isinstance(raw, dict): logger.warning( "HeadAgent structured output invalid", extra={"head_id": self._head_id.value, "raw_type": type(raw).__name__}, ) return self._fallback_output(user_prompt) return self._parse_output(raw) def _parse_output(self, raw: dict[str, Any]) -> HeadOutput | None: """Parse raw dict into HeadOutput.""" try: claims = [] for c in raw.get("claims", []): evidence = [ Citation( source_id=e.get("source_id", ""), excerpt=e.get("excerpt", ""), confidence=e.get("confidence", 1.0), ) for e in c.get("evidence", []) ] claims.append( HeadClaim( claim_text=c.get("claim_text", ""), confidence=float(c.get("confidence", 0.5)), evidence=evidence, assumptions=c.get("assumptions", []), ) ) risks = [ HeadRisk( description=r.get("description", ""), severity=r.get("severity", "medium"), ) for r in raw.get("risks", []) ] return HeadOutput( head_id=self._head_id, summary=raw.get("summary", "No summary"), claims=claims, risks=risks, questions=raw.get("questions", []), recommended_actions=raw.get("recommended_actions", []), tone_guidance=raw.get("tone_guidance", ""), ) except Exception as e: logger.exception( "HeadAgent parse_output failed", extra={"head_id": self._head_id.value, "error": str(e)}, ) return None def _fallback_output(self, user_prompt: str) -> HeadOutput: """Fallback when both reasoning provider and adapter fail or are absent.""" return HeadOutput( head_id=self._head_id, summary=f"{self.role} head: Unable to produce structured analysis for this prompt.", claims=[ HeadClaim( claim_text="Analysis requires reasoning provider or LLM adapter.", confidence=0.0, evidence=[], assumptions=[], ), ], risks=[HeadRisk(description="No reasoning provider or adapter configured", severity="high")], questions=[], recommended_actions=["Configure NativeReasoningProvider or an LLM adapter for this head"], tone_guidance="", )