Files
FusionAGI/fusionagi/memory/postgres_backend.py
defiQUG c052b07662
Some checks failed
Tests / test (3.10) (push) Has been cancelled
Tests / test (3.11) (push) Has been cancelled
Tests / test (3.12) (push) Has been cancelled
Tests / lint (push) Has been cancelled
Tests / docker (push) Has been cancelled
Initial commit: add .gitignore and README
2026-02-09 21:51:42 -08:00

232 lines
6.9 KiB
Python

"""Optional Postgres backend for memory. Requires: pip install fusionagi[memory]."""
from abc import ABC, abstractmethod
from typing import Any
from fusionagi._logger import logger
class MemoryBackend(ABC):
"""Abstract backend for persistent memory storage."""
@abstractmethod
def store(
self,
id: str,
tenant_id: str,
user_id: str,
session_id: str,
type: str,
content: dict[str, Any],
metadata: dict[str, Any] | None = None,
retention_policy: str = "session",
) -> None:
"""Store a memory item."""
...
@abstractmethod
def get(self, id: str) -> dict[str, Any] | None:
"""Get a memory item by id."""
...
@abstractmethod
def query(
self,
tenant_id: str,
user_id: str | None = None,
session_id: str | None = None,
type: str | None = None,
limit: int = 100,
) -> list[dict[str, Any]]:
"""Query memory items."""
...
class InMemoryBackend(MemoryBackend):
"""In-memory implementation for development."""
def __init__(self) -> None:
self._store: dict[str, dict[str, Any]] = {}
def store(
self,
id: str,
tenant_id: str,
user_id: str,
session_id: str,
type: str,
content: dict[str, Any],
metadata: dict[str, Any] | None = None,
retention_policy: str = "session",
) -> None:
self._store[id] = {
"id": id,
"tenant_id": tenant_id,
"user_id": user_id,
"session_id": session_id,
"type": type,
"content": content,
"metadata": metadata or {},
"retention_policy": retention_policy,
}
def get(self, id: str) -> dict[str, Any] | None:
return self._store.get(id)
def query(
self,
tenant_id: str,
user_id: str | None = None,
session_id: str | None = None,
type: str | None = None,
limit: int = 100,
) -> list[dict[str, Any]]:
out = []
for v in self._store.values():
if v["tenant_id"] != tenant_id:
continue
if user_id and v["user_id"] != user_id:
continue
if session_id and v["session_id"] != session_id:
continue
if type and v["type"] != type:
continue
out.append(v)
if len(out) >= limit:
break
return out
def create_postgres_backend(connection_string: str) -> MemoryBackend | None:
"""Create Postgres-backed MemoryBackend when psycopg is available."""
try:
import psycopg
except ImportError:
logger.debug("psycopg not installed; use pip install fusionagi[memory]")
return None
return PostgresMemoryBackend(connection_string)
class PostgresMemoryBackend(MemoryBackend):
"""Postgres-backed memory storage."""
def __init__(self, connection_string: str) -> None:
self._conn_str = connection_string
self._init_schema()
def _init_schema(self) -> None:
import psycopg
with psycopg.connect(self._conn_str) as conn:
with conn.cursor() as cur:
cur.execute(
"""
CREATE TABLE IF NOT EXISTS memory_items (
id TEXT PRIMARY KEY,
tenant_id TEXT NOT NULL,
user_id TEXT NOT NULL,
session_id TEXT NOT NULL,
type TEXT NOT NULL,
content JSONB NOT NULL,
metadata JSONB DEFAULT '{}',
retention_policy TEXT DEFAULT 'session',
created_at TIMESTAMPTZ DEFAULT NOW()
)
"""
)
conn.commit()
def store(
self,
id: str,
tenant_id: str,
user_id: str,
session_id: str,
type: str,
content: dict[str, Any],
metadata: dict[str, Any] | None = None,
retention_policy: str = "session",
) -> None:
import json
import psycopg
with psycopg.connect(self._conn_str) as conn:
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO memory_items (id, tenant_id, user_id, session_id, type, content, metadata, retention_policy)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (id) DO UPDATE SET content = EXCLUDED.content, metadata = EXCLUDED.metadata
""",
(id, tenant_id, user_id, session_id, type, json.dumps(content), json.dumps(metadata or {}), retention_policy),
)
conn.commit()
def get(self, id: str) -> dict[str, Any] | None:
import json
import psycopg
with psycopg.connect(self._conn_str) as conn:
with conn.cursor() as cur:
cur.execute(
"SELECT id, tenant_id, user_id, session_id, type, content, metadata, retention_policy FROM memory_items WHERE id = %s",
(id,),
)
row = cur.fetchone()
if not row:
return None
return {
"id": row[0],
"tenant_id": row[1],
"user_id": row[2],
"session_id": row[3],
"type": row[4],
"content": json.loads(row[5]) if row[5] else {},
"metadata": json.loads(row[6]) if row[6] else {},
"retention_policy": row[7],
}
def query(
self,
tenant_id: str,
user_id: str | None = None,
session_id: str | None = None,
type: str | None = None,
limit: int = 100,
) -> list[dict[str, Any]]:
import json
import psycopg
q = "SELECT id, tenant_id, user_id, session_id, type, content, metadata, retention_policy FROM memory_items WHERE tenant_id = %s"
params: list[Any] = [tenant_id]
if user_id:
q += " AND user_id = %s"
params.append(user_id)
if session_id:
q += " AND session_id = %s"
params.append(session_id)
if type:
q += " AND type = %s"
params.append(type)
q += " ORDER BY created_at DESC LIMIT %s"
params.append(limit)
with psycopg.connect(self._conn_str) as conn:
with conn.cursor() as cur:
cur.execute(q, params)
rows = cur.fetchall()
return [
{
"id": r[0],
"tenant_id": r[1],
"user_id": r[2],
"session_id": r[3],
"type": r[4],
"content": json.loads(r[5]) if r[5] else {},
"metadata": json.loads(r[6]) if r[6] else {},
"retention_policy": r[7],
}
for r in rows
]