auth providers

This commit is contained in:
Mateusz Gruszczyński
2026-05-25 09:09:41 +02:00
parent 352c53617c
commit 93aaca553b
4 changed files with 198 additions and 48 deletions

View File

@@ -13,11 +13,8 @@ from ..config import (
AUTH_ENABLE,
AUTH_PROVIDER,
AUTH_PROXY_AUTO_CREATE,
AUTH_PROXY_DEFAULT_ACCESS,
AUTH_PROXY_DEFAULT_ROLE,
AUTH_PROXY_EMAIL_HEADER,
AUTH_PROXY_NAME_HEADER,
AUTH_PROXY_SUBJECT_HEADER,
AUTH_PROXY_AUTO_CREATE_PERMISSION,
AUTH_PROXY_AUTO_CREATE_ROLE,
AUTH_PROXY_USER_HEADER,
)
from ..db import connect, default_user_id, utcnow
@@ -254,29 +251,42 @@ def _safe_username(value: str, fallback: str = "external-user") -> str:
def _external_identity_from_headers() -> dict[str, str] | None:
# Note: Tinyauth and generic proxy auth use a single trusted username header.
username = _clean_header_value(AUTH_PROXY_USER_HEADER)
email = _clean_header_value(AUTH_PROXY_EMAIL_HEADER)
display_name = _clean_header_value(AUTH_PROXY_NAME_HEADER)
subject = _clean_header_value(AUTH_PROXY_SUBJECT_HEADER) if AUTH_PROXY_SUBJECT_HEADER else ""
if not username and email:
username = email.split("@", 1)[0]
if not username:
return None
safe_username = _safe_username(username)
return {
"provider": provider(),
"username": _safe_username(username),
"email": email[:254],
"display_name": display_name[:160],
"subject": (subject or username or email)[:254],
"username": safe_username,
"subject": safe_username,
}
def _grant_default_external_permissions(conn, user_id: int, now: str) -> None:
if AUTH_PROXY_DEFAULT_ACCESS == "none" or AUTH_PROXY_DEFAULT_ROLE == "admin":
# Note: Admins can see and write all profiles through role-based access.
if AUTH_PROXY_AUTO_CREATE_PERMISSION == "none" or AUTH_PROXY_AUTO_CREATE_ROLE == "admin":
return
conn.execute(
"INSERT OR IGNORE INTO user_profile_permissions(user_id,profile_id,access_level,created_at,updated_at) VALUES(?,?,?,?,?)",
(user_id, 0, AUTH_PROXY_DEFAULT_ACCESS, now, now),
(user_id, 0, AUTH_PROXY_AUTO_CREATE_PERMISSION, now, now),
)
def _sync_external_auto_created_user(conn, user: dict[str, Any], now: str) -> None:
# Note: Passwordless external users follow the external auto-create defaults on login.
if not AUTH_PROXY_AUTO_CREATE or user.get("password_hash"):
return
if user.get("external_auth_provider") and user.get("external_auth_provider") != provider():
return
user_id = int(user["id"])
conn.execute("UPDATE users SET role=?, updated_at=? WHERE id=?", (AUTH_PROXY_AUTO_CREATE_ROLE, now, user_id))
if AUTH_PROXY_AUTO_CREATE_ROLE == "admin" or AUTH_PROXY_AUTO_CREATE_PERMISSION == "none":
conn.execute("DELETE FROM user_profile_permissions WHERE user_id=?", (user_id,))
return
conn.execute(
"INSERT OR REPLACE INTO user_profile_permissions(user_id,profile_id,access_level,created_at,updated_at) VALUES(?,?,?,?,?)",
(user_id, 0, AUTH_PROXY_AUTO_CREATE_PERMISSION, now, now),
)
@@ -296,8 +306,6 @@ def authenticate_external_user() -> dict[str, Any] | None:
).fetchone()
if not user:
user = conn.execute("SELECT * FROM users WHERE username=?", (identity["username"],)).fetchone()
if not user and identity["email"]:
user = conn.execute("SELECT * FROM users WHERE lower(email)=lower(?)", (identity["email"],)).fetchone()
if not user:
if not AUTH_PROXY_AUTO_CREATE:
return None
@@ -307,11 +315,11 @@ def authenticate_external_user() -> dict[str, Any] | None:
(
identity["username"],
None,
identity["email"] or None,
identity["display_name"] or None,
None,
None,
identity["provider"],
identity["subject"] or identity["username"],
AUTH_PROXY_DEFAULT_ROLE,
AUTH_PROXY_AUTO_CREATE_ROLE,
1,
now,
now,
@@ -324,15 +332,16 @@ def authenticate_external_user() -> dict[str, Any] | None:
user_id = int(user["id"])
conn.execute(
"""UPDATE users
SET email=COALESCE(NULLIF(?, ''), email),
display_name=COALESCE(NULLIF(?, ''), display_name),
external_auth_provider=?,
SET external_auth_provider=?,
external_subject=COALESCE(NULLIF(?, ''), external_subject),
updated_at=?
WHERE id=?""",
(identity["email"], identity["display_name"], identity["provider"], identity["subject"], now, user_id),
(identity["provider"], identity["subject"], now, user_id),
)
user = conn.execute("SELECT * FROM users WHERE id=?", (user_id,)).fetchone()
if user:
_sync_external_auto_created_user(conn, user, now)
user = conn.execute("SELECT * FROM users WHERE id=?", (user_id,)).fetchone()
if not user or not int(user.get("is_active") or 0):
return None
g.external_user_id = int(user["id"])