197 lines
7.4 KiB
Python
197 lines
7.4 KiB
Python
from __future__ import annotations
|
|
|
|
from datetime import timedelta
|
|
from typing import Any
|
|
|
|
from flask import session
|
|
from werkzeug.security import check_password_hash, generate_password_hash
|
|
|
|
from app.core_settings import AppSettings, get_settings
|
|
from app.storage.auth_users import AuthUser, SQLiteAuthUserRepository
|
|
|
|
|
|
SESSION_USER_KEY = "auth_user"
|
|
SESSION_DISPLAY_NAME_KEY = "auth_display_name"
|
|
SESSION_ROLE_KEY = "auth_role"
|
|
VALID_ROLES = {"admin", "user"}
|
|
|
|
|
|
class AuthService:
|
|
def __init__(self, settings: AppSettings | None = None) -> None:
|
|
self.settings = settings or get_settings()
|
|
self.user_repository = SQLiteAuthUserRepository(self.settings.storage["sqlite_path"])
|
|
|
|
@property
|
|
def enabled(self) -> bool:
|
|
return bool(self.settings.auth["enabled"])
|
|
|
|
def status(self) -> dict[str, Any]:
|
|
if not self.enabled:
|
|
return {
|
|
"enabled": False,
|
|
"authenticated": True,
|
|
"user": None,
|
|
"display_name": None,
|
|
"role": None,
|
|
}
|
|
|
|
return {
|
|
"enabled": True,
|
|
"authenticated": SESSION_USER_KEY in session,
|
|
"user": session.get(SESSION_USER_KEY),
|
|
"display_name": session.get(SESSION_DISPLAY_NAME_KEY),
|
|
"role": session.get(SESSION_ROLE_KEY),
|
|
}
|
|
|
|
def login(self, username: str, password: str) -> dict[str, Any]:
|
|
if not self.enabled:
|
|
return self.status()
|
|
|
|
username = (username or "").strip()
|
|
password = password or ""
|
|
user = self.user_repository.get_by_username(username)
|
|
|
|
if user is None:
|
|
self._login_legacy_user(username, password)
|
|
else:
|
|
if not user.is_active:
|
|
raise ValueError("Account is inactive")
|
|
if not check_password_hash(user.password_hash, password):
|
|
raise ValueError("Invalid username or password")
|
|
self._set_session(user.username, user.display_name, user.role)
|
|
|
|
return self.status()
|
|
|
|
def logout(self) -> dict[str, Any]:
|
|
session.clear()
|
|
return self.status()
|
|
|
|
def list_users(self) -> list[dict[str, Any]]:
|
|
users = self.user_repository.list_users()
|
|
return [
|
|
{
|
|
"username": user.username,
|
|
"display_name": user.display_name,
|
|
"role": user.role,
|
|
"is_active": user.is_active,
|
|
"created_at": user.created_at,
|
|
"updated_at": user.updated_at,
|
|
}
|
|
for user in users
|
|
]
|
|
|
|
def require_admin(self) -> None:
|
|
if not self.enabled:
|
|
return
|
|
if session.get(SESSION_ROLE_KEY) != "admin":
|
|
raise PermissionError("Administrator permissions are required")
|
|
|
|
def configure_app(self, app) -> None:
|
|
max_age = int(self.settings.auth["session_max_age_seconds"])
|
|
app.secret_key = self.settings.auth["secret_key"]
|
|
app.config["PERMANENT_SESSION_LIFETIME"] = timedelta(seconds=max_age)
|
|
app.config["SESSION_COOKIE_NAME"] = self.settings.auth["session_cookie_name"]
|
|
app.config["SESSION_COOKIE_HTTPONLY"] = True
|
|
app.config["SESSION_COOKIE_SAMESITE"] = self.settings.auth.get("cookie_samesite", "Lax")
|
|
app.config["SESSION_COOKIE_SECURE"] = bool(self.settings.auth.get("cookie_secure", False))
|
|
|
|
def create_user(self, *, username: str, password: str, role: str, display_name: str | None = None) -> AuthUser:
|
|
normalized_username = self._normalize_username(username)
|
|
normalized_role = self._normalize_role(role)
|
|
clean_password = self._validate_password(password)
|
|
resolved_display_name = (display_name or normalized_username).strip()
|
|
if not resolved_display_name:
|
|
raise ValueError("Display name cannot be empty")
|
|
return self.user_repository.upsert_user(
|
|
username=normalized_username,
|
|
password_hash=generate_password_hash(clean_password),
|
|
role=normalized_role,
|
|
display_name=resolved_display_name,
|
|
is_active=True,
|
|
)
|
|
|
|
def reset_password(self, *, username: str, new_password: str) -> AuthUser:
|
|
normalized_username = self._normalize_username(username)
|
|
clean_password = self._validate_password(new_password)
|
|
user = self.user_repository.update_password(
|
|
normalized_username,
|
|
generate_password_hash(clean_password),
|
|
)
|
|
if user is None:
|
|
raise ValueError(f"User '{normalized_username}' does not exist")
|
|
return user
|
|
|
|
def update_role(self, *, username: str, role: str) -> AuthUser:
|
|
normalized_username = self._normalize_username(username)
|
|
normalized_role = self._normalize_role(role)
|
|
user = self.user_repository.get_by_username(normalized_username)
|
|
if user is None:
|
|
raise ValueError(f"User '{normalized_username}' does not exist")
|
|
if user.role == normalized_role:
|
|
return user
|
|
if user.role == 'admin' and normalized_role != 'admin' and self.user_repository.count_admin_users() <= 1:
|
|
raise ValueError('At least one active admin user must remain')
|
|
updated = self.user_repository.update_role(normalized_username, normalized_role)
|
|
if updated is None:
|
|
raise ValueError(f"User '{normalized_username}' does not exist")
|
|
if session.get(SESSION_USER_KEY) == updated.username:
|
|
session[SESSION_ROLE_KEY] = updated.role
|
|
return updated
|
|
|
|
def _login_legacy_user(self, username: str, password: str) -> None:
|
|
expected_username = self.settings.auth["username"]
|
|
expected_password = self.settings.auth["password"]
|
|
expected_password_hash = self.settings.auth.get("password_hash")
|
|
|
|
if username != expected_username:
|
|
raise ValueError("Invalid username or password")
|
|
|
|
if expected_password_hash:
|
|
password_ok = check_password_hash(expected_password_hash, password)
|
|
else:
|
|
password_ok = password == expected_password
|
|
|
|
if not password_ok:
|
|
raise ValueError("Invalid username or password")
|
|
|
|
self._set_session(
|
|
expected_username,
|
|
self.settings.auth.get("display_name") or expected_username,
|
|
self.settings.auth.get("role", "admin"),
|
|
)
|
|
|
|
def _set_session(self, username: str, display_name: str, role: str) -> None:
|
|
session.clear()
|
|
session.permanent = True
|
|
session[SESSION_USER_KEY] = username
|
|
session[SESSION_DISPLAY_NAME_KEY] = display_name
|
|
session[SESSION_ROLE_KEY] = role
|
|
|
|
def _normalize_username(self, username: str) -> str:
|
|
normalized = (username or "").strip()
|
|
if not normalized:
|
|
raise ValueError("Username cannot be empty")
|
|
return normalized
|
|
|
|
def _normalize_role(self, role: str) -> str:
|
|
normalized = (role or "").strip().lower()
|
|
if normalized not in VALID_ROLES:
|
|
raise ValueError("Role must be one of: admin, user")
|
|
return normalized
|
|
|
|
def _validate_password(self, password: str) -> str:
|
|
clean_password = password or ""
|
|
if len(clean_password) < 8:
|
|
raise ValueError("Password must be at least 8 characters long")
|
|
return clean_password
|
|
|
|
|
|
_auth_service: AuthService | None = None
|
|
|
|
|
|
def get_auth_service() -> AuthService:
|
|
global _auth_service
|
|
if _auth_service is None:
|
|
_auth_service = AuthService()
|
|
return _auth_service
|