Files
proxmox/scripts/validation/validate-jvmtm-transaction-compliance-pack.py
defiQUG 7ac74f432b chore: sync docs, config schemas, scripts, and meta task alignment
- Institutional / JVMTM / reserve-provenance / GRU transport + standards JSON
- Validation and verify scripts (Blockscout labels, x402, GRU preflight, P1 local path)
- Wormhole wiring in AGENTS, MCP_SETUP, MASTER_INDEX, 04-configuration README
- Meta docs, integration gaps, live verification log, architecture updates
- CI validate-config workflow updates

Operator/LAN items, submodule working trees, and public token-aggregation edge
routes remain follow-up (see TODOS_CONSOLIDATED P1).

Made-with: Cursor
2026-03-31 22:31:39 -07:00

342 lines
15 KiB
Python
Executable File

#!/usr/bin/env python3
"""Validate the JVMTM transaction-grade compliance pack."""
from __future__ import annotations
import csv
import io
import json
import sys
from pathlib import Path
RAIL_MODES = {"chain138-primary", "swift", "hybrid", "internal-only"}
BLOCKING_LEVELS = {"HARD_STOP", "ESCALATE", "POST_EVENT"}
DECISION_STATUSES = {"READY", "BLOCKED", "ESCALATE"}
CONTROL_STATUSES = {"PASS", "FAIL", "PENDING", "WAIVED"}
EVIDENCE_REF_TYPES = {"repo-path", "runtime-slot", "archive-path", "external-ref"}
REQUIRED_CONTROL_FIELDS = [
"control_id",
"phase",
"domain",
"requirement",
"validation_method",
"blocking_level",
"applies_to_rail",
"source_audit_rows",
"repo_evidence_artifacts",
"validator_command",
"failure_action",
"high_value_override",
"notes",
]
CSV_FIELDNAMES = [
"control_id",
"phase",
"domain",
"requirement",
"validation_method",
"blocking_level",
"applies_to_rail",
"source_audit_rows",
"repo_evidence_artifacts",
"validator_command",
"failure_action",
"high_value_override",
"notes",
]
def fail(message: str) -> None:
raise SystemExit(f"error: {message}")
def load_json(path: Path) -> dict:
try:
return json.loads(path.read_text(encoding="utf-8"))
except FileNotFoundError:
fail(f"missing JSON file: {path}")
except json.JSONDecodeError as exc:
fail(f"invalid JSON in {path}: {exc}")
def format_artifacts(artifacts: list[dict[str, str]]) -> str:
return " | ".join(f'{artifact["artifact_type"]}:{artifact["ref"]}' for artifact in artifacts)
def render_csv(matrix: dict) -> str:
buffer = io.StringIO(newline="")
writer = csv.DictWriter(buffer, fieldnames=CSV_FIELDNAMES, lineterminator="\n")
writer.writeheader()
for control in matrix["controls"]:
writer.writerow(
{
"control_id": control["control_id"],
"phase": control["phase"],
"domain": control["domain"],
"requirement": control["requirement"],
"validation_method": control["validation_method"],
"blocking_level": control["blocking_level"],
"applies_to_rail": " | ".join(control["applies_to_rail"]),
"source_audit_rows": " | ".join(control["source_audit_rows"]),
"repo_evidence_artifacts": format_artifacts(control["repo_evidence_artifacts"]),
"validator_command": control["validator_command"],
"failure_action": control["failure_action"],
"high_value_override": control["high_value_override"],
"notes": control["notes"],
}
)
return buffer.getvalue()
def validate_evidence_ref(ref: dict, label: str) -> None:
if not isinstance(ref, dict):
fail(f"{label} must be an object")
for key in ("artifact_type", "ref"):
if key not in ref or not isinstance(ref[key], str) or not ref[key].strip():
fail(f"{label} missing non-empty {key}")
if ref["artifact_type"] not in EVIDENCE_REF_TYPES:
fail(f"{label} uses unsupported artifact_type {ref['artifact_type']}")
if "sha256" in ref:
sha256 = ref["sha256"]
if not isinstance(sha256, str) or len(sha256) != 64 or any(c not in "0123456789abcdefABCDEF" for c in sha256):
fail(f"{label} sha256 must be a 64-character hex string")
def validate_pack_reference(ref: dict, label: str, repo_root: Path, slot_refs: set[str]) -> None:
validate_evidence_ref(ref, label)
artifact_type = ref["artifact_type"]
target = ref["ref"]
if artifact_type == "repo-path":
if not (repo_root / target).exists():
fail(f"{label} repo-path does not exist: {target}")
elif artifact_type == "runtime-slot":
if target not in slot_refs:
fail(f"{label} runtime-slot does not exist in the matrix: {target}")
def validate_execution_example(
path: Path,
control_ids: set[str],
expected_status: str,
matrix_version: str,
repo_root: Path,
slot_refs: set[str],
) -> None:
payload = load_json(path)
required_top_level = [
"schema_version",
"matrix_version",
"transaction_id",
"correlation_id",
"rail_mode",
"amount",
"currency",
"decision_status",
"decision_reason",
"validated_at",
"approved_by",
"instruction_ref",
"control_results",
]
for field in required_top_level:
if field not in payload:
fail(f"{path} missing required field {field}")
if payload["decision_status"] not in DECISION_STATUSES:
fail(f"{path} uses unsupported decision_status {payload['decision_status']}")
if payload["rail_mode"] not in RAIL_MODES:
fail(f"{path} uses unsupported rail_mode {payload['rail_mode']}")
if payload["decision_status"] != expected_status:
fail(f"{path} decision_status expected {expected_status} but found {payload['decision_status']}")
if payload["matrix_version"] != matrix_version:
fail(f"{path} matrix_version {payload['matrix_version']} does not match canonical matrix_version {matrix_version}")
validate_pack_reference(payload["instruction_ref"], f"{path}:instruction_ref", repo_root, slot_refs)
if "settlement_event_ref" in payload:
validate_pack_reference(payload["settlement_event_ref"], f"{path}:settlement_event_ref", repo_root, slot_refs)
if not isinstance(payload["control_results"], list) or not payload["control_results"]:
fail(f"{path} control_results must be a non-empty array")
seen = set()
for index, result in enumerate(payload["control_results"]):
label = f"{path}:control_results[{index}]"
if not isinstance(result, dict):
fail(f"{label} must be an object")
for key in ("control_id", "status", "blocking", "validated_at", "validator_ref", "evidence_refs"):
if key not in result:
fail(f"{label} missing required field {key}")
control_id = result["control_id"]
if control_id not in control_ids:
fail(f"{label} references unknown control_id {control_id}")
if control_id in seen:
fail(f"{path} repeats control_id {control_id}")
seen.add(control_id)
if result["status"] not in CONTROL_STATUSES:
fail(f"{label} uses unsupported status {result['status']}")
if result["blocking"] not in BLOCKING_LEVELS:
fail(f"{label} uses unsupported blocking value {result['blocking']}")
if not isinstance(result["evidence_refs"], list) or not result["evidence_refs"]:
fail(f"{label} evidence_refs must be a non-empty array")
for ref_index, evidence_ref in enumerate(result["evidence_refs"]):
validate_pack_reference(evidence_ref, f"{label}:evidence_refs[{ref_index}]", repo_root, slot_refs)
if expected_status == "READY":
if "settlement_event_ref" not in payload:
fail(f"{path} must include settlement_event_ref for the READY example")
statuses = {result["control_id"]: result["status"] for result in payload["control_results"]}
if statuses.get("PT-02") != "PASS" or statuses.get("TX-02") != "PASS":
fail(f"{path} must show PT-02 and TX-02 as PASS for READY examples")
if expected_status == "BLOCKED":
if "settlement_event_ref" in payload:
fail(f"{path} should omit settlement_event_ref for the BLOCKED pre-execution example")
statuses = {result["control_id"]: result["status"] for result in payload["control_results"]}
if statuses.get("PT-02") != "FAIL":
fail(f"{path} must show PT-02 as FAIL for BLOCKED examples")
if statuses.get("TX-02") not in {"FAIL", "PENDING"}:
fail(f"{path} must show TX-02 as FAIL or PENDING for BLOCKED examples")
def main() -> int:
repo_root = Path(__file__).resolve().parents[2]
config_dir = repo_root / "config/jvmtm-regulatory-closure"
matrix_path = config_dir / "transaction-compliance-matrix.json"
csv_path = config_dir / "transaction-compliance-matrix.csv"
markdown_path = config_dir / "JVMTM_TRANSACTION_GRADE_COMPLIANCE_MATRIX.md"
schema_path = config_dir / "schemas/transaction-compliance-execution.schema.json"
ready_example_path = config_dir / "examples/transaction-compliance-execution.example.json"
blocked_example_path = config_dir / "examples/transaction-compliance-execution.blocked.example.json"
for path in (matrix_path, csv_path, markdown_path, schema_path, ready_example_path, blocked_example_path):
if not path.exists():
fail(f"missing required pack file: {path}")
matrix = load_json(matrix_path)
if matrix.get("schema_version") != 1:
fail(f"{matrix_path} schema_version must equal 1")
if not isinstance(matrix.get("matrix_version"), str) or not matrix["matrix_version"]:
fail(f"{matrix_path} matrix_version must be a non-empty string")
if not isinstance(matrix.get("runtime_slots"), list) or not matrix["runtime_slots"]:
fail(f"{matrix_path} runtime_slots must be a non-empty array")
if not isinstance(matrix.get("controls"), list) or not matrix["controls"]:
fail(f"{matrix_path} controls must be a non-empty array")
if matrix.get("canonical_format") != "json":
fail(f"{matrix_path} canonical_format must equal 'json'")
if matrix.get("csv_export") != "config/jvmtm-regulatory-closure/transaction-compliance-matrix.csv":
fail(f"{matrix_path} csv_export must point to the canonical CSV path")
if not isinstance(matrix.get("source_baseline"), list) or not matrix["source_baseline"]:
fail(f"{matrix_path} source_baseline must be a non-empty array")
for baseline_ref in matrix["source_baseline"]:
if not isinstance(baseline_ref, str) or not baseline_ref.strip():
fail(f"{matrix_path} contains an invalid source_baseline entry")
if not (repo_root / baseline_ref).exists():
fail(f"{matrix_path} source_baseline path does not exist: {baseline_ref}")
slot_refs: set[str] = set()
for index, slot in enumerate(matrix["runtime_slots"]):
if not isinstance(slot, dict):
fail(f"{matrix_path} runtime_slots[{index}] must be an object")
for key in ("slot", "source", "archive_path", "description"):
if key not in slot or not isinstance(slot[key], str) or not slot[key].strip():
fail(f"{matrix_path} runtime_slots[{index}] missing non-empty {key}")
if slot["slot"] in slot_refs:
fail(f"{matrix_path} repeats runtime slot {slot['slot']}")
slot_refs.add(slot["slot"])
control_ids: set[str] = set()
for index, control in enumerate(matrix["controls"]):
label = f"{matrix_path}:controls[{index}]"
if not isinstance(control, dict):
fail(f"{label} must be an object")
for field in REQUIRED_CONTROL_FIELDS:
if field not in control:
fail(f"{label} missing field {field}")
control_id = control["control_id"]
if not isinstance(control_id, str) or not control_id.strip():
fail(f"{label} control_id must be a non-empty string")
if control_id in control_ids:
fail(f"{matrix_path} repeats control_id {control_id}")
control_ids.add(control_id)
if control["blocking_level"] not in BLOCKING_LEVELS:
fail(f"{label} uses unsupported blocking_level {control['blocking_level']}")
if not isinstance(control["applies_to_rail"], list) or not control["applies_to_rail"]:
fail(f"{label} applies_to_rail must be a non-empty array")
if any(rail not in RAIL_MODES for rail in control["applies_to_rail"]):
fail(f"{label} uses unsupported rail mode")
if not isinstance(control["source_audit_rows"], list) or not control["source_audit_rows"]:
fail(f"{label} source_audit_rows must be a non-empty array")
artifacts = control["repo_evidence_artifacts"]
if not isinstance(artifacts, list) or not artifacts:
fail(f"{label} repo_evidence_artifacts must be a non-empty array")
for artifact_index, artifact in enumerate(artifacts):
if not isinstance(artifact, dict):
fail(f"{label}:repo_evidence_artifacts[{artifact_index}] must be an object")
for key in ("artifact_type", "ref"):
if key not in artifact or not isinstance(artifact[key], str) or not artifact[key].strip():
fail(f"{label}:repo_evidence_artifacts[{artifact_index}] missing non-empty {key}")
artifact_type = artifact["artifact_type"]
ref = artifact["ref"]
if artifact_type == "repo-path":
if not (repo_root / ref).exists():
fail(f"{label}:repo_evidence_artifacts[{artifact_index}] repo-path does not exist: {ref}")
elif artifact_type == "runtime-slot":
if ref not in slot_refs:
fail(f"{label}:repo_evidence_artifacts[{artifact_index}] unknown runtime slot: {ref}")
else:
fail(f"{label}:repo_evidence_artifacts[{artifact_index}] unsupported artifact_type {artifact_type}")
expected_csv = render_csv(matrix)
actual_csv = csv_path.read_text(encoding="utf-8")
if actual_csv != expected_csv:
fail(
"transaction-compliance-matrix.csv is out of date with transaction-compliance-matrix.json; "
"run scripts/jvmtm/export-transaction-compliance-matrix-csv.py"
)
actual_rows = [line for line in actual_csv.splitlines() if line.strip()]
expected_row_count = len(matrix["controls"]) + 1
if len(actual_rows) != expected_row_count:
fail(
f"{csv_path} row count mismatch: expected {expected_row_count} including header, "
f"found {len(actual_rows)}"
)
markdown_text = markdown_path.read_text(encoding="utf-8")
if matrix["title"] not in markdown_text:
fail(f"{markdown_path} does not contain the canonical matrix title: {matrix['title']}")
missing_markdown_controls = [control_id for control_id in control_ids if control_id not in markdown_text]
if missing_markdown_controls:
fail(
f"{markdown_path} is missing control ids present in the canonical matrix: "
f"{', '.join(sorted(missing_markdown_controls))}"
)
validate_execution_example(
ready_example_path,
control_ids,
"READY",
matrix["matrix_version"],
repo_root,
slot_refs,
)
validate_execution_example(
blocked_example_path,
control_ids,
"BLOCKED",
matrix["matrix_version"],
repo_root,
slot_refs,
)
print(
"OK jvmtm transaction-grade compliance pack "
f"({len(control_ids)} controls, {len(slot_refs)} runtime slots, CSV synchronized)"
)
return 0
if __name__ == "__main__":
raise SystemExit(main())