147 lines
5.3 KiB
Python
147 lines
5.3 KiB
Python
"""Phase 2/3: end-to-end flow with stub adapter, tools, executor, critic, reflection, governance."""
|
|
|
|
from fusionagi.core import EventBus, StateManager, Orchestrator
|
|
from fusionagi.agents import PlannerAgent, ReasonerAgent, ExecutorAgent, CriticAgent
|
|
from fusionagi.adapters import StubAdapter
|
|
from fusionagi.tools import ToolRegistry, ToolDef
|
|
from fusionagi.memory import WorkingMemory, EpisodicMemory, ReflectiveMemory
|
|
from fusionagi.reflection import run_reflection
|
|
from fusionagi.governance import Guardrails, RateLimiter, OverrideHooks, AccessControl, PolicyEngine
|
|
from fusionagi.schemas import TaskState, AgentMessage, AgentMessageEnvelope
|
|
from fusionagi.schemas.policy import PolicyRule, PolicyEffect
|
|
|
|
|
|
def test_planner_with_stub_adapter() -> None:
|
|
adapter = StubAdapter('{"steps":[{"id":"s1","description":"Step 1","dependencies":[]}],"fallback_paths":[]}')
|
|
planner = PlannerAgent(adapter=adapter)
|
|
env = AgentMessageEnvelope(
|
|
message=AgentMessage(sender="o", recipient="planner", intent="plan_request", payload={"goal": "Test"}),
|
|
task_id="t1",
|
|
)
|
|
out = planner.handle_message(env)
|
|
assert out is not None
|
|
assert out.message.intent == "plan_ready"
|
|
steps = out.message.payload["plan"]["steps"]
|
|
assert len(steps) == 1
|
|
assert steps[0]["id"] == "s1"
|
|
|
|
|
|
def test_executor_runs_tool_and_appends_trace() -> None:
|
|
state = StateManager()
|
|
reg = ToolRegistry()
|
|
reg.register(ToolDef(name="noop", description="No-op", fn=lambda: "ok", permission_scope=["*"]))
|
|
executor = ExecutorAgent(registry=reg, state_manager=state)
|
|
env = AgentMessageEnvelope(
|
|
message=AgentMessage(
|
|
sender="o",
|
|
recipient="executor",
|
|
intent="execute_step",
|
|
payload={
|
|
"step_id": "s1",
|
|
"plan": {"steps": [{"id": "s1", "description": "No-op", "dependencies": [], "tool_name": "noop", "tool_args": {}}], "fallback_paths": []},
|
|
"tool_name": "noop",
|
|
"tool_args": {},
|
|
},
|
|
),
|
|
task_id="task-1",
|
|
)
|
|
out = executor.handle_message(env)
|
|
assert out is not None
|
|
assert out.message.intent == "step_done"
|
|
trace = state.get_trace("task-1")
|
|
assert len(trace) == 1
|
|
assert trace[0].get("tool") == "noop"
|
|
assert trace[0].get("result") == "ok"
|
|
|
|
|
|
def test_critic_returns_evaluation() -> None:
|
|
critic = CriticAgent(adapter=None)
|
|
env = AgentMessageEnvelope(
|
|
message=AgentMessage(
|
|
sender="o",
|
|
recipient="critic",
|
|
intent="evaluate_request",
|
|
payload={"outcome": "completed", "trace": [], "plan": None},
|
|
),
|
|
task_id="t1",
|
|
)
|
|
out = critic.handle_message(env)
|
|
assert out is not None
|
|
assert out.message.intent == "evaluation_ready"
|
|
ev = out.message.payload["evaluation"]
|
|
assert "score" in ev
|
|
assert ev["success"] is True
|
|
|
|
|
|
def test_reflection_writes_to_reflective_memory() -> None:
|
|
critic = CriticAgent(adapter=None)
|
|
reflective = ReflectiveMemory()
|
|
ev = run_reflection(critic, "t1", "completed", [], None, reflective)
|
|
assert ev is not None
|
|
lessons = reflective.get_lessons(limit=5)
|
|
assert len(lessons) == 1
|
|
assert lessons[0]["task_id"] == "t1"
|
|
|
|
|
|
def test_guardrails_block_path() -> None:
|
|
g = Guardrails()
|
|
g.block_path_prefix("/etc")
|
|
result = g.pre_check("file_read", {"path": "/etc/passwd"})
|
|
assert result.allowed is False
|
|
assert result.error_message
|
|
result = g.pre_check("file_read", {"path": "/tmp/foo"})
|
|
assert result.allowed is True
|
|
|
|
|
|
def test_rate_limiter() -> None:
|
|
# Rate limiter is not yet wired to executor/orchestrator; tested in isolation here.
|
|
r = RateLimiter(max_calls=2, window_seconds=10.0)
|
|
assert r.allow("agent1")[0] is True
|
|
assert r.allow("agent1")[0] is True
|
|
assert r.allow("agent1")[0] is False
|
|
|
|
|
|
def test_override_hooks() -> None:
|
|
h = OverrideHooks()
|
|
seen = []
|
|
h.register(lambda e, p: (seen.append((e, p)), True)[1])
|
|
assert h.fire("task_paused_for_approval", {"task_id": "t1"}) is True
|
|
assert len(seen) == 1
|
|
assert seen[0][0] == "task_paused_for_approval"
|
|
|
|
|
|
def test_access_control_deny() -> None:
|
|
ac = AccessControl()
|
|
ac.deny("executor", "noop")
|
|
assert ac.allowed("executor", "noop") is False
|
|
assert ac.allowed("executor", "other_tool") is True
|
|
assert ac.allowed("planner", "noop") is True
|
|
|
|
|
|
def test_policy_engine_update_rule() -> None:
|
|
pe = PolicyEngine()
|
|
r = PolicyRule(rule_id="r1", effect=PolicyEffect.DENY, condition={"tool_name": "noop"}, reason="blocked", priority=1)
|
|
pe.add_rule(r)
|
|
assert pe.get_rule("r1") is not None
|
|
assert pe.get_rule("r1").reason == "blocked"
|
|
assert pe.update_rule("r1", {"reason": "updated"}) is True
|
|
assert pe.get_rule("r1").reason == "updated"
|
|
assert pe.update_rule("r1", {"priority": 5}) is True
|
|
assert pe.get_rule("r1").priority == 5
|
|
assert pe.remove_rule("r1") is True
|
|
assert pe.get_rule("r1") is None
|
|
assert pe.remove_rule("r1") is False
|
|
|
|
|
|
if __name__ == "__main__":
|
|
test_planner_with_stub_adapter()
|
|
test_executor_runs_tool_and_appends_trace()
|
|
test_critic_returns_evaluation()
|
|
test_reflection_writes_to_reflective_memory()
|
|
test_guardrails_block_path()
|
|
test_rate_limiter()
|
|
test_override_hooks()
|
|
test_access_control_deny()
|
|
test_policy_engine_update_rule()
|
|
print("Phase 2/3 tests OK")
|