- Institutional / JVMTM / reserve-provenance / GRU transport + standards JSON - Validation and verify scripts (Blockscout labels, x402, GRU preflight, P1 local path) - Wormhole wiring in AGENTS, MCP_SETUP, MASTER_INDEX, 04-configuration README - Meta docs, integration gaps, live verification log, architecture updates - CI validate-config workflow updates Operator/LAN items, submodule working trees, and public token-aggregation edge routes remain follow-up (see TODOS_CONSOLIDATED P1). Made-with: Cursor
342 lines
15 KiB
Python
Executable File
342 lines
15 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""Validate the JVMTM transaction-grade compliance pack."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import csv
|
|
import io
|
|
import json
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
|
|
RAIL_MODES = {"chain138-primary", "swift", "hybrid", "internal-only"}
|
|
BLOCKING_LEVELS = {"HARD_STOP", "ESCALATE", "POST_EVENT"}
|
|
DECISION_STATUSES = {"READY", "BLOCKED", "ESCALATE"}
|
|
CONTROL_STATUSES = {"PASS", "FAIL", "PENDING", "WAIVED"}
|
|
EVIDENCE_REF_TYPES = {"repo-path", "runtime-slot", "archive-path", "external-ref"}
|
|
REQUIRED_CONTROL_FIELDS = [
|
|
"control_id",
|
|
"phase",
|
|
"domain",
|
|
"requirement",
|
|
"validation_method",
|
|
"blocking_level",
|
|
"applies_to_rail",
|
|
"source_audit_rows",
|
|
"repo_evidence_artifacts",
|
|
"validator_command",
|
|
"failure_action",
|
|
"high_value_override",
|
|
"notes",
|
|
]
|
|
CSV_FIELDNAMES = [
|
|
"control_id",
|
|
"phase",
|
|
"domain",
|
|
"requirement",
|
|
"validation_method",
|
|
"blocking_level",
|
|
"applies_to_rail",
|
|
"source_audit_rows",
|
|
"repo_evidence_artifacts",
|
|
"validator_command",
|
|
"failure_action",
|
|
"high_value_override",
|
|
"notes",
|
|
]
|
|
|
|
|
|
def fail(message: str) -> None:
|
|
raise SystemExit(f"error: {message}")
|
|
|
|
|
|
def load_json(path: Path) -> dict:
|
|
try:
|
|
return json.loads(path.read_text(encoding="utf-8"))
|
|
except FileNotFoundError:
|
|
fail(f"missing JSON file: {path}")
|
|
except json.JSONDecodeError as exc:
|
|
fail(f"invalid JSON in {path}: {exc}")
|
|
|
|
|
|
def format_artifacts(artifacts: list[dict[str, str]]) -> str:
|
|
return " | ".join(f'{artifact["artifact_type"]}:{artifact["ref"]}' for artifact in artifacts)
|
|
|
|
|
|
def render_csv(matrix: dict) -> str:
|
|
buffer = io.StringIO(newline="")
|
|
writer = csv.DictWriter(buffer, fieldnames=CSV_FIELDNAMES, lineterminator="\n")
|
|
writer.writeheader()
|
|
for control in matrix["controls"]:
|
|
writer.writerow(
|
|
{
|
|
"control_id": control["control_id"],
|
|
"phase": control["phase"],
|
|
"domain": control["domain"],
|
|
"requirement": control["requirement"],
|
|
"validation_method": control["validation_method"],
|
|
"blocking_level": control["blocking_level"],
|
|
"applies_to_rail": " | ".join(control["applies_to_rail"]),
|
|
"source_audit_rows": " | ".join(control["source_audit_rows"]),
|
|
"repo_evidence_artifacts": format_artifacts(control["repo_evidence_artifacts"]),
|
|
"validator_command": control["validator_command"],
|
|
"failure_action": control["failure_action"],
|
|
"high_value_override": control["high_value_override"],
|
|
"notes": control["notes"],
|
|
}
|
|
)
|
|
return buffer.getvalue()
|
|
|
|
|
|
def validate_evidence_ref(ref: dict, label: str) -> None:
|
|
if not isinstance(ref, dict):
|
|
fail(f"{label} must be an object")
|
|
for key in ("artifact_type", "ref"):
|
|
if key not in ref or not isinstance(ref[key], str) or not ref[key].strip():
|
|
fail(f"{label} missing non-empty {key}")
|
|
if ref["artifact_type"] not in EVIDENCE_REF_TYPES:
|
|
fail(f"{label} uses unsupported artifact_type {ref['artifact_type']}")
|
|
if "sha256" in ref:
|
|
sha256 = ref["sha256"]
|
|
if not isinstance(sha256, str) or len(sha256) != 64 or any(c not in "0123456789abcdefABCDEF" for c in sha256):
|
|
fail(f"{label} sha256 must be a 64-character hex string")
|
|
|
|
|
|
def validate_pack_reference(ref: dict, label: str, repo_root: Path, slot_refs: set[str]) -> None:
|
|
validate_evidence_ref(ref, label)
|
|
artifact_type = ref["artifact_type"]
|
|
target = ref["ref"]
|
|
if artifact_type == "repo-path":
|
|
if not (repo_root / target).exists():
|
|
fail(f"{label} repo-path does not exist: {target}")
|
|
elif artifact_type == "runtime-slot":
|
|
if target not in slot_refs:
|
|
fail(f"{label} runtime-slot does not exist in the matrix: {target}")
|
|
|
|
|
|
def validate_execution_example(
|
|
path: Path,
|
|
control_ids: set[str],
|
|
expected_status: str,
|
|
matrix_version: str,
|
|
repo_root: Path,
|
|
slot_refs: set[str],
|
|
) -> None:
|
|
payload = load_json(path)
|
|
required_top_level = [
|
|
"schema_version",
|
|
"matrix_version",
|
|
"transaction_id",
|
|
"correlation_id",
|
|
"rail_mode",
|
|
"amount",
|
|
"currency",
|
|
"decision_status",
|
|
"decision_reason",
|
|
"validated_at",
|
|
"approved_by",
|
|
"instruction_ref",
|
|
"control_results",
|
|
]
|
|
for field in required_top_level:
|
|
if field not in payload:
|
|
fail(f"{path} missing required field {field}")
|
|
|
|
if payload["decision_status"] not in DECISION_STATUSES:
|
|
fail(f"{path} uses unsupported decision_status {payload['decision_status']}")
|
|
if payload["rail_mode"] not in RAIL_MODES:
|
|
fail(f"{path} uses unsupported rail_mode {payload['rail_mode']}")
|
|
if payload["decision_status"] != expected_status:
|
|
fail(f"{path} decision_status expected {expected_status} but found {payload['decision_status']}")
|
|
if payload["matrix_version"] != matrix_version:
|
|
fail(f"{path} matrix_version {payload['matrix_version']} does not match canonical matrix_version {matrix_version}")
|
|
|
|
validate_pack_reference(payload["instruction_ref"], f"{path}:instruction_ref", repo_root, slot_refs)
|
|
if "settlement_event_ref" in payload:
|
|
validate_pack_reference(payload["settlement_event_ref"], f"{path}:settlement_event_ref", repo_root, slot_refs)
|
|
|
|
if not isinstance(payload["control_results"], list) or not payload["control_results"]:
|
|
fail(f"{path} control_results must be a non-empty array")
|
|
|
|
seen = set()
|
|
for index, result in enumerate(payload["control_results"]):
|
|
label = f"{path}:control_results[{index}]"
|
|
if not isinstance(result, dict):
|
|
fail(f"{label} must be an object")
|
|
for key in ("control_id", "status", "blocking", "validated_at", "validator_ref", "evidence_refs"):
|
|
if key not in result:
|
|
fail(f"{label} missing required field {key}")
|
|
control_id = result["control_id"]
|
|
if control_id not in control_ids:
|
|
fail(f"{label} references unknown control_id {control_id}")
|
|
if control_id in seen:
|
|
fail(f"{path} repeats control_id {control_id}")
|
|
seen.add(control_id)
|
|
if result["status"] not in CONTROL_STATUSES:
|
|
fail(f"{label} uses unsupported status {result['status']}")
|
|
if result["blocking"] not in BLOCKING_LEVELS:
|
|
fail(f"{label} uses unsupported blocking value {result['blocking']}")
|
|
if not isinstance(result["evidence_refs"], list) or not result["evidence_refs"]:
|
|
fail(f"{label} evidence_refs must be a non-empty array")
|
|
for ref_index, evidence_ref in enumerate(result["evidence_refs"]):
|
|
validate_pack_reference(evidence_ref, f"{label}:evidence_refs[{ref_index}]", repo_root, slot_refs)
|
|
|
|
if expected_status == "READY":
|
|
if "settlement_event_ref" not in payload:
|
|
fail(f"{path} must include settlement_event_ref for the READY example")
|
|
statuses = {result["control_id"]: result["status"] for result in payload["control_results"]}
|
|
if statuses.get("PT-02") != "PASS" or statuses.get("TX-02") != "PASS":
|
|
fail(f"{path} must show PT-02 and TX-02 as PASS for READY examples")
|
|
if expected_status == "BLOCKED":
|
|
if "settlement_event_ref" in payload:
|
|
fail(f"{path} should omit settlement_event_ref for the BLOCKED pre-execution example")
|
|
statuses = {result["control_id"]: result["status"] for result in payload["control_results"]}
|
|
if statuses.get("PT-02") != "FAIL":
|
|
fail(f"{path} must show PT-02 as FAIL for BLOCKED examples")
|
|
if statuses.get("TX-02") not in {"FAIL", "PENDING"}:
|
|
fail(f"{path} must show TX-02 as FAIL or PENDING for BLOCKED examples")
|
|
|
|
|
|
def main() -> int:
|
|
repo_root = Path(__file__).resolve().parents[2]
|
|
config_dir = repo_root / "config/jvmtm-regulatory-closure"
|
|
|
|
matrix_path = config_dir / "transaction-compliance-matrix.json"
|
|
csv_path = config_dir / "transaction-compliance-matrix.csv"
|
|
markdown_path = config_dir / "JVMTM_TRANSACTION_GRADE_COMPLIANCE_MATRIX.md"
|
|
schema_path = config_dir / "schemas/transaction-compliance-execution.schema.json"
|
|
ready_example_path = config_dir / "examples/transaction-compliance-execution.example.json"
|
|
blocked_example_path = config_dir / "examples/transaction-compliance-execution.blocked.example.json"
|
|
|
|
for path in (matrix_path, csv_path, markdown_path, schema_path, ready_example_path, blocked_example_path):
|
|
if not path.exists():
|
|
fail(f"missing required pack file: {path}")
|
|
|
|
matrix = load_json(matrix_path)
|
|
if matrix.get("schema_version") != 1:
|
|
fail(f"{matrix_path} schema_version must equal 1")
|
|
if not isinstance(matrix.get("matrix_version"), str) or not matrix["matrix_version"]:
|
|
fail(f"{matrix_path} matrix_version must be a non-empty string")
|
|
if not isinstance(matrix.get("runtime_slots"), list) or not matrix["runtime_slots"]:
|
|
fail(f"{matrix_path} runtime_slots must be a non-empty array")
|
|
if not isinstance(matrix.get("controls"), list) or not matrix["controls"]:
|
|
fail(f"{matrix_path} controls must be a non-empty array")
|
|
if matrix.get("canonical_format") != "json":
|
|
fail(f"{matrix_path} canonical_format must equal 'json'")
|
|
if matrix.get("csv_export") != "config/jvmtm-regulatory-closure/transaction-compliance-matrix.csv":
|
|
fail(f"{matrix_path} csv_export must point to the canonical CSV path")
|
|
if not isinstance(matrix.get("source_baseline"), list) or not matrix["source_baseline"]:
|
|
fail(f"{matrix_path} source_baseline must be a non-empty array")
|
|
for baseline_ref in matrix["source_baseline"]:
|
|
if not isinstance(baseline_ref, str) or not baseline_ref.strip():
|
|
fail(f"{matrix_path} contains an invalid source_baseline entry")
|
|
if not (repo_root / baseline_ref).exists():
|
|
fail(f"{matrix_path} source_baseline path does not exist: {baseline_ref}")
|
|
|
|
slot_refs: set[str] = set()
|
|
for index, slot in enumerate(matrix["runtime_slots"]):
|
|
if not isinstance(slot, dict):
|
|
fail(f"{matrix_path} runtime_slots[{index}] must be an object")
|
|
for key in ("slot", "source", "archive_path", "description"):
|
|
if key not in slot or not isinstance(slot[key], str) or not slot[key].strip():
|
|
fail(f"{matrix_path} runtime_slots[{index}] missing non-empty {key}")
|
|
if slot["slot"] in slot_refs:
|
|
fail(f"{matrix_path} repeats runtime slot {slot['slot']}")
|
|
slot_refs.add(slot["slot"])
|
|
|
|
control_ids: set[str] = set()
|
|
for index, control in enumerate(matrix["controls"]):
|
|
label = f"{matrix_path}:controls[{index}]"
|
|
if not isinstance(control, dict):
|
|
fail(f"{label} must be an object")
|
|
for field in REQUIRED_CONTROL_FIELDS:
|
|
if field not in control:
|
|
fail(f"{label} missing field {field}")
|
|
control_id = control["control_id"]
|
|
if not isinstance(control_id, str) or not control_id.strip():
|
|
fail(f"{label} control_id must be a non-empty string")
|
|
if control_id in control_ids:
|
|
fail(f"{matrix_path} repeats control_id {control_id}")
|
|
control_ids.add(control_id)
|
|
if control["blocking_level"] not in BLOCKING_LEVELS:
|
|
fail(f"{label} uses unsupported blocking_level {control['blocking_level']}")
|
|
if not isinstance(control["applies_to_rail"], list) or not control["applies_to_rail"]:
|
|
fail(f"{label} applies_to_rail must be a non-empty array")
|
|
if any(rail not in RAIL_MODES for rail in control["applies_to_rail"]):
|
|
fail(f"{label} uses unsupported rail mode")
|
|
if not isinstance(control["source_audit_rows"], list) or not control["source_audit_rows"]:
|
|
fail(f"{label} source_audit_rows must be a non-empty array")
|
|
artifacts = control["repo_evidence_artifacts"]
|
|
if not isinstance(artifacts, list) or not artifacts:
|
|
fail(f"{label} repo_evidence_artifacts must be a non-empty array")
|
|
for artifact_index, artifact in enumerate(artifacts):
|
|
if not isinstance(artifact, dict):
|
|
fail(f"{label}:repo_evidence_artifacts[{artifact_index}] must be an object")
|
|
for key in ("artifact_type", "ref"):
|
|
if key not in artifact or not isinstance(artifact[key], str) or not artifact[key].strip():
|
|
fail(f"{label}:repo_evidence_artifacts[{artifact_index}] missing non-empty {key}")
|
|
artifact_type = artifact["artifact_type"]
|
|
ref = artifact["ref"]
|
|
if artifact_type == "repo-path":
|
|
if not (repo_root / ref).exists():
|
|
fail(f"{label}:repo_evidence_artifacts[{artifact_index}] repo-path does not exist: {ref}")
|
|
elif artifact_type == "runtime-slot":
|
|
if ref not in slot_refs:
|
|
fail(f"{label}:repo_evidence_artifacts[{artifact_index}] unknown runtime slot: {ref}")
|
|
else:
|
|
fail(f"{label}:repo_evidence_artifacts[{artifact_index}] unsupported artifact_type {artifact_type}")
|
|
|
|
expected_csv = render_csv(matrix)
|
|
actual_csv = csv_path.read_text(encoding="utf-8")
|
|
if actual_csv != expected_csv:
|
|
fail(
|
|
"transaction-compliance-matrix.csv is out of date with transaction-compliance-matrix.json; "
|
|
"run scripts/jvmtm/export-transaction-compliance-matrix-csv.py"
|
|
)
|
|
|
|
actual_rows = [line for line in actual_csv.splitlines() if line.strip()]
|
|
expected_row_count = len(matrix["controls"]) + 1
|
|
if len(actual_rows) != expected_row_count:
|
|
fail(
|
|
f"{csv_path} row count mismatch: expected {expected_row_count} including header, "
|
|
f"found {len(actual_rows)}"
|
|
)
|
|
|
|
markdown_text = markdown_path.read_text(encoding="utf-8")
|
|
if matrix["title"] not in markdown_text:
|
|
fail(f"{markdown_path} does not contain the canonical matrix title: {matrix['title']}")
|
|
missing_markdown_controls = [control_id for control_id in control_ids if control_id not in markdown_text]
|
|
if missing_markdown_controls:
|
|
fail(
|
|
f"{markdown_path} is missing control ids present in the canonical matrix: "
|
|
f"{', '.join(sorted(missing_markdown_controls))}"
|
|
)
|
|
|
|
validate_execution_example(
|
|
ready_example_path,
|
|
control_ids,
|
|
"READY",
|
|
matrix["matrix_version"],
|
|
repo_root,
|
|
slot_refs,
|
|
)
|
|
validate_execution_example(
|
|
blocked_example_path,
|
|
control_ids,
|
|
"BLOCKED",
|
|
matrix["matrix_version"],
|
|
repo_root,
|
|
slot_refs,
|
|
)
|
|
|
|
print(
|
|
"OK jvmtm transaction-grade compliance pack "
|
|
f"({len(control_ids)} controls, {len(slot_refs)} runtime slots, CSV synchronized)"
|
|
)
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|