"""Phase 2/3: end-to-end flow with stub adapter, tools, executor, critic, reflection, governance.""" from fusionagi.core import EventBus, StateManager, Orchestrator from fusionagi.agents import PlannerAgent, ReasonerAgent, ExecutorAgent, CriticAgent from fusionagi.adapters import StubAdapter from fusionagi.tools import ToolRegistry, ToolDef from fusionagi.memory import WorkingMemory, EpisodicMemory, ReflectiveMemory from fusionagi.reflection import run_reflection from fusionagi.governance import Guardrails, RateLimiter, OverrideHooks, AccessControl, PolicyEngine from fusionagi.schemas import TaskState, AgentMessage, AgentMessageEnvelope from fusionagi.schemas.policy import PolicyRule, PolicyEffect def test_planner_with_stub_adapter() -> None: adapter = StubAdapter('{"steps":[{"id":"s1","description":"Step 1","dependencies":[]}],"fallback_paths":[]}') planner = PlannerAgent(adapter=adapter) env = AgentMessageEnvelope( message=AgentMessage(sender="o", recipient="planner", intent="plan_request", payload={"goal": "Test"}), task_id="t1", ) out = planner.handle_message(env) assert out is not None assert out.message.intent == "plan_ready" steps = out.message.payload["plan"]["steps"] assert len(steps) == 1 assert steps[0]["id"] == "s1" def test_executor_runs_tool_and_appends_trace() -> None: state = StateManager() reg = ToolRegistry() reg.register(ToolDef(name="noop", description="No-op", fn=lambda: "ok", permission_scope=["*"])) executor = ExecutorAgent(registry=reg, state_manager=state) env = AgentMessageEnvelope( message=AgentMessage( sender="o", recipient="executor", intent="execute_step", payload={ "step_id": "s1", "plan": {"steps": [{"id": "s1", "description": "No-op", "dependencies": [], "tool_name": "noop", "tool_args": {}}], "fallback_paths": []}, "tool_name": "noop", "tool_args": {}, }, ), task_id="task-1", ) out = executor.handle_message(env) assert out is not None assert out.message.intent == "step_done" trace = state.get_trace("task-1") assert len(trace) == 1 assert trace[0].get("tool") == "noop" assert trace[0].get("result") == "ok" def test_critic_returns_evaluation() -> None: critic = CriticAgent(adapter=None) env = AgentMessageEnvelope( message=AgentMessage( sender="o", recipient="critic", intent="evaluate_request", payload={"outcome": "completed", "trace": [], "plan": None}, ), task_id="t1", ) out = critic.handle_message(env) assert out is not None assert out.message.intent == "evaluation_ready" ev = out.message.payload["evaluation"] assert "score" in ev assert ev["success"] is True def test_reflection_writes_to_reflective_memory() -> None: critic = CriticAgent(adapter=None) reflective = ReflectiveMemory() ev = run_reflection(critic, "t1", "completed", [], None, reflective) assert ev is not None lessons = reflective.get_lessons(limit=5) assert len(lessons) == 1 assert lessons[0]["task_id"] == "t1" def test_guardrails_block_path() -> None: g = Guardrails() g.block_path_prefix("/etc") result = g.pre_check("file_read", {"path": "/etc/passwd"}) assert result.allowed is False assert result.error_message result = g.pre_check("file_read", {"path": "/tmp/foo"}) assert result.allowed is True def test_rate_limiter() -> None: # Rate limiter is not yet wired to executor/orchestrator; tested in isolation here. r = RateLimiter(max_calls=2, window_seconds=10.0) assert r.allow("agent1")[0] is True assert r.allow("agent1")[0] is True assert r.allow("agent1")[0] is False def test_override_hooks() -> None: h = OverrideHooks() seen = [] h.register(lambda e, p: (seen.append((e, p)), True)[1]) assert h.fire("task_paused_for_approval", {"task_id": "t1"}) is True assert len(seen) == 1 assert seen[0][0] == "task_paused_for_approval" def test_access_control_deny() -> None: ac = AccessControl() ac.deny("executor", "noop") assert ac.allowed("executor", "noop") is False assert ac.allowed("executor", "other_tool") is True assert ac.allowed("planner", "noop") is True def test_policy_engine_update_rule() -> None: pe = PolicyEngine() r = PolicyRule(rule_id="r1", effect=PolicyEffect.DENY, condition={"tool_name": "noop"}, reason="blocked", priority=1) pe.add_rule(r) assert pe.get_rule("r1") is not None assert pe.get_rule("r1").reason == "blocked" assert pe.update_rule("r1", {"reason": "updated"}) is True assert pe.get_rule("r1").reason == "updated" assert pe.update_rule("r1", {"priority": 5}) is True assert pe.get_rule("r1").priority == 5 assert pe.remove_rule("r1") is True assert pe.get_rule("r1") is None assert pe.remove_rule("r1") is False if __name__ == "__main__": test_planner_with_stub_adapter() test_executor_runs_tool_and_appends_trace() test_critic_returns_evaluation() test_reflection_writes_to_reflective_memory() test_guardrails_block_path() test_rate_limiter() test_override_hooks() test_access_control_deny() test_policy_engine_update_rule() print("Phase 2/3 tests OK")