227 lines
7.8 KiB
Python
227 lines
7.8 KiB
Python
"""Reasoner agent: reasons over step/subgoal + context; outputs recommendation via CoT.
|
|
|
|
The Reasoner agent:
|
|
- Processes reason_request messages
|
|
- Uses Chain-of-Thought or Tree-of-Thought reasoning
|
|
- Integrates with WorkingMemory for context
|
|
- Records reasoning traces to EpisodicMemory
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
from typing import Any, TYPE_CHECKING
|
|
|
|
from fusionagi.agents.base_agent import BaseAgent
|
|
from fusionagi.adapters.base import LLMAdapter
|
|
from fusionagi.schemas.messages import AgentMessage, AgentMessageEnvelope
|
|
from fusionagi.reasoning import run_chain_of_thought
|
|
from fusionagi._logger import logger
|
|
|
|
if TYPE_CHECKING:
|
|
from fusionagi.memory.working import WorkingMemory
|
|
from fusionagi.memory.episodic import EpisodicMemory
|
|
|
|
|
|
class ReasonerAgent(BaseAgent):
|
|
"""
|
|
Reasoner agent: runs Chain-of-Thought reasoning and returns recommendations.
|
|
|
|
Features:
|
|
- LLM-powered reasoning via CoT
|
|
- WorkingMemory integration for context enrichment
|
|
- EpisodicMemory integration for trace recording
|
|
- Confidence scoring
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
identity: str = "reasoner",
|
|
adapter: LLMAdapter | None = None,
|
|
working_memory: WorkingMemory | None = None,
|
|
episodic_memory: EpisodicMemory | None = None,
|
|
) -> None:
|
|
"""
|
|
Initialize the Reasoner agent.
|
|
|
|
Args:
|
|
identity: Agent identifier.
|
|
adapter: LLM adapter for reasoning.
|
|
working_memory: Working memory for context retrieval.
|
|
episodic_memory: Episodic memory for trace recording.
|
|
"""
|
|
super().__init__(
|
|
identity=identity,
|
|
role="Reasoner",
|
|
objective="Reason over steps and recommend next actions",
|
|
memory_access=True,
|
|
tool_permissions=[],
|
|
)
|
|
self._adapter = adapter
|
|
self._working_memory = working_memory
|
|
self._episodic_memory = episodic_memory
|
|
|
|
def handle_message(self, envelope: AgentMessageEnvelope) -> AgentMessageEnvelope | None:
|
|
"""On reason_request, run CoT and return recommendation_ready."""
|
|
if envelope.message.intent != "reason_request":
|
|
return None
|
|
|
|
logger.info(
|
|
"Reasoner handle_message",
|
|
extra={"recipient": self.identity, "intent": envelope.message.intent},
|
|
)
|
|
|
|
payload = envelope.message.payload
|
|
task_id = envelope.task_id or ""
|
|
step_id = payload.get("step_id")
|
|
subgoal = payload.get("subgoal", "")
|
|
context = payload.get("context", "")
|
|
|
|
# Enrich context with working memory if available
|
|
enriched_context = self._enrich_context(task_id, context)
|
|
|
|
query = subgoal or f"Consider step: {step_id}. What should we do next?"
|
|
|
|
if not self._adapter:
|
|
return self._respond_without_llm(envelope, step_id)
|
|
|
|
# Run chain-of-thought reasoning
|
|
response, trace = run_chain_of_thought(
|
|
self._adapter,
|
|
query,
|
|
context=enriched_context or None,
|
|
)
|
|
|
|
# Calculate confidence based on trace quality
|
|
confidence = self._calculate_confidence(trace)
|
|
|
|
# Store reasoning in working memory
|
|
if self._working_memory and task_id:
|
|
self._working_memory.append(
|
|
task_id,
|
|
"reasoning_history",
|
|
{
|
|
"step_id": step_id,
|
|
"query": query,
|
|
"response": response[:500] if response else "",
|
|
"confidence": confidence,
|
|
},
|
|
)
|
|
|
|
# Record to episodic memory
|
|
if self._episodic_memory and task_id:
|
|
self._episodic_memory.append(
|
|
task_id=task_id,
|
|
event={
|
|
"type": "reasoning",
|
|
"step_id": step_id,
|
|
"query": query,
|
|
"response_length": len(response) if response else 0,
|
|
"trace_length": len(trace),
|
|
"confidence": confidence,
|
|
},
|
|
event_type="reasoning_complete",
|
|
)
|
|
|
|
logger.info(
|
|
"Reasoner response",
|
|
extra={
|
|
"recipient": self.identity,
|
|
"response_intent": "recommendation_ready",
|
|
"confidence": confidence,
|
|
},
|
|
)
|
|
|
|
return AgentMessageEnvelope(
|
|
message=AgentMessage(
|
|
sender=self.identity,
|
|
recipient=envelope.message.sender,
|
|
intent="recommendation_ready",
|
|
payload={
|
|
"step_id": step_id,
|
|
"recommendation": response,
|
|
"trace": trace,
|
|
"confidence": confidence,
|
|
},
|
|
confidence=confidence,
|
|
),
|
|
task_id=task_id,
|
|
correlation_id=envelope.correlation_id,
|
|
)
|
|
|
|
def _enrich_context(self, task_id: str, base_context: str) -> str:
|
|
"""Enrich context with working memory data."""
|
|
if not self._working_memory or not task_id:
|
|
return base_context
|
|
|
|
# Get context summary from working memory
|
|
context_summary = self._working_memory.get_context_summary(task_id, max_items=5)
|
|
|
|
if not context_summary:
|
|
return base_context
|
|
|
|
# Get recent reasoning history
|
|
reasoning_history = self._working_memory.get_list(task_id, "reasoning_history")
|
|
recent_reasoning = reasoning_history[-3:] if reasoning_history else []
|
|
|
|
enriched_parts = [base_context] if base_context else []
|
|
|
|
if context_summary:
|
|
enriched_parts.append(f"\nWorking memory context: {json.dumps(context_summary, default=str)[:500]}")
|
|
|
|
if recent_reasoning:
|
|
recent_summaries = [
|
|
f"- Step {r.get('step_id', '?')}: {r.get('response', '')[:100]}"
|
|
for r in recent_reasoning
|
|
]
|
|
enriched_parts.append(f"\nRecent reasoning:\n" + "\n".join(recent_summaries))
|
|
|
|
return "\n".join(enriched_parts)
|
|
|
|
def _calculate_confidence(self, trace: list[dict[str, Any]]) -> float:
|
|
"""Calculate confidence score based on reasoning trace."""
|
|
if not trace:
|
|
return 0.5 # Default confidence without trace
|
|
|
|
# Simple heuristic: more reasoning steps = more thorough = higher confidence
|
|
# But diminishing returns after a point
|
|
step_count = len(trace)
|
|
|
|
if step_count == 0:
|
|
return 0.3
|
|
elif step_count == 1:
|
|
return 0.5
|
|
elif step_count == 2:
|
|
return 0.7
|
|
elif step_count <= 4:
|
|
return 0.8
|
|
else:
|
|
return 0.9
|
|
|
|
def _respond_without_llm(
|
|
self,
|
|
envelope: AgentMessageEnvelope,
|
|
step_id: str | None,
|
|
) -> AgentMessageEnvelope:
|
|
"""Generate response when no LLM is available."""
|
|
logger.info(
|
|
"Reasoner response (no adapter)",
|
|
extra={"recipient": self.identity, "response_intent": "recommendation_ready"},
|
|
)
|
|
return AgentMessageEnvelope(
|
|
message=AgentMessage(
|
|
sender=self.identity,
|
|
recipient=envelope.message.sender,
|
|
intent="recommendation_ready",
|
|
payload={
|
|
"step_id": step_id,
|
|
"recommendation": "Proceed with execution (no LLM available for reasoning).",
|
|
"trace": [],
|
|
"confidence": 0.5,
|
|
},
|
|
confidence=0.5,
|
|
),
|
|
task_id=envelope.task_id,
|
|
correlation_id=envelope.correlation_id,
|
|
)
|