Current File : //usr/include/../src/../local/nagios/plugins/check_csf.py
#!/usr/bin/env python3
'''Nagios plugin to check the csf status and required rule
Please pay attention that it's necessary to add following access to sudoers file(s)
/usr/sbin/csf -c ===> not used anymore due to configserver.com retirement
/usr/sbin/csf -g "special_IP"
'''
debug = 0
CSF = '/usr/sbin/csf'
SUDO = '/usr/bin/sudo'
OK = 0
WARNING = 1
CRITICAL = 2
UNKNOWN = 3
import os, re, sys, argparse, ipaddress
from subprocess import Popen, PIPE, STDOUT, TimeoutExpired
def end(status, message, perfdata=""):
"""Exits the plugin with first arg as the return code and the second arg as the message to output."""
print(f"{message} | {perfdata}" if perfdata else message)
sys.exit(status if status in (OK, WARNING, CRITICAL, UNKNOWN) else UNKNOWN)
def check_program_usable(program, access=True):
"""Checks that a program path exists (and is executable if requested), otherwise exits with error."""
if not os.path.exists(program):
end(UNKNOWN, f"{program} cannot be found")
elif not os.path.isfile(program):
end(UNKNOWN, f"{program} is not a file")
elif access and not os.access(program, os.X_OK):
end(UNKNOWN, f"{program} is not executable")
parser = argparse.ArgumentParser(description='csf status')
parser.add_argument("-s", "--specialip", type=str, default="199.188.202.73",
help="IP to look for in ACCEPT or ALLOW rules (default: 199.188.202.73)")
parser.add_argument("--timeout", type=float, default=10.0,
help="Command timeout in seconds (default: 10)")
parser.add_argument("--no-sudo", action="store_true",
help="Do not use sudo (useful if running as root)")
args = parser.parse_args()
special_IP = args.specialip
# Validate IP early
try:
ipaddress.ip_address(special_IP)
except ValueError:
end(UNKNOWN, f"Invalid IP address: {special_IP}")
# Check binaries (require exec on both)
check_program_usable(CSF, False)
use_sudo = (os.geteuid() != 0) and (not args.no_sudo)
if use_sudo:
check_program_usable(SUDO, True)
# Regexes
re_status_disabled = re.compile(r'csf and lfd have been disabled', re.I)
re_status_checkIP = re.compile(rf'^\S.*\s*ACCEPT|ALLOW\b.*\b{re.escape(special_IP)}\b', re.M)
# Build command safely (no shell), pin locale for predictable output
cmd = ([SUDO] if use_sudo else []) + [CSF, '-g', special_IP]
env = os.environ.copy()
env.setdefault("LC_ALL", "C")
# Execute with timeout
try:
process = Popen(cmd, stdout=PIPE, stderr=STDOUT, encoding="utf-8", env=env)
stdout, _ = process.communicate(timeout=args.timeout)
except TimeoutExpired:
process.kill()
stdout, _ = process.communicate()
end(UNKNOWN, f"Timeout after {args.timeout}s running: {' '.join(cmd)}")
except Exception as e:
end(UNKNOWN, f"Failed to execute {' '.join(cmd)}: {e}")
if debug:
print("CMD:", " ".join(cmd))
print("RC:", process.returncode)
print("OUT:\n", stdout)
# Evaluate output
if re_status_disabled.search(stdout):
end(CRITICAL, "csf and lfd have been disabled")
if re_status_checkIP.search(stdout):
end(OK, f"CSF OK: required ACCEPT rule for {special_IP} found.")
# If csf errored, surface it explicitly
if process.returncode != 0:
end(UNKNOWN, f"csf -g returned {process.returncode}.\nOutput: {stdout.strip()}")
# Otherwise, rule not found but csf ran fine
end(CRITICAL, "Rule set isn't full. Check config and restart csf.\n" + (stdout or "").strip())