Files
pyTorrent/pytorrent/services/auth.py
Mateusz Gruszczyński dc1cac4e6f add auth support
2026-05-06 08:38:07 +02:00

345 lines
13 KiB
Python

from __future__ import annotations
from functools import wraps
from typing import Any
from urllib.parse import urlparse
from flask import abort, jsonify, redirect, request, session, url_for
from werkzeug.security import check_password_hash, generate_password_hash
from ..config import AUTH_ENABLE
from ..db import connect, default_user_id, utcnow
PUBLIC_ENDPOINTS = {"main.login", "main.logout", "api.auth_login", "api.auth_me", "static"}
RTORRENT_WRITE_PREFIXES = (
"/api/torrents/",
"/api/speed/limits",
"/api/labels",
"/api/ratio-groups",
"/api/rss",
"/api/smart-queue",
"/api/automations",
"/api/jobs",
)
RTORRENT_CONFIG_PREFIXES = ("/api/rtorrent-config",)
ADMIN_PREFIXES = ("/api/auth/users", "/api/profiles")
# Note: API reads that expose rTorrent/profile data must also respect profile permissions.
PROFILE_READ_PREFIXES = (
"/api/torrents",
"/api/torrent-stats",
"/api/system/status",
"/api/app/status",
"/api/port-check",
"/api/path",
"/api/labels",
"/api/ratio-groups",
"/api/rss",
"/api/rtorrent-config",
"/api/smart-queue",
"/api/traffic/history",
"/api/automations",
)
def enabled() -> bool:
return bool(AUTH_ENABLE)
def password_hash(password: str) -> str:
return generate_password_hash(password or "")
def current_user_id() -> int:
if not enabled():
return default_user_id()
try:
return int(session.get("user_id") or 0)
except Exception:
return 0
def current_user() -> dict[str, Any] | None:
uid = current_user_id()
if not uid:
return None
with connect() as conn:
return conn.execute(
"SELECT id, username, role, is_active, created_at, updated_at FROM users WHERE id=?",
(uid,),
).fetchone()
def is_admin(user: dict[str, Any] | None = None) -> bool:
if not enabled():
return True
user = user or current_user()
return bool(user and user.get("role") == "admin" and int(user.get("is_active") or 0))
def _permissions(user_id: int | None = None) -> list[dict[str, Any]]:
if not enabled():
return [{"profile_id": 0, "access_level": "full"}]
uid = user_id or current_user_id()
if not uid:
return []
with connect() as conn:
return conn.execute(
"SELECT profile_id, access_level FROM user_profile_permissions WHERE user_id=?",
(uid,),
).fetchall()
def can_access_profile(profile_id: int | None, user_id: int | None = None) -> bool:
if not enabled():
return True
uid = user_id or current_user_id()
if not uid:
return False
with connect() as conn:
user = conn.execute("SELECT role, is_active FROM users WHERE id=?", (uid,)).fetchone()
if not user or not int(user.get("is_active") or 0):
return False
if user.get("role") == "admin":
return True
pid = int(profile_id or 0)
row = conn.execute(
"SELECT 1 FROM user_profile_permissions WHERE user_id=? AND (profile_id=0 OR profile_id=?) LIMIT 1",
(uid, pid),
).fetchone()
return bool(row)
def can_write_profile(profile_id: int | None, user_id: int | None = None) -> bool:
if not enabled():
return True
uid = user_id or current_user_id()
if not uid:
return False
with connect() as conn:
user = conn.execute("SELECT role, is_active FROM users WHERE id=?", (uid,)).fetchone()
if not user or not int(user.get("is_active") or 0):
return False
if user.get("role") == "admin":
return True
pid = int(profile_id or 0)
row = conn.execute(
"SELECT access_level FROM user_profile_permissions WHERE user_id=? AND (profile_id=0 OR profile_id=?) ORDER BY profile_id DESC LIMIT 1",
(uid, pid),
).fetchone()
return bool(row and row.get("access_level") == "full")
def visible_profile_ids(user_id: int | None = None) -> set[int] | None:
if not enabled():
return None
uid = user_id or current_user_id()
if not uid:
return set()
with connect() as conn:
user = conn.execute("SELECT role, is_active FROM users WHERE id=?", (uid,)).fetchone()
if not user or not int(user.get("is_active") or 0):
return set()
if user.get("role") == "admin":
return None
rows = conn.execute("SELECT profile_id FROM user_profile_permissions WHERE user_id=?", (uid,)).fetchall()
if any(int(row.get("profile_id") or 0) == 0 for row in rows):
return None
return {int(row.get("profile_id") or 0) for row in rows}
def same_origin_request() -> bool:
"""Return False only when an unsafe request clearly comes from another origin."""
origin = request.headers.get("Origin") or request.headers.get("Referer")
if not origin:
return True
try:
parsed = urlparse(origin)
return parsed.scheme == request.scheme and parsed.netloc == request.host
except Exception:
return False
def writable_profile_ids(user_id: int | None = None) -> set[int] | None:
if not enabled():
return None
uid = user_id or current_user_id()
if not uid:
return set()
with connect() as conn:
user = conn.execute("SELECT role, is_active FROM users WHERE id=?", (uid,)).fetchone()
if not user or not int(user.get("is_active") or 0):
return set()
if user.get("role") == "admin":
return None
rows = conn.execute("SELECT profile_id FROM user_profile_permissions WHERE user_id=? AND access_level='full'", (uid,)).fetchall()
if any(int(row.get("profile_id") or 0) == 0 for row in rows):
return None
return {int(row.get("profile_id") or 0) for row in rows}
def require_admin() -> None:
if enabled() and not is_admin():
abort(403)
def require_profile_read(profile_id: int | None) -> None:
if enabled() and not can_access_profile(profile_id):
abort(403)
def require_profile_write(profile_id: int | None) -> None:
if enabled() and not can_write_profile(profile_id):
abort(403)
def login_user(username: str, password: str) -> dict[str, Any] | None:
if not enabled():
return {"id": default_user_id(), "username": "default", "role": "admin", "is_active": 1}
with connect() as conn:
user = conn.execute("SELECT * FROM users WHERE username=?", (username.strip(),)).fetchone()
if not user or not int(user.get("is_active") or 0):
return None
if not user.get("password_hash") or not check_password_hash(user.get("password_hash"), password or ""):
return None
session.clear()
session["user_id"] = int(user["id"])
session["username"] = user["username"]
session["role"] = user.get("role") or "user"
return current_user()
def logout_user() -> None:
session.clear()
def ensure_admin_user() -> None:
if not enabled():
return
now = utcnow()
with connect() as conn:
row = conn.execute("SELECT id FROM users WHERE username='admin'").fetchone()
if not row:
conn.execute(
"INSERT INTO users(username,password_hash,role,is_active,created_at,updated_at) VALUES(?,?,?,?,?,?)",
("admin", password_hash("admin"), "admin", 1, now, now),
)
else:
conn.execute("UPDATE users SET role='admin', is_active=1, updated_at=? WHERE username='admin'", (now,))
def list_users() -> list[dict[str, Any]]:
require_admin()
with connect() as conn:
users = conn.execute(
"SELECT id, username, role, is_active, created_at, updated_at FROM users ORDER BY username COLLATE NOCASE"
).fetchall()
perms = conn.execute(
"SELECT user_id, profile_id, access_level FROM user_profile_permissions ORDER BY user_id, profile_id"
).fetchall()
by_user: dict[int, list[dict[str, Any]]] = {}
for perm in perms:
by_user.setdefault(int(perm["user_id"]), []).append({
"profile_id": int(perm.get("profile_id") or 0),
"access_level": perm.get("access_level") or "ro",
})
for user in users:
user["permissions"] = by_user.get(int(user["id"]), [])
return users
def save_user(data: dict[str, Any], user_id: int | None = None) -> dict[str, Any]:
require_admin()
now = utcnow()
username = str(data.get("username") or "").strip()
role = "admin" if data.get("role") == "admin" else "user"
is_active = 1 if data.get("is_active", True) else 0
if not username:
raise ValueError("Username is required")
with connect() as conn:
if user_id:
row = conn.execute("SELECT id FROM users WHERE id=?", (user_id,)).fetchone()
if not row:
raise ValueError("User does not exist")
conn.execute(
"UPDATE users SET username=?, role=?, is_active=?, updated_at=? WHERE id=?",
(username, role, is_active, now, user_id),
)
else:
cur = conn.execute(
"INSERT INTO users(username,password_hash,role,is_active,created_at,updated_at) VALUES(?,?,?,?,?,?)",
(username, password_hash(str(data.get("password") or username)), role, is_active, now, now),
)
user_id = int(cur.lastrowid)
if data.get("password"):
conn.execute("UPDATE users SET password_hash=?, updated_at=? WHERE id=?", (password_hash(str(data.get("password"))), now, user_id))
if role != "admin":
conn.execute("DELETE FROM user_profile_permissions WHERE user_id=?", (user_id,))
for item in data.get("permissions") or []:
profile_id = int(item.get("profile_id") or 0)
access = "full" if item.get("access_level") == "full" else "ro"
conn.execute(
"INSERT OR REPLACE INTO user_profile_permissions(user_id,profile_id,access_level,created_at,updated_at) VALUES(?,?,?,?,?)",
(user_id, profile_id, access, now, now),
)
else:
conn.execute("DELETE FROM user_profile_permissions WHERE user_id=?", (user_id,))
return conn.execute("SELECT id, username, role, is_active, created_at, updated_at FROM users WHERE id=?", (user_id,)).fetchone()
def delete_user(user_id: int) -> None:
require_admin()
if int(user_id) == current_user_id():
raise ValueError("Cannot delete current user")
with connect() as conn:
conn.execute("DELETE FROM user_profile_permissions WHERE user_id=?", (user_id,))
conn.execute("DELETE FROM users WHERE id=? AND username <> 'admin'", (user_id,))
def install_guards(app) -> None:
@app.before_request
def _auth_guard():
if not enabled():
return None
endpoint = request.endpoint or ""
if endpoint in PUBLIC_ENDPOINTS or endpoint.startswith("static"):
return None
if not current_user_id():
if request.path.startswith("/api/"):
return jsonify({"ok": False, "error": "Authentication required"}), 401
return redirect(url_for("main.login", next=request.full_path if request.query_string else request.path))
user = current_user()
if not user or not int(user.get("is_active") or 0):
logout_user()
return jsonify({"ok": False, "error": "Authentication required"}), 401 if request.path.startswith("/api/") else redirect(url_for("main.login"))
if request.path.startswith("/api/auth/users") and not is_admin(user):
return jsonify({"ok": False, "error": "Admin only"}), 403
if request.path.startswith(PROFILE_READ_PREFIXES):
profile_id = _request_profile_id()
if profile_id and not can_access_profile(profile_id):
return jsonify({"ok": False, "error": "Profile access denied"}), 403
if request.method not in {"GET", "HEAD", "OPTIONS"}:
if request.path.startswith("/api/") and not same_origin_request():
return jsonify({"ok": False, "error": "Cross-origin API request blocked"}), 403
if request.path.startswith("/api/profiles") and not request.path.endswith("/activate") and not is_admin(user):
return jsonify({"ok": False, "error": "Admin only"}), 403
profile_id = _request_profile_id()
if request.path.startswith(RTORRENT_CONFIG_PREFIXES) and not can_write_profile(profile_id):
return jsonify({"ok": False, "error": "Read-only profile access"}), 403
if request.path.startswith(RTORRENT_WRITE_PREFIXES) and not can_write_profile(profile_id):
return jsonify({"ok": False, "error": "Read-only profile access"}), 403
return None
def _request_profile_id() -> int | None:
if request.view_args and request.view_args.get("profile_id"):
return int(request.view_args["profile_id"])
try:
payload = request.get_json(silent=True) or {}
if payload.get("profile_id"):
return int(payload.get("profile_id"))
except Exception:
pass
from . import preferences
profile = preferences.active_profile()
return int(profile["id"]) if profile else None