Files

45 lines
1.3 KiB
Python
Raw Permalink Normal View History

"""Optional persistence interface for state manager; in-memory is default."""
from abc import ABC, abstractmethod
from typing import Any
from fusionagi.schemas.task import Task, TaskState
class StateBackend(ABC):
"""
Abstract backend for task state and traces; replace StateManager internals for persistence.
Any backend used to replace StateManager storage must implement get_task_state and set_task_state
in addition to task and trace methods.
"""
@abstractmethod
def get_task(self, task_id: str) -> Task | None:
"""Load task by id."""
...
@abstractmethod
def set_task(self, task: Task) -> None:
"""Save task."""
...
@abstractmethod
def get_task_state(self, task_id: str) -> TaskState | None:
"""Return current task state or None if task unknown."""
...
@abstractmethod
def set_task_state(self, task_id: str, state: TaskState) -> None:
"""Update task state; creates no task if missing."""
...
@abstractmethod
def append_trace(self, task_id: str, entry: dict[str, Any]) -> None:
"""Append trace entry."""
...
@abstractmethod
def get_trace(self, task_id: str) -> list[dict[str, Any]]:
"""Load trace for task."""
...