From 00247c5c8e7104ff225d3282135c925756496fb3 Mon Sep 17 00:00:00 2001 From: gru Date: Wed, 1 Apr 2026 09:07:41 +0200 Subject: [PATCH] Add grafana_update.py --- grafana_update.py | 259 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 259 insertions(+) create mode 100644 grafana_update.py diff --git a/grafana_update.py b/grafana_update.py new file mode 100644 index 0000000..85b9ec1 --- /dev/null +++ b/grafana_update.py @@ -0,0 +1,259 @@ +#!/usr/bin/env python3 +import hashlib +import os +import re +import shutil +import socket +import subprocess +import sys +import tempfile +import urllib.parse +import urllib.request + + +GITHUB_RELEASES_URL = "https://github.com/grafana/grafana/releases" +DOWNLOAD_PAGE_TEMPLATE = "https://grafana.com/grafana/download/{version}?edition=oss" + +PUSHOVER_TOKEN = os.getenv("PUSHOVER_TOKEN", "") +PUSHOVER_USER = os.getenv("PUSHOVER_USER", "") +PUSHOVER_PRIORITY = os.getenv("PUSHOVER_PRIORITY", "0") + +LOG_FILE = "/var/log/grafana-update.log" +USER_AGENT = "grafana-opensuse-updater/3.0" + + +def log(msg: str) -> None: + line = msg.rstrip() + print(line) + try: + with open(LOG_FILE, "a", encoding="utf-8") as f: + f.write(line + "\n") + except Exception: + pass + + +def run(cmd: list[str], check: bool = True) -> subprocess.CompletedProcess: + log(f"$ {' '.join(cmd)}") + proc = subprocess.run(cmd, capture_output=True, text=True) + if proc.stdout.strip(): + log(proc.stdout.strip()) + if proc.stderr.strip(): + log(proc.stderr.strip()) + if check and proc.returncode != 0: + raise RuntimeError(f"Command failed ({proc.returncode}): {' '.join(cmd)}") + return proc + + +def send_pushover(title: str, message: str, priority: str = "0") -> None: + if not PUSHOVER_TOKEN or not PUSHOVER_USER: + log("Pushover pominięty: brak PUSHOVER_TOKEN lub PUSHOVER_USER") + return + + data = urllib.parse.urlencode({ + "token": PUSHOVER_TOKEN, + "user": PUSHOVER_USER, + "title": title, + "message": message, + "priority": priority, + }).encode("utf-8") + + req = urllib.request.Request( + "https://api.pushover.net/1/messages.json", + data=data, + headers={"User-Agent": USER_AGENT}, + method="POST", + ) + + try: + with urllib.request.urlopen(req, timeout=20) as resp: + body = resp.read().decode("utf-8", errors="replace") + log(f"Pushover OK: {body}") + except Exception as e: + log(f"Pushover error: {e}") + + +def fetch_text(url: str) -> str: + req = urllib.request.Request(url, headers={"User-Agent": USER_AGENT}) + with urllib.request.urlopen(req, timeout=30) as resp: + return resp.read().decode("utf-8", errors="replace") + + +def download_file(url: str, dst: str) -> None: + req = urllib.request.Request(url, headers={"User-Agent": USER_AGENT}) + with urllib.request.urlopen(req, timeout=300) as resp, open(dst, "wb") as f: + shutil.copyfileobj(resp, f) + + +def sha256sum(path: str) -> str: + h = hashlib.sha256() + with open(path, "rb") as f: + for chunk in iter(lambda: f.read(1024 * 1024), b""): + h.update(chunk) + return h.hexdigest() + + +def get_installed_version() -> str | None: + proc = subprocess.run( + ["rpm", "-q", "--qf", "%{VERSION}\n", "grafana"], + capture_output=True, + text=True, + ) + if proc.returncode != 0: + return None + return proc.stdout.strip() or None + + +def normalize_version(v: str) -> list[int | str]: + parts = re.split(r"([0-9]+)", v) + out = [] + for p in parts: + if not p: + continue + out.append(int(p) if p.isdigit() else p) + return out + + +def get_latest_version_from_github() -> str: + html = fetch_text(GITHUB_RELEASES_URL) + + patterns = [ + r'/releases/tag/v(\d+\.\d+\.\d+(?:\+security-\d+)?)"', + r'/tag/v(\d+\.\d+\.\d+(?:\+security-\d+)?)"', + r'>v?(\d+\.\d+\.\d+(?:\+security-\d+)?)<', + ] + + for pattern in patterns: + m = re.search(pattern, html, re.IGNORECASE) + if m: + return m.group(1) + + raise RuntimeError("Nie znaleziono najnowszej wersji na GitHub Releases.") + + +def get_exact_rpm_url(version: str) -> str: + url = DOWNLOAD_PAGE_TEMPLATE.format(version=version) + html = fetch_text(url) + + patterns = [ + rf'https://dl\.grafana\.com/grafana/release/{re.escape(version)}/grafana_{re.escape(version)}_[0-9]+_linux_amd64\.rpm', + rf'https://dl\.grafana\.com/grafana/release/{re.escape(version)}/grafana_{re.escape(version)}_linux_amd64\.rpm', + ] + + for pattern in patterns: + m = re.search(pattern, html, re.IGNORECASE) + if m: + return m.group(0) + + raise RuntimeError(f"Nie znaleziono dokładnego URL RPM dla wersji {version}.") + + +def get_sha256_for_rpm(html: str, rpm_url: str) -> str | None: + filename = rpm_url.rsplit("/", 1)[-1] + + patterns = [ + rf'{re.escape(filename)}.*?([a-fA-F0-9]{{64}})', + rf'([a-fA-F0-9]{{64}}).*?{re.escape(filename)}', + ] + + for pattern in patterns: + m = re.search(pattern, html, re.IGNORECASE | re.DOTALL) + if m: + return m.group(1) + + return None + + +def get_exact_rpm_info(version: str) -> tuple[str, str | None]: + url = DOWNLOAD_PAGE_TEMPLATE.format(version=version) + html = fetch_text(url) + rpm_url = None + + patterns = [ + rf'https://dl\.grafana\.com/grafana/release/{re.escape(version)}/grafana_{re.escape(version)}_[0-9]+_linux_amd64\.rpm', + rf'https://dl\.grafana\.com/grafana/release/{re.escape(version)}/grafana_{re.escape(version)}_linux_amd64\.rpm', + ] + + for pattern in patterns: + m = re.search(pattern, html, re.IGNORECASE) + if m: + rpm_url = m.group(0) + break + + if not rpm_url: + raise RuntimeError(f"Nie znaleziono dokładnego URL RPM dla wersji {version}.") + + sha256 = get_sha256_for_rpm(html, rpm_url) + return rpm_url, sha256 + + +def restart_grafana_if_active() -> None: + active = subprocess.run( + ["systemctl", "is-active", "--quiet", "grafana-server"] + ).returncode == 0 + + if active: + run(["systemctl", "restart", "grafana-server"]) + else: + log("grafana-server nie jest aktywna, restart pominięty") + + +def ensure_root() -> None: + if os.geteuid() != 0: + raise RuntimeError("Uruchom skrypt jako root.") + + +def main() -> int: + host = socket.gethostname() + + try: + ensure_root() + + installed = get_installed_version() + latest = get_latest_version_from_github() + rpm_url, expected_sha = get_exact_rpm_info(latest) + rpm_filename = rpm_url.rsplit("/", 1)[-1] + + log(f"Zainstalowana wersja: {installed or 'brak'}") + log(f"Najnowsza wersja z GitHub: {latest}") + log(f"RPM URL: {rpm_url}") + log(f"SHA256 ze strony download: {expected_sha or 'brak'}") + + if installed and normalize_version(installed) == normalize_version(latest): + msg = f"{host}: Grafana już aktualna ({installed})" + log(msg) + send_pushover("Grafana update", msg, PUSHOVER_PRIORITY) + return 0 + + with tempfile.TemporaryDirectory(prefix="grafana-update-") as tmpdir: + rpm_path = os.path.join(tmpdir, rpm_filename) + + log(f"Pobieranie do: {rpm_path}") + download_file(rpm_url, rpm_path) + + actual_sha = sha256sum(rpm_path) + log(f"SHA256 pobranego pliku: {actual_sha}") + + if expected_sha and actual_sha.lower() != expected_sha.lower(): + raise RuntimeError( + f"Błędny SHA256: expected={expected_sha}, got={actual_sha}" + ) + + #run(["rpm", "-Uvh", "--nosignature", "--nogpgcheck", rpm_path]) + run(["rpm", "-Uvh", "--nosignature", rpm_path]) + restart_grafana_if_active() + + new_ver = get_installed_version() or latest + msg = f"{host}: Grafana zaktualizowana {installed or 'brak'} -> {new_ver}" + log(msg) + send_pushover("Grafana update OK", msg, PUSHOVER_PRIORITY) + return 0 + + except Exception as e: + msg = f"{host}: Grafana update FAILED: {e}" + log(msg) + send_pushover("Grafana update FAILED", msg, "1") + return 1 + + +if __name__ == "__main__": + sys.exit(main())