first commit
This commit is contained in:
68
backend/app/core/security.py
Normal file
68
backend/app/core/security.py
Normal file
@@ -0,0 +1,68 @@
|
||||
import base64
|
||||
import secrets
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Optional
|
||||
|
||||
from cryptography.fernet import Fernet
|
||||
from itsdangerous import URLSafeTimedSerializer, BadSignature, SignatureExpired
|
||||
from passlib.context import CryptContext
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from app.core.config import settings
|
||||
from app.models.user import User, UserRole
|
||||
|
||||
pwd_context = CryptContext(schemes=["argon2", "bcrypt"], deprecated="auto")
|
||||
|
||||
def hash_password(password: str) -> str:
|
||||
return pwd_context.hash(password)
|
||||
|
||||
def verify_password(password: str, password_hash: str) -> bool:
|
||||
return pwd_context.verify(password, password_hash)
|
||||
|
||||
def session_serializer() -> URLSafeTimedSerializer:
|
||||
return URLSafeTimedSerializer(settings.SESSION_SECRET, salt="session")
|
||||
|
||||
SESSION_COOKIE = "mt_session"
|
||||
CSRF_COOKIE = "mt_csrf"
|
||||
|
||||
def create_session_token(user_id: int) -> str:
|
||||
s = session_serializer()
|
||||
return s.dumps({"uid": user_id})
|
||||
|
||||
def read_session_token(token: str, max_age_seconds: int = 60 * 60 * 24 * 7) -> Optional[int]:
|
||||
s = session_serializer()
|
||||
try:
|
||||
data = s.loads(token, max_age=max_age_seconds)
|
||||
uid = int(data.get("uid"))
|
||||
return uid
|
||||
except (BadSignature, SignatureExpired, Exception):
|
||||
return None
|
||||
|
||||
def new_csrf_token() -> str:
|
||||
return secrets.token_urlsafe(32)
|
||||
|
||||
def fernet() -> Fernet:
|
||||
# CREDENTIALS_MASTER_KEY powinien być base64 urlsafe 32 bytes
|
||||
key = settings.CREDENTIALS_MASTER_KEY.encode("utf-8")
|
||||
return Fernet(key)
|
||||
|
||||
def encrypt_secret(plain: str) -> str:
|
||||
return fernet().encrypt(plain.encode("utf-8")).decode("utf-8")
|
||||
|
||||
def decrypt_secret(enc: str) -> str:
|
||||
return fernet().decrypt(enc.encode("utf-8")).decode("utf-8")
|
||||
|
||||
async def bootstrap_admin_if_needed(session: AsyncSession) -> None:
|
||||
res = await session.execute(select(User).limit(1))
|
||||
first = res.scalar_one_or_none()
|
||||
if first:
|
||||
return
|
||||
admin = User(
|
||||
email=settings.ADMIN_BOOTSTRAP_EMAIL,
|
||||
password_hash=hash_password(settings.ADMIN_BOOTSTRAP_PASSWORD),
|
||||
role=UserRole.ADMIN,
|
||||
is_active=True,
|
||||
)
|
||||
session.add(admin)
|
||||
await session.commit()
|
||||
Reference in New Issue
Block a user