#!/usr/bin/env python3 import argparse import re import sys import requests from urllib.parse import quote from bs4 import BeautifulSoup def debug_print(debug, message): if debug: print(f"DEBUG: {message}") def extract_csrf_token(session, html, debug=False): soup = BeautifulSoup(html, "html.parser") csrf_input = soup.find("input", {"name": "_csrf"}) if csrf_input and csrf_input.get("value"): debug_print(debug, "CSRF found in input name=_csrf") return csrf_input["value"] csrf_meta = soup.find("meta", {"name": "_csrf"}) if csrf_meta and csrf_meta.get("content"): debug_print(debug, "CSRF found in meta name=_csrf") return csrf_meta["content"] csrf_meta_alt = soup.find("meta", {"name": "csrf-token"}) if csrf_meta_alt and csrf_meta_alt.get("content"): debug_print(debug, "CSRF found in meta name=csrf-token") return csrf_meta_alt["content"] patterns = [ r'csrfToken\s*[:=]\s*["\']([^"\']+)["\']', r'_csrf\s*[:=]\s*["\']([^"\']+)["\']', r'"csrf"\s*:\s*"([^"]+)"', r'"csrfToken"\s*:\s*"([^"]+)"', ] for pattern in patterns: match = re.search(pattern, html) if match: debug_print(debug, "CSRF found in JavaScript") return match.group(1) for cookie_name in ["csrf_token", "_csrf", "csrf", "csrf-token"]: value = session.cookies.get(cookie_name) if value: debug_print(debug, f"CSRF found in cookie {cookie_name}") return value return None def login_failed(response): text = response.text.lower() failed_markers = [ "invalid username or password", "incorrect username or password", "authentication failed", "login failed", "nieprawidłowa nazwa użytkownika lub hasło", "nieprawidłowy użytkownik lub hasło", "niepoprawna nazwa użytkownika lub hasło", ] if any(marker in text for marker in failed_markers): return True if "/user/login" in response.url: return True return False def check_gitea(server_address, repo_owner, repo_name, username, password, file_path, branch, debug=False): try: base_url = f"https://{server_address}".rstrip("/") login_url = f"{base_url}/user/login" file_path = file_path.lstrip("/") session = requests.Session() session.headers.update({ "User-Agent": "Mozilla/5.0 (X11; Linux x86_64) Nagios check_gitea", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", "Accept-Language": "en-US,en;q=0.9,pl;q=0.8", "Connection": "close", }) debug_print(debug, f"Base URL: {base_url}") try: response = session.get(base_url, timeout=10, allow_redirects=True) response.raise_for_status() debug_print(debug, f"Server reachable HTTP {response.status_code}") debug_print(debug, f"Server final URL: {response.url}") except requests.exceptions.RequestException as e: print(f"CRITICAL - Could not connect to Gitea server: {e}") return 2 try: response = session.get(login_url, timeout=10, allow_redirects=True) response.raise_for_status() debug_print(debug, f"Login page HTTP {response.status_code}") debug_print(debug, f"Login page final URL: {response.url}") except requests.exceptions.RequestException as e: print(f"CRITICAL - Could not load login page: {e}") return 2 csrf_token = extract_csrf_token(session, response.text, debug) login_data = { "user_name": username, "password": password, "remember": "on", } if csrf_token: login_data["_csrf"] = csrf_token debug_print(debug, "Using CSRF token") else: debug_print(debug, "No CSRF token found, trying login without _csrf") headers = { "Referer": login_url, "Origin": base_url, "Content-Type": "application/x-www-form-urlencoded", } debug_print(debug, f"Trying login as {username}") try: response = session.post( login_url, data=login_data, headers=headers, timeout=10, allow_redirects=True, ) response.raise_for_status() debug_print(debug, f"After login HTTP {response.status_code}") debug_print(debug, f"After login URL: {response.url}") except requests.exceptions.RequestException as e: print(f"CRITICAL - Login error: {e}") return 2 if login_failed(response): print("CRITICAL - Login failed") if debug: print("DEBUG: Cookies after login:") for c in session.cookies: print(f"DEBUG: cookie {c.name}={c.value[:80]}") print("DEBUG: First 3000 chars after login:") print(response.text[:3000]) return 2 debug_print(debug, "Login successful") repo_url = f"{base_url}/{quote(repo_owner, safe='')}/{quote(repo_name, safe='')}" debug_print(debug, f"Checking repository: {repo_url}") try: response = session.get(repo_url, timeout=10, allow_redirects=True) debug_print(debug, f"Repository HTTP {response.status_code}") debug_print(debug, f"Repository final URL: {response.url}") if response.status_code == 404: print(f"CRITICAL - Repository {repo_owner}/{repo_name} not found") return 2 if response.status_code in [401, 403]: print(f"CRITICAL - No permission to access repository HTTP {response.status_code}") return 2 response.raise_for_status() except requests.exceptions.RequestException as e: print(f"CRITICAL - Repository access error: {e}") return 2 encoded_file_path = quote(file_path, safe="/") file_url = ( f"{base_url}/" f"{quote(repo_owner, safe='')}/" f"{quote(repo_name, safe='')}/raw/branch/" f"{quote(branch, safe='')}/" f"{encoded_file_path}" ) debug_print(debug, f"Checking file: {file_url}") try: response = session.get(file_url, timeout=10, allow_redirects=True) debug_print(debug, f"File HTTP {response.status_code}") debug_print(debug, f"File final URL: {response.url}") if response.status_code == 200: if response.content: print(f"OK - Successfully read file {file_path} from {repo_owner}/{repo_name} on branch {branch}") return 0 print(f"WARNING - File {file_path} exists but is empty") return 1 if response.status_code == 404: print(f"WARNING - File {file_path} not found in repository on branch {branch}") return 1 if response.status_code in [401, 403]: print(f"CRITICAL - No permission to read file {file_path} HTTP {response.status_code}") return 2 print(f"CRITICAL - File read error HTTP {response.status_code}") return 2 except requests.exceptions.RequestException as e: print(f"CRITICAL - File access error: {e}") return 2 except Exception as e: print(f"CRITICAL - Unexpected error: {e}") return 2 if __name__ == "__main__": parser = argparse.ArgumentParser( description="Nagios check for Gitea repository file access via HTML login" ) parser.add_argument("--server", required=True, help="Gitea server address, e.g. git.example.com") parser.add_argument("--repo-owner", required=True, help="Repository owner or organization") parser.add_argument("--repo-name", required=True, help="Repository name") parser.add_argument("--username", required=True, help="Login username") parser.add_argument("--password", required=True, help="Login password") parser.add_argument("--file", required=True, help="File path in repository") parser.add_argument("--branch", default="master", help="Branch name, default: master") parser.add_argument("--debug", action="store_true", help="Enable debug output") args = parser.parse_args() sys.exit(check_gitea( server_address=args.server, repo_owner=args.repo_owner, repo_name=args.repo_name, username=args.username, password=args.password, file_path=args.file, branch=args.branch, debug=args.debug, ))