Files
FusionAGI/fusionagi/agents/reasoner.py
defiQUG c052b07662
Some checks failed
Tests / test (3.10) (push) Has been cancelled
Tests / test (3.11) (push) Has been cancelled
Tests / test (3.12) (push) Has been cancelled
Tests / lint (push) Has been cancelled
Tests / docker (push) Has been cancelled
Initial commit: add .gitignore and README
2026-02-09 21:51:42 -08:00

227 lines
7.8 KiB
Python

"""Reasoner agent: reasons over step/subgoal + context; outputs recommendation via CoT.
The Reasoner agent:
- Processes reason_request messages
- Uses Chain-of-Thought or Tree-of-Thought reasoning
- Integrates with WorkingMemory for context
- Records reasoning traces to EpisodicMemory
"""
from __future__ import annotations
import json
from typing import Any, TYPE_CHECKING
from fusionagi.agents.base_agent import BaseAgent
from fusionagi.adapters.base import LLMAdapter
from fusionagi.schemas.messages import AgentMessage, AgentMessageEnvelope
from fusionagi.reasoning import run_chain_of_thought
from fusionagi._logger import logger
if TYPE_CHECKING:
from fusionagi.memory.working import WorkingMemory
from fusionagi.memory.episodic import EpisodicMemory
class ReasonerAgent(BaseAgent):
"""
Reasoner agent: runs Chain-of-Thought reasoning and returns recommendations.
Features:
- LLM-powered reasoning via CoT
- WorkingMemory integration for context enrichment
- EpisodicMemory integration for trace recording
- Confidence scoring
"""
def __init__(
self,
identity: str = "reasoner",
adapter: LLMAdapter | None = None,
working_memory: WorkingMemory | None = None,
episodic_memory: EpisodicMemory | None = None,
) -> None:
"""
Initialize the Reasoner agent.
Args:
identity: Agent identifier.
adapter: LLM adapter for reasoning.
working_memory: Working memory for context retrieval.
episodic_memory: Episodic memory for trace recording.
"""
super().__init__(
identity=identity,
role="Reasoner",
objective="Reason over steps and recommend next actions",
memory_access=True,
tool_permissions=[],
)
self._adapter = adapter
self._working_memory = working_memory
self._episodic_memory = episodic_memory
def handle_message(self, envelope: AgentMessageEnvelope) -> AgentMessageEnvelope | None:
"""On reason_request, run CoT and return recommendation_ready."""
if envelope.message.intent != "reason_request":
return None
logger.info(
"Reasoner handle_message",
extra={"recipient": self.identity, "intent": envelope.message.intent},
)
payload = envelope.message.payload
task_id = envelope.task_id or ""
step_id = payload.get("step_id")
subgoal = payload.get("subgoal", "")
context = payload.get("context", "")
# Enrich context with working memory if available
enriched_context = self._enrich_context(task_id, context)
query = subgoal or f"Consider step: {step_id}. What should we do next?"
if not self._adapter:
return self._respond_without_llm(envelope, step_id)
# Run chain-of-thought reasoning
response, trace = run_chain_of_thought(
self._adapter,
query,
context=enriched_context or None,
)
# Calculate confidence based on trace quality
confidence = self._calculate_confidence(trace)
# Store reasoning in working memory
if self._working_memory and task_id:
self._working_memory.append(
task_id,
"reasoning_history",
{
"step_id": step_id,
"query": query,
"response": response[:500] if response else "",
"confidence": confidence,
},
)
# Record to episodic memory
if self._episodic_memory and task_id:
self._episodic_memory.append(
task_id=task_id,
event={
"type": "reasoning",
"step_id": step_id,
"query": query,
"response_length": len(response) if response else 0,
"trace_length": len(trace),
"confidence": confidence,
},
event_type="reasoning_complete",
)
logger.info(
"Reasoner response",
extra={
"recipient": self.identity,
"response_intent": "recommendation_ready",
"confidence": confidence,
},
)
return AgentMessageEnvelope(
message=AgentMessage(
sender=self.identity,
recipient=envelope.message.sender,
intent="recommendation_ready",
payload={
"step_id": step_id,
"recommendation": response,
"trace": trace,
"confidence": confidence,
},
confidence=confidence,
),
task_id=task_id,
correlation_id=envelope.correlation_id,
)
def _enrich_context(self, task_id: str, base_context: str) -> str:
"""Enrich context with working memory data."""
if not self._working_memory or not task_id:
return base_context
# Get context summary from working memory
context_summary = self._working_memory.get_context_summary(task_id, max_items=5)
if not context_summary:
return base_context
# Get recent reasoning history
reasoning_history = self._working_memory.get_list(task_id, "reasoning_history")
recent_reasoning = reasoning_history[-3:] if reasoning_history else []
enriched_parts = [base_context] if base_context else []
if context_summary:
enriched_parts.append(f"\nWorking memory context: {json.dumps(context_summary, default=str)[:500]}")
if recent_reasoning:
recent_summaries = [
f"- Step {r.get('step_id', '?')}: {r.get('response', '')[:100]}"
for r in recent_reasoning
]
enriched_parts.append(f"\nRecent reasoning:\n" + "\n".join(recent_summaries))
return "\n".join(enriched_parts)
def _calculate_confidence(self, trace: list[dict[str, Any]]) -> float:
"""Calculate confidence score based on reasoning trace."""
if not trace:
return 0.5 # Default confidence without trace
# Simple heuristic: more reasoning steps = more thorough = higher confidence
# But diminishing returns after a point
step_count = len(trace)
if step_count == 0:
return 0.3
elif step_count == 1:
return 0.5
elif step_count == 2:
return 0.7
elif step_count <= 4:
return 0.8
else:
return 0.9
def _respond_without_llm(
self,
envelope: AgentMessageEnvelope,
step_id: str | None,
) -> AgentMessageEnvelope:
"""Generate response when no LLM is available."""
logger.info(
"Reasoner response (no adapter)",
extra={"recipient": self.identity, "response_intent": "recommendation_ready"},
)
return AgentMessageEnvelope(
message=AgentMessage(
sender=self.identity,
recipient=envelope.message.sender,
intent="recommendation_ready",
payload={
"step_id": step_id,
"recommendation": "Proceed with execution (no LLM available for reasoning).",
"trace": [],
"confidence": 0.5,
},
confidence=0.5,
),
task_id=envelope.task_id,
correlation_id=envelope.correlation_id,
)