#!/usr/bin/env python3 """ Comprehensive Proxmox Inventory Script Queries all Proxmox hosts, lists all VMIDs with IPs, endpoints, ports, FQDNs, and identifies NPMplus instances with their configurations. """ import json import subprocess import sys from pathlib import Path from typing import Dict, List, Optional, Any from collections import defaultdict # Proxmox Hosts PROXMOX_HOSTS = { "ml110": "192.168.11.10", "r630-01": "192.168.11.11", "r630-02": "192.168.11.12" } SSH_OPTS = "-o StrictHostKeyChecking=no -o ConnectTimeout=5" # Known port mappings for common services SERVICE_PORTS = { "80": "HTTP", "443": "HTTPS", "3000": "Node.js API", "4000": "GraphQL/Blockscout", "5432": "PostgreSQL", "6379": "Redis", "8006": "Proxmox Web UI", "8043": "Omada", "8200": "Vault", "8545": "Besu HTTP RPC", "8546": "Besu WebSocket RPC", "9000": "Web3Signer", "9545": "Prometheus Metrics", "30303": "P2P Networking", } def run_ssh_command(host: str, command: str) -> Optional[str]: """Run SSH command on Proxmox host""" try: result = subprocess.run( ["ssh", *SSH_OPTS.split(), f"root@{host}", command], capture_output=True, text=True, timeout=10 ) if result.returncode == 0: return result.stdout.strip() return None except Exception as e: print(f"Error running SSH command on {host}: {e}", file=sys.stderr) return None def get_node_name(host: str) -> Optional[str]: """Get Proxmox node name""" return run_ssh_command(host, "hostname") def get_all_vms(host: str, node: str) -> List[Dict[str, Any]]: """Get all QEMU VMs from a Proxmox host""" vms = [] command = f"pvesh get /nodes/{node}/qemu --output-format json" output = run_ssh_command(host, command) if output: try: vm_list = json.loads(output) for vm in vm_list: vmid = str(vm.get('vmid', '')) name = vm.get('name', f'VM-{vmid}') status = vm.get('status', 'unknown') vms.append({ 'vmid': vmid, 'name': name, 'type': 'QEMU', 'status': status, 'host': host, 'node': node }) except json.JSONDecodeError: pass return vms def get_all_containers(host: str, node: str) -> List[Dict[str, Any]]: """Get all LXC containers from a Proxmox host""" containers = [] command = f"pvesh get /nodes/{node}/lxc --output-format json" output = run_ssh_command(host, command) if output: try: container_list = json.loads(output) for container in container_list: vmid = str(container.get('vmid', '')) name = container.get('name', f'CT-{vmid}') status = container.get('status', 'unknown') containers.append({ 'vmid': vmid, 'name': name, 'type': 'LXC', 'status': status, 'host': host, 'node': node }) except json.JSONDecodeError: pass return containers def get_vm_config(host: str, node: str, vmid: str, vm_type: str) -> Dict[str, Any]: """Get VM/container configuration""" config = {} command = f"pvesh get /nodes/{node}/{vm_type}/{vmid}/config --output-format json" output = run_ssh_command(host, command) if output: try: config = json.loads(output) except json.JSONDecodeError: pass return config def get_vm_ip(host: str, node: str, vmid: str, vm_type: str) -> Optional[str]: """Get VM IP address""" if vm_type == 'lxc': # For LXC, get IP from config or running container config = get_vm_config(host, node, vmid, vm_type) net_config = config.get('net0', '') if 'ip=' in net_config: ip_part = net_config.split('ip=')[1].split(',')[0].split('/')[0] if ip_part and ip_part not in ['dhcp', 'auto']: return ip_part # Try to get IP from running container if config.get('status') == 'running': ip = run_ssh_command(host, f"pct exec {vmid} -- hostname -I 2>/dev/null | awk '{{print $1}}'") if ip and not ip.startswith('127.'): return ip else: # For QEMU, try agent command = f"pvesh get /nodes/{node}/qemu/{vmid}/agent/network-get-interfaces --output-format json" output = run_ssh_command(host, command) if output: try: data = json.loads(output) if 'result' in data: for iface in data['result']: if 'ip-addresses' in iface: for ip_info in iface['ip-addresses']: if ip_info.get('ip-address-type') == 'ipv4' and not ip_info.get('ip-address', '').startswith('127.'): return ip_info['ip-address'] except json.JSONDecodeError: pass return None def get_vm_hostname(host: str, node: str, vmid: str, vm_type: str) -> Optional[str]: """Get VM hostname""" config = get_vm_config(host, node, vmid, vm_type) return config.get('hostname') or config.get('name') def get_vm_description(host: str, node: str, vmid: str, vm_type: str) -> Optional[str]: """Get VM description""" config = get_vm_config(host, node, vmid, vm_type) return config.get('description') def get_npmplus_config() -> List[Dict[str, Any]]: """Get NPMplus configuration via API""" project_root = Path(__file__).parent.parent env_file = project_root / ".env" npm_email = None npm_password = None if env_file.exists(): with open(env_file) as f: for line in f: if line.startswith("NPM_EMAIL="): npm_email = line.split("=", 1)[1].strip().strip('"').strip("'") elif line.startswith("NPM_PASSWORD="): npm_password = line.split("=", 1)[1].strip().strip('"').strip("'") if not npm_email or not npm_password: return [] npm_url = "https://192.168.11.166:81" try: import urllib.request import ssl # Authenticate auth_data = json.dumps({ "identity": npm_email, "secret": npm_password }).encode() context = ssl._create_unverified_context() req = urllib.request.Request( f"{npm_url}/api/tokens", data=auth_data, headers={"Content-Type": "application/json"} ) with urllib.request.urlopen(req, context=context) as response: token_response = json.loads(response.read().decode()) token = token_response.get("token") if not token: return [] # Get proxy hosts req = urllib.request.Request( f"{npm_url}/api/nginx/proxy-hosts", headers={"Authorization": f"Bearer {token}"} ) with urllib.request.urlopen(req, context=context) as response: proxy_hosts = json.loads(response.read().decode()) return proxy_hosts except Exception as e: print(f"Error fetching NPMplus config: {e}", file=sys.stderr) return [] def identify_ports_from_config(config: Dict[str, Any], vm_type: str) -> List[str]: """Identify ports from VM configuration""" ports = [] # Common ports based on service type name = config.get('name', '').lower() hostname = config.get('hostname', '').lower() if 'rpc' in name or 'rpc' in hostname: ports.extend(['8545', '8546', '30303', '9545']) if 'besu' in name or 'besu' in hostname: ports.extend(['8545', '8546', '30303', '9545']) if 'postgres' in name or 'postgres' in hostname: ports.append('5432') if 'redis' in name or 'redis' in hostname: ports.append('6379') if 'vault' in name or 'vault' in hostname: ports.append('8200') if 'web3signer' in name or 'web3signer' in hostname: ports.append('9000') if 'nginx' in name or 'npm' in name: ports.extend(['80', '81', '443']) if 'blockscout' in name or 'explorer' in name: ports.extend(['80', '4000']) if 'api' in name and '3000' not in str(ports): ports.append('3000') if 'frontend' in name or 'web' in name: ports.append('80') return list(set(ports)) # Remove duplicates def main(): print("=" * 100) print("COMPREHENSIVE PROXMOX INVENTORY REPORT") print("=" * 100) print() all_vms = [] npmplus_vmids = [] # Query all Proxmox hosts for hostname, host_ip in PROXMOX_HOSTS.items(): print(f"📡 Querying {hostname} ({host_ip})...") node = get_node_name(host_ip) if not node: print(f" ⚠️ Could not connect to {hostname}") continue print(f" ✓ Connected to node: {node}") # Get VMs and containers vms = get_all_vms(host_ip, node) containers = get_all_containers(host_ip, node) print(f" ✓ Found {len(vms)} QEMU VMs and {len(containers)} LXC containers") # Process VMs for vm in vms: vmid = vm['vmid'] config = get_vm_config(host_ip, node, vmid, 'qemu') ip = get_vm_ip(host_ip, node, vmid, 'qemu') hostname_vm = get_vm_hostname(host_ip, node, vmid, 'qemu') description = get_vm_description(host_ip, node, vmid, 'qemu') ports = identify_ports_from_config(config, 'qemu') vm.update({ 'ip': ip, 'hostname': hostname_vm, 'description': description, 'ports': ports, 'fqdn': hostname_vm }) all_vms.append(vm) # Check if NPMplus if 'npmplus' in (vm.get('name', '') + ' ' + (hostname_vm or '')).lower(): npmplus_vmids.append(vmid) # Process containers for container in containers: vmid = container['vmid'] config = get_vm_config(host_ip, node, vmid, 'lxc') ip = get_vm_ip(host_ip, node, vmid, 'lxc') hostname_vm = get_vm_hostname(host_ip, node, vmid, 'lxc') description = get_vm_description(host_ip, node, vmid, 'lxc') ports = identify_ports_from_config(config, 'lxc') container.update({ 'ip': ip, 'hostname': hostname_vm, 'description': description, 'ports': ports, 'fqdn': hostname_vm }) all_vms.append(container) # Check if NPMplus if 'npmplus' in (container.get('name', '') + ' ' + (hostname_vm or '')).lower(): npmplus_vmids.append(vmid) print() print("=" * 100) print("ALL VMIDs INVENTORY") print("=" * 100) print() # Sort by VMID all_vms.sort(key=lambda x: (int(x['vmid']) if x['vmid'].isdigit() else 999999, x['host'])) # Print table header print(f"{'VMID':<8} {'Type':<6} {'Name':<30} {'Host':<12} {'IP Address':<18} {'FQDN':<30} {'Status':<10} {'Ports':<30}") print("-" * 150) for vm in all_vms: vmid = vm['vmid'] vm_type = vm['type'] name = vm.get('name', 'N/A')[:28] host = vm.get('host', 'N/A') ip = vm.get('ip', 'N/A') fqdn = (vm.get('fqdn') or vm.get('hostname') or 'N/A')[:28] status = vm.get('status', 'unknown') ports = ', '.join(vm.get('ports', []))[:28] if vm.get('ports') else 'N/A' print(f"{vmid:<8} {vm_type:<6} {name:<30} {host:<12} {ip:<18} {fqdn:<30} {status:<10} {ports:<30}") print() print("=" * 100) print("NPMPLUS INSTANCES") print("=" * 100) print() if npmplus_vmids: print(f"Found NPMplus running on VMIDs: {', '.join(npmplus_vmids)}") print() for vmid in npmplus_vmids: vm = next((v for v in all_vms if v['vmid'] == vmid), None) if vm: print(f"VMID {vmid}:") print(f" Name: {vm.get('name', 'N/A')}") print(f" Type: {vm.get('type', 'N/A')}") print(f" Host: {vm.get('host', 'N/A')}") print(f" IP: {vm.get('ip', 'N/A')}") print(f" FQDN: {vm.get('fqdn', 'N/A')}") print(f" Status: {vm.get('status', 'N/A')}") print(f" Ports: {', '.join(vm.get('ports', []))}") print() else: print("No NPMplus instances found") print() print("=" * 100) print("NPMPLUS CONFIGURATION") print("=" * 100) print() npmplus_config = get_npmplus_config() if npmplus_config: print(f"Found {len(npmplus_config)} proxy host configurations in NPMplus") print() print(f"{'ID':<6} {'Domain(s)':<50} {'Target':<25} {'Port':<8} {'Scheme':<8} {'WebSocket':<10}") print("-" * 120) for host in sorted(npmplus_config, key=lambda x: x.get('id', 0)): host_id = str(host.get('id', 'N/A')) domain_names = host.get('domain_names', []) domains = ', '.join(domain_names) if isinstance(domain_names, list) else str(domain_names) forward_host = host.get('forward_host', 'N/A') forward_port = str(host.get('forward_port', 'N/A')) forward_scheme = host.get('forward_scheme', 'http') websocket = 'Yes' if host.get('forward_websocket', False) else 'No' print(f"{host_id:<6} {domains[:48]:<50} {forward_host:<25} {forward_port:<8} {forward_scheme:<8} {websocket:<10}") print() print("Detailed Configuration:") print() for host in sorted(npmplus_config, key=lambda x: x.get('id', 0)): print(f"Proxy Host ID: {host.get('id')}") print(f" Domain Names: {', '.join(host.get('domain_names', []))}") print(f" Forward Host: {host.get('forward_host')}") print(f" Forward Port: {host.get('forward_port')}") print(f" Forward Scheme: {host.get('forward_scheme')}") print(f" WebSocket Support: {host.get('forward_websocket', False)}") print(f" SSL Enabled: {host.get('ssl_enabled', False)}") print(f" Block Exploits: {host.get('block_exploits', False)}") print(f" Cache Assets: {host.get('cache_assets', False)}") print() else: print("Could not retrieve NPMplus configuration (check .env file for NPM_EMAIL and NPM_PASSWORD)") print() # Summary print("=" * 100) print("SUMMARY") print("=" * 100) print() print(f"Total VMIDs: {len(all_vms)}") print(f" QEMU VMs: {len([v for v in all_vms if v['type'] == 'QEMU'])}") print(f" LXC Containers: {len([v for v in all_vms if v['type'] == 'LXC'])}") print(f" Running: {len([v for v in all_vms if v.get('status') == 'running'])}") print(f" Stopped: {len([v for v in all_vms if v.get('status') == 'stopped'])}") print(f"NPMplus Instances: {len(npmplus_vmids)}") print(f"NPMplus Proxy Hosts: {len(npmplus_config)}") print() if __name__ == "__main__": main()