44 lines
1.7 KiB
Python
44 lines
1.7 KiB
Python
"""Logic tree and MPC versioning; changes require re-certification; historical preserved."""
|
|
|
|
from typing import Any
|
|
|
|
|
|
class VersionStore:
|
|
"""Immutable versioned store: logic trees and MPCs; historical read-only."""
|
|
|
|
def __init__(self) -> None:
|
|
self._versions: dict[str, list[tuple[int, Any]]] = {} # id -> [(version, payload), ...]
|
|
|
|
def put(self, id_key: str, version: int, payload: Any) -> None:
|
|
"""Store a new version; versions must be monotonic."""
|
|
if id_key not in self._versions:
|
|
self._versions[id_key] = []
|
|
existing = self._versions[id_key]
|
|
if existing and existing[-1][0] >= version:
|
|
raise ValueError(f"Version must be greater than {existing[-1][0]}")
|
|
existing.append((version, payload))
|
|
|
|
def get(self, id_key: str, version: int | None = None) -> Any | None:
|
|
"""Return payload for id; optional version (latest if omitted)."""
|
|
if id_key not in self._versions:
|
|
return None
|
|
versions = self._versions[id_key]
|
|
if not versions:
|
|
return None
|
|
if version is None:
|
|
return versions[-1][1]
|
|
for v, payload in versions:
|
|
if v == version:
|
|
return payload
|
|
return None
|
|
|
|
def get_latest_version(self, id_key: str) -> int | None:
|
|
"""Return latest version number for id or None."""
|
|
if id_key not in self._versions or not self._versions[id_key]:
|
|
return None
|
|
return self._versions[id_key][-1][0]
|
|
|
|
def history(self, id_key: str) -> list[tuple[int, Any]]:
|
|
"""Return full version history (read-only)."""
|
|
return list(self._versions.get(id_key, []))
|