88 lines
3.2 KiB
Python
88 lines
3.2 KiB
Python
"""Consolidation: distillation of experiences into knowledge; write/forget rules for AGI."""
|
|
|
|
from typing import Any, Callable, Protocol
|
|
|
|
from fusionagi._logger import logger
|
|
|
|
|
|
class EpisodicLike(Protocol):
|
|
def get_lessons(self, limit: int) -> list[dict[str, Any]]: ...
|
|
def get_recent(self, limit: int) -> list[dict[str, Any]]: ...
|
|
|
|
|
|
class ReflectiveLike(Protocol):
|
|
def get_lessons(self, limit: int) -> list[dict[str, Any]]: ...
|
|
|
|
|
|
class SemanticLike(Protocol):
|
|
def add_fact(self, fact_id: str, statement: str, source: str, domain: str, metadata: dict | None) -> None: ...
|
|
|
|
|
|
class ConsolidationJob:
|
|
"""
|
|
Periodic distillation: take recent episodic/reflective lessons and
|
|
write summarized facts into semantic memory. Write/forget rules
|
|
are applied by the distiller callback.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
episodic: EpisodicLike | None = None,
|
|
reflective: ReflectiveLike | None = None,
|
|
semantic: SemanticLike | None = None,
|
|
distiller: Callable[[list[dict[str, Any]]], list[dict[str, Any]]] | None = None,
|
|
) -> None:
|
|
self._episodic = episodic
|
|
self._reflective = reflective
|
|
self._semantic = semantic
|
|
self._distiller = distiller or _default_distiller
|
|
|
|
def run(self, episodic_limit: int = 100, reflective_limit: int = 50) -> int:
|
|
"""
|
|
Run consolidation: gather recent lessons, distill, write to semantic.
|
|
Returns number of facts written.
|
|
"""
|
|
lessons: list[dict[str, Any]] = []
|
|
if self._episodic:
|
|
try:
|
|
lessons.extend(self._episodic.get_recent(episodic_limit) if hasattr(self._episodic, "get_recent") else [])
|
|
except Exception:
|
|
pass
|
|
if self._reflective:
|
|
try:
|
|
lessons.extend(self._reflective.get_lessons(reflective_limit))
|
|
except Exception:
|
|
pass
|
|
if not lessons:
|
|
return 0
|
|
facts = self._distiller(lessons)
|
|
written = 0
|
|
if self._semantic and facts:
|
|
for i, f in enumerate(facts[:50]):
|
|
fact_id = f.get("fact_id", f"consolidated_{i}")
|
|
statement = f.get("statement", str(f))
|
|
source = f.get("source", "consolidation")
|
|
domain = f.get("domain", "general")
|
|
try:
|
|
self._semantic.add_fact(fact_id, statement, source=source, domain=domain, metadata=f)
|
|
written += 1
|
|
except Exception:
|
|
logger.exception("Consolidation: failed to add fact", extra={"fact_id": fact_id})
|
|
logger.info("Consolidation run", extra={"lessons": len(lessons), "facts_written": written})
|
|
return written
|
|
|
|
|
|
def _default_distiller(lessons: list[dict[str, Any]]) -> list[dict[str, Any]]:
|
|
"""Default: turn each lesson into one fact (summary)."""
|
|
out = []
|
|
for i, le in enumerate(lessons[-100:]):
|
|
outcome = le.get("outcome", le.get("result", ""))
|
|
task_id = le.get("task_id", "")
|
|
out.append({
|
|
"fact_id": f"cons_{task_id}_{i}",
|
|
"statement": f"Task {task_id} outcome: {outcome}",
|
|
"source": "consolidation",
|
|
"domain": "general",
|
|
})
|
|
return out
|