#!/usr/bin/env python3 import configparser import json import os import subprocess import sys from typing import Dict, List, Tuple CONFIG_PATH = "/etc/docker-monitoring/containers.ini" def run(cmd: List[str]) -> Tuple[int, str, str]: p = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) return p.returncode, p.stdout.strip(), p.stderr.strip() def nagios_exit(code: int, msg: str): print(msg) sys.exit(code) def ensure_dir(path: str): d = os.path.dirname(path) if d and not os.path.isdir(d): os.makedirs(d, exist_ok=True) def parse_bool(v: str, default: bool = False) -> bool: if v is None: return default return str(v).strip().lower() in ("1", "true", "yes", "on") def parse_float(v: str, default: float = 0.0) -> float: try: return float(str(v).strip()) except Exception: return default def docker_available() -> bool: rc, _, _ = run(["docker", "info", "--format", "{{json .}}"]) return rc == 0 def list_containers_all() -> List[str]: rc, out, err = run(["docker", "ps", "-a", "--format", "{{.Names}}"]) if rc != 0: nagios_exit(3, f"UNKNOWN - cannot list containers: {err}") return [x.strip() for x in out.splitlines() if x.strip()] def inspect_container(name: str) -> Dict: rc, out, _ = run(["docker", "inspect", name]) if rc != 0: return {} try: arr = json.loads(out) return arr[0] if arr else {} except Exception: return {} def detect_container_type(inspect: Dict) -> str: labels = (inspect.get("Config") or {}).get("Labels") or {} if "com.docker.compose.project" in labels: return "compose" if "com.docker.stack.namespace" in labels: return "stack" return "standalone" def get_ip(inspect: Dict) -> str: nets = ((inspect.get("NetworkSettings") or {}).get("Networks") or {}) for _, v in nets.items(): ip = v.get("IPAddress") if ip: return ip return "-" def get_started_at(inspect: Dict) -> str: return ((inspect.get("State") or {}).get("StartedAt")) or "-" def get_restart_count(inspect: Dict) -> int: try: return int(inspect.get("RestartCount", 0)) except Exception: return 0 def get_running(inspect: Dict) -> bool: return bool(((inspect.get("State") or {}).get("Running"))) def get_status_text(inspect: Dict) -> str: return ((inspect.get("State") or {}).get("Status")) or "unknown" def create_default_config(path: str): ensure_dir(path) cfg = configparser.ConfigParser() cfg["global"] = { "monitor_resources": "true", "default_cpu_warn": "80", "default_cpu_crit": "95", "default_mem_warn": "80", "default_mem_crit": "95", "default_restart_warn": "5", "default_restart_crit": "20", "skip_types": "compose,stack", } for name in list_containers_all(): info = inspect_container(name) ctype = detect_container_type(info) labels = (info.get("Config") or {}).get("Labels") or {} section = { "enabled": "true", "type": ctype, "monitor_liveness": "true", "monitor_resources": "true", "cpu_warn": "", "cpu_crit": "", "mem_warn": "", "mem_crit": "", "restart_warn": "", "restart_crit": "", } if ctype == "compose": section["note"] = f"auto-detected compose project: {labels.get('com.docker.compose.project', '-')}" elif ctype == "stack": section["note"] = f"auto-detected stack: {labels.get('com.docker.stack.namespace', '-')}" cfg[name] = section with open(path, "w") as f: cfg.write(f) def load_or_create_config(path: str) -> configparser.ConfigParser: if not os.path.exists(path): create_default_config(path) cfg = configparser.ConfigParser() cfg.read(path) if "global" not in cfg: cfg["global"] = { "monitor_resources": "true", "default_cpu_warn": "80", "default_cpu_crit": "95", "default_mem_warn": "80", "default_mem_crit": "95", "default_restart_warn": "5", "default_restart_crit": "20", "skip_types": "compose,stack", } return cfg def get_stats_one_shot() -> Dict[str, Dict[str, str]]: cmd = [ "docker", "stats", "--no-stream", "--format", "{{.Name}}|{{.CPUPerc}}|{{.MemPerc}}|{{.MemUsage}}" ] rc, out, _ = run(cmd) if rc != 0: return {} stats = {} for line in out.splitlines(): parts = line.split("|", 3) if len(parts) != 4: continue name, cpu, memp, memu = parts stats[name.strip()] = { "cpu": cpu.strip().replace("%", ""), "mem_perc": memp.strip().replace("%", ""), "mem_usage": memu.strip(), } return stats def state_max(a: int, b: int) -> int: return a if a > b else b def evaluate_container( name: str, section: configparser.SectionProxy, global_cfg: configparser.SectionProxy, stats: Dict[str, Dict[str, str]] ) -> Tuple[int, str]: info = inspect_container(name) if not info: return 2, f"CRITICAL - {name} container not found" running = get_running(info) status_txt = get_status_text(info) ip = get_ip(info) started = get_started_at(info) restarts = get_restart_count(info) code = 0 messages = [] monitor_liveness = parse_bool(section.get("monitor_liveness", "true"), True) monitor_resources = parse_bool( section.get("monitor_resources", global_cfg.get("monitor_resources", "true")), True ) if monitor_liveness: if running: messages.append(f"OK - {name} is running. IP: {ip}, StartedAt: {started}") else: code = 2 messages.append(f"CRITICAL - {name} is not running (status: {status_txt})") restart_warn = parse_float(section.get("restart_warn") or global_cfg.get("default_restart_warn", "5"), 5) restart_crit = parse_float(section.get("restart_crit") or global_cfg.get("default_restart_crit", "20"), 20) if restarts >= restart_crit: code = state_max(code, 2) messages.append(f"CRITICAL - {name} restart count {restarts} >= {restart_crit}") elif restarts >= restart_warn: code = state_max(code, 1) messages.append(f"WARNING - {name} restart count {restarts} >= {restart_warn}") if monitor_resources and name in stats: cpu = parse_float(stats[name].get("cpu", "0"), 0) memp = parse_float(stats[name].get("mem_perc", "0"), 0) memu = stats[name].get("mem_usage", "-") cpu_warn = parse_float(section.get("cpu_warn") or global_cfg.get("default_cpu_warn", "80"), 80) cpu_crit = parse_float(section.get("cpu_crit") or global_cfg.get("default_cpu_crit", "95"), 95) mem_warn = parse_float(section.get("mem_warn") or global_cfg.get("default_mem_warn", "80"), 80) mem_crit = parse_float(section.get("mem_crit") or global_cfg.get("default_mem_crit", "95"), 95) if cpu >= cpu_crit: code = state_max(code, 2) messages.append(f"CRITICAL - {name} CPU {cpu:.1f}% >= {cpu_crit}%") elif cpu >= cpu_warn: code = state_max(code, 1) messages.append(f"WARNING - {name} CPU {cpu:.1f}% >= {cpu_warn}%") if memp >= mem_crit: code = state_max(code, 2) messages.append(f"CRITICAL - {name} MEM {memp:.1f}% >= {mem_crit}% ({memu})") elif memp >= mem_warn: code = state_max(code, 1) messages.append(f"WARNING - {name} MEM {memp:.1f}% >= {mem_warn}% ({memu})") if code == 0: return 0, f"OK - {name} is running. IP: {ip}, StartedAt: {started}" return code, " ; ".join(messages) def main(): config_path = CONFIG_PATH verbose = False i = 1 while i < len(sys.argv): arg = sys.argv[i] if arg in ("-c", "--config") and i + 1 < len(sys.argv): config_path = sys.argv[i + 1] i += 2 continue elif arg in ("--init", "--init-only"): create_default_config(config_path) print(f"Created {config_path}") sys.exit(0) elif arg in ("-v", "--verbose"): verbose = True i += 1 if not docker_available(): nagios_exit(3, "UNKNOWN - docker is not available or permission denied") cfg = load_or_create_config(config_path) global_cfg = cfg["global"] stats = get_stats_one_shot() enabled = [] for sec in cfg.sections(): if sec == "global": continue if parse_bool(cfg[sec].get("enabled", "false"), False): enabled.append(sec) if not enabled: nagios_exit(3, f"UNKNOWN - no enabled containers in {config_path}") overall = 0 ok_msgs = [] warn_msgs = [] crit_msgs = [] for name in enabled: code, txt = evaluate_container(name, cfg[name], global_cfg, stats) overall = state_max(overall, code) if code == 0: ok_msgs.append(txt) elif code == 1: warn_msgs.append(txt) elif code == 2: crit_msgs.append(txt) ok_count = len(ok_msgs) warn_count = len(warn_msgs) crit_count = len(crit_msgs) total = len(enabled) perfdata = f"containers_ok={ok_count} containers_warning={warn_count} containers_critical={crit_count}" if overall == 0: if verbose: print(f"OK - checked {total} container(s), all running | {perfdata}") print("\n".join(ok_msgs)) else: print(f"OK - checked {total} container(s), all running | {perfdata}") sys.exit(0) prefix = "WARNING" if overall == 1 else "CRITICAL" print(f"{prefix} - checked {total} container(s): {crit_count} critical, {warn_count} warning, {ok_count} ok | {perfdata}") details = crit_msgs + warn_msgs if verbose: details += ok_msgs if details: print("\n".join(details)) sys.exit(overall) if __name__ == "__main__": main()