"""Planner agent: decomposes goals into plan graph; uses LLM adapter when provided.""" import json import re from typing import Any from fusionagi.agents.base_agent import BaseAgent from fusionagi.adapters.base import LLMAdapter from fusionagi.schemas.messages import AgentMessage, AgentMessageEnvelope from fusionagi._logger import logger PLAN_REQUEST_SYSTEM = """You are a planner. Given a goal and optional constraints, output a JSON object with this exact structure: {"steps": [{"id": "step_1", "description": "...", "dependencies": []}, ...], "fallback_paths": []} Each step has: id (string), description (string), dependencies (list of step ids that must complete first). Output only valid JSON, no markdown or extra text.""" class PlannerAgent(BaseAgent): """Planner: responds to plan_request with a plan; uses adapter if set, else fixed plan.""" DEFAULT_PLAN = { "steps": [ {"id": "step_1", "description": "Analyze goal", "dependencies": []}, {"id": "step_2", "description": "Execute primary action", "dependencies": ["step_1"]}, {"id": "step_3", "description": "Verify result", "dependencies": ["step_2"]}, ], "fallback_paths": [], } def __init__( self, identity: str = "planner", adapter: LLMAdapter | None = None, ) -> None: super().__init__( identity=identity, role="Planner", objective="Decompose goals into executable steps", memory_access=True, tool_permissions=[], ) self._adapter = adapter def handle_message(self, envelope: AgentMessageEnvelope) -> AgentMessageEnvelope | None: """On plan_request, return plan_ready with plan from adapter or default.""" if envelope.message.intent != "plan_request": return None logger.info( "Planner handle_message", extra={"recipient": self.identity, "intent": envelope.message.intent}, ) goal = envelope.message.payload.get("goal", "") constraints = envelope.message.payload.get("constraints", []) plan_dict = self._get_plan(goal, constraints) logger.info( "Planner response", extra={"recipient": self.identity, "response_intent": "plan_ready"}, ) return AgentMessageEnvelope( message=AgentMessage( sender=self.identity, recipient=envelope.message.sender, intent="plan_ready", payload={"plan": plan_dict}, ), task_id=envelope.task_id, correlation_id=envelope.correlation_id, ) def _get_plan(self, goal: str, constraints: list[str]) -> dict[str, Any]: """Produce plan dict: use adapter if available and parsing succeeds, else default.""" if not self._adapter or not goal: return self.DEFAULT_PLAN user_content = f"Goal: {goal}\n" if constraints: user_content += "Constraints: " + ", ".join(constraints) + "\n" user_content += "Output the plan as JSON only." messages = [ {"role": "system", "content": PLAN_REQUEST_SYSTEM}, {"role": "user", "content": user_content}, ] try: raw = self._adapter.complete(messages) plan_dict = self._parse_plan_response(raw) if plan_dict and plan_dict.get("steps"): return plan_dict except Exception: logger.exception( "Planner adapter or parse failed, using default plan", extra={"intent": "plan_request"}, ) return self.DEFAULT_PLAN def _parse_plan_response(self, raw: str) -> dict[str, Any] | None: """Extract JSON plan from raw response (handle code blocks).""" raw = raw.strip() for start in ("```json", "```"): if raw.startswith(start): raw = raw[len(start) :].strip() if raw.endswith("```"): raw = raw[:-3].strip() match = re.search(r"\{[\s\S]*\}", raw) if match: try: return json.loads(match.group()) except json.JSONDecodeError as e: logger.debug("Planner JSON parse failed (match)", extra={"error": str(e)}) try: return json.loads(raw) except json.JSONDecodeError as e: logger.debug("Planner JSON parse failed (raw)", extra={"error": str(e)}) return None