Files
defiQUG cb47cce074 Complete markdown files cleanup and organization
- Organized 252 files across project
- Root directory: 187 → 2 files (98.9% reduction)
- Moved configuration guides to docs/04-configuration/
- Moved troubleshooting guides to docs/09-troubleshooting/
- Moved quick start guides to docs/01-getting-started/
- Moved reports to reports/ directory
- Archived temporary files
- Generated comprehensive reports and documentation
- Created maintenance scripts and guides

All files organized according to established standards.
2026-01-06 01:46:25 -08:00

280 lines
10 KiB
Python

__author__ = "John Hollowell"
__copyright__ = "(c) John Hollowell 2023"
__license__ = "MIT"
import hashlib
import logging
import os
import sys
from enum import Enum
from pathlib import Path
from typing import Optional
from urllib.parse import urljoin, urlparse
from proxmoxer import ProxmoxResource, ResourceException
from proxmoxer.tools.tasks import Tasks
CHECKSUM_CHUNK_SIZE = 16384 # read 16k at a time while calculating the checksum for upload
logger = logging.getLogger(__name__)
logger.setLevel(level=logging.WARNING)
try:
import requests
except ImportError:
logger.error("Files tools requires 'requests' module\n")
sys.exit(1)
class ChecksumInfo:
def __init__(self, name: str, hex_size: int):
self.name = name
self.hex_size = hex_size
def __str__(self):
return self.name
def __repr__(self):
return f"{self.name} ({self.hex_size} digits)"
class SupportedChecksums(Enum):
"""
An Enum of the checksum types supported by Proxmox
"""
# ordered by preference for longer/stronger checksums first
SHA512 = ChecksumInfo("sha512", 128)
SHA256 = ChecksumInfo("sha256", 64)
SHA224 = ChecksumInfo("sha224", 56)
SHA384 = ChecksumInfo("sha384", 96)
MD5 = ChecksumInfo("md5", 32)
SHA1 = ChecksumInfo("sha1", 40)
class Files:
"""
Ease-of-use tools for interacting with the uploading/downloading files
in Proxmox VE
"""
def __init__(self, prox: ProxmoxResource, node: str, storage: str):
self._prox = prox
self._node = node
self._storage = storage
def __repr__(self):
return f"Files ({self._node}/{self._storage} at {self._prox})"
def upload_local_file_to_storage(
self,
filename: str,
do_checksum_check: bool = True,
blocking_status: bool = True,
):
file_path = Path(filename)
if not file_path.is_file():
logger.error(f'"{file_path.absolute()}" does not exist or is not a file')
return None
# init to None in case errors cause no values to be set
upid: str = ""
checksum: str = None
checksum_type: str = None
try:
with open(file_path.absolute(), "rb") as f_obj:
if do_checksum_check:
# iterate through SupportedChecksums and find the first one in hashlib.algorithms_available
for checksum_info in (v.value for v in SupportedChecksums):
if checksum_info.name in hashlib.algorithms_available:
checksum_type = checksum_info.name
break
if checksum_type is None:
logger.warning(
"There are no Proxmox supported checksums which are supported by hashlib. Skipping checksum validation"
)
else:
h = hashlib.new(checksum_type)
# Iterate through the file in CHECKSUM_CHUNK_SIZE size
for byte_block in iter(lambda: f_obj.read(CHECKSUM_CHUNK_SIZE), b""):
h.update(byte_block)
checksum = h.hexdigest()
logger.debug(
f"The {checksum_type} checksum of {file_path.absolute()} is {checksum}"
)
# reset to the start of the file so the upload can use the same file handle
f_obj.seek(0)
params = {
"content": "iso" if file_path.absolute().name.endswith("iso") else "vztmpl",
"checksum-algorithm": checksum_type,
"checksum": checksum,
"filename": f_obj,
}
upid = self._prox.nodes(self._node).storage(self._storage).upload.post(**params)
except OSError as e:
logger.error(e)
return None
if blocking_status:
return Tasks.blocking_status(self._prox, upid)
else:
return self._prox.nodes(self._node).tasks(upid).status.get()
def download_file_to_storage(
self,
url: str,
checksum: Optional[str] = None,
checksum_type: Optional[str] = None,
blocking_status: bool = True,
):
file_info = self.get_file_info(url)
filename = None
if file_info is not None:
filename = file_info.get("filename")
if checksum is None and checksum_type is None:
checksum, checksum_info = self.get_checksums_from_file_url(url, filename)
checksum_type = checksum_info.name if checksum_info else None
elif checksum is None or checksum_type is None:
logger.error(
"Must pass both checksum and checksum_type or leave both None for auto-discovery"
)
return None
if checksum is None or checksum_type is None:
logger.warning("Unable to discover checksum. Will not do checksum validation")
params = {
"checksum-algorithm": checksum_type,
"url": url,
"checksum": checksum,
"content": "iso" if url.endswith("iso") else "vztmpl",
"filename": filename,
}
upid = self._prox.nodes(self._node).storage(self._storage)("download-url").post(**params)
if blocking_status:
return Tasks.blocking_status(self._prox, upid)
else:
return self._prox.nodes(self._node).tasks(upid).status.get()
def get_file_info(self, url: str):
try:
return self._prox.nodes(self._node)("query-url-metadata").get(url=url)
except ResourceException as e:
logger.warning(f"Unable to get information for {url}: {e}")
return None
@staticmethod
def get_checksums_from_file_url(
url: str, filename: str = None, preferred_type=SupportedChecksums.SHA512.value
):
getters_by_quality = [
Files._get_checksum_from_sibling_file,
Files._get_checksum_from_extension,
Files._get_checksum_from_extension_upper,
]
# hacky way to try the preferred_type first while still trying all types with no duplicates
all_types_with_priority = list(
dict.fromkeys([preferred_type, *(map(lambda t: t.value, SupportedChecksums))])
)
for c_info in all_types_with_priority:
for getter in getters_by_quality:
checksum: str = getter(url, c_info, filename)
if checksum is not None:
logger.info(f"{getter} found {str(c_info)} checksum {checksum}")
return (checksum, c_info)
else:
logger.debug(f"{getter} found no {str(c_info)} checksum")
return (None, None)
@staticmethod
def _get_checksum_from_sibling_file(
url: str, checksum_info: ChecksumInfo, filename: Optional[str] = None
) -> Optional[str]:
"""
Uses a checksum file in the same path as the target file to discover the checksum
:param url: the URL string of the target file
:type url: str
:param checksum_info: the type of checksum to search for
:type checksum_info: ChecksumInfo
:param filename: the filename to use for finding the checksum. If None, it will be discovered from the url
:type filename: str | None
:return: a string of the checksum if found, else None
:rtype: str | None
"""
sumfile_url = urljoin(url, (checksum_info.name + "SUMS").upper())
filename = filename or os.path.basename(urlparse(url).path)
return Files._get_checksum_helper(sumfile_url, filename, checksum_info)
@staticmethod
def _get_checksum_from_extension(
url: str, checksum_info: ChecksumInfo, filename: Optional[str] = None
) -> Optional[str]:
"""
Uses a checksum file with a checksum extension added to the target file to discover the checksum
:param url: the URL string of the target file
:type url: str
:param checksum_info: the type of checksum to search for
:type checksum_info: ChecksumInfo
:param filename: the filename to use for finding the checksum. If None, it will be discovered from the url
:type filename: str | None
:return: a string of the checksum if found, else None
:rtype: str | None
"""
sumfile_url = url + "." + checksum_info.name
filename = filename or os.path.basename(urlparse(url).path)
return Files._get_checksum_helper(sumfile_url, filename, checksum_info)
@staticmethod
def _get_checksum_from_extension_upper(
url: str, checksum_info: ChecksumInfo, filename: Optional[str] = None
) -> Optional[str]:
"""
Uses a checksum file with a checksum extension added to the target file to discover the checksum
:param url: the URL string of the target file
:type url: str
:param checksum_info: the type of checksum to search for
:type checksum_info: ChecksumInfo
:param filename: the filename to use for finding the checksum. If None, it will be discovered from the url
:type filename: str | None
:return: a string of the checksum if found, else None
:rtype: str | None
"""
sumfile_url = url + "." + checksum_info.name.upper()
filename = filename or os.path.basename(urlparse(url).path)
return Files._get_checksum_helper(sumfile_url, filename, checksum_info)
@staticmethod
def _get_checksum_helper(sumfile_url: str, filename: str, checksum_info: ChecksumInfo):
logger.debug(f"getting {sumfile_url}")
try:
resp = requests.get(sumfile_url, timeout=10)
except (requests.exceptions.ConnectionError, requests.exceptions.ReadTimeout):
logger.info(f"Failed when trying to get {sumfile_url}")
return None
if resp.status_code == 200:
for line in resp.iter_lines():
line_str = line.decode("utf-8")
logger.debug(f"checking for '{filename}' in '{line_str}'")
if filename in str(line_str):
return line_str[0 : checksum_info.hex_size]
return None