308 lines
13 KiB
Python
308 lines
13 KiB
Python
|
|
#!/usr/bin/env python3
|
||
|
|
"""
|
||
|
|
List all Proxmox VMs with VMID, Name, IP Address, FQDN, and Description.
|
||
|
|
|
||
|
|
This script connects to a Proxmox cluster and retrieves comprehensive
|
||
|
|
information about all virtual machines (both QEMU VMs and LXC containers).
|
||
|
|
"""
|
||
|
|
|
||
|
|
import os
|
||
|
|
import sys
|
||
|
|
import json
|
||
|
|
from typing import Dict, List, Optional, Any
|
||
|
|
from proxmoxer import ProxmoxAPI
|
||
|
|
|
||
|
|
def load_env_file(env_path: str = None) -> dict:
|
||
|
|
"""Load environment variables from .env file."""
|
||
|
|
if env_path is None:
|
||
|
|
env_path = os.path.expanduser('~/.env')
|
||
|
|
|
||
|
|
env_vars = {}
|
||
|
|
if os.path.exists(env_path):
|
||
|
|
try:
|
||
|
|
with open(env_path, 'r') as f:
|
||
|
|
for line in f:
|
||
|
|
line = line.strip()
|
||
|
|
# Skip comments and empty lines
|
||
|
|
if not line or line.startswith('#'):
|
||
|
|
continue
|
||
|
|
# Parse KEY=VALUE format
|
||
|
|
if '=' in line:
|
||
|
|
key, value = line.split('=', 1)
|
||
|
|
key = key.strip()
|
||
|
|
value = value.strip().strip('"').strip("'")
|
||
|
|
env_vars[key] = value
|
||
|
|
except Exception as e:
|
||
|
|
print(f"Warning: Could not load {env_path}: {e}", file=sys.stderr)
|
||
|
|
return env_vars
|
||
|
|
|
||
|
|
def get_proxmox_connection() -> ProxmoxAPI:
|
||
|
|
"""Initialize Proxmox API connection from environment variables or config file."""
|
||
|
|
# Load from ~/.env file first
|
||
|
|
env_vars = load_env_file()
|
||
|
|
|
||
|
|
# Try environment variables first (they override .env file)
|
||
|
|
host = os.getenv('PROXMOX_HOST', env_vars.get('PROXMOX_HOST', '192.168.6.247'))
|
||
|
|
port = int(os.getenv('PROXMOX_PORT', env_vars.get('PROXMOX_PORT', '8006')))
|
||
|
|
user = os.getenv('PROXMOX_USER', env_vars.get('PROXMOX_USER', 'root@pam'))
|
||
|
|
token_name = os.getenv('PROXMOX_TOKEN_NAME', env_vars.get('PROXMOX_TOKEN_NAME', 'mcpserver'))
|
||
|
|
token_value = os.getenv('PROXMOX_TOKEN_VALUE', env_vars.get('PROXMOX_TOKEN_VALUE'))
|
||
|
|
password = os.getenv('PROXMOX_PASSWORD', env_vars.get('PROXMOX_PASSWORD'))
|
||
|
|
verify_ssl = os.getenv('PROXMOX_VERIFY_SSL', env_vars.get('PROXMOX_VERIFY_SSL', 'false')).lower() == 'true'
|
||
|
|
|
||
|
|
# If no token value, try to load from JSON config file
|
||
|
|
if not token_value and not password:
|
||
|
|
config_path = os.getenv('PROXMOX_MCP_CONFIG')
|
||
|
|
if config_path and os.path.exists(config_path):
|
||
|
|
with open(config_path) as f:
|
||
|
|
config = json.load(f)
|
||
|
|
host = config.get('proxmox', {}).get('host', host)
|
||
|
|
port = config.get('proxmox', {}).get('port', port)
|
||
|
|
verify_ssl = config.get('proxmox', {}).get('verify_ssl', verify_ssl)
|
||
|
|
user = config.get('auth', {}).get('user', user)
|
||
|
|
token_name = config.get('auth', {}).get('token_name', token_name)
|
||
|
|
token_value = config.get('auth', {}).get('token_value')
|
||
|
|
password = config.get('auth', {}).get('password')
|
||
|
|
|
||
|
|
if not token_value and not password:
|
||
|
|
print("Error: PROXMOX_TOKEN_VALUE or PROXMOX_PASSWORD required", file=sys.stderr)
|
||
|
|
print("\nCredentials can be provided via:", file=sys.stderr)
|
||
|
|
print(" 1. Environment variables", file=sys.stderr)
|
||
|
|
print(" 2. ~/.env file (automatically loaded)", file=sys.stderr)
|
||
|
|
print(" 3. JSON config file (set PROXMOX_MCP_CONFIG)", file=sys.stderr)
|
||
|
|
print("\nExample ~/.env file:", file=sys.stderr)
|
||
|
|
print(" PROXMOX_HOST=your-proxmox-host", file=sys.stderr)
|
||
|
|
print(" PROXMOX_USER=root@pam", file=sys.stderr)
|
||
|
|
print(" PROXMOX_TOKEN_NAME=your-token-name", file=sys.stderr)
|
||
|
|
print(" PROXMOX_TOKEN_VALUE=your-token-value", file=sys.stderr)
|
||
|
|
sys.exit(1)
|
||
|
|
|
||
|
|
try:
|
||
|
|
if token_value:
|
||
|
|
proxmox = ProxmoxAPI(
|
||
|
|
host=host,
|
||
|
|
port=port,
|
||
|
|
user=user,
|
||
|
|
token_name=token_name,
|
||
|
|
token_value=token_value,
|
||
|
|
verify_ssl=verify_ssl,
|
||
|
|
service='PVE'
|
||
|
|
)
|
||
|
|
else:
|
||
|
|
# Use password authentication
|
||
|
|
proxmox = ProxmoxAPI(
|
||
|
|
host=host,
|
||
|
|
port=port,
|
||
|
|
user=user,
|
||
|
|
password=password,
|
||
|
|
verify_ssl=verify_ssl,
|
||
|
|
service='PVE'
|
||
|
|
)
|
||
|
|
# Test connection
|
||
|
|
proxmox.version.get()
|
||
|
|
return proxmox
|
||
|
|
except Exception as e:
|
||
|
|
error_msg = str(e)
|
||
|
|
if "ConnectTimeoutError" in error_msg or "timed out" in error_msg:
|
||
|
|
print(f"Error: Connection to Proxmox host '{host}:{port}' timed out", file=sys.stderr)
|
||
|
|
print(f" - Verify the host is reachable: ping {host}", file=sys.stderr)
|
||
|
|
print(f" - Check firewall allows port {port}", file=sys.stderr)
|
||
|
|
print(f" - Verify PROXMOX_HOST in ~/.env is correct", file=sys.stderr)
|
||
|
|
elif "401" in error_msg or "authentication" in error_msg.lower():
|
||
|
|
print(f"Error: Authentication failed", file=sys.stderr)
|
||
|
|
print(f" - Verify PROXMOX_TOKEN_VALUE or PROXMOX_PASSWORD in ~/.env", file=sys.stderr)
|
||
|
|
print(f" - Check user permissions in Proxmox", file=sys.stderr)
|
||
|
|
else:
|
||
|
|
print(f"Error connecting to Proxmox: {e}", file=sys.stderr)
|
||
|
|
sys.exit(1)
|
||
|
|
|
||
|
|
def get_vm_ip_address(proxmox: ProxmoxAPI, node: str, vmid: str, vm_type: str) -> str:
|
||
|
|
"""Get IP address of a VM."""
|
||
|
|
ip_addresses = []
|
||
|
|
|
||
|
|
try:
|
||
|
|
# Try to get IP from network interfaces via guest agent (for QEMU) or config
|
||
|
|
if vm_type == 'qemu':
|
||
|
|
# Try QEMU guest agent first
|
||
|
|
try:
|
||
|
|
interfaces = proxmox.nodes(node).qemu(vmid).agent('network-get-interfaces').get()
|
||
|
|
if interfaces and 'result' in interfaces:
|
||
|
|
for iface in interfaces['result']:
|
||
|
|
if 'ip-addresses' in iface:
|
||
|
|
for ip_info in iface['ip-addresses']:
|
||
|
|
if ip_info.get('ip-address-type') == 'ipv4' and not ip_info.get('ip-address', '').startswith('127.'):
|
||
|
|
ip_addresses.append(ip_info['ip-address'])
|
||
|
|
except:
|
||
|
|
pass
|
||
|
|
|
||
|
|
# Try to get from config (for static IPs or if guest agent not available)
|
||
|
|
try:
|
||
|
|
config = proxmox.nodes(node).qemu(vmid).config.get() if vm_type == 'qemu' else proxmox.nodes(node).lxc(vmid).config.get()
|
||
|
|
# Check for IP in network config
|
||
|
|
for key, value in config.items():
|
||
|
|
if key.startswith('net') and isinstance(value, str):
|
||
|
|
# Parse network config like "virtio=00:11:22:33:44:55,bridge=vmbr0,ip=192.168.1.100/24"
|
||
|
|
if 'ip=' in value:
|
||
|
|
ip_part = value.split('ip=')[1].split(',')[0].split('/')[0]
|
||
|
|
if ip_part and ip_part not in ip_addresses:
|
||
|
|
ip_addresses.append(ip_part)
|
||
|
|
except:
|
||
|
|
pass
|
||
|
|
|
||
|
|
# For LXC, try to execute hostname -I command
|
||
|
|
if vm_type == 'lxc' and not ip_addresses:
|
||
|
|
try:
|
||
|
|
result = proxmox.nodes(node).lxc(vmid).exec.post(command='hostname -I')
|
||
|
|
if result and 'out' in result:
|
||
|
|
ips = result['out'].strip().split()
|
||
|
|
ip_addresses.extend([ip for ip in ips if not ip.startswith('127.')])
|
||
|
|
except:
|
||
|
|
pass
|
||
|
|
|
||
|
|
except Exception:
|
||
|
|
pass
|
||
|
|
|
||
|
|
return ', '.join(ip_addresses) if ip_addresses else 'N/A'
|
||
|
|
|
||
|
|
def get_vm_fqdn(proxmox: ProxmoxAPI, node: str, vmid: str, vm_type: str) -> str:
|
||
|
|
"""Get FQDN of a VM."""
|
||
|
|
fqdn = None
|
||
|
|
|
||
|
|
try:
|
||
|
|
# Get hostname from config
|
||
|
|
config = proxmox.nodes(node).qemu(vmid).config.get() if vm_type == 'qemu' else proxmox.nodes(node).lxc(vmid).config.get()
|
||
|
|
hostname = config.get('hostname') or config.get('name', '').split('.')[0]
|
||
|
|
|
||
|
|
if hostname:
|
||
|
|
# Try to get full FQDN by executing hostname -f
|
||
|
|
try:
|
||
|
|
if vm_type == 'qemu':
|
||
|
|
result = proxmox.nodes(node).qemu(vmid).agent('exec').post(command={'command': 'hostname -f'})
|
||
|
|
else:
|
||
|
|
result = proxmox.nodes(node).lxc(vmid).exec.post(command='hostname -f')
|
||
|
|
|
||
|
|
if result:
|
||
|
|
if vm_type == 'qemu' and 'result' in result and 'out-data' in result['result']:
|
||
|
|
fqdn = result['result']['out-data'].strip()
|
||
|
|
elif vm_type == 'lxc' and 'out' in result:
|
||
|
|
fqdn = result['out'].strip()
|
||
|
|
except:
|
||
|
|
# Fallback to hostname from config
|
||
|
|
if hostname:
|
||
|
|
fqdn = hostname
|
||
|
|
except Exception:
|
||
|
|
pass
|
||
|
|
|
||
|
|
return fqdn if fqdn else 'N/A'
|
||
|
|
|
||
|
|
def get_vm_description(proxmox: ProxmoxAPI, node: str, vmid: str, vm_type: str) -> str:
|
||
|
|
"""Get description of a VM."""
|
||
|
|
try:
|
||
|
|
config = proxmox.nodes(node).qemu(vmid).config.get() if vm_type == 'qemu' else proxmox.nodes(node).lxc(vmid).config.get()
|
||
|
|
return config.get('description', 'N/A')
|
||
|
|
except Exception:
|
||
|
|
return 'N/A'
|
||
|
|
|
||
|
|
def list_all_vms(proxmox: ProxmoxAPI) -> List[Dict[str, Any]]:
|
||
|
|
"""List all VMs across all nodes."""
|
||
|
|
all_vms = []
|
||
|
|
|
||
|
|
try:
|
||
|
|
nodes = proxmox.nodes.get()
|
||
|
|
|
||
|
|
for node in nodes:
|
||
|
|
node_name = node['node']
|
||
|
|
|
||
|
|
# Get QEMU VMs
|
||
|
|
try:
|
||
|
|
qemu_vms = proxmox.nodes(node_name).qemu.get()
|
||
|
|
for vm in qemu_vms:
|
||
|
|
vmid = str(vm['vmid'])
|
||
|
|
name = vm.get('name', f'VM-{vmid}')
|
||
|
|
|
||
|
|
# Get additional info
|
||
|
|
description = get_vm_description(proxmox, node_name, vmid, 'qemu')
|
||
|
|
ip_address = get_vm_ip_address(proxmox, node_name, vmid, 'qemu')
|
||
|
|
fqdn = get_vm_fqdn(proxmox, node_name, vmid, 'qemu')
|
||
|
|
|
||
|
|
all_vms.append({
|
||
|
|
'vmid': vmid,
|
||
|
|
'name': name,
|
||
|
|
'type': 'QEMU',
|
||
|
|
'node': node_name,
|
||
|
|
'status': vm.get('status', 'unknown'),
|
||
|
|
'ip_address': ip_address,
|
||
|
|
'fqdn': fqdn,
|
||
|
|
'description': description
|
||
|
|
})
|
||
|
|
except Exception as e:
|
||
|
|
print(f"Warning: Could not get QEMU VMs from node {node_name}: {e}", file=sys.stderr)
|
||
|
|
|
||
|
|
# Get LXC containers
|
||
|
|
try:
|
||
|
|
lxc_vms = proxmox.nodes(node_name).lxc.get()
|
||
|
|
for vm in lxc_vms:
|
||
|
|
vmid = str(vm['vmid'])
|
||
|
|
name = vm.get('name', f'CT-{vmid}')
|
||
|
|
|
||
|
|
# Get additional info
|
||
|
|
description = get_vm_description(proxmox, node_name, vmid, 'lxc')
|
||
|
|
ip_address = get_vm_ip_address(proxmox, node_name, vmid, 'lxc')
|
||
|
|
fqdn = get_vm_fqdn(proxmox, node_name, vmid, 'lxc')
|
||
|
|
|
||
|
|
all_vms.append({
|
||
|
|
'vmid': vmid,
|
||
|
|
'name': name,
|
||
|
|
'type': 'LXC',
|
||
|
|
'node': node_name,
|
||
|
|
'status': vm.get('status', 'unknown'),
|
||
|
|
'ip_address': ip_address,
|
||
|
|
'fqdn': fqdn,
|
||
|
|
'description': description
|
||
|
|
})
|
||
|
|
except Exception as e:
|
||
|
|
print(f"Warning: Could not get LXC containers from node {node_name}: {e}", file=sys.stderr)
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
print(f"Error listing VMs: {e}", file=sys.stderr)
|
||
|
|
sys.exit(1)
|
||
|
|
|
||
|
|
# Sort by VMID
|
||
|
|
all_vms.sort(key=lambda x: int(x['vmid']))
|
||
|
|
return all_vms
|
||
|
|
|
||
|
|
def print_vm_table(vms: List[Dict[str, Any]]):
|
||
|
|
"""Print VMs in a formatted table."""
|
||
|
|
if not vms:
|
||
|
|
print("No VMs found.")
|
||
|
|
return
|
||
|
|
|
||
|
|
# Calculate column widths
|
||
|
|
col_widths = {
|
||
|
|
'vmid': max(len('VMID'), max(len(vm['vmid']) for vm in vms)),
|
||
|
|
'name': max(len('Name'), max(len(vm['name']) for vm in vms)),
|
||
|
|
'type': max(len('Type'), max(len(vm['type']) for vm in vms)),
|
||
|
|
'ip_address': max(len('IP Address'), max(len(vm['ip_address']) for vm in vms)),
|
||
|
|
'fqdn': max(len('FQDN'), max(len(vm['fqdn']) for vm in vms)),
|
||
|
|
'description': max(len('Description'), max(len(vm['description']) for vm in vms if vm['description'] != 'N/A'))
|
||
|
|
}
|
||
|
|
|
||
|
|
# Print header
|
||
|
|
header = f"{'VMID':<{col_widths['vmid']}} | {'Name':<{col_widths['name']}} | {'Type':<{col_widths['type']}} | {'IP Address':<{col_widths['ip_address']}} | {'FQDN':<{col_widths['fqdn']}} | {'Description':<{col_widths['description']}}"
|
||
|
|
print(header)
|
||
|
|
print('-' * len(header))
|
||
|
|
|
||
|
|
# Print VMs
|
||
|
|
for vm in vms:
|
||
|
|
row = f"{vm['vmid']:<{col_widths['vmid']}} | {vm['name']:<{col_widths['name']}} | {vm['type']:<{col_widths['type']}} | {vm['ip_address']:<{col_widths['ip_address']}} | {vm['fqdn']:<{col_widths['fqdn']}} | {vm['description']:<{col_widths['description']}}"
|
||
|
|
print(row)
|
||
|
|
|
||
|
|
def main():
|
||
|
|
"""Main function."""
|
||
|
|
proxmox = get_proxmox_connection()
|
||
|
|
vms = list_all_vms(proxmox)
|
||
|
|
print_vm_table(vms)
|
||
|
|
|
||
|
|
if __name__ == '__main__':
|
||
|
|
main()
|