232 lines
6.9 KiB
Python
232 lines
6.9 KiB
Python
"""Optional Postgres backend for memory. Requires: pip install fusionagi[memory]."""
|
|
|
|
from abc import ABC, abstractmethod
|
|
from typing import Any
|
|
|
|
from fusionagi._logger import logger
|
|
|
|
|
|
class MemoryBackend(ABC):
|
|
"""Abstract backend for persistent memory storage."""
|
|
|
|
@abstractmethod
|
|
def store(
|
|
self,
|
|
id: str,
|
|
tenant_id: str,
|
|
user_id: str,
|
|
session_id: str,
|
|
type: str,
|
|
content: dict[str, Any],
|
|
metadata: dict[str, Any] | None = None,
|
|
retention_policy: str = "session",
|
|
) -> None:
|
|
"""Store a memory item."""
|
|
...
|
|
|
|
@abstractmethod
|
|
def get(self, id: str) -> dict[str, Any] | None:
|
|
"""Get a memory item by id."""
|
|
...
|
|
|
|
@abstractmethod
|
|
def query(
|
|
self,
|
|
tenant_id: str,
|
|
user_id: str | None = None,
|
|
session_id: str | None = None,
|
|
type: str | None = None,
|
|
limit: int = 100,
|
|
) -> list[dict[str, Any]]:
|
|
"""Query memory items."""
|
|
...
|
|
|
|
|
|
class InMemoryBackend(MemoryBackend):
|
|
"""In-memory implementation for development."""
|
|
|
|
def __init__(self) -> None:
|
|
self._store: dict[str, dict[str, Any]] = {}
|
|
|
|
def store(
|
|
self,
|
|
id: str,
|
|
tenant_id: str,
|
|
user_id: str,
|
|
session_id: str,
|
|
type: str,
|
|
content: dict[str, Any],
|
|
metadata: dict[str, Any] | None = None,
|
|
retention_policy: str = "session",
|
|
) -> None:
|
|
self._store[id] = {
|
|
"id": id,
|
|
"tenant_id": tenant_id,
|
|
"user_id": user_id,
|
|
"session_id": session_id,
|
|
"type": type,
|
|
"content": content,
|
|
"metadata": metadata or {},
|
|
"retention_policy": retention_policy,
|
|
}
|
|
|
|
def get(self, id: str) -> dict[str, Any] | None:
|
|
return self._store.get(id)
|
|
|
|
def query(
|
|
self,
|
|
tenant_id: str,
|
|
user_id: str | None = None,
|
|
session_id: str | None = None,
|
|
type: str | None = None,
|
|
limit: int = 100,
|
|
) -> list[dict[str, Any]]:
|
|
out = []
|
|
for v in self._store.values():
|
|
if v["tenant_id"] != tenant_id:
|
|
continue
|
|
if user_id and v["user_id"] != user_id:
|
|
continue
|
|
if session_id and v["session_id"] != session_id:
|
|
continue
|
|
if type and v["type"] != type:
|
|
continue
|
|
out.append(v)
|
|
if len(out) >= limit:
|
|
break
|
|
return out
|
|
|
|
|
|
def create_postgres_backend(connection_string: str) -> MemoryBackend | None:
|
|
"""Create Postgres-backed MemoryBackend when psycopg is available."""
|
|
try:
|
|
import psycopg
|
|
except ImportError:
|
|
logger.debug("psycopg not installed; use pip install fusionagi[memory]")
|
|
return None
|
|
|
|
return PostgresMemoryBackend(connection_string)
|
|
|
|
|
|
class PostgresMemoryBackend(MemoryBackend):
|
|
"""Postgres-backed memory storage."""
|
|
|
|
def __init__(self, connection_string: str) -> None:
|
|
self._conn_str = connection_string
|
|
self._init_schema()
|
|
|
|
def _init_schema(self) -> None:
|
|
import psycopg
|
|
|
|
with psycopg.connect(self._conn_str) as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS memory_items (
|
|
id TEXT PRIMARY KEY,
|
|
tenant_id TEXT NOT NULL,
|
|
user_id TEXT NOT NULL,
|
|
session_id TEXT NOT NULL,
|
|
type TEXT NOT NULL,
|
|
content JSONB NOT NULL,
|
|
metadata JSONB DEFAULT '{}',
|
|
retention_policy TEXT DEFAULT 'session',
|
|
created_at TIMESTAMPTZ DEFAULT NOW()
|
|
)
|
|
"""
|
|
)
|
|
conn.commit()
|
|
|
|
def store(
|
|
self,
|
|
id: str,
|
|
tenant_id: str,
|
|
user_id: str,
|
|
session_id: str,
|
|
type: str,
|
|
content: dict[str, Any],
|
|
metadata: dict[str, Any] | None = None,
|
|
retention_policy: str = "session",
|
|
) -> None:
|
|
import json
|
|
import psycopg
|
|
|
|
with psycopg.connect(self._conn_str) as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO memory_items (id, tenant_id, user_id, session_id, type, content, metadata, retention_policy)
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
|
|
ON CONFLICT (id) DO UPDATE SET content = EXCLUDED.content, metadata = EXCLUDED.metadata
|
|
""",
|
|
(id, tenant_id, user_id, session_id, type, json.dumps(content), json.dumps(metadata or {}), retention_policy),
|
|
)
|
|
conn.commit()
|
|
|
|
def get(self, id: str) -> dict[str, Any] | None:
|
|
import json
|
|
import psycopg
|
|
|
|
with psycopg.connect(self._conn_str) as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"SELECT id, tenant_id, user_id, session_id, type, content, metadata, retention_policy FROM memory_items WHERE id = %s",
|
|
(id,),
|
|
)
|
|
row = cur.fetchone()
|
|
if not row:
|
|
return None
|
|
return {
|
|
"id": row[0],
|
|
"tenant_id": row[1],
|
|
"user_id": row[2],
|
|
"session_id": row[3],
|
|
"type": row[4],
|
|
"content": json.loads(row[5]) if row[5] else {},
|
|
"metadata": json.loads(row[6]) if row[6] else {},
|
|
"retention_policy": row[7],
|
|
}
|
|
|
|
def query(
|
|
self,
|
|
tenant_id: str,
|
|
user_id: str | None = None,
|
|
session_id: str | None = None,
|
|
type: str | None = None,
|
|
limit: int = 100,
|
|
) -> list[dict[str, Any]]:
|
|
import json
|
|
import psycopg
|
|
|
|
q = "SELECT id, tenant_id, user_id, session_id, type, content, metadata, retention_policy FROM memory_items WHERE tenant_id = %s"
|
|
params: list[Any] = [tenant_id]
|
|
if user_id:
|
|
q += " AND user_id = %s"
|
|
params.append(user_id)
|
|
if session_id:
|
|
q += " AND session_id = %s"
|
|
params.append(session_id)
|
|
if type:
|
|
q += " AND type = %s"
|
|
params.append(type)
|
|
q += " ORDER BY created_at DESC LIMIT %s"
|
|
params.append(limit)
|
|
|
|
with psycopg.connect(self._conn_str) as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(q, params)
|
|
rows = cur.fetchall()
|
|
return [
|
|
{
|
|
"id": r[0],
|
|
"tenant_id": r[1],
|
|
"user_id": r[2],
|
|
"session_id": r[3],
|
|
"type": r[4],
|
|
"content": json.loads(r[5]) if r[5] else {},
|
|
"metadata": json.loads(r[6]) if r[6] else {},
|
|
"retention_policy": r[7],
|
|
}
|
|
for r in rows
|
|
]
|