107 lines
3.7 KiB
Python
107 lines
3.7 KiB
Python
from datetime import timedelta
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException, Request, status
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.api.deps import get_current_user, get_db
|
|
from app.core.config import settings
|
|
from app.core.security import create_access_token, get_password_hash, verify_password
|
|
from app.models.user import User
|
|
from app.schemas.auth import (
|
|
ChangePasswordRequest,
|
|
RegisterRequest,
|
|
TokenResponse,
|
|
UpdateUserPreferencesRequest,
|
|
UserResponse,
|
|
)
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
@router.post("/register", response_model=UserResponse)
|
|
def register(payload: RegisterRequest, db: Session = Depends(get_db)):
|
|
if not settings.allow_registration:
|
|
raise HTTPException(status_code=403, detail="Registration is disabled")
|
|
existing = db.query(User).filter(User.username == payload.username).first()
|
|
if existing:
|
|
raise HTTPException(status_code=409, detail="Username already exists")
|
|
user = User(username=payload.username, password_hash=get_password_hash(payload.password))
|
|
db.add(user)
|
|
db.commit()
|
|
db.refresh(user)
|
|
return user
|
|
|
|
|
|
@router.post("/login", response_model=TokenResponse)
|
|
async def login(request: Request, db: Session = Depends(get_db)):
|
|
username = None
|
|
password = None
|
|
content_type = (request.headers.get("content-type") or "").lower()
|
|
|
|
if "application/json" in content_type:
|
|
try:
|
|
payload = await request.json()
|
|
except Exception:
|
|
payload = {}
|
|
username = payload.get("username")
|
|
password = payload.get("password")
|
|
else:
|
|
form_data = await request.form()
|
|
username = form_data.get("username")
|
|
password = form_data.get("password")
|
|
|
|
if not username or not password:
|
|
raise HTTPException(status_code=422, detail="Username and password are required")
|
|
|
|
user = db.query(User).filter(User.username == username).first()
|
|
if not user or not verify_password(password, user.password_hash):
|
|
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid credentials")
|
|
token = create_access_token(
|
|
subject=user.username,
|
|
expires_delta=timedelta(minutes=settings.access_token_expire_minutes),
|
|
)
|
|
return TokenResponse(access_token=token, user=UserResponse.model_validate(user))
|
|
|
|
|
|
@router.get("/me", response_model=UserResponse)
|
|
def me(current_user: User = Depends(get_current_user)):
|
|
return current_user
|
|
|
|
|
|
|
|
|
|
@router.put("/preferences", response_model=UserResponse)
|
|
def update_preferences(
|
|
payload: UpdateUserPreferencesRequest,
|
|
current_user: User = Depends(get_current_user),
|
|
db: Session = Depends(get_db),
|
|
):
|
|
preferred_language = (payload.preferred_language or 'pl').strip().lower()
|
|
preferred_font = (payload.preferred_font or 'default').strip().lower()
|
|
|
|
if preferred_language not in {'pl', 'en', 'es', 'no'}:
|
|
raise HTTPException(status_code=422, detail='Unsupported language')
|
|
if preferred_font not in {'default', 'adwaita_mono', 'hack'}:
|
|
raise HTTPException(status_code=422, detail='Unsupported font')
|
|
|
|
current_user.preferred_language = preferred_language
|
|
current_user.preferred_font = preferred_font
|
|
db.add(current_user)
|
|
db.commit()
|
|
db.refresh(current_user)
|
|
return current_user
|
|
|
|
|
|
@router.post("/change-password")
|
|
def change_password(
|
|
payload: ChangePasswordRequest,
|
|
current_user: User = Depends(get_current_user),
|
|
db: Session = Depends(get_db),
|
|
):
|
|
if not verify_password(payload.current_password, current_user.password_hash):
|
|
raise HTTPException(status_code=400, detail="Current password is invalid")
|
|
current_user.password_hash = get_password_hash(payload.new_password)
|
|
db.add(current_user)
|
|
db.commit()
|
|
return {"message": "Password changed successfully"}
|