233 lines
8.6 KiB
Python
233 lines
8.6 KiB
Python
"""Dvādaśa head agent base: structured output via LLM or native reasoning."""
|
|
|
|
from typing import Any, Protocol, runtime_checkable
|
|
|
|
from fusionagi.agents.base_agent import BaseAgent
|
|
from fusionagi.adapters.base import LLMAdapter
|
|
from fusionagi.schemas.messages import AgentMessage, AgentMessageEnvelope
|
|
from fusionagi.schemas.head import HeadId, HeadOutput, HeadClaim, HeadRisk
|
|
from fusionagi.schemas.grounding import Citation
|
|
from fusionagi._logger import logger
|
|
|
|
|
|
@runtime_checkable
|
|
class ReasoningProvider(Protocol):
|
|
"""Protocol for native reasoning: produce HeadOutput without external APIs."""
|
|
|
|
def produce_head_output(self, head_id: HeadId, prompt: str) -> HeadOutput:
|
|
"""Produce structured HeadOutput for the given head and prompt."""
|
|
...
|
|
|
|
|
|
def _head_output_json_schema() -> dict[str, Any]:
|
|
"""JSON schema for HeadOutput for LLM structured generation."""
|
|
return {
|
|
"type": "object",
|
|
"required": ["head_id", "summary"],
|
|
"properties": {
|
|
"head_id": {
|
|
"type": "string",
|
|
"enum": [h.value for h in HeadId if h != HeadId.WITNESS],
|
|
},
|
|
"summary": {"type": "string"},
|
|
"claims": {
|
|
"type": "array",
|
|
"items": {
|
|
"type": "object",
|
|
"properties": {
|
|
"claim_text": {"type": "string"},
|
|
"confidence": {"type": "number", "minimum": 0, "maximum": 1},
|
|
"evidence": {
|
|
"type": "array",
|
|
"items": {
|
|
"type": "object",
|
|
"properties": {
|
|
"source_id": {"type": "string"},
|
|
"excerpt": {"type": "string"},
|
|
"confidence": {"type": "number"},
|
|
},
|
|
},
|
|
},
|
|
"assumptions": {"type": "array", "items": {"type": "string"}},
|
|
},
|
|
},
|
|
},
|
|
"risks": {
|
|
"type": "array",
|
|
"items": {
|
|
"type": "object",
|
|
"properties": {
|
|
"description": {"type": "string"},
|
|
"severity": {"type": "string"},
|
|
},
|
|
},
|
|
},
|
|
"questions": {"type": "array", "items": {"type": "string"}},
|
|
"recommended_actions": {"type": "array", "items": {"type": "string"}},
|
|
"tone_guidance": {"type": "string"},
|
|
},
|
|
}
|
|
|
|
|
|
class HeadAgent(BaseAgent):
|
|
"""
|
|
Dvādaśa head agent: produces structured HeadOutput from user prompt.
|
|
Uses LLMAdapter.complete_structured with JSON schema.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
head_id: HeadId,
|
|
role: str,
|
|
objective: str,
|
|
system_prompt: str,
|
|
adapter: LLMAdapter | None = None,
|
|
tool_permissions: list[str] | None = None,
|
|
reasoning_provider: "ReasoningProvider | None" = None,
|
|
) -> None:
|
|
if head_id == HeadId.WITNESS:
|
|
raise ValueError("HeadAgent is for content heads only; use WitnessAgent for Witness")
|
|
super().__init__(
|
|
identity=head_id.value,
|
|
role=role,
|
|
objective=objective,
|
|
memory_access=True,
|
|
tool_permissions=tool_permissions or [],
|
|
)
|
|
self._head_id = head_id
|
|
self._system_prompt = system_prompt
|
|
self._adapter = adapter
|
|
self._reasoning_provider = reasoning_provider
|
|
|
|
def handle_message(self, envelope: AgentMessageEnvelope) -> AgentMessageEnvelope | None:
|
|
"""On head_request, produce HeadOutput and return head_output envelope."""
|
|
if envelope.message.intent != "head_request":
|
|
return None
|
|
|
|
payload = envelope.message.payload or {}
|
|
user_prompt = payload.get("prompt", "")
|
|
|
|
logger.info(
|
|
"HeadAgent handle_message",
|
|
extra={"head_id": self._head_id.value, "intent": envelope.message.intent},
|
|
)
|
|
|
|
output = self._produce_output(user_prompt)
|
|
if output is None:
|
|
return envelope.create_response(
|
|
"head_failed",
|
|
payload={"error": "Failed to produce head output", "head_id": self._head_id.value},
|
|
)
|
|
|
|
return AgentMessageEnvelope(
|
|
message=AgentMessage(
|
|
sender=self.identity,
|
|
recipient=envelope.message.sender,
|
|
intent="head_output",
|
|
payload={"head_output": output.model_dump()},
|
|
),
|
|
task_id=envelope.task_id,
|
|
correlation_id=envelope.correlation_id,
|
|
)
|
|
|
|
def _produce_output(self, user_prompt: str) -> HeadOutput | None:
|
|
"""Produce HeadOutput via native reasoning or LLM adapter."""
|
|
# Prefer native reasoning when available (no external APIs)
|
|
if self._reasoning_provider is not None:
|
|
try:
|
|
return self._reasoning_provider.produce_head_output(
|
|
self._head_id, user_prompt or "(No prompt provided)"
|
|
)
|
|
except Exception as e:
|
|
logger.warning(
|
|
"Native reasoning failed, falling back",
|
|
extra={"head_id": self._head_id.value, "error": str(e)},
|
|
)
|
|
|
|
if not self._adapter:
|
|
return self._fallback_output(user_prompt)
|
|
|
|
messages = [
|
|
{"role": "system", "content": self._system_prompt},
|
|
{"role": "user", "content": user_prompt or "(No prompt provided)"},
|
|
]
|
|
|
|
raw = self._adapter.complete_structured(
|
|
messages,
|
|
schema=_head_output_json_schema(),
|
|
temperature=0.3,
|
|
)
|
|
if not isinstance(raw, dict):
|
|
logger.warning(
|
|
"HeadAgent structured output invalid",
|
|
extra={"head_id": self._head_id.value, "raw_type": type(raw).__name__},
|
|
)
|
|
return self._fallback_output(user_prompt)
|
|
|
|
return self._parse_output(raw)
|
|
|
|
def _parse_output(self, raw: dict[str, Any]) -> HeadOutput | None:
|
|
"""Parse raw dict into HeadOutput."""
|
|
try:
|
|
claims = []
|
|
for c in raw.get("claims", []):
|
|
evidence = [
|
|
Citation(
|
|
source_id=e.get("source_id", ""),
|
|
excerpt=e.get("excerpt", ""),
|
|
confidence=e.get("confidence", 1.0),
|
|
)
|
|
for e in c.get("evidence", [])
|
|
]
|
|
claims.append(
|
|
HeadClaim(
|
|
claim_text=c.get("claim_text", ""),
|
|
confidence=float(c.get("confidence", 0.5)),
|
|
evidence=evidence,
|
|
assumptions=c.get("assumptions", []),
|
|
)
|
|
)
|
|
|
|
risks = [
|
|
HeadRisk(
|
|
description=r.get("description", ""),
|
|
severity=r.get("severity", "medium"),
|
|
)
|
|
for r in raw.get("risks", [])
|
|
]
|
|
|
|
return HeadOutput(
|
|
head_id=self._head_id,
|
|
summary=raw.get("summary", "No summary"),
|
|
claims=claims,
|
|
risks=risks,
|
|
questions=raw.get("questions", []),
|
|
recommended_actions=raw.get("recommended_actions", []),
|
|
tone_guidance=raw.get("tone_guidance", ""),
|
|
)
|
|
except Exception as e:
|
|
logger.exception(
|
|
"HeadAgent parse_output failed",
|
|
extra={"head_id": self._head_id.value, "error": str(e)},
|
|
)
|
|
return None
|
|
|
|
def _fallback_output(self, user_prompt: str) -> HeadOutput:
|
|
"""Fallback when both reasoning provider and adapter fail or are absent."""
|
|
return HeadOutput(
|
|
head_id=self._head_id,
|
|
summary=f"{self.role} head: Unable to produce structured analysis for this prompt.",
|
|
claims=[
|
|
HeadClaim(
|
|
claim_text="Analysis requires reasoning provider or LLM adapter.",
|
|
confidence=0.0,
|
|
evidence=[],
|
|
assumptions=[],
|
|
),
|
|
],
|
|
risks=[HeadRisk(description="No reasoning provider or adapter configured", severity="high")],
|
|
questions=[],
|
|
recommended_actions=["Configure NativeReasoningProvider or an LLM adapter for this head"],
|
|
tone_guidance="",
|
|
)
|