#!/usr/bin/env python3 """Merge live JSON with config/ip-addresses.conf; write live_inventory.json + drift.json.""" from __future__ import annotations import argparse import json import re import sys from pathlib import Path IPV4_RE = re.compile( r"(? bool: return ip.startswith("192.168.11.") def parse_all_vmids_markdown(path: Path) -> tuple[set[str], dict[str, str]]: """Extract declared LAN IPs and vmid->ip from ALL_VMIDS pipe tables.""" ips: set[str] = set() vmid_to_ip: dict[str, str] = {} if not path.is_file(): return ips, vmid_to_ip for line in path.read_text(encoding="utf-8", errors="replace").splitlines(): m = MD_VMID_IP_ROW.match(line.strip()) if not m: continue vmid, ip = m.group(1), m.group(2) if is_lan_11(ip): ips.add(ip) vmid_to_ip[vmid] = ip return ips, vmid_to_ip def parse_ip_addresses_conf(path: Path) -> tuple[dict[str, str], set[str]]: var_map: dict[str, str] = {} all_ips: set[str] = set() if not path.is_file(): return var_map, all_ips for line in path.read_text(encoding="utf-8", errors="replace").splitlines(): s = line.strip() if not s or s.startswith("#") or "=" not in s: continue key, _, val = s.partition("=") key = key.strip() val = val.strip() if val.startswith('"') and val.endswith('"'): val = val[1:-1] elif val.startswith("'") and val.endswith("'"): val = val[1:-1] var_map[key] = val for m in IPV4_RE.findall(val): all_ips.add(m) return var_map, all_ips def hypervisor_related_keys(var_map: dict[str, str]) -> set[str]: keys = set() for k in var_map: ku = k.upper() if any( x in ku for x in ( "PROXMOX_HOST", "PROXMOX_ML110", "PROXMOX_R630", "PROXMOX_R750", "WAN_AGGREGATOR", "NETWORK_GATEWAY", "UDM_PRO", "PUBLIC_IP_GATEWAY", "PUBLIC_IP_ER605", ) ): keys.add(k) return keys def main() -> None: ap = argparse.ArgumentParser() ap.add_argument("--live", type=Path, help="live JSON file (default stdin)") ap.add_argument( "--ip-conf", type=Path, default=Path("config/ip-addresses.conf"), help="path to ip-addresses.conf", ) ap.add_argument("--out-dir", type=Path, required=True) ap.add_argument( "--all-vmids-md", type=Path, default=None, help="optional ALL_VMIDS_ENDPOINTS.md for declared VMID/IP tables", ) args = ap.parse_args() if args.live: live_raw = args.live.read_text(encoding="utf-8") else: live_raw = sys.stdin.read() try: live = json.loads(live_raw) except json.JSONDecodeError as e: print(f"Invalid live JSON: {e}", file=sys.stderr) sys.exit(1) guests = live.get("guests") or [] var_map, conf_ips = parse_ip_addresses_conf(args.ip_conf) doc_ips: set[str] = set() vmid_to_ip_doc: dict[str, str] = {} if args.all_vmids_md: doc_ips, vmid_to_ip_doc = parse_all_vmids_markdown(args.all_vmids_md) declared_union = conf_ips | doc_ips hyp_keys = hypervisor_related_keys(var_map) hyp_ips: set[str] = set() for k in hyp_keys: if k not in var_map: continue for m in IPV4_RE.findall(var_map[k]): hyp_ips.add(m) ip_to_vmids: dict[str, list[str]] = {} vmid_to_ip_live: dict[str, str] = {} for g in guests: ip = (g.get("ip") or "").strip() vmid = str(g.get("vmid", "")).strip() if ip: ip_to_vmids.setdefault(ip, []).append(vmid or "?") if vmid and ip: vmid_to_ip_live[vmid] = ip duplicate_ips = {ip: vms for ip, vms in ip_to_vmids.items() if len(vms) > 1} guest_ip_set = set(ip_to_vmids.keys()) conf_only = sorted(conf_ips - guest_ip_set - hyp_ips) live_only_legacy = sorted(guest_ip_set - conf_ips) declared_lan11 = {ip for ip in declared_union if is_lan_11(ip)} guest_lan11 = {ip for ip in guest_ip_set if is_lan_11(ip)} guest_lan_not_declared = sorted( guest_lan11 - declared_union - hyp_ips ) declared_lan11_not_on_guests = sorted( declared_lan11 - guest_ip_set - hyp_ips ) vmid_ip_mismatch: list[dict[str, str]] = [] for vmid, doc_ip in vmid_to_ip_doc.items(): lip = vmid_to_ip_live.get(vmid) if lip and doc_ip and lip != doc_ip: vmid_ip_mismatch.append( {"vmid": vmid, "live_ip": lip, "all_vmids_doc_ip": doc_ip} ) drift = { "collected_at": live.get("collected_at"), "guest_count": len(guests), "duplicate_ips": duplicate_ips, "guest_ips_not_in_ip_addresses_conf": live_only_legacy, "ip_addresses_conf_ips_not_on_guests": conf_only, "guest_lan_ips_not_in_declared_sources": guest_lan_not_declared, "declared_lan11_ips_not_on_live_guests": declared_lan11_not_on_guests, "vmid_ip_mismatch_live_vs_all_vmids_doc": vmid_ip_mismatch, "hypervisor_and_infra_ips_excluded_from_guest_match": sorted(hyp_ips), "declared_sources": { "ip_addresses_conf_ipv4_count": len(conf_ips), "all_vmids_md_lan11_count": len(doc_ips), }, "notes": [], } if live.get("error"): drift["notes"].append(str(live["error"])) inv_out = { "collected_at": live.get("collected_at"), "source": "proxmox_cluster_pvesh_plus_config", "guests": guests, } args.out_dir.mkdir(parents=True, exist_ok=True) (args.out_dir / "live_inventory.json").write_text( json.dumps(inv_out, indent=2), encoding="utf-8" ) (args.out_dir / "drift.json").write_text( json.dumps(drift, indent=2), encoding="utf-8" ) print(f"Wrote {args.out_dir / 'live_inventory.json'}") print(f"Wrote {args.out_dir / 'drift.json'}") # Exit 2 only for duplicate guest IPs (hard failure). VMID vs ALL_VMIDS doc # mismatches are informational — documentation often lags live `pct set`. sys.exit(2 if duplicate_ips else 0) if __name__ == "__main__": main()