- Organized 252 files across project - Root directory: 187 → 2 files (98.9% reduction) - Moved configuration guides to docs/04-configuration/ - Moved troubleshooting guides to docs/09-troubleshooting/ - Moved quick start guides to docs/01-getting-started/ - Moved reports to reports/ directory - Archived temporary files - Generated comprehensive reports and documentation - Created maintenance scripts and guides All files organized according to established standards.
172 lines
5.2 KiB
Python
172 lines
5.2 KiB
Python
__author__ = "Oleg Butovich"
|
|
__copyright__ = "(c) Oleg Butovich 2013-2017"
|
|
__license__ = "MIT"
|
|
|
|
|
|
import json
|
|
import logging
|
|
import platform
|
|
import re
|
|
from itertools import chain
|
|
from shlex import split as shell_split
|
|
|
|
from proxmoxer.core import SERVICES
|
|
|
|
logger = logging.getLogger(__name__)
|
|
logger.setLevel(level=logging.WARNING)
|
|
|
|
|
|
try:
|
|
from shlex import join
|
|
|
|
def shell_join(args):
|
|
return join(args)
|
|
|
|
except ImportError:
|
|
from shlex import quote
|
|
|
|
def shell_join(args):
|
|
return " ".join([quote(arg) for arg in args])
|
|
|
|
|
|
class Response:
|
|
def __init__(self, content, status_code):
|
|
self.status_code = status_code
|
|
self.content = content
|
|
self.text = str(content)
|
|
self.headers = {"content-type": "application/json"}
|
|
|
|
def __str__(self):
|
|
return f"Response ({self.status_code}) {self.content}"
|
|
|
|
|
|
class CommandBaseSession:
|
|
def __init__(
|
|
self,
|
|
service="PVE",
|
|
timeout=5,
|
|
sudo=False,
|
|
):
|
|
self.service = service.lower()
|
|
self.timeout = timeout
|
|
self.sudo = sudo
|
|
|
|
def _exec(self, cmd):
|
|
raise NotImplementedError()
|
|
|
|
# noinspection PyUnusedLocal
|
|
def request(self, method, url, data=None, params=None, headers=None):
|
|
method = method.lower()
|
|
data = data or {}
|
|
params = params or {}
|
|
url = url.strip()
|
|
|
|
cmd = {"post": "create", "put": "set"}.get(method, method)
|
|
|
|
# separate out qemu exec commands to split into multiple argument pairs (issue#89)
|
|
data_command = None
|
|
if "/agent/exec" in url:
|
|
data_command = data.get("command")
|
|
if data_command is not None:
|
|
del data["command"]
|
|
|
|
# for 'upload' call some workaround
|
|
tmp_filename = ""
|
|
if url.endswith("upload"):
|
|
# copy file to temporary location on proxmox host
|
|
tmp_filename, _ = self._exec(
|
|
[
|
|
"python3",
|
|
"-c",
|
|
"import tempfile; import sys; tf = tempfile.NamedTemporaryFile(); sys.stdout.write(tf.name)",
|
|
]
|
|
)
|
|
tmp_filename = str(tmp_filename, "utf-8")
|
|
self.upload_file_obj(data["filename"], tmp_filename)
|
|
data["filename"] = data["filename"].name
|
|
data["tmpfilename"] = tmp_filename
|
|
|
|
command = [f"{self.service}sh", cmd, url]
|
|
# convert the options dict into a 2-tuple with the key formatted as a flag
|
|
option_pairs = []
|
|
for k, v in chain(data.items(), params.items()):
|
|
try:
|
|
option_pairs.append((f"-{k}", str(v, "utf-8")))
|
|
except TypeError:
|
|
option_pairs.append((f"-{k}", str(v)))
|
|
# add back in all the command arguments as their own pairs
|
|
if data_command is not None:
|
|
if isinstance(data_command, list):
|
|
command_arr = data_command
|
|
elif "Windows" not in platform.platform():
|
|
command_arr = shell_split(data_command)
|
|
for arg in command_arr:
|
|
option_pairs.append(("-command", arg))
|
|
# expand the list of 2-tuples into a flat list
|
|
options = [val for pair in option_pairs for val in pair]
|
|
additional_options = SERVICES[self.service.upper()].get("cli_additional_options", [])
|
|
full_cmd = command + options + additional_options
|
|
|
|
if self.sudo:
|
|
full_cmd = ["sudo"] + full_cmd
|
|
|
|
stdout, stderr = self._exec(full_cmd)
|
|
|
|
def is_http_status_string(s):
|
|
return re.match(r"\d\d\d [a-zA-Z]", str(s))
|
|
|
|
if stderr:
|
|
# assume if we got a task ID that the request was successful
|
|
task_id_pattern = re.compile(
|
|
r"UPID:[\w-]+:[0-9a-fA-F]{8}:[0-9a-fA-F]{8}:[0-9a-fA-F]{8}:\w+:[\w\._-]+:[\w\.@_-]+:\w*"
|
|
)
|
|
if task_id_pattern.search(str(stdout)) or task_id_pattern.search(str(stderr)):
|
|
status_code = 200
|
|
else:
|
|
# sometimes contains extra text like 'trying to acquire lock...OK'
|
|
status_code = next(
|
|
(
|
|
int(line.split()[0])
|
|
for line in stderr.splitlines()
|
|
if is_http_status_string(line)
|
|
),
|
|
500,
|
|
)
|
|
else:
|
|
status_code = 200
|
|
if stdout:
|
|
return Response(stdout, status_code)
|
|
return Response(stderr, status_code)
|
|
|
|
def upload_file_obj(self, file_obj, remote_path):
|
|
raise NotImplementedError()
|
|
|
|
|
|
class JsonSimpleSerializer:
|
|
def loads(self, response):
|
|
try:
|
|
return json.loads(response.content)
|
|
except (UnicodeDecodeError, ValueError):
|
|
return {"errors": response.content}
|
|
|
|
def loads_errors(self, response):
|
|
try:
|
|
return json.loads(response.text).get("errors")
|
|
except (UnicodeDecodeError, ValueError):
|
|
return {"errors": response.content}
|
|
|
|
|
|
class CommandBaseBackend:
|
|
def __init__(self):
|
|
self.session = None
|
|
self.target = None
|
|
|
|
def get_session(self):
|
|
return self.session
|
|
|
|
def get_base_url(self):
|
|
return ""
|
|
|
|
def get_serializer(self):
|
|
return JsonSimpleSerializer()
|