from __future__ import annotations from datetime import timedelta from typing import Any from flask import session from werkzeug.security import check_password_hash, generate_password_hash from app.core_settings import AppSettings, get_settings from app.storage.auth_users import AuthUser, SQLiteAuthUserRepository SESSION_USER_KEY = "auth_user" SESSION_DISPLAY_NAME_KEY = "auth_display_name" SESSION_ROLE_KEY = "auth_role" VALID_ROLES = {"admin", "user"} class AuthService: def __init__(self, settings: AppSettings | None = None) -> None: self.settings = settings or get_settings() self.user_repository = SQLiteAuthUserRepository(self.settings.storage["sqlite_path"]) @property def enabled(self) -> bool: return bool(self.settings.auth["enabled"]) def status(self) -> dict[str, Any]: if not self.enabled: return { "enabled": False, "authenticated": True, "user": None, "display_name": None, "role": None, } return { "enabled": True, "authenticated": SESSION_USER_KEY in session, "user": session.get(SESSION_USER_KEY), "display_name": session.get(SESSION_DISPLAY_NAME_KEY), "role": session.get(SESSION_ROLE_KEY), } def login(self, username: str, password: str) -> dict[str, Any]: if not self.enabled: return self.status() username = (username or "").strip() password = password or "" user = self.user_repository.get_by_username(username) if user is None: self._login_legacy_user(username, password) else: if not user.is_active: raise ValueError("Konto jest nieaktywne") if not check_password_hash(user.password_hash, password): raise ValueError("Niepoprawny login lub haslo") self._set_session(user.username, user.display_name, user.role) return self.status() def logout(self) -> dict[str, Any]: session.clear() return self.status() def list_users(self) -> list[dict[str, Any]]: users = self.user_repository.list_users() return [ { "username": user.username, "display_name": user.display_name, "role": user.role, "is_active": user.is_active, "created_at": user.created_at, "updated_at": user.updated_at, } for user in users ] def require_admin(self) -> None: if not self.enabled: return if session.get(SESSION_ROLE_KEY) != "admin": raise PermissionError("Brak uprawnien administratora") def configure_app(self, app) -> None: max_age = int(self.settings.auth["session_max_age_seconds"]) app.secret_key = self.settings.auth["secret_key"] app.config["PERMANENT_SESSION_LIFETIME"] = timedelta(seconds=max_age) app.config["SESSION_COOKIE_NAME"] = self.settings.auth["session_cookie_name"] app.config["SESSION_COOKIE_HTTPONLY"] = True app.config["SESSION_COOKIE_SAMESITE"] = self.settings.auth.get("cookie_samesite", "Lax") app.config["SESSION_COOKIE_SECURE"] = bool(self.settings.auth.get("cookie_secure", False)) def create_user(self, *, username: str, password: str, role: str, display_name: str | None = None) -> AuthUser: normalized_username = self._normalize_username(username) normalized_role = self._normalize_role(role) clean_password = self._validate_password(password) resolved_display_name = (display_name or normalized_username).strip() if not resolved_display_name: raise ValueError("Display name nie moze byc pusty") return self.user_repository.upsert_user( username=normalized_username, password_hash=generate_password_hash(clean_password), role=normalized_role, display_name=resolved_display_name, is_active=True, ) def reset_password(self, *, username: str, new_password: str) -> AuthUser: normalized_username = self._normalize_username(username) clean_password = self._validate_password(new_password) user = self.user_repository.update_password( normalized_username, generate_password_hash(clean_password), ) if user is None: raise ValueError(f"Uzytkownik '{normalized_username}' nie istnieje") return user def _login_legacy_user(self, username: str, password: str) -> None: expected_username = self.settings.auth["username"] expected_password = self.settings.auth["password"] expected_password_hash = self.settings.auth.get("password_hash") if username != expected_username: raise ValueError("Niepoprawny login lub haslo") if expected_password_hash: password_ok = check_password_hash(expected_password_hash, password) else: password_ok = password == expected_password if not password_ok: raise ValueError("Niepoprawny login lub haslo") self._set_session( expected_username, self.settings.auth.get("display_name") or expected_username, self.settings.auth.get("role", "admin"), ) def _set_session(self, username: str, display_name: str, role: str) -> None: session.clear() session.permanent = True session[SESSION_USER_KEY] = username session[SESSION_DISPLAY_NAME_KEY] = display_name session[SESSION_ROLE_KEY] = role def _normalize_username(self, username: str) -> str: normalized = (username or "").strip() if not normalized: raise ValueError("Username nie moze byc pusty") return normalized def _normalize_role(self, role: str) -> str: normalized = (role or "").strip().lower() if normalized not in VALID_ROLES: raise ValueError("Rola musi byc jedna z: admin, user") return normalized def _validate_password(self, password: str) -> str: clean_password = password or "" if len(clean_password) < 8: raise ValueError("Haslo musi miec co najmniej 8 znakow") return clean_password _auth_service: AuthService | None = None def get_auth_service() -> AuthService: global _auth_service if _auth_service is None: _auth_service = AuthService() return _auth_service