69 lines
2.1 KiB
Python
69 lines
2.1 KiB
Python
import base64
|
|
import secrets
|
|
from datetime import datetime, timedelta
|
|
from typing import Optional
|
|
|
|
from cryptography.fernet import Fernet
|
|
from itsdangerous import URLSafeTimedSerializer, BadSignature, SignatureExpired
|
|
from passlib.context import CryptContext
|
|
from sqlalchemy import select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.core.config import settings
|
|
from app.models.user import User, UserRole
|
|
|
|
pwd_context = CryptContext(schemes=["argon2", "bcrypt"], deprecated="auto")
|
|
|
|
def hash_password(password: str) -> str:
|
|
return pwd_context.hash(password)
|
|
|
|
def verify_password(password: str, password_hash: str) -> bool:
|
|
return pwd_context.verify(password, password_hash)
|
|
|
|
def session_serializer() -> URLSafeTimedSerializer:
|
|
return URLSafeTimedSerializer(settings.SESSION_SECRET, salt="session")
|
|
|
|
SESSION_COOKIE = "mt_session"
|
|
CSRF_COOKIE = "mt_csrf"
|
|
|
|
def create_session_token(user_id: int) -> str:
|
|
s = session_serializer()
|
|
return s.dumps({"uid": user_id})
|
|
|
|
def read_session_token(token: str, max_age_seconds: int = 60 * 60 * 24 * 7) -> Optional[int]:
|
|
s = session_serializer()
|
|
try:
|
|
data = s.loads(token, max_age=max_age_seconds)
|
|
uid = int(data.get("uid"))
|
|
return uid
|
|
except (BadSignature, SignatureExpired, Exception):
|
|
return None
|
|
|
|
def new_csrf_token() -> str:
|
|
return secrets.token_urlsafe(32)
|
|
|
|
def fernet() -> Fernet:
|
|
# CREDENTIALS_MASTER_KEY powinien być base64 urlsafe 32 bytes
|
|
key = settings.CREDENTIALS_MASTER_KEY.encode("utf-8")
|
|
return Fernet(key)
|
|
|
|
def encrypt_secret(plain: str) -> str:
|
|
return fernet().encrypt(plain.encode("utf-8")).decode("utf-8")
|
|
|
|
def decrypt_secret(enc: str) -> str:
|
|
return fernet().decrypt(enc.encode("utf-8")).decode("utf-8")
|
|
|
|
async def bootstrap_admin_if_needed(session: AsyncSession) -> None:
|
|
res = await session.execute(select(User).limit(1))
|
|
first = res.scalar_one_or_none()
|
|
if first:
|
|
return
|
|
admin = User(
|
|
email=settings.ADMIN_BOOTSTRAP_EMAIL,
|
|
password_hash=hash_password(settings.ADMIN_BOOTSTRAP_PASSWORD),
|
|
role=UserRole.ADMIN,
|
|
is_active=True,
|
|
)
|
|
session.add(admin)
|
|
await session.commit()
|