194 lines
9.3 KiB
Python
194 lines
9.3 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
|
|
from ..db import connect, utcnow, default_user_id
|
|
from . import auth
|
|
|
|
BOOTSTRAP_THEMES = {
|
|
"default": "Default Bootstrap",
|
|
"flatly": "Flatly",
|
|
"litera": "Litera",
|
|
"lumen": "Lumen",
|
|
"minty": "Minty",
|
|
"sketchy": "Sketchy",
|
|
"solar": "Solar",
|
|
"spacelab": "Spacelab",
|
|
"united": "United",
|
|
"zephyr": "Zephyr",
|
|
}
|
|
|
|
FONT_FAMILIES = {
|
|
"default": "Theme default",
|
|
"adwaita-mono": "Adwaita Mono",
|
|
"inter": "Inter",
|
|
"system-ui": "System UI",
|
|
"source-sans-3": "Source Sans 3",
|
|
"jetbrains-mono": "JetBrains Mono",
|
|
}
|
|
|
|
def bootstrap_css_url(theme: str | None) -> str:
|
|
# Notatka: zachowana funkcja zwraca aktualny adres motywu, ale źródło wybiera konfiguracja offline.
|
|
from .frontend_assets import bootstrap_css_path
|
|
|
|
return bootstrap_css_path(theme)
|
|
|
|
def list_profiles(user_id: int | None = None):
|
|
user_id = user_id or auth.current_user_id() or default_user_id()
|
|
visible = auth.visible_profile_ids(user_id)
|
|
with connect() as conn:
|
|
if visible is None:
|
|
return conn.execute(
|
|
"SELECT * FROM rtorrent_profiles ORDER BY is_default DESC, name COLLATE NOCASE"
|
|
).fetchall()
|
|
if not visible:
|
|
return []
|
|
placeholders = ",".join("?" for _ in visible)
|
|
return conn.execute(
|
|
f"SELECT * FROM rtorrent_profiles WHERE id IN ({placeholders}) ORDER BY is_default DESC, name COLLATE NOCASE",
|
|
tuple(visible),
|
|
).fetchall()
|
|
|
|
|
|
def get_profile(profile_id: int, user_id: int | None = None):
|
|
user_id = user_id or auth.current_user_id() or default_user_id()
|
|
if not auth.can_access_profile(profile_id, user_id):
|
|
return None
|
|
with connect() as conn:
|
|
return conn.execute("SELECT * FROM rtorrent_profiles WHERE id=?", (profile_id,)).fetchone()
|
|
|
|
|
|
def active_profile(user_id: int | None = None):
|
|
user_id = user_id or auth.current_user_id() or default_user_id()
|
|
with connect() as conn:
|
|
pref = conn.execute("SELECT active_rtorrent_id FROM user_preferences WHERE user_id=?", (user_id,)).fetchone()
|
|
if pref and pref.get("active_rtorrent_id") and auth.can_access_profile(int(pref["active_rtorrent_id"]), user_id):
|
|
row = conn.execute("SELECT * FROM rtorrent_profiles WHERE id=?", (pref["active_rtorrent_id"],)).fetchone()
|
|
if row:
|
|
return row
|
|
profiles = list_profiles(user_id)
|
|
return profiles[0] if profiles else None
|
|
|
|
|
|
def save_profile(data: dict, user_id: int | None = None):
|
|
user_id = user_id or auth.current_user_id() or default_user_id()
|
|
now = utcnow()
|
|
name = str(data.get("name") or "rTorrent").strip()
|
|
scgi_url = str(data.get("scgi_url") or "").strip()
|
|
timeout = int(data.get("timeout_seconds") or 5)
|
|
max_parallel = int(data.get("max_parallel_jobs") or 5)
|
|
is_remote = 1 if data.get("is_remote") else 0
|
|
is_default = 1 if data.get("is_default") else 0
|
|
if not scgi_url.startswith("scgi://"):
|
|
raise ValueError("SCGI URL must start with scgi://")
|
|
with connect() as conn:
|
|
if is_default:
|
|
conn.execute("UPDATE rtorrent_profiles SET is_default=0 WHERE user_id=?", (user_id,))
|
|
cur = conn.execute(
|
|
"INSERT INTO rtorrent_profiles(user_id,name,scgi_url,is_default,timeout_seconds,max_parallel_jobs,is_remote,created_at,updated_at) VALUES(?,?,?,?,?,?,?,?,?)",
|
|
(user_id, name, scgi_url, is_default, timeout, max_parallel, is_remote, now, now),
|
|
)
|
|
profile_id = cur.lastrowid
|
|
pref = conn.execute("SELECT active_rtorrent_id FROM user_preferences WHERE user_id=?", (user_id,)).fetchone()
|
|
if not pref or not pref.get("active_rtorrent_id") or is_default:
|
|
conn.execute(
|
|
"UPDATE user_preferences SET active_rtorrent_id=?, updated_at=? WHERE user_id=?",
|
|
(profile_id, now, user_id),
|
|
)
|
|
return conn.execute("SELECT * FROM rtorrent_profiles WHERE id=?", (profile_id,)).fetchone()
|
|
|
|
|
|
def update_profile(profile_id: int, data: dict, user_id: int | None = None):
|
|
user_id = user_id or auth.current_user_id() or default_user_id()
|
|
now = utcnow()
|
|
name = str(data.get("name") or "rTorrent").strip()
|
|
scgi_url = str(data.get("scgi_url") or "").strip()
|
|
timeout = int(data.get("timeout_seconds") or 5)
|
|
max_parallel = int(data.get("max_parallel_jobs") or 5)
|
|
is_remote = 1 if data.get("is_remote") else 0
|
|
is_default = 1 if data.get("is_default") else 0
|
|
if not scgi_url.startswith("scgi://"):
|
|
raise ValueError("SCGI URL must start with scgi://")
|
|
with connect() as conn:
|
|
row = conn.execute("SELECT id FROM rtorrent_profiles WHERE id=?", (profile_id,)).fetchone()
|
|
if not row or not auth.can_write_profile(profile_id, user_id):
|
|
raise ValueError("Profil nie istnieje")
|
|
if is_default:
|
|
conn.execute("UPDATE rtorrent_profiles SET is_default=0 WHERE user_id=?", (user_id,))
|
|
conn.execute(
|
|
"UPDATE rtorrent_profiles SET name=?, scgi_url=?, is_default=?, timeout_seconds=?, max_parallel_jobs=?, is_remote=?, updated_at=? WHERE id=?",
|
|
(name, scgi_url, is_default, timeout, max_parallel, is_remote, now, profile_id),
|
|
)
|
|
return conn.execute("SELECT * FROM rtorrent_profiles WHERE id=?", (profile_id,)).fetchone()
|
|
|
|
|
|
def delete_profile(profile_id: int, user_id: int | None = None):
|
|
user_id = user_id or auth.current_user_id() or default_user_id()
|
|
auth.require_profile_write(profile_id)
|
|
with connect() as conn:
|
|
conn.execute("DELETE FROM rtorrent_profiles WHERE id=?", (profile_id,))
|
|
active = active_profile(user_id)
|
|
conn.execute(
|
|
"UPDATE user_preferences SET active_rtorrent_id=?, updated_at=? WHERE user_id=?",
|
|
(active["id"] if active else None, utcnow(), user_id),
|
|
)
|
|
|
|
|
|
def activate_profile(profile_id: int, user_id: int | None = None):
|
|
user_id = user_id or auth.current_user_id() or default_user_id()
|
|
with connect() as conn:
|
|
row = conn.execute("SELECT id FROM rtorrent_profiles WHERE id=?", (profile_id,)).fetchone()
|
|
if not row or not auth.can_access_profile(profile_id, user_id):
|
|
raise ValueError("Profil nie istnieje")
|
|
conn.execute(
|
|
"UPDATE user_preferences SET active_rtorrent_id=?, updated_at=? WHERE user_id=?",
|
|
(profile_id, utcnow(), user_id),
|
|
)
|
|
return get_profile(profile_id, user_id)
|
|
|
|
|
|
def get_preferences(user_id: int | None = None):
|
|
user_id = user_id or auth.current_user_id() or default_user_id()
|
|
with connect() as conn:
|
|
pref = conn.execute("SELECT * FROM user_preferences WHERE user_id=?", (user_id,)).fetchone()
|
|
if not pref:
|
|
now = utcnow()
|
|
conn.execute("INSERT INTO user_preferences(user_id, theme, created_at, updated_at) VALUES(?, 'dark', ?, ?)", (user_id, now, now))
|
|
pref = conn.execute("SELECT * FROM user_preferences WHERE user_id=?", (user_id,)).fetchone()
|
|
return pref
|
|
|
|
|
|
def save_preferences(data: dict, user_id: int | None = None):
|
|
user_id = user_id or auth.current_user_id() or default_user_id()
|
|
allowed_theme = data.get("theme") if data.get("theme") in {"light", "dark"} else None
|
|
bootstrap_theme = data.get("bootstrap_theme") if data.get("bootstrap_theme") in BOOTSTRAP_THEMES else None
|
|
font_family = data.get("font_family") if data.get("font_family") in FONT_FAMILIES else None
|
|
table_columns_json = data.get("table_columns_json")
|
|
peers_refresh_seconds = data.get("peers_refresh_seconds")
|
|
port_check_enabled = data.get("port_check_enabled")
|
|
footer_items_json = data.get("footer_items_json")
|
|
with connect() as conn:
|
|
now = utcnow()
|
|
if allowed_theme:
|
|
conn.execute("UPDATE user_preferences SET theme=?, updated_at=? WHERE user_id=?", (allowed_theme, now, user_id))
|
|
if bootstrap_theme:
|
|
conn.execute("UPDATE user_preferences SET bootstrap_theme=?, updated_at=? WHERE user_id=?", (bootstrap_theme, now, user_id))
|
|
if font_family:
|
|
conn.execute("UPDATE user_preferences SET font_family=?, updated_at=? WHERE user_id=?", (font_family, now, user_id))
|
|
if table_columns_json is not None:
|
|
conn.execute("UPDATE user_preferences SET table_columns_json=?, updated_at=? WHERE user_id=?", (str(table_columns_json), now, user_id))
|
|
if peers_refresh_seconds is not None:
|
|
sec = int(peers_refresh_seconds or 0)
|
|
if sec not in {0, 10, 15, 30, 60}: sec = 0
|
|
conn.execute("UPDATE user_preferences SET peers_refresh_seconds=?, updated_at=? WHERE user_id=?", (sec, now, user_id))
|
|
if port_check_enabled is not None:
|
|
conn.execute("UPDATE user_preferences SET port_check_enabled=?, updated_at=? WHERE user_id=?", (1 if port_check_enabled else 0, now, user_id))
|
|
if footer_items_json is not None:
|
|
# Note: Store only JSON objects so footer visibility can be extended without schema churn.
|
|
value = footer_items_json if isinstance(footer_items_json, str) else json.dumps(footer_items_json)
|
|
parsed = json.loads(value or "{}")
|
|
if not isinstance(parsed, dict):
|
|
parsed = {}
|
|
conn.execute("UPDATE user_preferences SET footer_items_json=?, updated_at=? WHERE user_id=?", (json.dumps(parsed), now, user_id))
|
|
return get_preferences(user_id)
|