113 lines
4.5 KiB
Python
113 lines
4.5 KiB
Python
"""Planner agent: decomposes goals into plan graph; uses LLM adapter when provided."""
|
|
|
|
import json
|
|
import re
|
|
from typing import Any
|
|
|
|
from fusionagi.agents.base_agent import BaseAgent
|
|
from fusionagi.adapters.base import LLMAdapter
|
|
from fusionagi.schemas.messages import AgentMessage, AgentMessageEnvelope
|
|
from fusionagi._logger import logger
|
|
|
|
PLAN_REQUEST_SYSTEM = """You are a planner. Given a goal and optional constraints, output a JSON object with this exact structure:
|
|
{"steps": [{"id": "step_1", "description": "...", "dependencies": []}, ...], "fallback_paths": []}
|
|
Each step has: id (string), description (string), dependencies (list of step ids that must complete first).
|
|
Output only valid JSON, no markdown or extra text."""
|
|
|
|
|
|
class PlannerAgent(BaseAgent):
|
|
"""Planner: responds to plan_request with a plan; uses adapter if set, else fixed plan."""
|
|
|
|
DEFAULT_PLAN = {
|
|
"steps": [
|
|
{"id": "step_1", "description": "Analyze goal", "dependencies": []},
|
|
{"id": "step_2", "description": "Execute primary action", "dependencies": ["step_1"]},
|
|
{"id": "step_3", "description": "Verify result", "dependencies": ["step_2"]},
|
|
],
|
|
"fallback_paths": [],
|
|
}
|
|
|
|
def __init__(
|
|
self,
|
|
identity: str = "planner",
|
|
adapter: LLMAdapter | None = None,
|
|
) -> None:
|
|
super().__init__(
|
|
identity=identity,
|
|
role="Planner",
|
|
objective="Decompose goals into executable steps",
|
|
memory_access=True,
|
|
tool_permissions=[],
|
|
)
|
|
self._adapter = adapter
|
|
|
|
def handle_message(self, envelope: AgentMessageEnvelope) -> AgentMessageEnvelope | None:
|
|
"""On plan_request, return plan_ready with plan from adapter or default."""
|
|
if envelope.message.intent != "plan_request":
|
|
return None
|
|
logger.info(
|
|
"Planner handle_message",
|
|
extra={"recipient": self.identity, "intent": envelope.message.intent},
|
|
)
|
|
goal = envelope.message.payload.get("goal", "")
|
|
constraints = envelope.message.payload.get("constraints", [])
|
|
plan_dict = self._get_plan(goal, constraints)
|
|
logger.info(
|
|
"Planner response",
|
|
extra={"recipient": self.identity, "response_intent": "plan_ready"},
|
|
)
|
|
return AgentMessageEnvelope(
|
|
message=AgentMessage(
|
|
sender=self.identity,
|
|
recipient=envelope.message.sender,
|
|
intent="plan_ready",
|
|
payload={"plan": plan_dict},
|
|
),
|
|
task_id=envelope.task_id,
|
|
correlation_id=envelope.correlation_id,
|
|
)
|
|
|
|
def _get_plan(self, goal: str, constraints: list[str]) -> dict[str, Any]:
|
|
"""Produce plan dict: use adapter if available and parsing succeeds, else default."""
|
|
if not self._adapter or not goal:
|
|
return self.DEFAULT_PLAN
|
|
user_content = f"Goal: {goal}\n"
|
|
if constraints:
|
|
user_content += "Constraints: " + ", ".join(constraints) + "\n"
|
|
user_content += "Output the plan as JSON only."
|
|
messages = [
|
|
{"role": "system", "content": PLAN_REQUEST_SYSTEM},
|
|
{"role": "user", "content": user_content},
|
|
]
|
|
try:
|
|
raw = self._adapter.complete(messages)
|
|
plan_dict = self._parse_plan_response(raw)
|
|
if plan_dict and plan_dict.get("steps"):
|
|
return plan_dict
|
|
except Exception:
|
|
logger.exception(
|
|
"Planner adapter or parse failed, using default plan",
|
|
extra={"intent": "plan_request"},
|
|
)
|
|
return self.DEFAULT_PLAN
|
|
|
|
def _parse_plan_response(self, raw: str) -> dict[str, Any] | None:
|
|
"""Extract JSON plan from raw response (handle code blocks)."""
|
|
raw = raw.strip()
|
|
for start in ("```json", "```"):
|
|
if raw.startswith(start):
|
|
raw = raw[len(start) :].strip()
|
|
if raw.endswith("```"):
|
|
raw = raw[:-3].strip()
|
|
match = re.search(r"\{[\s\S]*\}", raw)
|
|
if match:
|
|
try:
|
|
return json.loads(match.group())
|
|
except json.JSONDecodeError as e:
|
|
logger.debug("Planner JSON parse failed (match)", extra={"error": str(e)})
|
|
try:
|
|
return json.loads(raw)
|
|
except json.JSONDecodeError as e:
|
|
logger.debug("Planner JSON parse failed (raw)", extra={"error": str(e)})
|
|
return None
|