Files
proxmox/scripts/it-ops/compute_ipam_drift.py
defiQUG dbd517b279 Sync workspace: config, docs, scripts, CI, operator rules, and submodule pointers.
- Update dbis_core, cross-chain-pmm-lps, explorer-monorepo, metamask-integration, pr-workspace/chains
- Omit embedded publish git dirs and empty placeholders from index

Made-with: Cursor
2026-04-12 06:12:20 -07:00

251 lines
8.6 KiB
Python
Executable File

#!/usr/bin/env python3
"""Merge live JSON with config/ip-addresses.conf; write live_inventory.json + drift.json."""
from __future__ import annotations
import argparse
import json
import re
import sys
from pathlib import Path
IPV4_RE = re.compile(
r"(?<![0-9.])(?:[0-9]{1,3}\.){3}[0-9]{1,3}(?![0-9.])"
)
# VMID | IP | ... (optional ** markdown bold around cells)
MD_VMID_IP_ROW = re.compile(
r"^\|\s*\*{0,2}(\d+)\*{0,2}\s*\|\s*\*{0,2}((?:[0-9]{1,3}\.){3}[0-9]{1,3})\*{0,2}\s*\|"
)
def is_lan_11(ip: str) -> bool:
return ip.startswith("192.168.11.")
def parse_all_vmids_markdown(path: Path) -> tuple[set[str], dict[str, str]]:
"""Extract declared LAN IPs and vmid->ip from ALL_VMIDS pipe tables."""
ips: set[str] = set()
vmid_to_ip: dict[str, str] = {}
if not path.is_file():
return ips, vmid_to_ip
for line in path.read_text(encoding="utf-8", errors="replace").splitlines():
m = MD_VMID_IP_ROW.match(line.strip())
if not m:
continue
vmid, ip = m.group(1), m.group(2)
if is_lan_11(ip):
ips.add(ip)
vmid_to_ip[vmid] = ip
return ips, vmid_to_ip
def parse_ip_addresses_conf(path: Path) -> tuple[dict[str, str], set[str]]:
var_map: dict[str, str] = {}
all_ips: set[str] = set()
if not path.is_file():
return var_map, all_ips
for line in path.read_text(encoding="utf-8", errors="replace").splitlines():
s = line.strip()
if not s or s.startswith("#") or "=" not in s:
continue
key, _, val = s.partition("=")
key = key.strip()
val = val.strip()
if val.startswith('"') and val.endswith('"'):
val = val[1:-1]
elif val.startswith("'") and val.endswith("'"):
val = val[1:-1]
var_map[key] = val
for m in IPV4_RE.findall(val):
all_ips.add(m)
return var_map, all_ips
def hypervisor_related_keys(var_map: dict[str, str]) -> set[str]:
keys = set()
for k in var_map:
ku = k.upper()
if any(
x in ku
for x in (
"PROXMOX_HOST",
"PROXMOX_ML110",
"PROXMOX_R630",
"PROXMOX_R750",
"WAN_AGGREGATOR",
"NETWORK_GATEWAY",
"UDM_PRO",
"PUBLIC_IP_GATEWAY",
"PUBLIC_IP_ER605",
)
):
keys.add(k)
return keys
def main() -> None:
ap = argparse.ArgumentParser()
ap.add_argument("--live", type=Path, help="live JSON file (default stdin)")
ap.add_argument(
"--ip-conf",
type=Path,
default=Path("config/ip-addresses.conf"),
help="path to ip-addresses.conf",
)
ap.add_argument("--out-dir", type=Path, required=True)
ap.add_argument(
"--all-vmids-md",
type=Path,
default=None,
help="optional ALL_VMIDS_ENDPOINTS.md for declared VMID/IP tables",
)
args = ap.parse_args()
if args.live:
live_raw = args.live.read_text(encoding="utf-8")
else:
live_raw = sys.stdin.read()
try:
live = json.loads(live_raw)
except json.JSONDecodeError as e:
print(f"Invalid live JSON: {e}", file=sys.stderr)
sys.exit(1)
guests = live.get("guests") or []
var_map, conf_ips = parse_ip_addresses_conf(args.ip_conf)
doc_ips: set[str] = set()
vmid_to_ip_doc: dict[str, str] = {}
if args.all_vmids_md:
doc_ips, vmid_to_ip_doc = parse_all_vmids_markdown(args.all_vmids_md)
declared_union = conf_ips | doc_ips
hyp_keys = hypervisor_related_keys(var_map)
hyp_ips: set[str] = set()
for k in hyp_keys:
if k not in var_map:
continue
for m in IPV4_RE.findall(var_map[k]):
hyp_ips.add(m)
ip_to_rows: dict[str, list[dict]] = {}
vmid_to_ip_live: dict[str, str] = {}
live_vmids_all: set[str] = set()
for g in guests:
ip = (g.get("ip") or "").strip()
vmid = str(g.get("vmid", "")).strip()
if vmid:
live_vmids_all.add(vmid)
if ip:
ip_to_rows.setdefault(ip, []).append(g)
if vmid and ip:
vmid_to_ip_live[vmid] = ip
doc_vmids = set(vmid_to_ip_doc.keys())
vmids_in_all_vmids_doc_not_on_cluster = sorted(
doc_vmids - live_vmids_all, key=lambda x: int(x) if x.isdigit() else 0
)
only_live_not_in_doc = live_vmids_all - doc_vmids
vmids_on_cluster_not_in_all_vmids_table_count = len(only_live_not_in_doc)
vmids_on_cluster_not_in_all_vmids_table_sample = sorted(
only_live_not_in_doc, key=lambda x: int(x) if x.isdigit() else 0
)[:100]
ip_to_vmids: dict[str, list[str]] = {
ip: [str(r.get("vmid", "") or "?").strip() or "?" for r in rows]
for ip, rows in ip_to_rows.items()
}
duplicate_ips: dict[str, list[str]] = {}
same_name_duplicate_ip: dict[str, list[str]] = {}
for ip, rows in ip_to_rows.items():
if len(rows) < 2:
continue
names = {(str(r.get("name") or "").strip().lower()) for r in rows}
names.discard("")
vmids = [str(r.get("vmid", "") or "?").strip() or "?" for r in rows]
if len(names) == 1:
# Same guest name on multiple VMIDs (e.g. clone/migration) — informational only.
same_name_duplicate_ip[ip] = sorted(vmids, key=lambda x: int(x) if x.isdigit() else 0)
else:
duplicate_ips[ip] = vmids
guest_ip_set = set(ip_to_vmids.keys())
conf_only = sorted(conf_ips - guest_ip_set - hyp_ips)
live_only_legacy = sorted(guest_ip_set - conf_ips)
declared_lan11 = {ip for ip in declared_union if is_lan_11(ip)}
guest_lan11 = {ip for ip in guest_ip_set if is_lan_11(ip)}
guest_lan_not_declared = sorted(
guest_lan11 - declared_union - hyp_ips
)
declared_lan11_not_on_guests = sorted(
declared_lan11 - guest_ip_set - hyp_ips
)
vmid_ip_mismatch: list[dict[str, str]] = []
for vmid, doc_ip in vmid_to_ip_doc.items():
lip = vmid_to_ip_live.get(vmid)
if lip and doc_ip and lip != doc_ip:
vmid_ip_mismatch.append(
{"vmid": vmid, "live_ip": lip, "all_vmids_doc_ip": doc_ip}
)
drift = {
"collected_at": live.get("collected_at"),
"guest_count": len(guests),
"duplicate_ips": duplicate_ips,
"same_name_duplicate_ip_guests": same_name_duplicate_ip,
"guest_ips_not_in_ip_addresses_conf": live_only_legacy,
"ip_addresses_conf_ips_not_on_guests": conf_only,
"guest_lan_ips_not_in_declared_sources": guest_lan_not_declared,
"declared_lan11_ips_not_on_live_guests": declared_lan11_not_on_guests,
"vmid_ip_mismatch_live_vs_all_vmids_doc": vmid_ip_mismatch,
"vmids_in_all_vmids_doc_not_on_cluster": vmids_in_all_vmids_doc_not_on_cluster,
"vmids_on_cluster_not_in_all_vmids_table": {
"count": vmids_on_cluster_not_in_all_vmids_table_count,
"sample_vmids": vmids_on_cluster_not_in_all_vmids_table_sample,
"note": "ALL_VMIDS_ENDPOINTS pipe tables do not list every guest; large count is normal.",
},
"hypervisor_and_infra_ips_excluded_from_guest_match": sorted(hyp_ips),
"declared_sources": {
"ip_addresses_conf_ipv4_count": len(conf_ips),
"all_vmids_md_lan11_count": len(doc_ips),
"all_vmids_md_row_count": len(doc_vmids),
},
"notes": [],
}
if live.get("error"):
drift["notes"].append(str(live["error"]))
if same_name_duplicate_ip:
drift["notes"].append(
"same_name_duplicate_ip_guests: multiple VMIDs share an IP but identical "
"guest name — resolve duplicate CTs/VMs in Proxmox; drift exit code not raised."
)
inv_out = {
"collected_at": live.get("collected_at"),
"source": "proxmox_cluster_pvesh_plus_config",
"guests": guests,
}
neigh = live.get("ip_neigh_vmbr0_sample")
if isinstance(neigh, dict):
inv_out["ip_neigh_vmbr0_sample"] = neigh
args.out_dir.mkdir(parents=True, exist_ok=True)
(args.out_dir / "live_inventory.json").write_text(
json.dumps(inv_out, indent=2), encoding="utf-8"
)
(args.out_dir / "drift.json").write_text(
json.dumps(drift, indent=2), encoding="utf-8"
)
print(f"Wrote {args.out_dir / 'live_inventory.json'}")
print(f"Wrote {args.out_dir / 'drift.json'}")
# Exit 2 only when the same LAN IP is claimed by guests with different names
# (likely address conflict). Same-name clones are in same_name_duplicate_ip_guests only.
sys.exit(2 if duplicate_ips else 0)
if __name__ == "__main__":
main()