Files
FusionAGI/fusionagi/maa/versioning.py
defiQUG c052b07662
Some checks failed
Tests / test (3.10) (push) Has been cancelled
Tests / test (3.11) (push) Has been cancelled
Tests / test (3.12) (push) Has been cancelled
Tests / lint (push) Has been cancelled
Tests / docker (push) Has been cancelled
Initial commit: add .gitignore and README
2026-02-09 21:51:42 -08:00

44 lines
1.7 KiB
Python

"""Logic tree and MPC versioning; changes require re-certification; historical preserved."""
from typing import Any
class VersionStore:
"""Immutable versioned store: logic trees and MPCs; historical read-only."""
def __init__(self) -> None:
self._versions: dict[str, list[tuple[int, Any]]] = {} # id -> [(version, payload), ...]
def put(self, id_key: str, version: int, payload: Any) -> None:
"""Store a new version; versions must be monotonic."""
if id_key not in self._versions:
self._versions[id_key] = []
existing = self._versions[id_key]
if existing and existing[-1][0] >= version:
raise ValueError(f"Version must be greater than {existing[-1][0]}")
existing.append((version, payload))
def get(self, id_key: str, version: int | None = None) -> Any | None:
"""Return payload for id; optional version (latest if omitted)."""
if id_key not in self._versions:
return None
versions = self._versions[id_key]
if not versions:
return None
if version is None:
return versions[-1][1]
for v, payload in versions:
if v == version:
return payload
return None
def get_latest_version(self, id_key: str) -> int | None:
"""Return latest version number for id or None."""
if id_key not in self._versions or not self._versions[id_key]:
return None
return self._versions[id_key][-1][0]
def history(self, id_key: str) -> list[tuple[int, Any]]:
"""Return full version history (read-only)."""
return list(self._versions.get(id_key, []))