Files
proxmox/services/sankofa-it-read-api/server.py
defiQUG dbd517b279 Sync workspace: config, docs, scripts, CI, operator rules, and submodule pointers.
- Update dbis_core, cross-chain-pmm-lps, explorer-monorepo, metamask-integration, pr-workspace/chains
- Omit embedded publish git dirs and empty placeholders from index

Made-with: Cursor
2026-04-12 06:12:20 -07:00

337 lines
12 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Read-only HTTP API for IT inventory JSON (Phase 0 BFF stub).
Serves latest reports/status/live_inventory.json and drift.json from the repo tree.
Optional IT_READ_API_KEY: when set, /v1/* requires header X-API-Key (GET and POST).
Usage (from repo root):
IT_READ_API_KEY=secret python3 services/sankofa-it-read-api/server.py
# or
python3 services/sankofa-it-read-api/server.py # open /v1 without key
Env:
IT_READ_API_HOST (default 127.0.0.1)
IT_READ_API_PORT (default 8787)
IT_READ_API_KEY (optional)
IT_READ_API_CORS_ORIGINS (optional, comma-separated; enables CORS for browser direct calls)
"""
from __future__ import annotations
import json
import os
import subprocess
import sys
from datetime import datetime, timezone
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from pathlib import Path
def _project_root() -> Path:
here = Path(__file__).resolve()
for p in [here.parent.parent.parent, *here.parents]:
if (p / "config" / "ip-addresses.conf").is_file():
return p
return Path.cwd()
ROOT = _project_root()
REPORTS = ROOT / "reports" / "status"
EXPORT_SCRIPT = ROOT / "scripts" / "it-ops" / "export-live-inventory-and-drift.sh"
COLLECTOR_CONTRACT = (
ROOT / "config" / "it-operations" / "live-collectors-contract.json"
)
API_KEY = os.environ.get("IT_READ_API_KEY", "").strip()
HOST = os.environ.get("IT_READ_API_HOST", "127.0.0.1")
PORT = int(os.environ.get("IT_READ_API_PORT", "8787"))
# Comma-separated origins for Access-Control-Allow-Origin (optional; portal should proxy via Next.js).
_CORS_RAW = os.environ.get("IT_READ_API_CORS_ORIGINS", "").strip()
CORS_ORIGINS = {o.strip() for o in _CORS_RAW.split(",") if o.strip()}
def _file_meta(path: Path) -> dict:
if not path.is_file():
return {"path": str(path), "exists": False}
st = path.stat()
return {
"path": str(path),
"exists": True,
"size_bytes": st.st_size,
"mtime_utc": datetime.fromtimestamp(st.st_mtime, tz=timezone.utc).strftime(
"%Y-%m-%dT%H:%M:%SZ"
),
}
def _load_json_file(path: Path) -> tuple[dict | list | None, str | None]:
if not path.is_file():
return None, "file_missing"
try:
return json.loads(path.read_text(encoding="utf-8")), None
except json.JSONDecodeError as e:
return None, str(e)
def _declared_git_head(repo: Path) -> str | None:
"""Best-effort declared config revision (repo checkout on read API host)."""
if os.environ.get("IT_SKIP_GIT_HEAD", "").strip().lower() in ("1", "yes", "true"):
return None
try:
proc = subprocess.run(
["git", "-C", str(repo), "rev-parse", "--short", "HEAD"],
capture_output=True,
text=True,
timeout=5,
check=False,
)
if proc.returncode == 0 and proc.stdout.strip():
return proc.stdout.strip()
except (OSError, subprocess.TimeoutExpired):
pass
return None
class Handler(BaseHTTPRequestHandler):
server_version = "SankofaITReadAPI/0.1"
def log_message(self, fmt: str, *args) -> None:
sys.stderr.write("%s - %s\n" % (self.address_string(), fmt % args))
def _maybe_cors(self) -> None:
if not CORS_ORIGINS:
return
origin = (self.headers.get("Origin") or "").strip()
if origin in CORS_ORIGINS:
self.send_header("Access-Control-Allow-Origin", origin)
self.send_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS")
self.send_header(
"Access-Control-Allow-Headers",
"Content-Type, X-API-Key",
)
self.send_header("Vary", "Origin")
def do_OPTIONS(self) -> None:
if CORS_ORIGINS:
self.send_response(204)
self._maybe_cors()
self.end_headers()
return
self._text(404, "Not found\n")
def _json(self, code: int, obj: object) -> None:
body = json.dumps(obj, indent=2).encode("utf-8")
self.send_response(code)
self.send_header("Content-Type", "application/json")
self._maybe_cors()
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def _text(self, code: int, text: str, ctype: str = "text/plain") -> None:
body = text.encode("utf-8")
self.send_response(code)
self.send_header("Content-Type", f"{ctype}; charset=utf-8")
self._maybe_cors()
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def _auth_ok(self) -> bool:
if not API_KEY:
return True
return self.headers.get("X-API-Key") == API_KEY
def do_GET(self) -> None:
path = self.path.split("?", 1)[0].rstrip("/") or "/"
if path == "/health":
oidc_issuer = os.environ.get("IT_BFF_OIDC_ISSUER", "").strip()
self._json(
200,
{
"ok": True,
"service": "sankofa-it-read-api",
"project_root": str(ROOT),
"auth_required_for_v1": bool(API_KEY),
"oidc_issuer_configured": bool(oidc_issuer),
},
)
return
if path.startswith("/v1"):
if not self._auth_ok():
self._json(401, {"error": "unauthorized"})
return
if path == "/v1/collector-contract":
data, err = _load_json_file(COLLECTOR_CONTRACT)
if err == "file_missing":
self._json(404, {"error": "file_missing", "path": str(COLLECTOR_CONTRACT)})
return
if err:
self._json(500, {"error": "invalid_json", "detail": err})
return
assert isinstance(data, (dict, list))
self._json(200, data)
return
if path == "/v1/portmap/joined":
now = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
self._json(
200,
{
"implementation": "stub",
"collected_at": now,
"stale": True,
"rows": [],
"note": "UniFi/NPM live collectors not wired; see docs/02-architecture/IT_PORT_MAP_LAYERS_SPEC.md",
},
)
return
if path == "/v1/summary":
live_p = REPORTS / "live_inventory.json"
drift_p = REPORTS / "drift.json"
live_d, le = _load_json_file(live_p)
drift_d, de = _load_json_file(drift_p)
if le == "file_missing" and de == "file_missing":
self._json(404, {"error": "no_inventory_artifacts"})
return
dup = (
len(drift_d.get("duplicate_ips", {}))
if isinstance(drift_d, dict)
else 0
)
notes = drift_d.get("notes", []) if isinstance(drift_d, dict) else []
seed_err = any(
"seed_unreachable" in str(n) for n in notes
) or (
isinstance(live_d, dict) and live_d.get("error") == "seed_unreachable"
)
vmid_doc_missing: list[str] = []
vmid_live_extra: dict = {}
if isinstance(drift_d, dict):
raw_missing = drift_d.get("vmids_in_all_vmids_doc_not_on_cluster")
if isinstance(raw_missing, list):
vmid_doc_missing = [str(x) for x in raw_missing]
extra = drift_d.get("vmids_on_cluster_not_in_all_vmids_table")
if isinstance(extra, dict):
vmid_live_extra = extra
self._json(
200,
{
"service": "sankofa-it-read-api",
"envelope_at": datetime.now(timezone.utc).strftime(
"%Y-%m-%dT%H:%M:%SZ"
),
"declared_git_head": _declared_git_head(ROOT),
"artifacts": {
"live_inventory": _file_meta(live_p),
"drift": _file_meta(drift_p),
},
"live_collected_at": (live_d or {}).get("collected_at")
if isinstance(live_d, dict)
else None,
"drift_collected_at": (drift_d or {}).get("collected_at")
if isinstance(drift_d, dict)
else None,
"guest_count": (drift_d or {}).get("guest_count")
if isinstance(drift_d, dict)
else None,
"duplicate_ip_bucket_count": dup,
"seed_unreachable": bool(seed_err),
"drift_notes": notes if isinstance(notes, list) else [],
"vmids_in_all_vmids_doc_not_on_cluster": vmid_doc_missing,
"vmids_on_cluster_not_in_all_vmids_table": vmid_live_extra,
},
)
return
if path == "/v1/inventory/live":
f = REPORTS / "live_inventory.json"
elif path == "/v1/inventory/drift":
f = REPORTS / "drift.json"
else:
self._json(404, {"error": "not_found"})
return
data, err = _load_json_file(f)
if err == "file_missing":
self._json(404, {"error": "file_missing", "path": str(f)})
return
if err:
self._json(500, {"error": "invalid_json", "detail": err})
return
assert data is not None
self._json(200, data)
return
self._text(
404,
"Not found. GET /health, /v1/summary, /v1/collector-contract, /v1/inventory/*\n",
)
def do_POST(self) -> None:
path = self.path.split("?", 1)[0].rstrip("/") or "/"
if path != "/v1/inventory/refresh":
self._json(404, {"error": "not_found"})
return
if not API_KEY or not self._auth_ok():
self._json(401, {"error": "unauthorized"})
return
if not EXPORT_SCRIPT.is_file():
self._json(500, {"error": "export_script_missing"})
return
try:
proc = subprocess.run(
["bash", str(EXPORT_SCRIPT)],
cwd=str(ROOT),
check=False,
timeout=600,
capture_output=True,
text=True,
)
except subprocess.TimeoutExpired:
self._json(500, {"error": "export_timeout"})
return
if proc.returncode not in (0, 2):
self._json(
500,
{
"error": "export_failed",
"returncode": proc.returncode,
"stderr": (proc.stderr or "")[-4000:],
},
)
return
self._json(
200,
{
"ok": True,
"refreshed": True,
"drift_exit_code": proc.returncode,
"duplicate_guest_ip_conflict": proc.returncode == 2,
},
)
def main() -> None:
httpd = ThreadingHTTPServer((HOST, PORT), Handler)
print(f"sankofa-it-read-api listening on http://{HOST}:{PORT}", file=sys.stderr)
print(f" project_root={ROOT}", file=sys.stderr)
print(
" GET /health /v1/summary /v1/collector-contract /v1/portmap/joined",
file=sys.stderr,
)
print(" GET /v1/inventory/live /v1/inventory/drift POST /v1/inventory/refresh", file=sys.stderr)
if API_KEY:
print(" X-API-Key required for /v1/*", file=sys.stderr)
if CORS_ORIGINS:
print(f" CORS origins: {sorted(CORS_ORIGINS)}", file=sys.stderr)
try:
httpd.serve_forever()
except KeyboardInterrupt:
print("\nshutdown", file=sys.stderr)
if __name__ == "__main__":
main()