"""Semantic memory graph: nodes = AtomicSemanticUnit, edges = SemanticRelation.""" from __future__ import annotations from collections import defaultdict from typing import Any from fusionagi.schemas.atomic import ( AtomicSemanticUnit, AtomicUnitType, SemanticRelation, ) from fusionagi._logger import logger class SemanticGraphMemory: """ Graph-backed semantic memory: nodes = atomic units, edges = relations. Supports add_unit, add_relation, query_units, query_neighbors, query_by_type. In-memory implementation with dict + adjacency list. """ def __init__(self, max_units: int = 50000) -> None: self._units: dict[str, AtomicSemanticUnit] = {} self._by_type: dict[AtomicUnitType, list[str]] = defaultdict(list) self._outgoing: dict[str, list[SemanticRelation]] = defaultdict(list) self._incoming: dict[str, list[SemanticRelation]] = defaultdict(list) self._max_units = max_units def add_unit(self, unit: AtomicSemanticUnit) -> None: """Add an atomic semantic unit.""" if len(self._units) >= self._max_units and unit.unit_id not in self._units: self._evict_one() self._units[unit.unit_id] = unit self._by_type[unit.type].append(unit.unit_id) logger.debug("Semantic graph: unit added", extra={"unit_id": unit.unit_id, "type": unit.type.value}) def add_relation(self, relation: SemanticRelation) -> None: """Add a relation between units.""" if relation.from_id in self._units and relation.to_id in self._units: self._outgoing[relation.from_id].append(relation) self._incoming[relation.to_id].append(relation) def get_unit(self, unit_id: str) -> AtomicSemanticUnit | None: """Get unit by ID.""" return self._units.get(unit_id) def query_units( self, unit_ids: list[str] | None = None, unit_type: AtomicUnitType | None = None, limit: int = 100, ) -> list[AtomicSemanticUnit]: """Query units by IDs or type.""" if unit_ids: return [self._units[uid] for uid in unit_ids if uid in self._units][:limit] if unit_type: ids = self._by_type.get(unit_type, [])[-limit:] return [self._units[uid] for uid in ids if uid in self._units] return list(self._units.values())[-limit:] def query_neighbors( self, unit_id: str, direction: str = "outgoing", relation_type: str | None = None, ) -> list[tuple[AtomicSemanticUnit, SemanticRelation]]: """Get neighboring units and relations.""" edges = self._outgoing[unit_id] if direction == "outgoing" else self._incoming[unit_id] results: list[tuple[AtomicSemanticUnit, SemanticRelation]] = [] for rel in edges: if relation_type and rel.relation_type.value != relation_type: continue other_id = rel.to_id if direction == "outgoing" else rel.from_id other = self._units.get(other_id) if other: results.append((other, rel)) return results def query_by_type(self, unit_type: AtomicUnitType, limit: int = 100) -> list[AtomicSemanticUnit]: """Query units by type.""" return self.query_units(unit_type=unit_type, limit=limit) def ingest_decomposition( self, units: list[AtomicSemanticUnit], relations: list[SemanticRelation], ) -> None: """Ingest a DecompositionResult into the graph.""" for u in units: self.add_unit(u) for r in relations: self.add_relation(r) def _evict_one(self) -> None: """Evict oldest unit (simple FIFO on first key).""" if not self._units: return uid = next(iter(self._units)) unit = self._units.pop(uid, None) if unit: self._by_type[unit.type] = [x for x in self._by_type[unit.type] if x != uid] self._outgoing.pop(uid, None) self._incoming.pop(uid, None) logger.debug("Semantic graph: evicted unit", extra={"unit_id": uid})