Complete markdown files cleanup and organization

- Organized 252 files across project
- Root directory: 187 → 2 files (98.9% reduction)
- Moved configuration guides to docs/04-configuration/
- Moved troubleshooting guides to docs/09-troubleshooting/
- Moved quick start guides to docs/01-getting-started/
- Moved reports to reports/ directory
- Archived temporary files
- Generated comprehensive reports and documentation
- Created maintenance scripts and guides

All files organized according to established standards.
This commit is contained in:
defiQUG
2026-01-06 01:46:25 -08:00
parent 1edcec953c
commit cb47cce074
1327 changed files with 217220 additions and 801 deletions

View File

@@ -0,0 +1,21 @@
The MIT License
Copyright (c) 2013 Oleg Butovich
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.

View File

@@ -0,0 +1,139 @@
Metadata-Version: 2.1
Name: proxmoxer
Version: 2.2.0
Summary: Python Wrapper for the Proxmox 2.x API (HTTP and SSH)
Home-page: https://proxmoxer.github.io/docs/
Download-URL: http://pypi.python.org/pypi/proxmoxer
Author: Oleg Butovich
Author-email: obutovich@gmail.com
License: MIT
Keywords: proxmox,api
Classifier: Development Status :: 5 - Production/Stable
Classifier: Intended Audience :: Developers
Classifier: Intended Audience :: System Administrators
Classifier: License :: OSI Approved :: MIT License
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python :: 3.8
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Classifier: Topic :: System :: Clustering
Classifier: Topic :: System :: Monitoring
Classifier: Topic :: System :: Systems Administration
Description-Content-Type: text/x-rst
License-File: LICENSE.txt
================================================
Proxmoxer: A Python wrapper for Proxmox REST API
================================================
master branch: |master_build_status| |master_coverage_status| |pypi_version| |pypi_downloads|
develop branch: |develop_build_status| |develop_coverage_status|
Proxmoxer is a python wrapper around the `Proxmox REST API v2 <https://pve.proxmox.com/pve-docs/api-viewer/index.html>`_.
It currently supports the Proxmox services of Proxmox Virtual Environment (PVE), Proxmox Mail Gateway (PMG), and Proxmox Backup Server (PBS).
It was inspired by slumber, but it is dedicated only to Proxmox. It allows not only REST API use over HTTPS, but
the same api over ssh and pvesh utility.
Like `Proxmoxia <https://github.com/baseblack/Proxmoxia>`_, it dynamically creates attributes which responds to the
attributes you've attempted to reach.
Full Documentation is available at https://proxmoxer.github.io/docs/
--------------------------------------------------------------------
Migrating to version 2
......................
Full instructions for the minimal steps needed to update to version 2 can be found in `Migration Docs <https://proxmoxer.github.io/docs/latest/v1_migration/>`_.
Installation
............
::
pip install proxmoxer
To use the 'https' backend, install requests
::
pip install requests
To use the 'ssh_paramiko' backend, install paramiko
::
pip install paramiko
To use the 'openssh' backend, install openssh_wrapper
::
pip install openssh_wrapper
Short usage information
.......................
The first thing to do is import the proxmoxer library and create ProxmoxAPI instance.
::
from proxmoxer import ProxmoxAPI
proxmox = ProxmoxAPI(
"proxmox_host", user="admin@pam", password="secret_word", verify_ssl=False
)
This will connect by default to PVE through the 'https' backend.
**Note: ensure you have the required libraries (listed above) for the connection method you are using**
Queries are exposed via the access methods **get**, **post**, **put** and **delete**. For convenience two
synonyms are available: **create** for **post**, and **set** for **put**.
Using the paths from the `PVE API v2 <https://pve.proxmox.com/pve-docs/api-viewer/index.html>`_, you can create
API calls using the access methods above.
::
>>> for node in proxmox.nodes.get():
... for vm in proxmox.nodes(node["node"]).qemu.get():
... print(f"{vm['vmid']}. {vm['name']} => {vm['status']}")
...
141. puppet-2.london.example.com => running
101. munki.london.example.com => running
102. redmine.london.example.com => running
140. dns-1.london.example.com => running
126. ns-3.london.example.com => running
113. rabbitmq.london.example.com => running
See Changelog in `CHANGELOG.md <https://github.com/proxmoxer/proxmoxer/blob/develop/CHANGELOG.md>`_
...................................................................................................
.. |master_build_status| image:: https://github.com/proxmoxer/proxmoxer/actions/workflows/ci.yaml/badge.svg?branch=master
:target: https://github.com/proxmoxer/proxmoxer/actions
.. |master_coverage_status| image:: https://img.shields.io/coveralls/github/proxmoxer/proxmoxer/master
:target: https://coveralls.io/github/proxmoxer/proxmoxer?branch=master
.. |develop_build_status| image:: https://github.com/proxmoxer/proxmoxer/actions/workflows/ci.yaml/badge.svg?branch=develop
:target: https://github.com/proxmoxer/proxmoxer/actions
.. |develop_coverage_status| image:: https://img.shields.io/coveralls/github/proxmoxer/proxmoxer/develop
:target: https://coveralls.io/github/proxmoxer/proxmoxer?branch=develop
.. |pypi_version| image:: https://img.shields.io/pypi/v/proxmoxer.svg
:target: https://pypi.python.org/pypi/proxmoxer
.. |pypi_downloads| image:: https://img.shields.io/pypi/dm/proxmoxer.svg
:target: https://pypi.python.org/pypi/proxmoxer

View File

@@ -0,0 +1,29 @@
proxmoxer-2.2.0.dist-info/INSTALLER,sha256=zuuue4knoyJ-UwPPXg8fezS7VCrXJQrAP7zeNuwvFQg,4
proxmoxer-2.2.0.dist-info/LICENSE.txt,sha256=FducsKC9FGCrJOa7AswLPXOGorWJLiS5YMmCC45sNxA,1073
proxmoxer-2.2.0.dist-info/METADATA,sha256=gLlwnYShElov5M-066wt5OsPZwFmBMHLrFCNA05KplA,5165
proxmoxer-2.2.0.dist-info/RECORD,,
proxmoxer-2.2.0.dist-info/REQUESTED,sha256=47DEQpj8HBSa-_TImW-5JCeuQeRkm5NMpJWZG3hSuFU,0
proxmoxer-2.2.0.dist-info/WHEEL,sha256=eOLhNAGa2EW3wWl_TU484h7q1UNgy0JXjjoqKoxAAQc,92
proxmoxer-2.2.0.dist-info/top_level.txt,sha256=ZsRL5geB-MjPcI0spxyb5QJ5X2LY81BA8F_WvWYcZ-U,10
proxmoxer/__init__.py,sha256=x7Ja4bao0qb0G8sn2R66H7_IGEhz_L_KjuYvLXM-sVA,146
proxmoxer/__pycache__/__init__.cpython-312.pyc,,
proxmoxer/__pycache__/core.cpython-312.pyc,,
proxmoxer/backends/__init__.py,sha256=aVwaNfo_mo4qWd2OTK_EbpqGpxFJuLUd-S9UClGYOmk,95
proxmoxer/backends/__pycache__/__init__.cpython-312.pyc,,
proxmoxer/backends/__pycache__/command_base.cpython-312.pyc,,
proxmoxer/backends/__pycache__/https.cpython-312.pyc,,
proxmoxer/backends/__pycache__/local.cpython-312.pyc,,
proxmoxer/backends/__pycache__/openssh.cpython-312.pyc,,
proxmoxer/backends/__pycache__/ssh_paramiko.cpython-312.pyc,,
proxmoxer/backends/command_base.py,sha256=o3TBwgHoghkmG5-bno2UVK-8DwBrfc0GAq7ozkhBZAA,5339
proxmoxer/backends/https.py,sha256=WLioH0uDCOewoJIxHrFmDHboxRDbmVuN-RgJ3inIEtI,12011
proxmoxer/backends/local.py,sha256=RnyTrdXNwvWjxkqLbe0lgKCh0WXnKp4Jj_c4GU1-kW8,775
proxmoxer/backends/openssh.py,sha256=9aPOT7-QrzzkavIiwnK8CiGb_txcx1125X_OTJoAjuE,1757
proxmoxer/backends/ssh_paramiko.py,sha256=ozERATZGBB2pDTKyJUtRPHOQsXLvx924oWmVBNcCn9o,2117
proxmoxer/core.py,sha256=-QO_Mmafndj6Cn3ltyMP1GhXTgBIuu35Ded59gDZFjk,7878
proxmoxer/tools/__init__.py,sha256=VXnix0lFPaGzw_IckUw2_PiZMoEvJykWDyL5YDd9twk,208
proxmoxer/tools/__pycache__/__init__.cpython-312.pyc,,
proxmoxer/tools/__pycache__/files.cpython-312.pyc,,
proxmoxer/tools/__pycache__/tasks.cpython-312.pyc,,
proxmoxer/tools/files.py,sha256=SB7peNp9l7G9WFOMPepO1xKp5jNndZvxXX5hTk6FFDg,10458
proxmoxer/tools/tasks.py,sha256=fem-EvPU0sgo_lGSUbtj71UgYHmlz1e8N3p_HJB9sEY,2717

View File

@@ -0,0 +1,5 @@
Wheel-Version: 1.0
Generator: bdist_wheel (0.44.0)
Root-Is-Purelib: true
Tag: py3-none-any

View File

@@ -0,0 +1 @@
proxmoxer

View File

@@ -0,0 +1,6 @@
__author__ = "Oleg Butovich"
__copyright__ = "(c) Oleg Butovich 2013-2024"
__version__ = "2.2.0"
__license__ = "MIT"
from .core import * # noqa

View File

@@ -0,0 +1,3 @@
__author__ = "Oleg Butovich"
__copyright__ = "(c) Oleg Butovich 2013-2017"
__license__ = "MIT"

View File

@@ -0,0 +1,171 @@
__author__ = "Oleg Butovich"
__copyright__ = "(c) Oleg Butovich 2013-2017"
__license__ = "MIT"
import json
import logging
import platform
import re
from itertools import chain
from shlex import split as shell_split
from proxmoxer.core import SERVICES
logger = logging.getLogger(__name__)
logger.setLevel(level=logging.WARNING)
try:
from shlex import join
def shell_join(args):
return join(args)
except ImportError:
from shlex import quote
def shell_join(args):
return " ".join([quote(arg) for arg in args])
class Response:
def __init__(self, content, status_code):
self.status_code = status_code
self.content = content
self.text = str(content)
self.headers = {"content-type": "application/json"}
def __str__(self):
return f"Response ({self.status_code}) {self.content}"
class CommandBaseSession:
def __init__(
self,
service="PVE",
timeout=5,
sudo=False,
):
self.service = service.lower()
self.timeout = timeout
self.sudo = sudo
def _exec(self, cmd):
raise NotImplementedError()
# noinspection PyUnusedLocal
def request(self, method, url, data=None, params=None, headers=None):
method = method.lower()
data = data or {}
params = params or {}
url = url.strip()
cmd = {"post": "create", "put": "set"}.get(method, method)
# separate out qemu exec commands to split into multiple argument pairs (issue#89)
data_command = None
if "/agent/exec" in url:
data_command = data.get("command")
if data_command is not None:
del data["command"]
# for 'upload' call some workaround
tmp_filename = ""
if url.endswith("upload"):
# copy file to temporary location on proxmox host
tmp_filename, _ = self._exec(
[
"python3",
"-c",
"import tempfile; import sys; tf = tempfile.NamedTemporaryFile(); sys.stdout.write(tf.name)",
]
)
tmp_filename = str(tmp_filename, "utf-8")
self.upload_file_obj(data["filename"], tmp_filename)
data["filename"] = data["filename"].name
data["tmpfilename"] = tmp_filename
command = [f"{self.service}sh", cmd, url]
# convert the options dict into a 2-tuple with the key formatted as a flag
option_pairs = []
for k, v in chain(data.items(), params.items()):
try:
option_pairs.append((f"-{k}", str(v, "utf-8")))
except TypeError:
option_pairs.append((f"-{k}", str(v)))
# add back in all the command arguments as their own pairs
if data_command is not None:
if isinstance(data_command, list):
command_arr = data_command
elif "Windows" not in platform.platform():
command_arr = shell_split(data_command)
for arg in command_arr:
option_pairs.append(("-command", arg))
# expand the list of 2-tuples into a flat list
options = [val for pair in option_pairs for val in pair]
additional_options = SERVICES[self.service.upper()].get("cli_additional_options", [])
full_cmd = command + options + additional_options
if self.sudo:
full_cmd = ["sudo"] + full_cmd
stdout, stderr = self._exec(full_cmd)
def is_http_status_string(s):
return re.match(r"\d\d\d [a-zA-Z]", str(s))
if stderr:
# assume if we got a task ID that the request was successful
task_id_pattern = re.compile(
r"UPID:[\w-]+:[0-9a-fA-F]{8}:[0-9a-fA-F]{8}:[0-9a-fA-F]{8}:\w+:[\w\._-]+:[\w\.@_-]+:\w*"
)
if task_id_pattern.search(str(stdout)) or task_id_pattern.search(str(stderr)):
status_code = 200
else:
# sometimes contains extra text like 'trying to acquire lock...OK'
status_code = next(
(
int(line.split()[0])
for line in stderr.splitlines()
if is_http_status_string(line)
),
500,
)
else:
status_code = 200
if stdout:
return Response(stdout, status_code)
return Response(stderr, status_code)
def upload_file_obj(self, file_obj, remote_path):
raise NotImplementedError()
class JsonSimpleSerializer:
def loads(self, response):
try:
return json.loads(response.content)
except (UnicodeDecodeError, ValueError):
return {"errors": response.content}
def loads_errors(self, response):
try:
return json.loads(response.text).get("errors")
except (UnicodeDecodeError, ValueError):
return {"errors": response.content}
class CommandBaseBackend:
def __init__(self):
self.session = None
self.target = None
def get_session(self):
return self.session
def get_base_url(self):
return ""
def get_serializer(self):
return JsonSimpleSerializer()

View File

@@ -0,0 +1,386 @@
__author__ = "Oleg Butovich"
__copyright__ = "(c) Oleg Butovich 2013-2017"
__license__ = "MIT"
import io
import json
import logging
import os
import platform
import sys
import time
from shlex import split as shell_split
from proxmoxer.core import SERVICES, AuthenticationError, config_failure
logger = logging.getLogger(__name__)
logger.setLevel(level=logging.WARNING)
STREAMING_SIZE_THRESHOLD = 10 * 1024 * 1024 # 10 MiB
SSL_OVERFLOW_THRESHOLD = 2147483135 # 2^31 - 1 - 512
try:
import requests
from requests.auth import AuthBase
from requests.cookies import cookiejar_from_dict
# Disable warnings about using untrusted TLS
requests.packages.urllib3.disable_warnings()
except ImportError:
logger.error("Chosen backend requires 'requests' module\n")
sys.exit(1)
class ProxmoxHTTPAuthBase(AuthBase):
def __call__(self, req):
return req
def get_cookies(self):
return cookiejar_from_dict({})
def get_tokens(self):
return None, None
def __init__(self, timeout=5, service="PVE", verify_ssl=False, cert=None):
self.timeout = timeout
self.service = service
self.verify_ssl = verify_ssl
self.cert = cert
class ProxmoxHTTPAuth(ProxmoxHTTPAuthBase):
# number of seconds between renewing access tickets (must be less than 7200 to function correctly)
# if calls are made less frequently than 2 hrs, using the API token auth is recommended
renew_age = 3600
def __init__(self, username, password, otp=None, base_url="", **kwargs):
super().__init__(**kwargs)
self.base_url = base_url
self.username = username
self.pve_auth_ticket = ""
self._get_new_tokens(password=password, otp=otp)
def _get_new_tokens(self, password=None, otp=None):
if password is None:
# refresh from existing (unexpired) ticket
password = self.pve_auth_ticket
data = {"username": self.username, "password": password}
if otp:
data["otp"] = otp
response_data = requests.post(
self.base_url + "/access/ticket",
verify=self.verify_ssl,
timeout=self.timeout,
data=data,
cert=self.cert,
).json()["data"]
if response_data is None:
raise AuthenticationError(
"Couldn't authenticate user: {0} to {1}".format(
self.username, self.base_url + "/access/ticket"
)
)
if response_data.get("NeedTFA") is not None:
raise AuthenticationError(
"Couldn't authenticate user: missing Two Factor Authentication (TFA)"
)
self.birth_time = time.monotonic()
self.pve_auth_ticket = response_data["ticket"]
self.csrf_prevention_token = response_data["CSRFPreventionToken"]
def get_cookies(self):
return cookiejar_from_dict({self.service + "AuthCookie": self.pve_auth_ticket})
def get_tokens(self):
return self.pve_auth_ticket, self.csrf_prevention_token
def __call__(self, req):
# refresh ticket if older than `renew_age`
time_diff = time.monotonic() - self.birth_time
if time_diff >= self.renew_age:
logger.debug(f"refreshing ticket (age {time_diff})")
self._get_new_tokens()
# only attach CSRF token if needed (reduce interception risk)
if req.method != "GET":
req.headers["CSRFPreventionToken"] = self.csrf_prevention_token
return req
class ProxmoxHTTPApiTokenAuth(ProxmoxHTTPAuthBase):
def __init__(self, username, token_name, token_value, **kwargs):
super().__init__(**kwargs)
self.username = username
self.token_name = token_name
self.token_value = token_value
def __call__(self, req):
req.headers["Authorization"] = "{0}APIToken={1}!{2}{3}{4}".format(
self.service,
self.username,
self.token_name,
SERVICES[self.service]["token_separator"],
self.token_value,
)
req.cert = self.cert
return req
class JsonSerializer:
content_types = [
"application/json",
"application/x-javascript",
"text/javascript",
"text/x-javascript",
"text/x-json",
]
def get_accept_types(self):
return ", ".join(self.content_types)
def loads(self, response):
try:
return json.loads(response.content.decode("utf-8"))["data"]
except (UnicodeDecodeError, ValueError):
return {"errors": response.content}
def loads_errors(self, response):
try:
return json.loads(response.text).get("errors")
except (UnicodeDecodeError, ValueError):
return {"errors": response.content}
# pylint:disable=arguments-renamed
class ProxmoxHttpSession(requests.Session):
def request(
self,
method,
url,
params=None,
data=None,
headers=None,
cookies=None,
files=None,
auth=None,
timeout=None,
allow_redirects=True,
proxies=None,
hooks=None,
stream=None,
verify=None,
cert=None,
serializer=None,
):
a = auth or self.auth
c = cookies or self.cookies
# set verify flag from auth if request does not have this parameter explicitly
if verify is None:
verify = a.verify_ssl
if timeout is None:
timeout = a.timeout
# pull cookies from auth if not present
if (not c) and a:
cookies = a.get_cookies()
# filter out streams
files = files or {}
data = data or {}
total_file_size = 0
for k, v in data.copy().items():
# split qemu exec commands for proper parsing by PVE (issue#89)
if k == "command" and url.endswith("agent/exec"):
if isinstance(v, list):
data[k] = v
elif "Windows" not in platform.platform():
data[k] = shell_split(v)
if isinstance(v, io.IOBase):
total_file_size += get_file_size(v)
# add in filename from file pointer (patch for https://github.com/requests/toolbelt/pull/316)
# add Content-Type since Proxmox requires it (https://bugzilla.proxmox.com/show_bug.cgi?id=4344)
files[k] = (requests.utils.guess_filename(v), v, "application/octet-stream")
del data[k]
# if there are any large files, send all data and files using streaming multipart encoding
if total_file_size > STREAMING_SIZE_THRESHOLD:
try:
# pylint:disable=import-outside-toplevel
from requests_toolbelt import MultipartEncoder
encoder = MultipartEncoder(fields={**data, **files})
data = encoder
files = None
headers = {"Content-Type": encoder.content_type}
except ImportError:
# if the files will cause issues with the SSL 2GiB limit (https://bugs.python.org/issue42853#msg384566)
if total_file_size > SSL_OVERFLOW_THRESHOLD:
logger.warning(
"Install 'requests_toolbelt' to add support for files larger than 2GiB"
)
raise OverflowError("Unable to upload a payload larger than 2 GiB")
else:
logger.info(
"Installing 'requests_toolbelt' will decrease memory used during upload"
)
return super().request(
method,
url,
params,
data,
headers,
cookies,
files,
auth,
timeout,
allow_redirects,
proxies,
hooks,
stream,
verify,
cert,
)
class Backend:
def __init__(
self,
host,
user=None,
password=None,
otp=None,
port=None,
verify_ssl=True,
mode="json",
timeout=5,
token_name=None,
token_value=None,
path_prefix=None,
service="PVE",
cert=None,
):
self.cert = cert
host_port = ""
if len(host.split(":")) > 2: # IPv6
if host.startswith("["):
if "]:" in host:
host, host_port = host.rsplit(":", 1)
else:
host = f"[{host}]"
elif ":" in host:
host, host_port = host.split(":")
port = host_port if host_port.isdigit() else port
# if a port is not specified, use the default port for this service
if not port:
port = SERVICES[service]["default_port"]
self.mode = mode
if path_prefix is not None:
self.base_url = f"https://{host}:{port}/{path_prefix}/api2/{mode}"
else:
self.base_url = f"https://{host}:{port}/api2/{mode}"
if token_name is not None:
if "token" not in SERVICES[service]["supported_https_auths"]:
config_failure("{} does not support API Token authentication", service)
self.auth = ProxmoxHTTPApiTokenAuth(
user,
token_name,
token_value,
verify_ssl=verify_ssl,
timeout=timeout,
service=service,
cert=self.cert,
)
elif password is not None:
if "password" not in SERVICES[service]["supported_https_auths"]:
config_failure("{} does not support password authentication", service)
self.auth = ProxmoxHTTPAuth(
user,
password,
otp,
base_url=self.base_url,
verify_ssl=verify_ssl,
timeout=timeout,
service=service,
cert=self.cert,
)
else:
config_failure("No valid authentication credentials were supplied")
def get_session(self):
session = ProxmoxHttpSession()
session.cert = self.cert
session.auth = self.auth
# cookies are taken from the auth
session.headers["Connection"] = "keep-alive"
session.headers["accept"] = self.get_serializer().get_accept_types()
return session
def get_base_url(self):
return self.base_url
def get_serializer(self):
assert self.mode == "json"
return JsonSerializer()
def get_tokens(self):
"""Return the in-use auth and csrf tokens if using user/password auth."""
return self.auth.get_tokens()
def get_file_size(file_obj):
"""Returns the number of bytes in the given file object in total
file cursor remains at the same location as when passed in
:param fileObj: file object of which the get size
:type fileObj: file object
:return: total bytes in file object
:rtype: int
"""
# store existing file cursor location
starting_cursor = file_obj.tell()
# seek to end of file
file_obj.seek(0, os.SEEK_END)
size = file_obj.tell()
# reset cursor
file_obj.seek(starting_cursor)
return size
def get_file_size_partial(file_obj):
"""Returns the number of bytes in the given file object from the current cursor to the end
:param fileObj: file object of which the get size
:type fileObj: file object
:return: remaining bytes in file object
:rtype: int
"""
# store existing file cursor location
starting_cursor = file_obj.tell()
file_obj.seek(0, os.SEEK_END)
# get number of byte between where the cursor was set and the end
size = file_obj.tell() - starting_cursor
# reset cursor
file_obj.seek(starting_cursor)
return size

View File

@@ -0,0 +1,25 @@
__author__ = "Markus Reiter"
__copyright__ = "(c) Markus Reiter 2022"
__license__ = "MIT"
import shutil
from subprocess import PIPE, Popen
from proxmoxer.backends.command_base import CommandBaseBackend, CommandBaseSession
class LocalSession(CommandBaseSession):
def _exec(self, cmd):
proc = Popen(cmd, stdout=PIPE, stderr=PIPE)
stdout, stderr = proc.communicate(timeout=self.timeout)
return stdout.decode(), stderr.decode()
def upload_file_obj(self, file_obj, remote_path):
with open(remote_path, "wb") as dest_fp:
shutil.copyfileobj(file_obj, dest_fp)
class Backend(CommandBaseBackend):
def __init__(self, *args, **kwargs):
self.session = LocalSession(*args, **kwargs)
self.target = "localhost"

View File

@@ -0,0 +1,67 @@
__author__ = "Oleg Butovich"
__copyright__ = "(c) Oleg Butovich 2013-2017"
__license__ = "MIT"
import logging
from proxmoxer.backends.command_base import (
CommandBaseBackend,
CommandBaseSession,
shell_join,
)
logger = logging.getLogger(__name__)
logger.setLevel(level=logging.WARNING)
try:
import openssh_wrapper
except ImportError:
import sys
logger.error("Chosen backend requires 'openssh_wrapper' module\n")
sys.exit(1)
class OpenSSHSession(CommandBaseSession):
def __init__(
self,
host,
user,
config_file=None,
port=22,
identity_file=None,
forward_ssh_agent=False,
**kwargs,
):
super().__init__(**kwargs)
self.host = host
self.user = user
self.config_file = config_file
self.port = port
self.forward_ssh_agent = forward_ssh_agent
self.identity_file = identity_file
self.ssh_client = self._connect()
def _connect(self):
return openssh_wrapper.SSHConnection(
self.host,
login=self.user,
port=str(self.port), # openssh_wrapper complains if this is an int
configfile=self.config_file,
identity_file=self.identity_file,
timeout=self.timeout,
)
def _exec(self, cmd):
ret = self.ssh_client.run(shell_join(cmd), forward_ssh_agent=self.forward_ssh_agent)
return ret.stdout, ret.stderr
def upload_file_obj(self, file_obj, remote_path):
self.ssh_client.scp((file_obj,), target=remote_path)
class Backend(CommandBaseBackend):
def __init__(self, *args, **kwargs):
self.session = OpenSSHSession(*args, **kwargs)
self.target = self.session.host

View File

@@ -0,0 +1,77 @@
__author__ = "Oleg Butovich"
__copyright__ = "(c) Oleg Butovich 2013-2017"
__license__ = "MIT"
# spell-checker:ignore putfo
import logging
import os
from proxmoxer.backends.command_base import (
CommandBaseBackend,
CommandBaseSession,
shell_join,
)
logger = logging.getLogger(__name__)
logger.setLevel(level=logging.WARNING)
try:
import paramiko
except ImportError:
import sys
logger.error("Chosen backend requires 'paramiko' module\n")
sys.exit(1)
class SshParamikoSession(CommandBaseSession):
def __init__(self, host, user, password=None, private_key_file=None, port=22, **kwargs):
super().__init__(**kwargs)
self.host = host
self.user = user
self.password = password
self.private_key_file = private_key_file
self.port = port
self.ssh_client = self._connect()
def _connect(self):
ssh_client = paramiko.SSHClient()
ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
if self.private_key_file:
key_filename = os.path.expanduser(self.private_key_file)
else:
key_filename = None
ssh_client.connect(
self.host,
username=self.user,
allow_agent=(not self.password),
look_for_keys=True,
key_filename=key_filename,
password=self.password,
timeout=self.timeout,
port=self.port,
)
return ssh_client
def _exec(self, cmd):
session = self.ssh_client.get_transport().open_session()
session.exec_command(shell_join(cmd))
stdout = session.makefile("rb", -1).read().decode()
stderr = session.makefile_stderr("rb", -1).read().decode()
return stdout, stderr
def upload_file_obj(self, file_obj, remote_path):
sftp = self.ssh_client.open_sftp()
sftp.putfo(file_obj, remote_path)
sftp.close()
class Backend(CommandBaseBackend):
def __init__(self, *args, **kwargs):
self.session = SshParamikoSession(*args, **kwargs)
self.target = self.session.host

View File

@@ -0,0 +1,231 @@
__author__ = "Oleg Butovich"
__copyright__ = "(c) Oleg Butovich 2013-2017"
__license__ = "MIT"
# spell-checker:ignore urlunsplit
import importlib
import logging
import posixpath
from http import client as httplib
from urllib import parse as urlparse
logger = logging.getLogger(__name__)
logger.setLevel(level=logging.WARNING)
# https://metacpan.org/pod/AnyEvent::HTTP
ANYEVENT_HTTP_STATUS_CODES = {
595: "Errors during connection establishment, proxy handshake",
596: "Errors during TLS negotiation, request sending and header processing",
597: "Errors during body receiving or processing",
598: "User aborted request via on_header or on_body",
599: "Other, usually nonretryable, errors (garbled URL etc.)",
}
SERVICES = {
"PVE": {
"supported_backends": ["local", "https", "openssh", "ssh_paramiko"],
"supported_https_auths": ["password", "token"],
"default_port": 8006,
"token_separator": "=",
"cli_additional_options": ["--output-format", "json"],
},
"PMG": {
"supported_backends": ["local", "https", "openssh", "ssh_paramiko"],
"supported_https_auths": ["password"],
"default_port": 8006,
},
"PBS": {
"supported_backends": ["https"],
"supported_https_auths": ["password", "token"],
"default_port": 8007,
"token_separator": ":",
},
}
def config_failure(message, *args):
raise NotImplementedError(message.format(*args))
class ResourceException(Exception):
"""
An Exception thrown when an Proxmox API call failed
"""
def __init__(self, status_code, status_message, content, errors=None):
"""
Create a new ResourceException
:param status_code: The HTTP status code (faked by non-HTTP backends)
:type status_code: int
:param status_message: HTTP Status code (faked by non-HTTP backends)
:type status_message: str
:param content: Extended information on what went wrong
:type content: str
:param errors: Any specific errors that were encountered (converted to string), defaults to None
:type errors: Optional[object], optional
"""
self.status_code = status_code
self.status_message = status_message
self.content = content
self.errors = errors
if errors is not None:
content += f" - {errors}"
message = f"{status_code} {status_message}: {content}".strip()
super().__init__(message)
class AuthenticationError(Exception):
pass
class ProxmoxResource:
def __init__(self, **kwargs):
self._store = kwargs
def __repr__(self):
return f"ProxmoxResource ({self._store.get('base_url')})"
def __getattr__(self, item):
if item.startswith("_"):
raise AttributeError(item)
kwargs = self._store.copy()
kwargs["base_url"] = self.url_join(self._store["base_url"], item)
return ProxmoxResource(**kwargs)
def url_join(self, base, *args):
scheme, netloc, path, query, fragment = urlparse.urlsplit(base)
path = path if len(path) else "/"
path = posixpath.join(path, *[str(x) for x in args])
return urlparse.urlunsplit([scheme, netloc, path, query, fragment])
def __call__(self, resource_id=None):
if resource_id in (None, ""):
return self
if isinstance(resource_id, (bytes, str)):
resource_id = resource_id.split("/")
elif not isinstance(resource_id, (tuple, list)):
resource_id = [str(resource_id)]
kwargs = self._store.copy()
if resource_id is not None:
kwargs["base_url"] = self.url_join(self._store["base_url"], *resource_id)
return ProxmoxResource(**kwargs)
def _request(self, method, data=None, params=None):
url = self._store["base_url"]
if data:
logger.info(f"{method} {url} {data}")
else:
logger.info(f"{method} {url}")
# passing None values to pvesh command breaks it, let's remove them just as requests library does
# helpful when dealing with function default values higher in the chain, no need to clean up in multiple places
if params:
# remove keys that are set to None
params_none_keys = [k for (k, v) in params.items() if v is None]
for key in params_none_keys:
del params[key]
if data:
# remove keys that are set to None
data_none_keys = [k for (k, v) in data.items() if v is None]
for key in data_none_keys:
del data[key]
resp = self._store["session"].request(method, url, data=data, params=params)
logger.debug(f"Status code: {resp.status_code}, output: {resp.content!r}")
if resp.status_code >= 400:
if hasattr(resp, "reason"):
raise ResourceException(
resp.status_code,
httplib.responses.get(
resp.status_code, ANYEVENT_HTTP_STATUS_CODES.get(resp.status_code)
),
resp.reason,
errors=(self._store["serializer"].loads_errors(resp)),
)
else:
raise ResourceException(
resp.status_code,
httplib.responses.get(
resp.status_code, ANYEVENT_HTTP_STATUS_CODES.get(resp.status_code)
),
resp.text,
)
elif 200 <= resp.status_code <= 299:
return self._store["serializer"].loads(resp)
def get(self, *args, **params):
return self(args)._request("GET", params=params)
def post(self, *args, **data):
return self(args)._request("POST", data=data)
def put(self, *args, **data):
return self(args)._request("PUT", data=data)
def delete(self, *args, **params):
return self(args)._request("DELETE", params=params)
def create(self, *args, **data):
return self.post(*args, **data)
def set(self, *args, **data):
return self.put(*args, **data)
class ProxmoxAPI(ProxmoxResource):
def __init__(self, host=None, backend="https", service="PVE", **kwargs):
super().__init__(**kwargs)
service = service.upper()
backend = backend.lower()
# throw error for unsupported services
if service not in SERVICES.keys():
config_failure("{} service is not supported", service)
# throw error for unsupported backend for service
if backend not in SERVICES[service]["supported_backends"]:
config_failure("{} service does not support {} backend", service, backend)
if host is not None:
if backend == "local":
config_failure("{} backend does not support host keyword", backend)
else:
kwargs["host"] = host
kwargs["service"] = service
# load backend module
self._backend = importlib.import_module(f".backends.{backend}", "proxmoxer").Backend(
**kwargs
)
self._backend_name = backend
self._store = {
"base_url": self._backend.get_base_url(),
"session": self._backend.get_session(),
"serializer": self._backend.get_serializer(),
}
def __repr__(self):
dest = getattr(self._backend, "target", self._store.get("base_url"))
return f"ProxmoxAPI ({self._backend_name} backend for {dest})"
def get_tokens(self):
"""Return the auth and csrf tokens.
Returns (None, None) if the backend is not https using password authentication.
"""
if self._backend_name != "https":
return None, None
return self._backend.get_tokens()

View File

@@ -0,0 +1,7 @@
__author__ = "John Hollowell"
__copyright__ = "(c) John Hollowell 2022"
__license__ = "MIT"
from . import * # noqa: F401 F403
from .files import * # noqa: F401 F403
from .tasks import * # noqa: F401 F403

View File

@@ -0,0 +1,279 @@
__author__ = "John Hollowell"
__copyright__ = "(c) John Hollowell 2023"
__license__ = "MIT"
import hashlib
import logging
import os
import sys
from enum import Enum
from pathlib import Path
from typing import Optional
from urllib.parse import urljoin, urlparse
from proxmoxer import ProxmoxResource, ResourceException
from proxmoxer.tools.tasks import Tasks
CHECKSUM_CHUNK_SIZE = 16384 # read 16k at a time while calculating the checksum for upload
logger = logging.getLogger(__name__)
logger.setLevel(level=logging.WARNING)
try:
import requests
except ImportError:
logger.error("Files tools requires 'requests' module\n")
sys.exit(1)
class ChecksumInfo:
def __init__(self, name: str, hex_size: int):
self.name = name
self.hex_size = hex_size
def __str__(self):
return self.name
def __repr__(self):
return f"{self.name} ({self.hex_size} digits)"
class SupportedChecksums(Enum):
"""
An Enum of the checksum types supported by Proxmox
"""
# ordered by preference for longer/stronger checksums first
SHA512 = ChecksumInfo("sha512", 128)
SHA256 = ChecksumInfo("sha256", 64)
SHA224 = ChecksumInfo("sha224", 56)
SHA384 = ChecksumInfo("sha384", 96)
MD5 = ChecksumInfo("md5", 32)
SHA1 = ChecksumInfo("sha1", 40)
class Files:
"""
Ease-of-use tools for interacting with the uploading/downloading files
in Proxmox VE
"""
def __init__(self, prox: ProxmoxResource, node: str, storage: str):
self._prox = prox
self._node = node
self._storage = storage
def __repr__(self):
return f"Files ({self._node}/{self._storage} at {self._prox})"
def upload_local_file_to_storage(
self,
filename: str,
do_checksum_check: bool = True,
blocking_status: bool = True,
):
file_path = Path(filename)
if not file_path.is_file():
logger.error(f'"{file_path.absolute()}" does not exist or is not a file')
return None
# init to None in case errors cause no values to be set
upid: str = ""
checksum: str = None
checksum_type: str = None
try:
with open(file_path.absolute(), "rb") as f_obj:
if do_checksum_check:
# iterate through SupportedChecksums and find the first one in hashlib.algorithms_available
for checksum_info in (v.value for v in SupportedChecksums):
if checksum_info.name in hashlib.algorithms_available:
checksum_type = checksum_info.name
break
if checksum_type is None:
logger.warning(
"There are no Proxmox supported checksums which are supported by hashlib. Skipping checksum validation"
)
else:
h = hashlib.new(checksum_type)
# Iterate through the file in CHECKSUM_CHUNK_SIZE size
for byte_block in iter(lambda: f_obj.read(CHECKSUM_CHUNK_SIZE), b""):
h.update(byte_block)
checksum = h.hexdigest()
logger.debug(
f"The {checksum_type} checksum of {file_path.absolute()} is {checksum}"
)
# reset to the start of the file so the upload can use the same file handle
f_obj.seek(0)
params = {
"content": "iso" if file_path.absolute().name.endswith("iso") else "vztmpl",
"checksum-algorithm": checksum_type,
"checksum": checksum,
"filename": f_obj,
}
upid = self._prox.nodes(self._node).storage(self._storage).upload.post(**params)
except OSError as e:
logger.error(e)
return None
if blocking_status:
return Tasks.blocking_status(self._prox, upid)
else:
return self._prox.nodes(self._node).tasks(upid).status.get()
def download_file_to_storage(
self,
url: str,
checksum: Optional[str] = None,
checksum_type: Optional[str] = None,
blocking_status: bool = True,
):
file_info = self.get_file_info(url)
filename = None
if file_info is not None:
filename = file_info.get("filename")
if checksum is None and checksum_type is None:
checksum, checksum_info = self.get_checksums_from_file_url(url, filename)
checksum_type = checksum_info.name if checksum_info else None
elif checksum is None or checksum_type is None:
logger.error(
"Must pass both checksum and checksum_type or leave both None for auto-discovery"
)
return None
if checksum is None or checksum_type is None:
logger.warning("Unable to discover checksum. Will not do checksum validation")
params = {
"checksum-algorithm": checksum_type,
"url": url,
"checksum": checksum,
"content": "iso" if url.endswith("iso") else "vztmpl",
"filename": filename,
}
upid = self._prox.nodes(self._node).storage(self._storage)("download-url").post(**params)
if blocking_status:
return Tasks.blocking_status(self._prox, upid)
else:
return self._prox.nodes(self._node).tasks(upid).status.get()
def get_file_info(self, url: str):
try:
return self._prox.nodes(self._node)("query-url-metadata").get(url=url)
except ResourceException as e:
logger.warning(f"Unable to get information for {url}: {e}")
return None
@staticmethod
def get_checksums_from_file_url(
url: str, filename: str = None, preferred_type=SupportedChecksums.SHA512.value
):
getters_by_quality = [
Files._get_checksum_from_sibling_file,
Files._get_checksum_from_extension,
Files._get_checksum_from_extension_upper,
]
# hacky way to try the preferred_type first while still trying all types with no duplicates
all_types_with_priority = list(
dict.fromkeys([preferred_type, *(map(lambda t: t.value, SupportedChecksums))])
)
for c_info in all_types_with_priority:
for getter in getters_by_quality:
checksum: str = getter(url, c_info, filename)
if checksum is not None:
logger.info(f"{getter} found {str(c_info)} checksum {checksum}")
return (checksum, c_info)
else:
logger.debug(f"{getter} found no {str(c_info)} checksum")
return (None, None)
@staticmethod
def _get_checksum_from_sibling_file(
url: str, checksum_info: ChecksumInfo, filename: Optional[str] = None
) -> Optional[str]:
"""
Uses a checksum file in the same path as the target file to discover the checksum
:param url: the URL string of the target file
:type url: str
:param checksum_info: the type of checksum to search for
:type checksum_info: ChecksumInfo
:param filename: the filename to use for finding the checksum. If None, it will be discovered from the url
:type filename: str | None
:return: a string of the checksum if found, else None
:rtype: str | None
"""
sumfile_url = urljoin(url, (checksum_info.name + "SUMS").upper())
filename = filename or os.path.basename(urlparse(url).path)
return Files._get_checksum_helper(sumfile_url, filename, checksum_info)
@staticmethod
def _get_checksum_from_extension(
url: str, checksum_info: ChecksumInfo, filename: Optional[str] = None
) -> Optional[str]:
"""
Uses a checksum file with a checksum extension added to the target file to discover the checksum
:param url: the URL string of the target file
:type url: str
:param checksum_info: the type of checksum to search for
:type checksum_info: ChecksumInfo
:param filename: the filename to use for finding the checksum. If None, it will be discovered from the url
:type filename: str | None
:return: a string of the checksum if found, else None
:rtype: str | None
"""
sumfile_url = url + "." + checksum_info.name
filename = filename or os.path.basename(urlparse(url).path)
return Files._get_checksum_helper(sumfile_url, filename, checksum_info)
@staticmethod
def _get_checksum_from_extension_upper(
url: str, checksum_info: ChecksumInfo, filename: Optional[str] = None
) -> Optional[str]:
"""
Uses a checksum file with a checksum extension added to the target file to discover the checksum
:param url: the URL string of the target file
:type url: str
:param checksum_info: the type of checksum to search for
:type checksum_info: ChecksumInfo
:param filename: the filename to use for finding the checksum. If None, it will be discovered from the url
:type filename: str | None
:return: a string of the checksum if found, else None
:rtype: str | None
"""
sumfile_url = url + "." + checksum_info.name.upper()
filename = filename or os.path.basename(urlparse(url).path)
return Files._get_checksum_helper(sumfile_url, filename, checksum_info)
@staticmethod
def _get_checksum_helper(sumfile_url: str, filename: str, checksum_info: ChecksumInfo):
logger.debug(f"getting {sumfile_url}")
try:
resp = requests.get(sumfile_url, timeout=10)
except (requests.exceptions.ConnectionError, requests.exceptions.ReadTimeout):
logger.info(f"Failed when trying to get {sumfile_url}")
return None
if resp.status_code == 200:
for line in resp.iter_lines():
line_str = line.decode("utf-8")
logger.debug(f"checking for '{filename}' in '{line_str}'")
if filename in str(line_str):
return line_str[0 : checksum_info.hex_size]
return None

View File

@@ -0,0 +1,84 @@
__author__ = "John Hollowell"
__copyright__ = "(c) John Hollowell 2022"
__license__ = "MIT"
import time
class Tasks:
"""
Ease-of-use tools for interacting with the tasks endpoints
in the Proxmox API.
"""
@staticmethod
def blocking_status(prox, task_id, timeout=300, polling_interval=1):
"""
Turns getting the status of a Proxmox task into a blocking call
by polling the API until the task completes
:param prox: The Proxmox object used to query for status
:type prox: ProxmoxAPI
:param task_id: the UPID of the task
:type task_id: str
:param timeout: If the task does not complete in this time (in seconds) return None, defaults to 300
:type timeout: int, optional
:param polling_interval: the time to wait between checking for status updates, defaults to 1
:type polling_interval: float, optional
:return: the status of the task
:rtype: dict
"""
node: str = Tasks.decode_upid(task_id)["node"]
start_time: float = time.monotonic()
data = {"status": ""}
while data["status"] != "stopped":
data = prox.nodes(node).tasks(task_id).status.get()
if start_time + timeout <= time.monotonic():
data = None # type: ignore
break
time.sleep(polling_interval)
return data
@staticmethod
def decode_upid(upid):
"""
Decodes the sections of a UPID into separate fields
:param upid: a UPID string
:type upid: str
:return: The decoded information from the UPID
:rtype: dict
"""
segments = upid.split(":")
if segments[0] != "UPID" or len(segments) != 9:
raise AssertionError("UPID is not in the correct format")
data = {
"upid": upid,
"node": segments[1],
"pid": int(segments[2], 16),
"pstart": int(segments[3], 16),
"starttime": int(segments[4], 16),
"type": segments[5],
"id": segments[6],
"user": segments[7].split("!")[0],
"comment": segments[8],
}
return data
@staticmethod
def decode_log(log_list):
"""
Takes in a task's log data and returns a multiline string representation
:param log_list: The log formatting returned by the Proxmox API
:type log_list: list of dicts
:return: a multiline string of the log
:rtype: str
"""
str_list = [""] * len(log_list)
for line in log_list:
str_list[line["n"] - 1] = line.get("t", "")
return "\n".join(str_list)