Files
pyTorrent/pytorrent/services/preferences.py
Mateusz Gruszczyński dc1cac4e6f add auth support
2026-05-06 08:38:07 +02:00

195 lines
9.3 KiB
Python

from __future__ import annotations
import json
from ..db import connect, utcnow, default_user_id
from . import auth
BOOTSTRAP_THEMES = {
"default": "Default Bootstrap",
"flatly": "Flatly",
"litera": "Litera",
"lumen": "Lumen",
"minty": "Minty",
"sketchy": "Sketchy",
"solar": "Solar",
"spacelab": "Spacelab",
"united": "United",
"zephyr": "Zephyr",
}
FONT_FAMILIES = {
"default": "Theme default",
"adwaita-mono": "Adwaita Mono",
"inter": "Inter",
"system-ui": "System UI",
"source-sans-3": "Source Sans 3",
"jetbrains-mono": "JetBrains Mono",
}
def bootstrap_css_url(theme: str | None) -> str:
theme = theme if theme in BOOTSTRAP_THEMES else "default"
if theme == "default":
return "https://cdn.jsdelivr.net/npm/bootstrap@5.3.3/dist/css/bootstrap.min.css"
return f"https://cdn.jsdelivr.net/npm/bootswatch@5.3.3/dist/{theme}/bootstrap.min.css"
def list_profiles(user_id: int | None = None):
user_id = user_id or auth.current_user_id() or default_user_id()
visible = auth.visible_profile_ids(user_id)
with connect() as conn:
if visible is None:
return conn.execute(
"SELECT * FROM rtorrent_profiles ORDER BY is_default DESC, name COLLATE NOCASE"
).fetchall()
if not visible:
return []
placeholders = ",".join("?" for _ in visible)
return conn.execute(
f"SELECT * FROM rtorrent_profiles WHERE id IN ({placeholders}) ORDER BY is_default DESC, name COLLATE NOCASE",
tuple(visible),
).fetchall()
def get_profile(profile_id: int, user_id: int | None = None):
user_id = user_id or auth.current_user_id() or default_user_id()
if not auth.can_access_profile(profile_id, user_id):
return None
with connect() as conn:
return conn.execute("SELECT * FROM rtorrent_profiles WHERE id=?", (profile_id,)).fetchone()
def active_profile(user_id: int | None = None):
user_id = user_id or auth.current_user_id() or default_user_id()
with connect() as conn:
pref = conn.execute("SELECT active_rtorrent_id FROM user_preferences WHERE user_id=?", (user_id,)).fetchone()
if pref and pref.get("active_rtorrent_id") and auth.can_access_profile(int(pref["active_rtorrent_id"]), user_id):
row = conn.execute("SELECT * FROM rtorrent_profiles WHERE id=?", (pref["active_rtorrent_id"],)).fetchone()
if row:
return row
profiles = list_profiles(user_id)
return profiles[0] if profiles else None
def save_profile(data: dict, user_id: int | None = None):
user_id = user_id or auth.current_user_id() or default_user_id()
now = utcnow()
name = str(data.get("name") or "rTorrent").strip()
scgi_url = str(data.get("scgi_url") or "").strip()
timeout = int(data.get("timeout_seconds") or 5)
max_parallel = int(data.get("max_parallel_jobs") or 5)
is_remote = 1 if data.get("is_remote") else 0
is_default = 1 if data.get("is_default") else 0
if not scgi_url.startswith("scgi://"):
raise ValueError("SCGI URL must start with scgi://")
with connect() as conn:
if is_default:
conn.execute("UPDATE rtorrent_profiles SET is_default=0 WHERE user_id=?", (user_id,))
cur = conn.execute(
"INSERT INTO rtorrent_profiles(user_id,name,scgi_url,is_default,timeout_seconds,max_parallel_jobs,is_remote,created_at,updated_at) VALUES(?,?,?,?,?,?,?,?,?)",
(user_id, name, scgi_url, is_default, timeout, max_parallel, is_remote, now, now),
)
profile_id = cur.lastrowid
pref = conn.execute("SELECT active_rtorrent_id FROM user_preferences WHERE user_id=?", (user_id,)).fetchone()
if not pref or not pref.get("active_rtorrent_id") or is_default:
conn.execute(
"UPDATE user_preferences SET active_rtorrent_id=?, updated_at=? WHERE user_id=?",
(profile_id, now, user_id),
)
return conn.execute("SELECT * FROM rtorrent_profiles WHERE id=?", (profile_id,)).fetchone()
def update_profile(profile_id: int, data: dict, user_id: int | None = None):
user_id = user_id or auth.current_user_id() or default_user_id()
now = utcnow()
name = str(data.get("name") or "rTorrent").strip()
scgi_url = str(data.get("scgi_url") or "").strip()
timeout = int(data.get("timeout_seconds") or 5)
max_parallel = int(data.get("max_parallel_jobs") or 5)
is_remote = 1 if data.get("is_remote") else 0
is_default = 1 if data.get("is_default") else 0
if not scgi_url.startswith("scgi://"):
raise ValueError("SCGI URL must start with scgi://")
with connect() as conn:
row = conn.execute("SELECT id FROM rtorrent_profiles WHERE id=?", (profile_id,)).fetchone()
if not row or not auth.can_write_profile(profile_id, user_id):
raise ValueError("Profil nie istnieje")
if is_default:
conn.execute("UPDATE rtorrent_profiles SET is_default=0 WHERE user_id=?", (user_id,))
conn.execute(
"UPDATE rtorrent_profiles SET name=?, scgi_url=?, is_default=?, timeout_seconds=?, max_parallel_jobs=?, is_remote=?, updated_at=? WHERE id=?",
(name, scgi_url, is_default, timeout, max_parallel, is_remote, now, profile_id),
)
return conn.execute("SELECT * FROM rtorrent_profiles WHERE id=?", (profile_id,)).fetchone()
def delete_profile(profile_id: int, user_id: int | None = None):
user_id = user_id or auth.current_user_id() or default_user_id()
auth.require_profile_write(profile_id)
with connect() as conn:
conn.execute("DELETE FROM rtorrent_profiles WHERE id=?", (profile_id,))
active = active_profile(user_id)
conn.execute(
"UPDATE user_preferences SET active_rtorrent_id=?, updated_at=? WHERE user_id=?",
(active["id"] if active else None, utcnow(), user_id),
)
def activate_profile(profile_id: int, user_id: int | None = None):
user_id = user_id or auth.current_user_id() or default_user_id()
with connect() as conn:
row = conn.execute("SELECT id FROM rtorrent_profiles WHERE id=?", (profile_id,)).fetchone()
if not row or not auth.can_access_profile(profile_id, user_id):
raise ValueError("Profil nie istnieje")
conn.execute(
"UPDATE user_preferences SET active_rtorrent_id=?, updated_at=? WHERE user_id=?",
(profile_id, utcnow(), user_id),
)
return get_profile(profile_id, user_id)
def get_preferences(user_id: int | None = None):
user_id = user_id or auth.current_user_id() or default_user_id()
with connect() as conn:
pref = conn.execute("SELECT * FROM user_preferences WHERE user_id=?", (user_id,)).fetchone()
if not pref:
now = utcnow()
conn.execute("INSERT INTO user_preferences(user_id, theme, created_at, updated_at) VALUES(?, 'dark', ?, ?)", (user_id, now, now))
pref = conn.execute("SELECT * FROM user_preferences WHERE user_id=?", (user_id,)).fetchone()
return pref
def save_preferences(data: dict, user_id: int | None = None):
user_id = user_id or auth.current_user_id() or default_user_id()
allowed_theme = data.get("theme") if data.get("theme") in {"light", "dark"} else None
bootstrap_theme = data.get("bootstrap_theme") if data.get("bootstrap_theme") in BOOTSTRAP_THEMES else None
font_family = data.get("font_family") if data.get("font_family") in FONT_FAMILIES else None
table_columns_json = data.get("table_columns_json")
peers_refresh_seconds = data.get("peers_refresh_seconds")
port_check_enabled = data.get("port_check_enabled")
footer_items_json = data.get("footer_items_json")
with connect() as conn:
now = utcnow()
if allowed_theme:
conn.execute("UPDATE user_preferences SET theme=?, updated_at=? WHERE user_id=?", (allowed_theme, now, user_id))
if bootstrap_theme:
conn.execute("UPDATE user_preferences SET bootstrap_theme=?, updated_at=? WHERE user_id=?", (bootstrap_theme, now, user_id))
if font_family:
conn.execute("UPDATE user_preferences SET font_family=?, updated_at=? WHERE user_id=?", (font_family, now, user_id))
if table_columns_json is not None:
conn.execute("UPDATE user_preferences SET table_columns_json=?, updated_at=? WHERE user_id=?", (str(table_columns_json), now, user_id))
if peers_refresh_seconds is not None:
sec = int(peers_refresh_seconds or 0)
if sec not in {0, 10, 15, 30, 60}: sec = 0
conn.execute("UPDATE user_preferences SET peers_refresh_seconds=?, updated_at=? WHERE user_id=?", (sec, now, user_id))
if port_check_enabled is not None:
conn.execute("UPDATE user_preferences SET port_check_enabled=?, updated_at=? WHERE user_id=?", (1 if port_check_enabled else 0, now, user_id))
if footer_items_json is not None:
# Note: Store only JSON objects so footer visibility can be extended without schema churn.
value = footer_items_json if isinstance(footer_items_json, str) else json.dumps(footer_items_json)
parsed = json.loads(value or "{}")
if not isinstance(parsed, dict):
parsed = {}
conn.execute("UPDATE user_preferences SET footer_items_json=?, updated_at=? WHERE user_id=?", (json.dumps(parsed), now, user_id))
return get_preferences(user_id)