Files
zbiorki_app/zbiorka_app/utils.py
2026-03-20 10:43:40 +01:00

292 lines
8.3 KiB
Python

import hashlib
import ipaddress
import os
import re
import socket
import threading
import time
from datetime import datetime, timezone
from decimal import Decimal, InvalidOperation
from functools import lru_cache
from pathlib import Path
from flask import current_app, request, url_for
from sqlalchemy import text
from sqlalchemy.exc import OperationalError
from .extensions import db
from .models import UstawieniaGlobalne, Uzytkownik
try:
from zoneinfo import ZoneInfo
except ImportError:
from backports.zoneinfo import ZoneInfo
LOCAL_TZ = ZoneInfo("Europe/Warsaw")
_DB_INIT_LOCK = threading.Lock()
def read_commit_and_date(filename="version.txt", root_path=None):
base = root_path or os.path.dirname(os.path.abspath(__file__))
path = os.path.join(base, filename)
if not os.path.exists(path):
return None, None
try:
commit = open(path, "r", encoding="utf-8").read().strip()
if commit:
commit = commit[:12]
except Exception:
commit = None
try:
ts = os.path.getmtime(path)
date_str = datetime.fromtimestamp(ts).strftime("%Y.%m.%d")
except Exception:
date_str = None
return date_str, commit
def set_sqlite_pragma(dbapi_connection, connection_record):
if dbapi_connection.__class__.__module__.startswith("sqlite3"):
try:
cursor = dbapi_connection.cursor()
cursor.execute("PRAGMA foreign_keys=ON")
cursor.close()
except Exception:
pass
def get_real_ip():
headers = request.headers
cf_ip = headers.get("CF-Connecting-IP")
if cf_ip:
return cf_ip.split(",")[0].strip()
xff = headers.get("X-Forwarded-For")
if xff:
return xff.split(",")[0].strip()
x_real_ip = headers.get("X-Real-IP")
if x_real_ip:
return x_real_ip.strip()
return request.remote_addr
def is_allowed_ip(remote_ip, allowed_hosts_str):
if os.path.exists("emergency_access.txt"):
return True
if not allowed_hosts_str or not allowed_hosts_str.strip():
return False
allowed_ips = set()
hosts = re.split(r"[\n,]+", allowed_hosts_str.strip())
for host in hosts:
host = host.strip()
if not host:
continue
try:
allowed_ips.add(ipaddress.ip_address(host))
continue
except ValueError:
pass
try:
infos = socket.getaddrinfo(host, None)
for _, _, _, _, sockaddr in infos:
ip_str = sockaddr[0]
try:
allowed_ips.add(ipaddress.ip_address(ip_str))
except ValueError:
continue
except Exception as exc:
current_app.logger.warning("Nie mozna rozwiazac hosta %s: %s", host, exc)
try:
remote_ip_obj = ipaddress.ip_address(remote_ip)
except ValueError:
current_app.logger.warning("Nieprawidlowe IP klienta: %s", remote_ip)
return False
is_allowed = remote_ip_obj in allowed_ips
current_app.logger.info("is_allowed_ip: %s -> %s (lista: %s)", remote_ip_obj, is_allowed, allowed_ips)
return is_allowed
def to_local(dt):
if dt is None:
return None
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt.astimezone(LOCAL_TZ)
def parse_amount(raw: str) -> Decimal:
if not raw or not str(raw).strip():
raise InvalidOperation("empty amount")
norm = str(raw).replace(" ", "").replace("\u00A0", "").replace(",", ".").strip()
d = Decimal(norm)
if d <= 0:
raise InvalidOperation("amount must be > 0")
return d
def safe_db_rollback() -> None:
try:
db.session.rollback()
except Exception:
pass
def create_admin_account(force_reset_password: bool = False):
admin = Uzytkownik.query.filter_by(czy_admin=True).first()
if not admin:
admin = Uzytkownik(
uzytkownik=current_app.config["MAIN_ADMIN_USERNAME"],
czy_admin=True,
)
admin.set_password(current_app.config["MAIN_ADMIN_PASSWORD"])
db.session.add(admin)
db.session.commit()
return admin
if force_reset_password:
admin.set_password(current_app.config["MAIN_ADMIN_PASSWORD"])
db.session.commit()
return admin
def get_or_create_user(username: str, is_admin: bool = False) -> Uzytkownik:
user = Uzytkownik.query.filter_by(uzytkownik=username).first()
if user is None:
user = Uzytkownik(uzytkownik=username, czy_admin=is_admin)
db.session.add(user)
db.session.flush()
return user
def set_user_password(username: str, password: str, is_admin: bool | None = None) -> Uzytkownik:
user = Uzytkownik.query.filter_by(uzytkownik=username).first()
if user is None:
user = Uzytkownik(uzytkownik=username, czy_admin=bool(is_admin))
db.session.add(user)
elif is_admin is not None:
user.czy_admin = bool(is_admin)
user.set_password(password)
db.session.commit()
return user
def set_login_hosts(hosts: str | None) -> UstawieniaGlobalne:
settings = UstawieniaGlobalne.query.first()
if settings is None:
settings = UstawieniaGlobalne(
numer_konta="",
numer_telefonu_blik="",
)
db.session.add(settings)
settings.dozwolone_hosty_logowania = (hosts or "").strip() or None
db.session.commit()
return settings
def init_version(app):
root_path = os.path.dirname(app.root_path)
deploy_date, commit = read_commit_and_date("version.txt", root_path=root_path)
if not deploy_date:
deploy_date = datetime.now().strftime("%Y.%m.%d")
if not commit:
commit = "dev"
app.config["APP_VERSION"] = f"{deploy_date}+{commit}"
@lru_cache(maxsize=512)
def _md5_for_static_file(path_str: str, mtime_ns: int, size: int) -> str:
digest = hashlib.md5()
with open(path_str, "rb") as handle:
for chunk in iter(lambda: handle.read(65536), b""):
digest.update(chunk)
return digest.hexdigest()[:12]
def static_file_hash(filename: str) -> str:
static_root = Path(current_app.static_folder).resolve()
candidate = (static_root / filename).resolve()
try:
candidate.relative_to(static_root)
except ValueError:
return "invalid"
if not candidate.is_file():
return "missing"
stat = candidate.stat()
return _md5_for_static_file(str(candidate), stat.st_mtime_ns, stat.st_size)
def asset_url(filename: str) -> str:
return url_for("static", filename=filename, h=static_file_hash(filename))
def is_database_available() -> bool:
try:
db.session.execute(text("SELECT 1"))
return True
except Exception:
safe_db_rollback()
return False
def ensure_database_ready(create_schema: bool = True, create_admin: bool = True) -> bool:
with _DB_INIT_LOCK:
try:
db.session.execute(text("SELECT 1"))
if create_schema:
db.create_all()
if create_admin:
create_admin_account()
current_app.extensions["database_ready"] = True
return True
except Exception:
safe_db_rollback()
current_app.extensions["database_ready"] = False
raise
def init_database_with_retry(app, max_attempts: int = 20, delay: int = 3, raise_on_failure: bool = True) -> bool:
last_error = None
with app.app_context():
for attempt in range(1, max_attempts + 1):
try:
ensure_database_ready(create_schema=True, create_admin=True)
current_app.logger.info("Baza danych gotowa po probie %s/%s", attempt, max_attempts)
return True
except OperationalError as exc:
last_error = exc
current_app.logger.warning(
"Baza danych niedostepna (proba %s/%s): %s",
attempt,
max_attempts,
exc,
)
except Exception as exc:
last_error = exc
current_app.logger.warning(
"Blad inicjalizacji bazy (proba %s/%s): %s",
attempt,
max_attempts,
exc,
)
if attempt < max_attempts:
time.sleep(delay)
if raise_on_failure and last_error:
raise last_error
return False