243 lines
7.4 KiB
Python
243 lines
7.4 KiB
Python
|
|
"""Tests for memory modules."""
|
||
|
|
|
||
|
|
import pytest
|
||
|
|
import time
|
||
|
|
|
||
|
|
from fusionagi.memory.working import WorkingMemory
|
||
|
|
from fusionagi.memory.episodic import EpisodicMemory
|
||
|
|
from fusionagi.memory.reflective import ReflectiveMemory
|
||
|
|
|
||
|
|
|
||
|
|
class TestWorkingMemory:
|
||
|
|
"""Test WorkingMemory functionality."""
|
||
|
|
|
||
|
|
def test_get_set(self):
|
||
|
|
"""Test basic get/set operations."""
|
||
|
|
wm = WorkingMemory()
|
||
|
|
|
||
|
|
wm.set("session1", "key1", "value1")
|
||
|
|
assert wm.get("session1", "key1") == "value1"
|
||
|
|
assert wm.get("session1", "key2") is None
|
||
|
|
assert wm.get("session1", "key2", "default") == "default"
|
||
|
|
|
||
|
|
def test_append(self):
|
||
|
|
"""Test append to list."""
|
||
|
|
wm = WorkingMemory()
|
||
|
|
|
||
|
|
wm.append("s1", "items", "a")
|
||
|
|
wm.append("s1", "items", "b")
|
||
|
|
wm.append("s1", "items", "c")
|
||
|
|
|
||
|
|
items = wm.get_list("s1", "items")
|
||
|
|
assert items == ["a", "b", "c"]
|
||
|
|
|
||
|
|
def test_append_converts_non_list(self):
|
||
|
|
"""Test append converts non-list values to list."""
|
||
|
|
wm = WorkingMemory()
|
||
|
|
|
||
|
|
wm.set("s1", "val", "single")
|
||
|
|
wm.append("s1", "val", "new")
|
||
|
|
|
||
|
|
items = wm.get_list("s1", "val")
|
||
|
|
assert items == ["single", "new"]
|
||
|
|
|
||
|
|
def test_has_and_keys(self):
|
||
|
|
"""Test has() and keys() methods."""
|
||
|
|
wm = WorkingMemory()
|
||
|
|
|
||
|
|
wm.set("s1", "k1", "v1")
|
||
|
|
wm.set("s1", "k2", "v2")
|
||
|
|
|
||
|
|
assert wm.has("s1", "k1") is True
|
||
|
|
assert wm.has("s1", "k3") is False
|
||
|
|
assert set(wm.keys("s1")) == {"k1", "k2"}
|
||
|
|
|
||
|
|
def test_delete(self):
|
||
|
|
"""Test delete operation."""
|
||
|
|
wm = WorkingMemory()
|
||
|
|
|
||
|
|
wm.set("s1", "key", "value")
|
||
|
|
assert wm.has("s1", "key")
|
||
|
|
|
||
|
|
result = wm.delete("s1", "key")
|
||
|
|
assert result is True
|
||
|
|
assert not wm.has("s1", "key")
|
||
|
|
|
||
|
|
# Delete non-existent returns False
|
||
|
|
result = wm.delete("s1", "key")
|
||
|
|
assert result is False
|
||
|
|
|
||
|
|
def test_clear_session(self):
|
||
|
|
"""Test clearing a session."""
|
||
|
|
wm = WorkingMemory()
|
||
|
|
|
||
|
|
wm.set("s1", "k1", "v1")
|
||
|
|
wm.set("s1", "k2", "v2")
|
||
|
|
wm.set("s2", "k1", "v1")
|
||
|
|
|
||
|
|
wm.clear_session("s1")
|
||
|
|
|
||
|
|
assert not wm.session_exists("s1")
|
||
|
|
assert wm.session_exists("s2")
|
||
|
|
|
||
|
|
def test_context_summary(self):
|
||
|
|
"""Test context summary generation."""
|
||
|
|
wm = WorkingMemory()
|
||
|
|
|
||
|
|
wm.set("s1", "scalar", "hello")
|
||
|
|
wm.set("s1", "list_val", [1, 2, 3, 4, 5])
|
||
|
|
wm.set("s1", "dict_val", {"a": 1, "b": 2})
|
||
|
|
|
||
|
|
summary = wm.get_context_summary("s1")
|
||
|
|
|
||
|
|
assert "scalar" in summary
|
||
|
|
assert summary["scalar"] == "hello"
|
||
|
|
assert summary["list_val"]["type"] == "list"
|
||
|
|
assert summary["list_val"]["count"] == 5
|
||
|
|
assert summary["dict_val"]["type"] == "dict"
|
||
|
|
|
||
|
|
def test_session_count(self):
|
||
|
|
"""Test session counting."""
|
||
|
|
wm = WorkingMemory()
|
||
|
|
|
||
|
|
assert wm.session_count() == 0
|
||
|
|
|
||
|
|
wm.set("s1", "k", "v")
|
||
|
|
wm.set("s2", "k", "v")
|
||
|
|
|
||
|
|
assert wm.session_count() == 2
|
||
|
|
|
||
|
|
|
||
|
|
class TestEpisodicMemory:
|
||
|
|
"""Test EpisodicMemory functionality."""
|
||
|
|
|
||
|
|
def test_append_and_get_by_task(self):
|
||
|
|
"""Test appending and retrieving by task."""
|
||
|
|
em = EpisodicMemory()
|
||
|
|
|
||
|
|
em.append("task1", {"step": "s1", "result": "ok"})
|
||
|
|
em.append("task1", {"step": "s2", "result": "ok"})
|
||
|
|
em.append("task2", {"step": "s1", "result": "fail"})
|
||
|
|
|
||
|
|
task1_entries = em.get_by_task("task1")
|
||
|
|
assert len(task1_entries) == 2
|
||
|
|
assert task1_entries[0]["step"] == "s1"
|
||
|
|
|
||
|
|
task2_entries = em.get_by_task("task2")
|
||
|
|
assert len(task2_entries) == 1
|
||
|
|
|
||
|
|
def test_get_by_type(self):
|
||
|
|
"""Test retrieving by event type."""
|
||
|
|
em = EpisodicMemory()
|
||
|
|
|
||
|
|
em.append("t1", {"data": 1}, event_type="step_done")
|
||
|
|
em.append("t1", {"data": 2}, event_type="step_done")
|
||
|
|
em.append("t1", {"data": 3}, event_type="step_failed")
|
||
|
|
|
||
|
|
done_events = em.get_by_type("step_done")
|
||
|
|
assert len(done_events) == 2
|
||
|
|
|
||
|
|
failed_events = em.get_by_type("step_failed")
|
||
|
|
assert len(failed_events) == 1
|
||
|
|
|
||
|
|
def test_get_recent(self):
|
||
|
|
"""Test getting recent entries."""
|
||
|
|
em = EpisodicMemory()
|
||
|
|
|
||
|
|
for i in range(10):
|
||
|
|
em.append("task", {"n": i})
|
||
|
|
|
||
|
|
recent = em.get_recent(limit=5)
|
||
|
|
assert len(recent) == 5
|
||
|
|
assert recent[0]["n"] == 5 # 5th entry
|
||
|
|
assert recent[4]["n"] == 9 # 10th entry
|
||
|
|
|
||
|
|
def test_query_with_filter(self):
|
||
|
|
"""Test custom query filter."""
|
||
|
|
em = EpisodicMemory()
|
||
|
|
|
||
|
|
em.append("t1", {"score": 0.9, "type": "a"})
|
||
|
|
em.append("t1", {"score": 0.5, "type": "b"})
|
||
|
|
em.append("t1", {"score": 0.8, "type": "a"})
|
||
|
|
|
||
|
|
high_scores = em.query(lambda e: e.get("score", 0) > 0.7)
|
||
|
|
assert len(high_scores) == 2
|
||
|
|
|
||
|
|
def test_task_summary(self):
|
||
|
|
"""Test task summary generation."""
|
||
|
|
em = EpisodicMemory()
|
||
|
|
|
||
|
|
em.append("task1", {"success": True}, event_type="step_done")
|
||
|
|
em.append("task1", {"success": True}, event_type="step_done")
|
||
|
|
em.append("task1", {"error": "fail"}, event_type="step_failed")
|
||
|
|
|
||
|
|
summary = em.get_task_summary("task1")
|
||
|
|
|
||
|
|
assert summary["count"] == 3
|
||
|
|
assert summary["success_count"] == 2
|
||
|
|
assert summary["failure_count"] == 1
|
||
|
|
assert "step_done" in summary["event_types"]
|
||
|
|
|
||
|
|
def test_statistics(self):
|
||
|
|
"""Test overall statistics."""
|
||
|
|
em = EpisodicMemory()
|
||
|
|
|
||
|
|
em.append("t1", {}, event_type="type_a")
|
||
|
|
em.append("t2", {}, event_type="type_b")
|
||
|
|
|
||
|
|
stats = em.get_statistics()
|
||
|
|
|
||
|
|
assert stats["total_entries"] == 2
|
||
|
|
assert stats["task_count"] == 2
|
||
|
|
assert stats["event_type_count"] == 2
|
||
|
|
|
||
|
|
def test_clear(self):
|
||
|
|
"""Test clearing all entries."""
|
||
|
|
em = EpisodicMemory()
|
||
|
|
|
||
|
|
em.append("t1", {})
|
||
|
|
em.append("t2", {})
|
||
|
|
|
||
|
|
em.clear()
|
||
|
|
|
||
|
|
assert em.get_statistics()["total_entries"] == 0
|
||
|
|
|
||
|
|
|
||
|
|
class TestReflectiveMemory:
|
||
|
|
"""Test ReflectiveMemory functionality."""
|
||
|
|
|
||
|
|
def test_add_and_get_lessons(self):
|
||
|
|
"""Test adding and retrieving lessons."""
|
||
|
|
rm = ReflectiveMemory()
|
||
|
|
|
||
|
|
rm.add_lesson({"content": "Don't repeat mistakes", "source": "critic"})
|
||
|
|
rm.add_lesson({"content": "Plan before acting", "source": "critic"})
|
||
|
|
|
||
|
|
lessons = rm.get_lessons()
|
||
|
|
assert len(lessons) == 2
|
||
|
|
assert lessons[0]["content"] == "Don't repeat mistakes"
|
||
|
|
|
||
|
|
def test_add_and_get_heuristics(self):
|
||
|
|
"""Test adding and retrieving heuristics."""
|
||
|
|
rm = ReflectiveMemory()
|
||
|
|
|
||
|
|
rm.set_heuristic("strategy1", "Check dependencies first")
|
||
|
|
rm.set_heuristic("strategy2", "Validate inputs early")
|
||
|
|
|
||
|
|
heuristics = rm.get_all_heuristics()
|
||
|
|
assert len(heuristics) == 2
|
||
|
|
assert rm.get_heuristic("strategy1") == "Check dependencies first"
|
||
|
|
|
||
|
|
def test_get_recent_limits(self):
|
||
|
|
"""Test limits on recent retrieval."""
|
||
|
|
rm = ReflectiveMemory()
|
||
|
|
|
||
|
|
for i in range(10):
|
||
|
|
rm.add_lesson({"id": i, "content": f"Lesson {i}"})
|
||
|
|
|
||
|
|
recent = rm.get_lessons(limit=5)
|
||
|
|
assert len(recent) == 5
|
||
|
|
# Should get the last 5
|
||
|
|
assert recent[0]["id"] == 5
|
||
|
|
assert recent[4]["id"] == 9
|