import base64 import secrets from datetime import datetime, timedelta from typing import Optional from cryptography.fernet import Fernet from itsdangerous import URLSafeTimedSerializer, BadSignature, SignatureExpired from passlib.context import CryptContext from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.core.config import settings from app.models.user import User, UserRole pwd_context = CryptContext(schemes=["argon2", "bcrypt"], deprecated="auto") def hash_password(password: str) -> str: return pwd_context.hash(password) def verify_password(password: str, password_hash: str) -> bool: return pwd_context.verify(password, password_hash) def session_serializer() -> URLSafeTimedSerializer: return URLSafeTimedSerializer(settings.SESSION_SECRET, salt="session") SESSION_COOKIE = "mt_session" CSRF_COOKIE = "mt_csrf" def create_session_token(user_id: int) -> str: s = session_serializer() return s.dumps({"uid": user_id}) def read_session_token(token: str, max_age_seconds: int = 60 * 60 * 24 * 7) -> Optional[int]: s = session_serializer() try: data = s.loads(token, max_age=max_age_seconds) uid = int(data.get("uid")) return uid except (BadSignature, SignatureExpired, Exception): return None def new_csrf_token() -> str: return secrets.token_urlsafe(32) def fernet() -> Fernet: # CREDENTIALS_MASTER_KEY powinien być base64 urlsafe 32 bytes key = settings.CREDENTIALS_MASTER_KEY.encode("utf-8") return Fernet(key) def encrypt_secret(plain: str) -> str: return fernet().encrypt(plain.encode("utf-8")).decode("utf-8") def decrypt_secret(enc: str) -> str: return fernet().decrypt(enc.encode("utf-8")).decode("utf-8") async def bootstrap_admin_if_needed(session: AsyncSession) -> None: res = await session.execute(select(User).limit(1)) first = res.scalar_one_or_none() if first: return admin = User( email=settings.ADMIN_BOOTSTRAP_EMAIL, password_hash=hash_password(settings.ADMIN_BOOTSTRAP_PASSWORD), role=UserRole.ADMIN, is_active=True, ) session.add(admin) await session.commit()