#!/usr/bin/env python3 """Validate the JVMTM transaction-grade compliance pack.""" from __future__ import annotations import csv import io import json import sys from pathlib import Path RAIL_MODES = {"chain138-primary", "swift", "hybrid", "internal-only"} BLOCKING_LEVELS = {"HARD_STOP", "ESCALATE", "POST_EVENT"} DECISION_STATUSES = {"READY", "BLOCKED", "ESCALATE"} CONTROL_STATUSES = {"PASS", "FAIL", "PENDING", "WAIVED"} EVIDENCE_REF_TYPES = {"repo-path", "runtime-slot", "archive-path", "external-ref"} REQUIRED_CONTROL_FIELDS = [ "control_id", "phase", "domain", "requirement", "validation_method", "blocking_level", "applies_to_rail", "source_audit_rows", "repo_evidence_artifacts", "validator_command", "failure_action", "high_value_override", "notes", ] CSV_FIELDNAMES = [ "control_id", "phase", "domain", "requirement", "validation_method", "blocking_level", "applies_to_rail", "source_audit_rows", "repo_evidence_artifacts", "validator_command", "failure_action", "high_value_override", "notes", ] def fail(message: str) -> None: raise SystemExit(f"error: {message}") def load_json(path: Path) -> dict: try: return json.loads(path.read_text(encoding="utf-8")) except FileNotFoundError: fail(f"missing JSON file: {path}") except json.JSONDecodeError as exc: fail(f"invalid JSON in {path}: {exc}") def format_artifacts(artifacts: list[dict[str, str]]) -> str: return " | ".join(f'{artifact["artifact_type"]}:{artifact["ref"]}' for artifact in artifacts) def render_csv(matrix: dict) -> str: buffer = io.StringIO(newline="") writer = csv.DictWriter(buffer, fieldnames=CSV_FIELDNAMES, lineterminator="\n") writer.writeheader() for control in matrix["controls"]: writer.writerow( { "control_id": control["control_id"], "phase": control["phase"], "domain": control["domain"], "requirement": control["requirement"], "validation_method": control["validation_method"], "blocking_level": control["blocking_level"], "applies_to_rail": " | ".join(control["applies_to_rail"]), "source_audit_rows": " | ".join(control["source_audit_rows"]), "repo_evidence_artifacts": format_artifacts(control["repo_evidence_artifacts"]), "validator_command": control["validator_command"], "failure_action": control["failure_action"], "high_value_override": control["high_value_override"], "notes": control["notes"], } ) return buffer.getvalue() def validate_evidence_ref(ref: dict, label: str) -> None: if not isinstance(ref, dict): fail(f"{label} must be an object") for key in ("artifact_type", "ref"): if key not in ref or not isinstance(ref[key], str) or not ref[key].strip(): fail(f"{label} missing non-empty {key}") if ref["artifact_type"] not in EVIDENCE_REF_TYPES: fail(f"{label} uses unsupported artifact_type {ref['artifact_type']}") if "sha256" in ref: sha256 = ref["sha256"] if not isinstance(sha256, str) or len(sha256) != 64 or any(c not in "0123456789abcdefABCDEF" for c in sha256): fail(f"{label} sha256 must be a 64-character hex string") def validate_pack_reference(ref: dict, label: str, repo_root: Path, slot_refs: set[str]) -> None: validate_evidence_ref(ref, label) artifact_type = ref["artifact_type"] target = ref["ref"] if artifact_type == "repo-path": if not (repo_root / target).exists(): fail(f"{label} repo-path does not exist: {target}") elif artifact_type == "runtime-slot": if target not in slot_refs: fail(f"{label} runtime-slot does not exist in the matrix: {target}") def validate_execution_example( path: Path, control_ids: set[str], expected_status: str, matrix_version: str, repo_root: Path, slot_refs: set[str], ) -> None: payload = load_json(path) required_top_level = [ "schema_version", "matrix_version", "transaction_id", "correlation_id", "rail_mode", "amount", "currency", "decision_status", "decision_reason", "validated_at", "approved_by", "instruction_ref", "control_results", ] for field in required_top_level: if field not in payload: fail(f"{path} missing required field {field}") if payload["decision_status"] not in DECISION_STATUSES: fail(f"{path} uses unsupported decision_status {payload['decision_status']}") if payload["rail_mode"] not in RAIL_MODES: fail(f"{path} uses unsupported rail_mode {payload['rail_mode']}") if payload["decision_status"] != expected_status: fail(f"{path} decision_status expected {expected_status} but found {payload['decision_status']}") if payload["matrix_version"] != matrix_version: fail(f"{path} matrix_version {payload['matrix_version']} does not match canonical matrix_version {matrix_version}") validate_pack_reference(payload["instruction_ref"], f"{path}:instruction_ref", repo_root, slot_refs) if "settlement_event_ref" in payload: validate_pack_reference(payload["settlement_event_ref"], f"{path}:settlement_event_ref", repo_root, slot_refs) if not isinstance(payload["control_results"], list) or not payload["control_results"]: fail(f"{path} control_results must be a non-empty array") seen = set() for index, result in enumerate(payload["control_results"]): label = f"{path}:control_results[{index}]" if not isinstance(result, dict): fail(f"{label} must be an object") for key in ("control_id", "status", "blocking", "validated_at", "validator_ref", "evidence_refs"): if key not in result: fail(f"{label} missing required field {key}") control_id = result["control_id"] if control_id not in control_ids: fail(f"{label} references unknown control_id {control_id}") if control_id in seen: fail(f"{path} repeats control_id {control_id}") seen.add(control_id) if result["status"] not in CONTROL_STATUSES: fail(f"{label} uses unsupported status {result['status']}") if result["blocking"] not in BLOCKING_LEVELS: fail(f"{label} uses unsupported blocking value {result['blocking']}") if not isinstance(result["evidence_refs"], list) or not result["evidence_refs"]: fail(f"{label} evidence_refs must be a non-empty array") for ref_index, evidence_ref in enumerate(result["evidence_refs"]): validate_pack_reference(evidence_ref, f"{label}:evidence_refs[{ref_index}]", repo_root, slot_refs) if expected_status == "READY": if "settlement_event_ref" not in payload: fail(f"{path} must include settlement_event_ref for the READY example") statuses = {result["control_id"]: result["status"] for result in payload["control_results"]} if statuses.get("PT-02") != "PASS" or statuses.get("TX-02") != "PASS": fail(f"{path} must show PT-02 and TX-02 as PASS for READY examples") if expected_status == "BLOCKED": if "settlement_event_ref" in payload: fail(f"{path} should omit settlement_event_ref for the BLOCKED pre-execution example") statuses = {result["control_id"]: result["status"] for result in payload["control_results"]} if statuses.get("PT-02") != "FAIL": fail(f"{path} must show PT-02 as FAIL for BLOCKED examples") if statuses.get("TX-02") not in {"FAIL", "PENDING"}: fail(f"{path} must show TX-02 as FAIL or PENDING for BLOCKED examples") def main() -> int: repo_root = Path(__file__).resolve().parents[2] config_dir = repo_root / "config/jvmtm-regulatory-closure" matrix_path = config_dir / "transaction-compliance-matrix.json" csv_path = config_dir / "transaction-compliance-matrix.csv" markdown_path = config_dir / "JVMTM_TRANSACTION_GRADE_COMPLIANCE_MATRIX.md" schema_path = config_dir / "schemas/transaction-compliance-execution.schema.json" ready_example_path = config_dir / "examples/transaction-compliance-execution.example.json" blocked_example_path = config_dir / "examples/transaction-compliance-execution.blocked.example.json" for path in (matrix_path, csv_path, markdown_path, schema_path, ready_example_path, blocked_example_path): if not path.exists(): fail(f"missing required pack file: {path}") matrix = load_json(matrix_path) if matrix.get("schema_version") != 1: fail(f"{matrix_path} schema_version must equal 1") if not isinstance(matrix.get("matrix_version"), str) or not matrix["matrix_version"]: fail(f"{matrix_path} matrix_version must be a non-empty string") if not isinstance(matrix.get("runtime_slots"), list) or not matrix["runtime_slots"]: fail(f"{matrix_path} runtime_slots must be a non-empty array") if not isinstance(matrix.get("controls"), list) or not matrix["controls"]: fail(f"{matrix_path} controls must be a non-empty array") if matrix.get("canonical_format") != "json": fail(f"{matrix_path} canonical_format must equal 'json'") if matrix.get("csv_export") != "config/jvmtm-regulatory-closure/transaction-compliance-matrix.csv": fail(f"{matrix_path} csv_export must point to the canonical CSV path") if not isinstance(matrix.get("source_baseline"), list) or not matrix["source_baseline"]: fail(f"{matrix_path} source_baseline must be a non-empty array") for baseline_ref in matrix["source_baseline"]: if not isinstance(baseline_ref, str) or not baseline_ref.strip(): fail(f"{matrix_path} contains an invalid source_baseline entry") if not (repo_root / baseline_ref).exists(): fail(f"{matrix_path} source_baseline path does not exist: {baseline_ref}") slot_refs: set[str] = set() for index, slot in enumerate(matrix["runtime_slots"]): if not isinstance(slot, dict): fail(f"{matrix_path} runtime_slots[{index}] must be an object") for key in ("slot", "source", "archive_path", "description"): if key not in slot or not isinstance(slot[key], str) or not slot[key].strip(): fail(f"{matrix_path} runtime_slots[{index}] missing non-empty {key}") if slot["slot"] in slot_refs: fail(f"{matrix_path} repeats runtime slot {slot['slot']}") slot_refs.add(slot["slot"]) control_ids: set[str] = set() for index, control in enumerate(matrix["controls"]): label = f"{matrix_path}:controls[{index}]" if not isinstance(control, dict): fail(f"{label} must be an object") for field in REQUIRED_CONTROL_FIELDS: if field not in control: fail(f"{label} missing field {field}") control_id = control["control_id"] if not isinstance(control_id, str) or not control_id.strip(): fail(f"{label} control_id must be a non-empty string") if control_id in control_ids: fail(f"{matrix_path} repeats control_id {control_id}") control_ids.add(control_id) if control["blocking_level"] not in BLOCKING_LEVELS: fail(f"{label} uses unsupported blocking_level {control['blocking_level']}") if not isinstance(control["applies_to_rail"], list) or not control["applies_to_rail"]: fail(f"{label} applies_to_rail must be a non-empty array") if any(rail not in RAIL_MODES for rail in control["applies_to_rail"]): fail(f"{label} uses unsupported rail mode") if not isinstance(control["source_audit_rows"], list) or not control["source_audit_rows"]: fail(f"{label} source_audit_rows must be a non-empty array") artifacts = control["repo_evidence_artifacts"] if not isinstance(artifacts, list) or not artifacts: fail(f"{label} repo_evidence_artifacts must be a non-empty array") for artifact_index, artifact in enumerate(artifacts): if not isinstance(artifact, dict): fail(f"{label}:repo_evidence_artifacts[{artifact_index}] must be an object") for key in ("artifact_type", "ref"): if key not in artifact or not isinstance(artifact[key], str) or not artifact[key].strip(): fail(f"{label}:repo_evidence_artifacts[{artifact_index}] missing non-empty {key}") artifact_type = artifact["artifact_type"] ref = artifact["ref"] if artifact_type == "repo-path": if not (repo_root / ref).exists(): fail(f"{label}:repo_evidence_artifacts[{artifact_index}] repo-path does not exist: {ref}") elif artifact_type == "runtime-slot": if ref not in slot_refs: fail(f"{label}:repo_evidence_artifacts[{artifact_index}] unknown runtime slot: {ref}") else: fail(f"{label}:repo_evidence_artifacts[{artifact_index}] unsupported artifact_type {artifact_type}") expected_csv = render_csv(matrix) actual_csv = csv_path.read_text(encoding="utf-8") if actual_csv != expected_csv: fail( "transaction-compliance-matrix.csv is out of date with transaction-compliance-matrix.json; " "run scripts/jvmtm/export-transaction-compliance-matrix-csv.py" ) actual_rows = [line for line in actual_csv.splitlines() if line.strip()] expected_row_count = len(matrix["controls"]) + 1 if len(actual_rows) != expected_row_count: fail( f"{csv_path} row count mismatch: expected {expected_row_count} including header, " f"found {len(actual_rows)}" ) markdown_text = markdown_path.read_text(encoding="utf-8") if matrix["title"] not in markdown_text: fail(f"{markdown_path} does not contain the canonical matrix title: {matrix['title']}") missing_markdown_controls = [control_id for control_id in control_ids if control_id not in markdown_text] if missing_markdown_controls: fail( f"{markdown_path} is missing control ids present in the canonical matrix: " f"{', '.join(sorted(missing_markdown_controls))}" ) validate_execution_example( ready_example_path, control_ids, "READY", matrix["matrix_version"], repo_root, slot_refs, ) validate_execution_example( blocked_example_path, control_ids, "BLOCKED", matrix["matrix_version"], repo_root, slot_refs, ) print( "OK jvmtm transaction-grade compliance pack " f"({len(control_ids)} controls, {len(slot_refs)} runtime slots, CSV synchronized)" ) return 0 if __name__ == "__main__": raise SystemExit(main())