Files
FusionAGI/fusionagi/core/json_file_backend.py
defiQUG c052b07662
Some checks failed
Tests / test (3.10) (push) Has been cancelled
Tests / test (3.11) (push) Has been cancelled
Tests / test (3.12) (push) Has been cancelled
Tests / lint (push) Has been cancelled
Tests / docker (push) Has been cancelled
Initial commit: add .gitignore and README
2026-02-09 21:51:42 -08:00

70 lines
2.3 KiB
Python

"""JSON file persistence backend for StateManager."""
import json
from pathlib import Path
from typing import Any
from fusionagi.schemas.task import Task, TaskState
from fusionagi.core.persistence import StateBackend
from fusionagi._logger import logger
class JsonFileBackend(StateBackend):
"""
StateBackend that persists tasks and traces to a JSON file.
Use with StateManager(backend=JsonFileBackend(path="state.json")).
File is created on first write; directory must exist or be creatable.
"""
def __init__(self, path: str | Path) -> None:
self._path = Path(path)
self._tasks: dict[str, dict[str, Any]] = {}
self._traces: dict[str, list[dict[str, Any]]] = {}
self._load()
def _load(self) -> None:
if not self._path.exists():
return
try:
data = json.loads(self._path.read_text(encoding="utf-8"))
self._tasks = data.get("tasks", {})
self._traces = data.get("traces", {})
except Exception as e:
logger.warning("JsonFileBackend load failed", extra={"path": str(self._path), "error": str(e)})
def _save(self) -> None:
self._path.parent.mkdir(parents=True, exist_ok=True)
data = {"tasks": self._tasks, "traces": self._traces}
self._path.write_text(json.dumps(data, indent=2), encoding="utf-8")
def get_task(self, task_id: str) -> Task | None:
raw = self._tasks.get(task_id)
if raw is None:
return None
try:
return Task.model_validate(raw)
except Exception:
return None
def set_task(self, task: Task) -> None:
self._tasks[task.task_id] = task.model_dump(mode="json")
self._save()
def get_task_state(self, task_id: str) -> TaskState | None:
task = self.get_task(task_id)
return task.state if task else None
def set_task_state(self, task_id: str, state: TaskState) -> None:
task = self.get_task(task_id)
if task:
updated = task.model_copy(update={"state": state})
self.set_task(updated)
def append_trace(self, task_id: str, entry: dict[str, Any]) -> None:
self._traces.setdefault(task_id, []).append(entry)
self._save()
def get_trace(self, task_id: str) -> list[dict[str, Any]]:
return list(self._traces.get(task_id, []))