Files
Mateusz Gruszczyński 138059945e poprawki i zmiany ux
2026-03-26 09:30:39 +01:00

197 lines
7.4 KiB
Python

from __future__ import annotations
from datetime import timedelta
from typing import Any
from flask import session
from werkzeug.security import check_password_hash, generate_password_hash
from app.core_settings import AppSettings, get_settings
from app.storage.auth_users import AuthUser, SQLiteAuthUserRepository
SESSION_USER_KEY = "auth_user"
SESSION_DISPLAY_NAME_KEY = "auth_display_name"
SESSION_ROLE_KEY = "auth_role"
VALID_ROLES = {"admin", "user"}
class AuthService:
def __init__(self, settings: AppSettings | None = None) -> None:
self.settings = settings or get_settings()
self.user_repository = SQLiteAuthUserRepository(self.settings.storage["sqlite_path"])
@property
def enabled(self) -> bool:
return bool(self.settings.auth["enabled"])
def status(self) -> dict[str, Any]:
if not self.enabled:
return {
"enabled": False,
"authenticated": True,
"user": None,
"display_name": None,
"role": None,
}
return {
"enabled": True,
"authenticated": SESSION_USER_KEY in session,
"user": session.get(SESSION_USER_KEY),
"display_name": session.get(SESSION_DISPLAY_NAME_KEY),
"role": session.get(SESSION_ROLE_KEY),
}
def login(self, username: str, password: str) -> dict[str, Any]:
if not self.enabled:
return self.status()
username = (username or "").strip()
password = password or ""
user = self.user_repository.get_by_username(username)
if user is None:
self._login_legacy_user(username, password)
else:
if not user.is_active:
raise ValueError("Account is inactive")
if not check_password_hash(user.password_hash, password):
raise ValueError("Invalid username or password")
self._set_session(user.username, user.display_name, user.role)
return self.status()
def logout(self) -> dict[str, Any]:
session.clear()
return self.status()
def list_users(self) -> list[dict[str, Any]]:
users = self.user_repository.list_users()
return [
{
"username": user.username,
"display_name": user.display_name,
"role": user.role,
"is_active": user.is_active,
"created_at": user.created_at,
"updated_at": user.updated_at,
}
for user in users
]
def require_admin(self) -> None:
if not self.enabled:
return
if session.get(SESSION_ROLE_KEY) != "admin":
raise PermissionError("Administrator permissions are required")
def configure_app(self, app) -> None:
max_age = int(self.settings.auth["session_max_age_seconds"])
app.secret_key = self.settings.auth["secret_key"]
app.config["PERMANENT_SESSION_LIFETIME"] = timedelta(seconds=max_age)
app.config["SESSION_COOKIE_NAME"] = self.settings.auth["session_cookie_name"]
app.config["SESSION_COOKIE_HTTPONLY"] = True
app.config["SESSION_COOKIE_SAMESITE"] = self.settings.auth.get("cookie_samesite", "Lax")
app.config["SESSION_COOKIE_SECURE"] = bool(self.settings.auth.get("cookie_secure", False))
def create_user(self, *, username: str, password: str, role: str, display_name: str | None = None) -> AuthUser:
normalized_username = self._normalize_username(username)
normalized_role = self._normalize_role(role)
clean_password = self._validate_password(password)
resolved_display_name = (display_name or normalized_username).strip()
if not resolved_display_name:
raise ValueError("Display name cannot be empty")
return self.user_repository.upsert_user(
username=normalized_username,
password_hash=generate_password_hash(clean_password),
role=normalized_role,
display_name=resolved_display_name,
is_active=True,
)
def reset_password(self, *, username: str, new_password: str) -> AuthUser:
normalized_username = self._normalize_username(username)
clean_password = self._validate_password(new_password)
user = self.user_repository.update_password(
normalized_username,
generate_password_hash(clean_password),
)
if user is None:
raise ValueError(f"User '{normalized_username}' does not exist")
return user
def update_role(self, *, username: str, role: str) -> AuthUser:
normalized_username = self._normalize_username(username)
normalized_role = self._normalize_role(role)
user = self.user_repository.get_by_username(normalized_username)
if user is None:
raise ValueError(f"User '{normalized_username}' does not exist")
if user.role == normalized_role:
return user
if user.role == 'admin' and normalized_role != 'admin' and self.user_repository.count_admin_users() <= 1:
raise ValueError('At least one active admin user must remain')
updated = self.user_repository.update_role(normalized_username, normalized_role)
if updated is None:
raise ValueError(f"User '{normalized_username}' does not exist")
if session.get(SESSION_USER_KEY) == updated.username:
session[SESSION_ROLE_KEY] = updated.role
return updated
def _login_legacy_user(self, username: str, password: str) -> None:
expected_username = self.settings.auth["username"]
expected_password = self.settings.auth["password"]
expected_password_hash = self.settings.auth.get("password_hash")
if username != expected_username:
raise ValueError("Invalid username or password")
if expected_password_hash:
password_ok = check_password_hash(expected_password_hash, password)
else:
password_ok = password == expected_password
if not password_ok:
raise ValueError("Invalid username or password")
self._set_session(
expected_username,
self.settings.auth.get("display_name") or expected_username,
self.settings.auth.get("role", "admin"),
)
def _set_session(self, username: str, display_name: str, role: str) -> None:
session.clear()
session.permanent = True
session[SESSION_USER_KEY] = username
session[SESSION_DISPLAY_NAME_KEY] = display_name
session[SESSION_ROLE_KEY] = role
def _normalize_username(self, username: str) -> str:
normalized = (username or "").strip()
if not normalized:
raise ValueError("Username cannot be empty")
return normalized
def _normalize_role(self, role: str) -> str:
normalized = (role or "").strip().lower()
if normalized not in VALID_ROLES:
raise ValueError("Role must be one of: admin, user")
return normalized
def _validate_password(self, password: str) -> str:
clean_password = password or ""
if len(clean_password) < 8:
raise ValueError("Password must be at least 8 characters long")
return clean_password
_auth_service: AuthService | None = None
def get_auth_service() -> AuthService:
global _auth_service
if _auth_service is None:
_auth_service = AuthService()
return _auth_service