Files
FusionAGI/fusionagi/agents/head_agent.py
defiQUG c052b07662
Some checks failed
Tests / test (3.10) (push) Has been cancelled
Tests / test (3.11) (push) Has been cancelled
Tests / test (3.12) (push) Has been cancelled
Tests / lint (push) Has been cancelled
Tests / docker (push) Has been cancelled
Initial commit: add .gitignore and README
2026-02-09 21:51:42 -08:00

233 lines
8.6 KiB
Python

"""Dvādaśa head agent base: structured output via LLM or native reasoning."""
from typing import Any, Protocol, runtime_checkable
from fusionagi.agents.base_agent import BaseAgent
from fusionagi.adapters.base import LLMAdapter
from fusionagi.schemas.messages import AgentMessage, AgentMessageEnvelope
from fusionagi.schemas.head import HeadId, HeadOutput, HeadClaim, HeadRisk
from fusionagi.schemas.grounding import Citation
from fusionagi._logger import logger
@runtime_checkable
class ReasoningProvider(Protocol):
"""Protocol for native reasoning: produce HeadOutput without external APIs."""
def produce_head_output(self, head_id: HeadId, prompt: str) -> HeadOutput:
"""Produce structured HeadOutput for the given head and prompt."""
...
def _head_output_json_schema() -> dict[str, Any]:
"""JSON schema for HeadOutput for LLM structured generation."""
return {
"type": "object",
"required": ["head_id", "summary"],
"properties": {
"head_id": {
"type": "string",
"enum": [h.value for h in HeadId if h != HeadId.WITNESS],
},
"summary": {"type": "string"},
"claims": {
"type": "array",
"items": {
"type": "object",
"properties": {
"claim_text": {"type": "string"},
"confidence": {"type": "number", "minimum": 0, "maximum": 1},
"evidence": {
"type": "array",
"items": {
"type": "object",
"properties": {
"source_id": {"type": "string"},
"excerpt": {"type": "string"},
"confidence": {"type": "number"},
},
},
},
"assumptions": {"type": "array", "items": {"type": "string"}},
},
},
},
"risks": {
"type": "array",
"items": {
"type": "object",
"properties": {
"description": {"type": "string"},
"severity": {"type": "string"},
},
},
},
"questions": {"type": "array", "items": {"type": "string"}},
"recommended_actions": {"type": "array", "items": {"type": "string"}},
"tone_guidance": {"type": "string"},
},
}
class HeadAgent(BaseAgent):
"""
Dvādaśa head agent: produces structured HeadOutput from user prompt.
Uses LLMAdapter.complete_structured with JSON schema.
"""
def __init__(
self,
head_id: HeadId,
role: str,
objective: str,
system_prompt: str,
adapter: LLMAdapter | None = None,
tool_permissions: list[str] | None = None,
reasoning_provider: "ReasoningProvider | None" = None,
) -> None:
if head_id == HeadId.WITNESS:
raise ValueError("HeadAgent is for content heads only; use WitnessAgent for Witness")
super().__init__(
identity=head_id.value,
role=role,
objective=objective,
memory_access=True,
tool_permissions=tool_permissions or [],
)
self._head_id = head_id
self._system_prompt = system_prompt
self._adapter = adapter
self._reasoning_provider = reasoning_provider
def handle_message(self, envelope: AgentMessageEnvelope) -> AgentMessageEnvelope | None:
"""On head_request, produce HeadOutput and return head_output envelope."""
if envelope.message.intent != "head_request":
return None
payload = envelope.message.payload or {}
user_prompt = payload.get("prompt", "")
logger.info(
"HeadAgent handle_message",
extra={"head_id": self._head_id.value, "intent": envelope.message.intent},
)
output = self._produce_output(user_prompt)
if output is None:
return envelope.create_response(
"head_failed",
payload={"error": "Failed to produce head output", "head_id": self._head_id.value},
)
return AgentMessageEnvelope(
message=AgentMessage(
sender=self.identity,
recipient=envelope.message.sender,
intent="head_output",
payload={"head_output": output.model_dump()},
),
task_id=envelope.task_id,
correlation_id=envelope.correlation_id,
)
def _produce_output(self, user_prompt: str) -> HeadOutput | None:
"""Produce HeadOutput via native reasoning or LLM adapter."""
# Prefer native reasoning when available (no external APIs)
if self._reasoning_provider is not None:
try:
return self._reasoning_provider.produce_head_output(
self._head_id, user_prompt or "(No prompt provided)"
)
except Exception as e:
logger.warning(
"Native reasoning failed, falling back",
extra={"head_id": self._head_id.value, "error": str(e)},
)
if not self._adapter:
return self._fallback_output(user_prompt)
messages = [
{"role": "system", "content": self._system_prompt},
{"role": "user", "content": user_prompt or "(No prompt provided)"},
]
raw = self._adapter.complete_structured(
messages,
schema=_head_output_json_schema(),
temperature=0.3,
)
if not isinstance(raw, dict):
logger.warning(
"HeadAgent structured output invalid",
extra={"head_id": self._head_id.value, "raw_type": type(raw).__name__},
)
return self._fallback_output(user_prompt)
return self._parse_output(raw)
def _parse_output(self, raw: dict[str, Any]) -> HeadOutput | None:
"""Parse raw dict into HeadOutput."""
try:
claims = []
for c in raw.get("claims", []):
evidence = [
Citation(
source_id=e.get("source_id", ""),
excerpt=e.get("excerpt", ""),
confidence=e.get("confidence", 1.0),
)
for e in c.get("evidence", [])
]
claims.append(
HeadClaim(
claim_text=c.get("claim_text", ""),
confidence=float(c.get("confidence", 0.5)),
evidence=evidence,
assumptions=c.get("assumptions", []),
)
)
risks = [
HeadRisk(
description=r.get("description", ""),
severity=r.get("severity", "medium"),
)
for r in raw.get("risks", [])
]
return HeadOutput(
head_id=self._head_id,
summary=raw.get("summary", "No summary"),
claims=claims,
risks=risks,
questions=raw.get("questions", []),
recommended_actions=raw.get("recommended_actions", []),
tone_guidance=raw.get("tone_guidance", ""),
)
except Exception as e:
logger.exception(
"HeadAgent parse_output failed",
extra={"head_id": self._head_id.value, "error": str(e)},
)
return None
def _fallback_output(self, user_prompt: str) -> HeadOutput:
"""Fallback when both reasoning provider and adapter fail or are absent."""
return HeadOutput(
head_id=self._head_id,
summary=f"{self.role} head: Unable to produce structured analysis for this prompt.",
claims=[
HeadClaim(
claim_text="Analysis requires reasoning provider or LLM adapter.",
confidence=0.0,
evidence=[],
assumptions=[],
),
],
risks=[HeadRisk(description="No reasoning provider or adapter configured", severity="high")],
questions=[],
recommended_actions=["Configure NativeReasoningProvider or an LLM adapter for this head"],
tone_guidance="",
)