TOMODACHI SHELL

Free Palestine !! - Free Gaza !!


 
OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON
Directory (0755) :  /usr/include/../src/../local/nagios/plugins/

 Home   ☍ Command   ☍ Upload File   ☍Info Server   ☍ Buat File   ☍ Mass deface   ☍ Jumping   ☍ Config   ☍ Symlink   ☍ About 

Current File : //usr/include/../src/../local/nagios/plugins/check_csf.py
#!/usr/bin/env python3

'''Nagios plugin to check the csf status and required rule

Please pay attention that it's necessary to add following access to sudoers file(s)
/usr/sbin/csf -c ===> not used anymore due to configserver.com retirement
/usr/sbin/csf -g "special_IP"
'''

debug = 0
CSF = '/usr/sbin/csf'
SUDO = '/usr/bin/sudo'

OK = 0
WARNING = 1
CRITICAL = 2
UNKNOWN = 3

import os, re, sys, argparse, ipaddress
from subprocess import Popen, PIPE, STDOUT, TimeoutExpired

def end(status, message, perfdata=""):
    """Exits the plugin with first arg as the return code and the second arg as the message to output."""
    print(f"{message} | {perfdata}" if perfdata else message)
    sys.exit(status if status in (OK, WARNING, CRITICAL, UNKNOWN) else UNKNOWN)

def check_program_usable(program, access=True):
    """Checks that a program path exists (and is executable if requested), otherwise exits with error."""
    if not os.path.exists(program):
        end(UNKNOWN, f"{program} cannot be found")
    elif not os.path.isfile(program):
        end(UNKNOWN, f"{program} is not a file")
    elif access and not os.access(program, os.X_OK):
        end(UNKNOWN, f"{program} is not executable")

parser = argparse.ArgumentParser(description='csf status')
parser.add_argument("-s", "--specialip", type=str, default="199.188.202.73",
                    help="IP to look for in ACCEPT or ALLOW rules (default: 199.188.202.73)")
parser.add_argument("--timeout", type=float, default=10.0,
                    help="Command timeout in seconds (default: 10)")
parser.add_argument("--no-sudo", action="store_true",
                    help="Do not use sudo (useful if running as root)")
args = parser.parse_args()

special_IP = args.specialip

# Validate IP early
try:
    ipaddress.ip_address(special_IP)
except ValueError:
    end(UNKNOWN, f"Invalid IP address: {special_IP}")

# Check binaries (require exec on both)
check_program_usable(CSF, False)
use_sudo = (os.geteuid() != 0) and (not args.no_sudo)
if use_sudo:
    check_program_usable(SUDO, True)

# Regexes
re_status_disabled = re.compile(r'csf and lfd have been disabled', re.I)
re_status_checkIP = re.compile(rf'^\S.*\s*ACCEPT|ALLOW\b.*\b{re.escape(special_IP)}\b', re.M)

# Build command safely (no shell), pin locale for predictable output
cmd = ([SUDO] if use_sudo else []) + [CSF, '-g', special_IP]
env = os.environ.copy()
env.setdefault("LC_ALL", "C")

# Execute with timeout
try:
    process = Popen(cmd, stdout=PIPE, stderr=STDOUT, encoding="utf-8", env=env)
    stdout, _ = process.communicate(timeout=args.timeout)
except TimeoutExpired:
    process.kill()
    stdout, _ = process.communicate()
    end(UNKNOWN, f"Timeout after {args.timeout}s running: {' '.join(cmd)}")
except Exception as e:
    end(UNKNOWN, f"Failed to execute {' '.join(cmd)}: {e}")

if debug:
    print("CMD:", " ".join(cmd))
    print("RC:", process.returncode)
    print("OUT:\n", stdout)

# Evaluate output
if re_status_disabled.search(stdout):
    end(CRITICAL, "csf and lfd have been disabled")

if re_status_checkIP.search(stdout):
    end(OK, f"CSF OK: required ACCEPT rule for {special_IP} found.")

# If csf errored, surface it explicitly
if process.returncode != 0:
    end(UNKNOWN, f"csf -g returned {process.returncode}.\nOutput: {stdout.strip()}")

# Otherwise, rule not found but csf ran fine
end(CRITICAL, "Rule set isn't full. Check config and restart csf.\n" + (stdout or "").strip())


TOMODACHI | Tempest Hacker