Files
FusionAGI/fusionagi/agents/planner.py
defiQUG c052b07662
Some checks failed
Tests / test (3.10) (push) Has been cancelled
Tests / test (3.11) (push) Has been cancelled
Tests / test (3.12) (push) Has been cancelled
Tests / lint (push) Has been cancelled
Tests / docker (push) Has been cancelled
Initial commit: add .gitignore and README
2026-02-09 21:51:42 -08:00

113 lines
4.5 KiB
Python

"""Planner agent: decomposes goals into plan graph; uses LLM adapter when provided."""
import json
import re
from typing import Any
from fusionagi.agents.base_agent import BaseAgent
from fusionagi.adapters.base import LLMAdapter
from fusionagi.schemas.messages import AgentMessage, AgentMessageEnvelope
from fusionagi._logger import logger
PLAN_REQUEST_SYSTEM = """You are a planner. Given a goal and optional constraints, output a JSON object with this exact structure:
{"steps": [{"id": "step_1", "description": "...", "dependencies": []}, ...], "fallback_paths": []}
Each step has: id (string), description (string), dependencies (list of step ids that must complete first).
Output only valid JSON, no markdown or extra text."""
class PlannerAgent(BaseAgent):
"""Planner: responds to plan_request with a plan; uses adapter if set, else fixed plan."""
DEFAULT_PLAN = {
"steps": [
{"id": "step_1", "description": "Analyze goal", "dependencies": []},
{"id": "step_2", "description": "Execute primary action", "dependencies": ["step_1"]},
{"id": "step_3", "description": "Verify result", "dependencies": ["step_2"]},
],
"fallback_paths": [],
}
def __init__(
self,
identity: str = "planner",
adapter: LLMAdapter | None = None,
) -> None:
super().__init__(
identity=identity,
role="Planner",
objective="Decompose goals into executable steps",
memory_access=True,
tool_permissions=[],
)
self._adapter = adapter
def handle_message(self, envelope: AgentMessageEnvelope) -> AgentMessageEnvelope | None:
"""On plan_request, return plan_ready with plan from adapter or default."""
if envelope.message.intent != "plan_request":
return None
logger.info(
"Planner handle_message",
extra={"recipient": self.identity, "intent": envelope.message.intent},
)
goal = envelope.message.payload.get("goal", "")
constraints = envelope.message.payload.get("constraints", [])
plan_dict = self._get_plan(goal, constraints)
logger.info(
"Planner response",
extra={"recipient": self.identity, "response_intent": "plan_ready"},
)
return AgentMessageEnvelope(
message=AgentMessage(
sender=self.identity,
recipient=envelope.message.sender,
intent="plan_ready",
payload={"plan": plan_dict},
),
task_id=envelope.task_id,
correlation_id=envelope.correlation_id,
)
def _get_plan(self, goal: str, constraints: list[str]) -> dict[str, Any]:
"""Produce plan dict: use adapter if available and parsing succeeds, else default."""
if not self._adapter or not goal:
return self.DEFAULT_PLAN
user_content = f"Goal: {goal}\n"
if constraints:
user_content += "Constraints: " + ", ".join(constraints) + "\n"
user_content += "Output the plan as JSON only."
messages = [
{"role": "system", "content": PLAN_REQUEST_SYSTEM},
{"role": "user", "content": user_content},
]
try:
raw = self._adapter.complete(messages)
plan_dict = self._parse_plan_response(raw)
if plan_dict and plan_dict.get("steps"):
return plan_dict
except Exception:
logger.exception(
"Planner adapter or parse failed, using default plan",
extra={"intent": "plan_request"},
)
return self.DEFAULT_PLAN
def _parse_plan_response(self, raw: str) -> dict[str, Any] | None:
"""Extract JSON plan from raw response (handle code blocks)."""
raw = raw.strip()
for start in ("```json", "```"):
if raw.startswith(start):
raw = raw[len(start) :].strip()
if raw.endswith("```"):
raw = raw[:-3].strip()
match = re.search(r"\{[\s\S]*\}", raw)
if match:
try:
return json.loads(match.group())
except json.JSONDecodeError as e:
logger.debug("Planner JSON parse failed (match)", extra={"error": str(e)})
try:
return json.loads(raw)
except json.JSONDecodeError as e:
logger.debug("Planner JSON parse failed (raw)", extra={"error": str(e)})
return None