- Add scripts/it-ops (Proxmox collector, IPAM drift, export orchestrator) - Add sankofa-it-read-api stub with optional CORS and refresh - Add systemd examples for read API, weekly inventory export, timer - Add live-inventory-drift GitHub workflow (dispatch + weekly) - Add IT controller spec, runbooks, Keycloak ensure-it-admin-role script - Note IT_READ_API env on portal sync completion output Made-with: Cursor
204 lines
6.5 KiB
Python
Executable File
204 lines
6.5 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""Merge live JSON with config/ip-addresses.conf; write live_inventory.json + drift.json."""
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import re
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
IPV4_RE = re.compile(
|
|
r"(?<![0-9.])(?:[0-9]{1,3}\.){3}[0-9]{1,3}(?![0-9.])"
|
|
)
|
|
# VMID | IP | ... (optional ** markdown bold around cells)
|
|
MD_VMID_IP_ROW = re.compile(
|
|
r"^\|\s*\*{0,2}(\d+)\*{0,2}\s*\|\s*\*{0,2}((?:[0-9]{1,3}\.){3}[0-9]{1,3})\*{0,2}\s*\|"
|
|
)
|
|
|
|
|
|
def is_lan_11(ip: str) -> bool:
|
|
return ip.startswith("192.168.11.")
|
|
|
|
|
|
def parse_all_vmids_markdown(path: Path) -> tuple[set[str], dict[str, str]]:
|
|
"""Extract declared LAN IPs and vmid->ip from ALL_VMIDS pipe tables."""
|
|
ips: set[str] = set()
|
|
vmid_to_ip: dict[str, str] = {}
|
|
if not path.is_file():
|
|
return ips, vmid_to_ip
|
|
for line in path.read_text(encoding="utf-8", errors="replace").splitlines():
|
|
m = MD_VMID_IP_ROW.match(line.strip())
|
|
if not m:
|
|
continue
|
|
vmid, ip = m.group(1), m.group(2)
|
|
if is_lan_11(ip):
|
|
ips.add(ip)
|
|
vmid_to_ip[vmid] = ip
|
|
return ips, vmid_to_ip
|
|
|
|
|
|
def parse_ip_addresses_conf(path: Path) -> tuple[dict[str, str], set[str]]:
|
|
var_map: dict[str, str] = {}
|
|
all_ips: set[str] = set()
|
|
if not path.is_file():
|
|
return var_map, all_ips
|
|
for line in path.read_text(encoding="utf-8", errors="replace").splitlines():
|
|
s = line.strip()
|
|
if not s or s.startswith("#") or "=" not in s:
|
|
continue
|
|
key, _, val = s.partition("=")
|
|
key = key.strip()
|
|
val = val.strip()
|
|
if val.startswith('"') and val.endswith('"'):
|
|
val = val[1:-1]
|
|
elif val.startswith("'") and val.endswith("'"):
|
|
val = val[1:-1]
|
|
var_map[key] = val
|
|
for m in IPV4_RE.findall(val):
|
|
all_ips.add(m)
|
|
return var_map, all_ips
|
|
|
|
|
|
def hypervisor_related_keys(var_map: dict[str, str]) -> set[str]:
|
|
keys = set()
|
|
for k in var_map:
|
|
ku = k.upper()
|
|
if any(
|
|
x in ku
|
|
for x in (
|
|
"PROXMOX_HOST",
|
|
"PROXMOX_ML110",
|
|
"PROXMOX_R630",
|
|
"PROXMOX_R750",
|
|
"WAN_AGGREGATOR",
|
|
"NETWORK_GATEWAY",
|
|
"UDM_PRO",
|
|
"PUBLIC_IP_GATEWAY",
|
|
"PUBLIC_IP_ER605",
|
|
)
|
|
):
|
|
keys.add(k)
|
|
return keys
|
|
|
|
|
|
def main() -> None:
|
|
ap = argparse.ArgumentParser()
|
|
ap.add_argument("--live", type=Path, help="live JSON file (default stdin)")
|
|
ap.add_argument(
|
|
"--ip-conf",
|
|
type=Path,
|
|
default=Path("config/ip-addresses.conf"),
|
|
help="path to ip-addresses.conf",
|
|
)
|
|
ap.add_argument("--out-dir", type=Path, required=True)
|
|
ap.add_argument(
|
|
"--all-vmids-md",
|
|
type=Path,
|
|
default=None,
|
|
help="optional ALL_VMIDS_ENDPOINTS.md for declared VMID/IP tables",
|
|
)
|
|
args = ap.parse_args()
|
|
|
|
if args.live:
|
|
live_raw = args.live.read_text(encoding="utf-8")
|
|
else:
|
|
live_raw = sys.stdin.read()
|
|
|
|
try:
|
|
live = json.loads(live_raw)
|
|
except json.JSONDecodeError as e:
|
|
print(f"Invalid live JSON: {e}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
guests = live.get("guests") or []
|
|
var_map, conf_ips = parse_ip_addresses_conf(args.ip_conf)
|
|
doc_ips: set[str] = set()
|
|
vmid_to_ip_doc: dict[str, str] = {}
|
|
if args.all_vmids_md:
|
|
doc_ips, vmid_to_ip_doc = parse_all_vmids_markdown(args.all_vmids_md)
|
|
|
|
declared_union = conf_ips | doc_ips
|
|
hyp_keys = hypervisor_related_keys(var_map)
|
|
hyp_ips: set[str] = set()
|
|
for k in hyp_keys:
|
|
if k not in var_map:
|
|
continue
|
|
for m in IPV4_RE.findall(var_map[k]):
|
|
hyp_ips.add(m)
|
|
|
|
ip_to_vmids: dict[str, list[str]] = {}
|
|
vmid_to_ip_live: dict[str, str] = {}
|
|
for g in guests:
|
|
ip = (g.get("ip") or "").strip()
|
|
vmid = str(g.get("vmid", "")).strip()
|
|
if ip:
|
|
ip_to_vmids.setdefault(ip, []).append(vmid or "?")
|
|
if vmid and ip:
|
|
vmid_to_ip_live[vmid] = ip
|
|
|
|
duplicate_ips = {ip: vms for ip, vms in ip_to_vmids.items() if len(vms) > 1}
|
|
guest_ip_set = set(ip_to_vmids.keys())
|
|
|
|
conf_only = sorted(conf_ips - guest_ip_set - hyp_ips)
|
|
live_only_legacy = sorted(guest_ip_set - conf_ips)
|
|
|
|
declared_lan11 = {ip for ip in declared_union if is_lan_11(ip)}
|
|
guest_lan11 = {ip for ip in guest_ip_set if is_lan_11(ip)}
|
|
guest_lan_not_declared = sorted(
|
|
guest_lan11 - declared_union - hyp_ips
|
|
)
|
|
declared_lan11_not_on_guests = sorted(
|
|
declared_lan11 - guest_ip_set - hyp_ips
|
|
)
|
|
|
|
vmid_ip_mismatch: list[dict[str, str]] = []
|
|
for vmid, doc_ip in vmid_to_ip_doc.items():
|
|
lip = vmid_to_ip_live.get(vmid)
|
|
if lip and doc_ip and lip != doc_ip:
|
|
vmid_ip_mismatch.append(
|
|
{"vmid": vmid, "live_ip": lip, "all_vmids_doc_ip": doc_ip}
|
|
)
|
|
|
|
drift = {
|
|
"collected_at": live.get("collected_at"),
|
|
"guest_count": len(guests),
|
|
"duplicate_ips": duplicate_ips,
|
|
"guest_ips_not_in_ip_addresses_conf": live_only_legacy,
|
|
"ip_addresses_conf_ips_not_on_guests": conf_only,
|
|
"guest_lan_ips_not_in_declared_sources": guest_lan_not_declared,
|
|
"declared_lan11_ips_not_on_live_guests": declared_lan11_not_on_guests,
|
|
"vmid_ip_mismatch_live_vs_all_vmids_doc": vmid_ip_mismatch,
|
|
"hypervisor_and_infra_ips_excluded_from_guest_match": sorted(hyp_ips),
|
|
"declared_sources": {
|
|
"ip_addresses_conf_ipv4_count": len(conf_ips),
|
|
"all_vmids_md_lan11_count": len(doc_ips),
|
|
},
|
|
"notes": [],
|
|
}
|
|
if live.get("error"):
|
|
drift["notes"].append(str(live["error"]))
|
|
|
|
inv_out = {
|
|
"collected_at": live.get("collected_at"),
|
|
"source": "proxmox_cluster_pvesh_plus_config",
|
|
"guests": guests,
|
|
}
|
|
|
|
args.out_dir.mkdir(parents=True, exist_ok=True)
|
|
(args.out_dir / "live_inventory.json").write_text(
|
|
json.dumps(inv_out, indent=2), encoding="utf-8"
|
|
)
|
|
(args.out_dir / "drift.json").write_text(
|
|
json.dumps(drift, indent=2), encoding="utf-8"
|
|
)
|
|
print(f"Wrote {args.out_dir / 'live_inventory.json'}")
|
|
print(f"Wrote {args.out_dir / 'drift.json'}")
|
|
# Exit 2 only for duplicate guest IPs (hard failure). VMID vs ALL_VMIDS doc
|
|
# mismatches are informational — documentation often lags live `pct set`.
|
|
sys.exit(2 if duplicate_ips else 0)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|