import hashlib import ipaddress import os import re import socket import threading import time from datetime import datetime, timezone from decimal import Decimal, InvalidOperation from functools import lru_cache from pathlib import Path from flask import current_app, request, url_for from sqlalchemy import text from sqlalchemy.exc import OperationalError from .extensions import db from .models import UstawieniaGlobalne, Uzytkownik try: from zoneinfo import ZoneInfo except ImportError: from backports.zoneinfo import ZoneInfo LOCAL_TZ = ZoneInfo("Europe/Warsaw") _DB_INIT_LOCK = threading.Lock() def read_commit_and_date(filename="version.txt", root_path=None): base = root_path or os.path.dirname(os.path.abspath(__file__)) path = os.path.join(base, filename) if not os.path.exists(path): return None, None try: commit = open(path, "r", encoding="utf-8").read().strip() if commit: commit = commit[:12] except Exception: commit = None try: ts = os.path.getmtime(path) date_str = datetime.fromtimestamp(ts).strftime("%Y.%m.%d") except Exception: date_str = None return date_str, commit def set_sqlite_pragma(dbapi_connection, connection_record): if dbapi_connection.__class__.__module__.startswith("sqlite3"): try: cursor = dbapi_connection.cursor() cursor.execute("PRAGMA foreign_keys=ON") cursor.close() except Exception: pass def get_real_ip(): headers = request.headers cf_ip = headers.get("CF-Connecting-IP") if cf_ip: return cf_ip.split(",")[0].strip() xff = headers.get("X-Forwarded-For") if xff: return xff.split(",")[0].strip() x_real_ip = headers.get("X-Real-IP") if x_real_ip: return x_real_ip.strip() return request.remote_addr def is_allowed_ip(remote_ip, allowed_hosts_str): if os.path.exists("emergency_access.txt"): return True if not allowed_hosts_str or not allowed_hosts_str.strip(): return False allowed_ips = set() hosts = re.split(r"[\n,]+", allowed_hosts_str.strip()) for host in hosts: host = host.strip() if not host: continue try: allowed_ips.add(ipaddress.ip_address(host)) continue except ValueError: pass try: infos = socket.getaddrinfo(host, None) for _, _, _, _, sockaddr in infos: ip_str = sockaddr[0] try: allowed_ips.add(ipaddress.ip_address(ip_str)) except ValueError: continue except Exception as exc: current_app.logger.warning("Nie mozna rozwiazac hosta %s: %s", host, exc) try: remote_ip_obj = ipaddress.ip_address(remote_ip) except ValueError: current_app.logger.warning("Nieprawidlowe IP klienta: %s", remote_ip) return False is_allowed = remote_ip_obj in allowed_ips current_app.logger.info("is_allowed_ip: %s -> %s (lista: %s)", remote_ip_obj, is_allowed, allowed_ips) return is_allowed def to_local(dt): if dt is None: return None if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) return dt.astimezone(LOCAL_TZ) def parse_amount(raw: str) -> Decimal: if not raw or not str(raw).strip(): raise InvalidOperation("empty amount") norm = str(raw).replace(" ", "").replace("\u00A0", "").replace(",", ".").strip() d = Decimal(norm) if d <= 0: raise InvalidOperation("amount must be > 0") return d def safe_db_rollback() -> None: try: db.session.rollback() except Exception: pass def create_admin_account(force_reset_password: bool = False): admin = Uzytkownik.query.filter_by(czy_admin=True).first() if not admin: admin = Uzytkownik( uzytkownik=current_app.config["MAIN_ADMIN_USERNAME"], czy_admin=True, ) admin.set_password(current_app.config["MAIN_ADMIN_PASSWORD"]) db.session.add(admin) db.session.commit() return admin if force_reset_password: admin.set_password(current_app.config["MAIN_ADMIN_PASSWORD"]) db.session.commit() return admin def get_or_create_user(username: str, is_admin: bool = False) -> Uzytkownik: user = Uzytkownik.query.filter_by(uzytkownik=username).first() if user is None: user = Uzytkownik(uzytkownik=username, czy_admin=is_admin) db.session.add(user) db.session.flush() return user def set_user_password(username: str, password: str, is_admin: bool | None = None) -> Uzytkownik: user = Uzytkownik.query.filter_by(uzytkownik=username).first() if user is None: user = Uzytkownik(uzytkownik=username, czy_admin=bool(is_admin)) db.session.add(user) elif is_admin is not None: user.czy_admin = bool(is_admin) user.set_password(password) db.session.commit() return user def set_login_hosts(hosts: str | None) -> UstawieniaGlobalne: settings = UstawieniaGlobalne.query.first() if settings is None: settings = UstawieniaGlobalne( numer_konta="", numer_telefonu_blik="", ) db.session.add(settings) settings.dozwolone_hosty_logowania = (hosts or "").strip() or None db.session.commit() return settings def init_version(app): root_path = os.path.dirname(app.root_path) deploy_date, commit = read_commit_and_date("version.txt", root_path=root_path) if not deploy_date: deploy_date = datetime.now().strftime("%Y.%m.%d") if not commit: commit = "dev" app.config["APP_VERSION"] = f"{deploy_date}+{commit}" @lru_cache(maxsize=512) def _md5_for_static_file(path_str: str, mtime_ns: int, size: int) -> str: digest = hashlib.md5() with open(path_str, "rb") as handle: for chunk in iter(lambda: handle.read(65536), b""): digest.update(chunk) return digest.hexdigest()[:12] def static_file_hash(filename: str) -> str: static_root = Path(current_app.static_folder).resolve() candidate = (static_root / filename).resolve() try: candidate.relative_to(static_root) except ValueError: return "invalid" if not candidate.is_file(): return "missing" stat = candidate.stat() return _md5_for_static_file(str(candidate), stat.st_mtime_ns, stat.st_size) def asset_url(filename: str) -> str: return url_for("static", filename=filename, h=static_file_hash(filename)) def is_database_available() -> bool: try: db.session.execute(text("SELECT 1")) return True except Exception: safe_db_rollback() return False def ensure_database_ready(create_schema: bool = True, create_admin: bool = True) -> bool: with _DB_INIT_LOCK: try: db.session.execute(text("SELECT 1")) if create_schema: db.create_all() if create_admin: create_admin_account() current_app.extensions["database_ready"] = True return True except Exception: safe_db_rollback() current_app.extensions["database_ready"] = False raise def init_database_with_retry(app, max_attempts: int = 20, delay: int = 3, raise_on_failure: bool = True) -> bool: last_error = None with app.app_context(): for attempt in range(1, max_attempts + 1): try: ensure_database_ready(create_schema=True, create_admin=True) current_app.logger.info("Baza danych gotowa po probie %s/%s", attempt, max_attempts) return True except OperationalError as exc: last_error = exc current_app.logger.warning( "Baza danych niedostepna (proba %s/%s): %s", attempt, max_attempts, exc, ) except Exception as exc: last_error = exc current_app.logger.warning( "Blad inicjalizacji bazy (proba %s/%s): %s", attempt, max_attempts, exc, ) if attempt < max_attempts: time.sleep(delay) if raise_on_failure and last_error: raise last_error return False