"""Reasoner agent: reasons over step/subgoal + context; outputs recommendation via CoT. The Reasoner agent: - Processes reason_request messages - Uses Chain-of-Thought or Tree-of-Thought reasoning - Integrates with WorkingMemory for context - Records reasoning traces to EpisodicMemory """ from __future__ import annotations import json from typing import Any, TYPE_CHECKING from fusionagi.agents.base_agent import BaseAgent from fusionagi.adapters.base import LLMAdapter from fusionagi.schemas.messages import AgentMessage, AgentMessageEnvelope from fusionagi.reasoning import run_chain_of_thought from fusionagi._logger import logger if TYPE_CHECKING: from fusionagi.memory.working import WorkingMemory from fusionagi.memory.episodic import EpisodicMemory class ReasonerAgent(BaseAgent): """ Reasoner agent: runs Chain-of-Thought reasoning and returns recommendations. Features: - LLM-powered reasoning via CoT - WorkingMemory integration for context enrichment - EpisodicMemory integration for trace recording - Confidence scoring """ def __init__( self, identity: str = "reasoner", adapter: LLMAdapter | None = None, working_memory: WorkingMemory | None = None, episodic_memory: EpisodicMemory | None = None, ) -> None: """ Initialize the Reasoner agent. Args: identity: Agent identifier. adapter: LLM adapter for reasoning. working_memory: Working memory for context retrieval. episodic_memory: Episodic memory for trace recording. """ super().__init__( identity=identity, role="Reasoner", objective="Reason over steps and recommend next actions", memory_access=True, tool_permissions=[], ) self._adapter = adapter self._working_memory = working_memory self._episodic_memory = episodic_memory def handle_message(self, envelope: AgentMessageEnvelope) -> AgentMessageEnvelope | None: """On reason_request, run CoT and return recommendation_ready.""" if envelope.message.intent != "reason_request": return None logger.info( "Reasoner handle_message", extra={"recipient": self.identity, "intent": envelope.message.intent}, ) payload = envelope.message.payload task_id = envelope.task_id or "" step_id = payload.get("step_id") subgoal = payload.get("subgoal", "") context = payload.get("context", "") # Enrich context with working memory if available enriched_context = self._enrich_context(task_id, context) query = subgoal or f"Consider step: {step_id}. What should we do next?" if not self._adapter: return self._respond_without_llm(envelope, step_id) # Run chain-of-thought reasoning response, trace = run_chain_of_thought( self._adapter, query, context=enriched_context or None, ) # Calculate confidence based on trace quality confidence = self._calculate_confidence(trace) # Store reasoning in working memory if self._working_memory and task_id: self._working_memory.append( task_id, "reasoning_history", { "step_id": step_id, "query": query, "response": response[:500] if response else "", "confidence": confidence, }, ) # Record to episodic memory if self._episodic_memory and task_id: self._episodic_memory.append( task_id=task_id, event={ "type": "reasoning", "step_id": step_id, "query": query, "response_length": len(response) if response else 0, "trace_length": len(trace), "confidence": confidence, }, event_type="reasoning_complete", ) logger.info( "Reasoner response", extra={ "recipient": self.identity, "response_intent": "recommendation_ready", "confidence": confidence, }, ) return AgentMessageEnvelope( message=AgentMessage( sender=self.identity, recipient=envelope.message.sender, intent="recommendation_ready", payload={ "step_id": step_id, "recommendation": response, "trace": trace, "confidence": confidence, }, confidence=confidence, ), task_id=task_id, correlation_id=envelope.correlation_id, ) def _enrich_context(self, task_id: str, base_context: str) -> str: """Enrich context with working memory data.""" if not self._working_memory or not task_id: return base_context # Get context summary from working memory context_summary = self._working_memory.get_context_summary(task_id, max_items=5) if not context_summary: return base_context # Get recent reasoning history reasoning_history = self._working_memory.get_list(task_id, "reasoning_history") recent_reasoning = reasoning_history[-3:] if reasoning_history else [] enriched_parts = [base_context] if base_context else [] if context_summary: enriched_parts.append(f"\nWorking memory context: {json.dumps(context_summary, default=str)[:500]}") if recent_reasoning: recent_summaries = [ f"- Step {r.get('step_id', '?')}: {r.get('response', '')[:100]}" for r in recent_reasoning ] enriched_parts.append(f"\nRecent reasoning:\n" + "\n".join(recent_summaries)) return "\n".join(enriched_parts) def _calculate_confidence(self, trace: list[dict[str, Any]]) -> float: """Calculate confidence score based on reasoning trace.""" if not trace: return 0.5 # Default confidence without trace # Simple heuristic: more reasoning steps = more thorough = higher confidence # But diminishing returns after a point step_count = len(trace) if step_count == 0: return 0.3 elif step_count == 1: return 0.5 elif step_count == 2: return 0.7 elif step_count <= 4: return 0.8 else: return 0.9 def _respond_without_llm( self, envelope: AgentMessageEnvelope, step_id: str | None, ) -> AgentMessageEnvelope: """Generate response when no LLM is available.""" logger.info( "Reasoner response (no adapter)", extra={"recipient": self.identity, "response_intent": "recommendation_ready"}, ) return AgentMessageEnvelope( message=AgentMessage( sender=self.identity, recipient=envelope.message.sender, intent="recommendation_ready", payload={ "step_id": step_id, "recommendation": "Proceed with execution (no LLM available for reasoning).", "trace": [], "confidence": 0.5, }, confidence=0.5, ), task_id=envelope.task_id, correlation_id=envelope.correlation_id, )