TOMODACHI SHELL

Free Palestine !! - Free Gaza !!


 
OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON
Directory (0755) :  /opt/managed_servers/app/

 Home   ☍ Command   ☍ Upload File   ☍Info Server   ☍ Buat File   ☍ Mass deface   ☍ Jumping   ☍ Config   ☍ Symlink   ☍ About 

Current File : //opt/managed_servers/app/main.py
import os
from argparse import ArgumentParser

from app.api.utils import AutoFixStatus, Service
from app.config import FILE_LOGGING, load_settings, logger, setup_file_logging
from app.host_check import run_service_check
from app.utils import ManagementType, check_management, is_vps, str_to_bool
from app.utils.check_processing import (
    process_auto_fix_results,
    process_report,
    process_script_status,
)
from app.utils.logging import AutofixLogger


def process_service_check(s: Service):
    acknowledged = None
    mgmt_type = ManagementType(os.getenv("SERVER_MANAGEMENT_TYPE"))
    # Skip  checks not applicable for Basic management
    if mgmt_type == ManagementType.BASIC and s in [
        Service.BACKUP_DISK,
        Service.BACKUP_AGE,
    ]:
        logger.info("Service check not applicable - skipping")
        return

    # Skip checks not applicable for VPS
    if s == Service.SWAP and str_to_bool(os.getenv("IS_VPS")):
        logger.info("Service check not applicable - skipping")
        return

    # If a resource-intensive check has been successfully reported to API less than 24 hours ago, skip it
    if s in [Service.DISK, Service.BACKUP_DISK]:
        log_check_result = AutofixLogger.check_logged_api_reports(s.codename)
        for warn in log_check_result.warnings:
            logger.warning(warn)
        if log_check_result.success:
            if log_check_result.data["already_sent"]:
                logger.info(
                    f"Service {s.codename} was in critical state less that 24 hours ago at {log_check_result.data['timestamp']} and has been reported. Skipping this check.",
                )
                return
        else:
            logger.error(log_check_result.message)

    result = run_service_check(s)

    if not result.success:
        logger.error(
            f"Failed to run result check for service {s.codename}: {result.message}",
        )
        return

    if result.warnings:
        for warn in result.warnings:
            logger.warning(warn)

    data = result.data

    script_run_status = process_script_status(s.codename, data)
    if script_run_status.data.get("abort_execution", False):
        return

    auto_fix_results_check = process_auto_fix_results(s, data)
    if auto_fix_results_check.data["abort_execution"]:
        return
    data.update(auto_fix_results_check.data)

    if script_run_status.data.get("report_error", False):
        data["payload"]["error"] = script_run_status.data.get("error", "N/A")

    auto_fix_status = data.get("auto_fix_status")
    client_notified = data.get("client_notified")

    if not auto_fix_status:
        auto_fix_status = AutoFixStatus.PROCESS_MANUALLY.codename

    process_report(acknowledged, client_notified, data, result, s, auto_fix_status)


def main():
    parser = ArgumentParser(description="Run monitoring checks.")
    services = [s.codename for s in Service]
    parser.add_argument(
        "--service",
        choices=services,
        help="Run monitoring checks for a specific service. Available options: " + ", ".join(services),
    )
    args = parser.parse_args()

    if FILE_LOGGING:
        setup_file_logging()
        load_settings()

    mgmt_check = check_management()
    if not mgmt_check.success:
        logger.error(f"Management check failed: {mgmt_check.message}")
        return
    os.environ["SERVER_MANAGEMENT_TYPE"] = mgmt_check.data.codename

    vps_check = is_vps()
    if not vps_check.success:
        logger.error(f"Virtualization check failed: {vps_check.message}")
        return
    os.environ["IS_VPS"] = str(vps_check.data).lower()

    if args.service:
        logger.info(f"> Starting monitoring check for service: {args.service}")
        service_obj = next((s for s in Service if s.codename == args.service), None)
        if service_obj:
            process_service_check(service_obj)
        else:
            logger.error(f"Service '{args.service}' not found.")
    else:
        logger.info("> Starting monitoring checks processing")
        for s in Service:
            process_service_check(s)

    logger.info("+ Done")


if __name__ == "__main__":
    main()

TOMODACHI | Tempest Hacker