Files
FusionAGI/fusionagi/memory/semantic_graph.py
defiQUG c052b07662
Some checks failed
Tests / test (3.10) (push) Has been cancelled
Tests / test (3.11) (push) Has been cancelled
Tests / test (3.12) (push) Has been cancelled
Tests / lint (push) Has been cancelled
Tests / docker (push) Has been cancelled
Initial commit: add .gitignore and README
2026-02-09 21:51:42 -08:00

107 lines
4.1 KiB
Python

"""Semantic memory graph: nodes = AtomicSemanticUnit, edges = SemanticRelation."""
from __future__ import annotations
from collections import defaultdict
from typing import Any
from fusionagi.schemas.atomic import (
AtomicSemanticUnit,
AtomicUnitType,
SemanticRelation,
)
from fusionagi._logger import logger
class SemanticGraphMemory:
"""
Graph-backed semantic memory: nodes = atomic units, edges = relations.
Supports add_unit, add_relation, query_units, query_neighbors, query_by_type.
In-memory implementation with dict + adjacency list.
"""
def __init__(self, max_units: int = 50000) -> None:
self._units: dict[str, AtomicSemanticUnit] = {}
self._by_type: dict[AtomicUnitType, list[str]] = defaultdict(list)
self._outgoing: dict[str, list[SemanticRelation]] = defaultdict(list)
self._incoming: dict[str, list[SemanticRelation]] = defaultdict(list)
self._max_units = max_units
def add_unit(self, unit: AtomicSemanticUnit) -> None:
"""Add an atomic semantic unit."""
if len(self._units) >= self._max_units and unit.unit_id not in self._units:
self._evict_one()
self._units[unit.unit_id] = unit
self._by_type[unit.type].append(unit.unit_id)
logger.debug("Semantic graph: unit added", extra={"unit_id": unit.unit_id, "type": unit.type.value})
def add_relation(self, relation: SemanticRelation) -> None:
"""Add a relation between units."""
if relation.from_id in self._units and relation.to_id in self._units:
self._outgoing[relation.from_id].append(relation)
self._incoming[relation.to_id].append(relation)
def get_unit(self, unit_id: str) -> AtomicSemanticUnit | None:
"""Get unit by ID."""
return self._units.get(unit_id)
def query_units(
self,
unit_ids: list[str] | None = None,
unit_type: AtomicUnitType | None = None,
limit: int = 100,
) -> list[AtomicSemanticUnit]:
"""Query units by IDs or type."""
if unit_ids:
return [self._units[uid] for uid in unit_ids if uid in self._units][:limit]
if unit_type:
ids = self._by_type.get(unit_type, [])[-limit:]
return [self._units[uid] for uid in ids if uid in self._units]
return list(self._units.values())[-limit:]
def query_neighbors(
self,
unit_id: str,
direction: str = "outgoing",
relation_type: str | None = None,
) -> list[tuple[AtomicSemanticUnit, SemanticRelation]]:
"""Get neighboring units and relations."""
edges = self._outgoing[unit_id] if direction == "outgoing" else self._incoming[unit_id]
results: list[tuple[AtomicSemanticUnit, SemanticRelation]] = []
for rel in edges:
if relation_type and rel.relation_type.value != relation_type:
continue
other_id = rel.to_id if direction == "outgoing" else rel.from_id
other = self._units.get(other_id)
if other:
results.append((other, rel))
return results
def query_by_type(self, unit_type: AtomicUnitType, limit: int = 100) -> list[AtomicSemanticUnit]:
"""Query units by type."""
return self.query_units(unit_type=unit_type, limit=limit)
def ingest_decomposition(
self,
units: list[AtomicSemanticUnit],
relations: list[SemanticRelation],
) -> None:
"""Ingest a DecompositionResult into the graph."""
for u in units:
self.add_unit(u)
for r in relations:
self.add_relation(r)
def _evict_one(self) -> None:
"""Evict oldest unit (simple FIFO on first key)."""
if not self._units:
return
uid = next(iter(self._units))
unit = self._units.pop(uid, None)
if unit:
self._by_type[unit.type] = [x for x in self._by_type[unit.type] if x != uid]
self._outgoing.pop(uid, None)
self._incoming.pop(uid, None)
logger.debug("Semantic graph: evicted unit", extra={"unit_id": uid})