TOMODACHI SHELL
Free Palestine !! - Free Gaza !!
Current File : //opt/managed_servers/app/utils/__init__.py |
import subprocess
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Optional
from app.config import ALERTS_DIR, BASH_UTILS, MAIN_SCRIPT
class DataUnits(Enum):
"""
For conversion from KB to other (binary) units
"""
KB = 1
MB = 1024
GB = 1024**2
TB = 1024**3
class ServiceStatus(Enum):
OK = "ok"
DOWN = "down"
CRITICAL = "critical"
WARNING = "warning"
DISABLED = "disabled"
BACKUP_IN_PROGRESS = "backup_in_progress"
BACKUP_STARTED = "backup_started"
@property
def codename(self):
return self.value
class CPULoadReason(Enum):
OTHER = "unknown"
MYSQL = "mysql"
def convert_data(value: int, use_bytes: bool = False) -> str:
if use_bytes:
value = value * 1.00 / 1024
if value < 1024:
factor = DataUnits.KB
elif 1024 <= value < 1024**2:
factor = DataUnits.MB
elif 1024**2 <= value < 1024**3:
factor = DataUnits.GB
else:
factor = DataUnits.TB
return f"{round(value * 1.00 / factor.value, 2)} {factor.name}"
@dataclass
class Result:
success: bool = True
data: Any = None
message: str = ""
warnings: list = field(default_factory=list)
class ScriptStatus(Enum):
TIMEOUT = "timeout"
STARTED = "started"
IN_PROGRESS = "progress"
CANNOT_START = "start_failed"
SKIP = "skipped"
@property
def codename(self):
return self.value
class ManagementType(Enum):
BASIC = "basic"
COMPLETE = "complete"
@property
def codename(self):
return self.value
class Virtualization(Enum):
NONE = "none"
KVM = "kvm"
XEN = "xen"
@property
def codename(self):
return self.value
def check_management() -> Result:
cmd = f"source {BASH_UTILS} ; management_type_check"
res = subprocess.run(
cmd,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
executable="/bin/bash",
)
stdout, stderr, exit_code = res.stdout, res.stderr, res.returncode
if exit_code != 0:
return Result(success=False, message=f"{stderr}\n{stdout}")
return Result(data=ManagementType(stdout.strip()))
truthy_values = {"yes", "true", "1"}
falsy_values = {"no", "false", "0"}
def str_to_bool(value: str) -> Optional[bool]:
value = str(value).lower()
if value in truthy_values:
return True
if value in falsy_values:
return False
def is_vps() -> Result:
result = Result(data={"data": False})
cmd_res = subprocess.run(
f"source {ALERTS_DIR}/{MAIN_SCRIPT}; echo $QEMU",
shell=True,
stdout=subprocess.PIPE,
)
stdout = cmd_res.stdout.decode().strip()
if stdout in [Virtualization.KVM.codename, Virtualization.XEN.codename]:
result.data = True
elif stdout == Virtualization.NONE.codename:
result.data = False
else:
result.success = False
result.message = f"Failed to check server type: {stdout}"
return result
TOMODACHI | Tempest Hacker