"""Consolidation: distillation of experiences into knowledge; write/forget rules for AGI.""" from typing import Any, Callable, Protocol from fusionagi._logger import logger class EpisodicLike(Protocol): def get_lessons(self, limit: int) -> list[dict[str, Any]]: ... def get_recent(self, limit: int) -> list[dict[str, Any]]: ... class ReflectiveLike(Protocol): def get_lessons(self, limit: int) -> list[dict[str, Any]]: ... class SemanticLike(Protocol): def add_fact(self, fact_id: str, statement: str, source: str, domain: str, metadata: dict | None) -> None: ... class ConsolidationJob: """ Periodic distillation: take recent episodic/reflective lessons and write summarized facts into semantic memory. Write/forget rules are applied by the distiller callback. """ def __init__( self, episodic: EpisodicLike | None = None, reflective: ReflectiveLike | None = None, semantic: SemanticLike | None = None, distiller: Callable[[list[dict[str, Any]]], list[dict[str, Any]]] | None = None, ) -> None: self._episodic = episodic self._reflective = reflective self._semantic = semantic self._distiller = distiller or _default_distiller def run(self, episodic_limit: int = 100, reflective_limit: int = 50) -> int: """ Run consolidation: gather recent lessons, distill, write to semantic. Returns number of facts written. """ lessons: list[dict[str, Any]] = [] if self._episodic: try: lessons.extend(self._episodic.get_recent(episodic_limit) if hasattr(self._episodic, "get_recent") else []) except Exception: pass if self._reflective: try: lessons.extend(self._reflective.get_lessons(reflective_limit)) except Exception: pass if not lessons: return 0 facts = self._distiller(lessons) written = 0 if self._semantic and facts: for i, f in enumerate(facts[:50]): fact_id = f.get("fact_id", f"consolidated_{i}") statement = f.get("statement", str(f)) source = f.get("source", "consolidation") domain = f.get("domain", "general") try: self._semantic.add_fact(fact_id, statement, source=source, domain=domain, metadata=f) written += 1 except Exception: logger.exception("Consolidation: failed to add fact", extra={"fact_id": fact_id}) logger.info("Consolidation run", extra={"lessons": len(lessons), "facts_written": written}) return written def _default_distiller(lessons: list[dict[str, Any]]) -> list[dict[str, Any]]: """Default: turn each lesson into one fact (summary).""" out = [] for i, le in enumerate(lessons[-100:]): outcome = le.get("outcome", le.get("result", "")) task_id = le.get("task_id", "") out.append({ "fact_id": f"cons_{task_id}_{i}", "statement": f"Task {task_id} outcome: {outcome}", "source": "consolidation", "domain": "general", }) return out