"""Optional Postgres backend for memory. Requires: pip install fusionagi[memory].""" from abc import ABC, abstractmethod from typing import Any from fusionagi._logger import logger class MemoryBackend(ABC): """Abstract backend for persistent memory storage.""" @abstractmethod def store( self, id: str, tenant_id: str, user_id: str, session_id: str, type: str, content: dict[str, Any], metadata: dict[str, Any] | None = None, retention_policy: str = "session", ) -> None: """Store a memory item.""" ... @abstractmethod def get(self, id: str) -> dict[str, Any] | None: """Get a memory item by id.""" ... @abstractmethod def query( self, tenant_id: str, user_id: str | None = None, session_id: str | None = None, type: str | None = None, limit: int = 100, ) -> list[dict[str, Any]]: """Query memory items.""" ... class InMemoryBackend(MemoryBackend): """In-memory implementation for development.""" def __init__(self) -> None: self._store: dict[str, dict[str, Any]] = {} def store( self, id: str, tenant_id: str, user_id: str, session_id: str, type: str, content: dict[str, Any], metadata: dict[str, Any] | None = None, retention_policy: str = "session", ) -> None: self._store[id] = { "id": id, "tenant_id": tenant_id, "user_id": user_id, "session_id": session_id, "type": type, "content": content, "metadata": metadata or {}, "retention_policy": retention_policy, } def get(self, id: str) -> dict[str, Any] | None: return self._store.get(id) def query( self, tenant_id: str, user_id: str | None = None, session_id: str | None = None, type: str | None = None, limit: int = 100, ) -> list[dict[str, Any]]: out = [] for v in self._store.values(): if v["tenant_id"] != tenant_id: continue if user_id and v["user_id"] != user_id: continue if session_id and v["session_id"] != session_id: continue if type and v["type"] != type: continue out.append(v) if len(out) >= limit: break return out def create_postgres_backend(connection_string: str) -> MemoryBackend | None: """Create Postgres-backed MemoryBackend when psycopg is available.""" try: import psycopg except ImportError: logger.debug("psycopg not installed; use pip install fusionagi[memory]") return None return PostgresMemoryBackend(connection_string) class PostgresMemoryBackend(MemoryBackend): """Postgres-backed memory storage.""" def __init__(self, connection_string: str) -> None: self._conn_str = connection_string self._init_schema() def _init_schema(self) -> None: import psycopg with psycopg.connect(self._conn_str) as conn: with conn.cursor() as cur: cur.execute( """ CREATE TABLE IF NOT EXISTS memory_items ( id TEXT PRIMARY KEY, tenant_id TEXT NOT NULL, user_id TEXT NOT NULL, session_id TEXT NOT NULL, type TEXT NOT NULL, content JSONB NOT NULL, metadata JSONB DEFAULT '{}', retention_policy TEXT DEFAULT 'session', created_at TIMESTAMPTZ DEFAULT NOW() ) """ ) conn.commit() def store( self, id: str, tenant_id: str, user_id: str, session_id: str, type: str, content: dict[str, Any], metadata: dict[str, Any] | None = None, retention_policy: str = "session", ) -> None: import json import psycopg with psycopg.connect(self._conn_str) as conn: with conn.cursor() as cur: cur.execute( """ INSERT INTO memory_items (id, tenant_id, user_id, session_id, type, content, metadata, retention_policy) VALUES (%s, %s, %s, %s, %s, %s, %s, %s) ON CONFLICT (id) DO UPDATE SET content = EXCLUDED.content, metadata = EXCLUDED.metadata """, (id, tenant_id, user_id, session_id, type, json.dumps(content), json.dumps(metadata or {}), retention_policy), ) conn.commit() def get(self, id: str) -> dict[str, Any] | None: import json import psycopg with psycopg.connect(self._conn_str) as conn: with conn.cursor() as cur: cur.execute( "SELECT id, tenant_id, user_id, session_id, type, content, metadata, retention_policy FROM memory_items WHERE id = %s", (id,), ) row = cur.fetchone() if not row: return None return { "id": row[0], "tenant_id": row[1], "user_id": row[2], "session_id": row[3], "type": row[4], "content": json.loads(row[5]) if row[5] else {}, "metadata": json.loads(row[6]) if row[6] else {}, "retention_policy": row[7], } def query( self, tenant_id: str, user_id: str | None = None, session_id: str | None = None, type: str | None = None, limit: int = 100, ) -> list[dict[str, Any]]: import json import psycopg q = "SELECT id, tenant_id, user_id, session_id, type, content, metadata, retention_policy FROM memory_items WHERE tenant_id = %s" params: list[Any] = [tenant_id] if user_id: q += " AND user_id = %s" params.append(user_id) if session_id: q += " AND session_id = %s" params.append(session_id) if type: q += " AND type = %s" params.append(type) q += " ORDER BY created_at DESC LIMIT %s" params.append(limit) with psycopg.connect(self._conn_str) as conn: with conn.cursor() as cur: cur.execute(q, params) rows = cur.fetchall() return [ { "id": r[0], "tenant_id": r[1], "user_id": r[2], "session_id": r[3], "type": r[4], "content": json.loads(r[5]) if r[5] else {}, "metadata": json.loads(r[6]) if r[6] else {}, "retention_policy": r[7], } for r in rows ]