from __future__ import annotations from ..db import connect, utcnow, default_user_id BOOTSTRAP_THEMES = { "default": "Default Bootstrap", "flatly": "Flatly", "litera": "Litera", "lumen": "Lumen", "minty": "Minty", "sketchy": "Sketchy", "solar": "Solar", "spacelab": "Spacelab", "united": "United", "zephyr": "Zephyr", } FONT_FAMILIES = { "default": "Theme default", "adwaita-mono": "Adwaita Mono", "inter": "Inter", "system-ui": "System UI", "source-sans-3": "Source Sans 3", "jetbrains-mono": "JetBrains Mono", } def bootstrap_css_url(theme: str | None) -> str: theme = theme if theme in BOOTSTRAP_THEMES else "default" if theme == "default": return "https://cdn.jsdelivr.net/npm/bootstrap@5.3.3/dist/css/bootstrap.min.css" return f"https://cdn.jsdelivr.net/npm/bootswatch@5.3.3/dist/{theme}/bootstrap.min.css" def list_profiles(user_id: int | None = None): user_id = user_id or default_user_id() with connect() as conn: return conn.execute( "SELECT * FROM rtorrent_profiles WHERE user_id=? ORDER BY is_default DESC, name COLLATE NOCASE", (user_id,), ).fetchall() def get_profile(profile_id: int, user_id: int | None = None): user_id = user_id or default_user_id() with connect() as conn: return conn.execute( "SELECT * FROM rtorrent_profiles WHERE id=? AND user_id=?", (profile_id, user_id), ).fetchone() def active_profile(user_id: int | None = None): user_id = user_id or default_user_id() with connect() as conn: pref = conn.execute("SELECT active_rtorrent_id FROM user_preferences WHERE user_id=?", (user_id,)).fetchone() if pref and pref.get("active_rtorrent_id"): row = conn.execute( "SELECT * FROM rtorrent_profiles WHERE id=? AND user_id=?", (pref["active_rtorrent_id"], user_id), ).fetchone() if row: return row row = conn.execute( "SELECT * FROM rtorrent_profiles WHERE user_id=? ORDER BY is_default DESC, id ASC LIMIT 1", (user_id,), ).fetchone() return row def save_profile(data: dict, user_id: int | None = None): user_id = user_id or default_user_id() now = utcnow() name = str(data.get("name") or "rTorrent").strip() scgi_url = str(data.get("scgi_url") or "").strip() timeout = int(data.get("timeout_seconds") or 5) max_parallel = int(data.get("max_parallel_jobs") or 5) is_remote = 1 if data.get("is_remote") else 0 is_default = 1 if data.get("is_default") else 0 if not scgi_url.startswith("scgi://"): raise ValueError("SCGI URL musi zaczynać się od scgi://") with connect() as conn: if is_default: conn.execute("UPDATE rtorrent_profiles SET is_default=0 WHERE user_id=?", (user_id,)) cur = conn.execute( "INSERT INTO rtorrent_profiles(user_id,name,scgi_url,is_default,timeout_seconds,max_parallel_jobs,is_remote,created_at,updated_at) VALUES(?,?,?,?,?,?,?,?,?)", (user_id, name, scgi_url, is_default, timeout, max_parallel, is_remote, now, now), ) profile_id = cur.lastrowid pref = conn.execute("SELECT active_rtorrent_id FROM user_preferences WHERE user_id=?", (user_id,)).fetchone() if not pref or not pref.get("active_rtorrent_id") or is_default: conn.execute( "UPDATE user_preferences SET active_rtorrent_id=?, updated_at=? WHERE user_id=?", (profile_id, now, user_id), ) return conn.execute("SELECT * FROM rtorrent_profiles WHERE id=? AND user_id=?", (profile_id, user_id)).fetchone() def update_profile(profile_id: int, data: dict, user_id: int | None = None): user_id = user_id or default_user_id() now = utcnow() name = str(data.get("name") or "rTorrent").strip() scgi_url = str(data.get("scgi_url") or "").strip() timeout = int(data.get("timeout_seconds") or 5) max_parallel = int(data.get("max_parallel_jobs") or 5) is_remote = 1 if data.get("is_remote") else 0 is_default = 1 if data.get("is_default") else 0 if not scgi_url.startswith("scgi://"): raise ValueError("SCGI URL musi zaczynać się od scgi://") with connect() as conn: row = conn.execute("SELECT id FROM rtorrent_profiles WHERE id=? AND user_id=?", (profile_id, user_id)).fetchone() if not row: raise ValueError("Profil nie istnieje") if is_default: conn.execute("UPDATE rtorrent_profiles SET is_default=0 WHERE user_id=?", (user_id,)) conn.execute( "UPDATE rtorrent_profiles SET name=?, scgi_url=?, is_default=?, timeout_seconds=?, max_parallel_jobs=?, is_remote=?, updated_at=? WHERE id=? AND user_id=?", (name, scgi_url, is_default, timeout, max_parallel, is_remote, now, profile_id, user_id), ) return conn.execute("SELECT * FROM rtorrent_profiles WHERE id=? AND user_id=?", (profile_id, user_id)).fetchone() def delete_profile(profile_id: int, user_id: int | None = None): user_id = user_id or default_user_id() with connect() as conn: conn.execute("DELETE FROM rtorrent_profiles WHERE id=? AND user_id=?", (profile_id, user_id)) active = active_profile(user_id) conn.execute( "UPDATE user_preferences SET active_rtorrent_id=?, updated_at=? WHERE user_id=?", (active["id"] if active else None, utcnow(), user_id), ) def activate_profile(profile_id: int, user_id: int | None = None): user_id = user_id or default_user_id() with connect() as conn: row = conn.execute("SELECT id FROM rtorrent_profiles WHERE id=? AND user_id=?", (profile_id, user_id)).fetchone() if not row: raise ValueError("Profil nie istnieje") conn.execute( "UPDATE user_preferences SET active_rtorrent_id=?, updated_at=? WHERE user_id=?", (profile_id, utcnow(), user_id), ) return get_profile(profile_id, user_id) def get_preferences(user_id: int | None = None): user_id = user_id or default_user_id() with connect() as conn: return conn.execute("SELECT * FROM user_preferences WHERE user_id=?", (user_id,)).fetchone() def save_preferences(data: dict, user_id: int | None = None): user_id = user_id or default_user_id() allowed_theme = data.get("theme") if data.get("theme") in {"light", "dark"} else None bootstrap_theme = data.get("bootstrap_theme") if data.get("bootstrap_theme") in BOOTSTRAP_THEMES else None font_family = data.get("font_family") if data.get("font_family") in FONT_FAMILIES else None table_columns_json = data.get("table_columns_json") peers_refresh_seconds = data.get("peers_refresh_seconds") port_check_enabled = data.get("port_check_enabled") with connect() as conn: now = utcnow() if allowed_theme: conn.execute("UPDATE user_preferences SET theme=?, updated_at=? WHERE user_id=?", (allowed_theme, now, user_id)) if bootstrap_theme: conn.execute("UPDATE user_preferences SET bootstrap_theme=?, updated_at=? WHERE user_id=?", (bootstrap_theme, now, user_id)) if font_family: conn.execute("UPDATE user_preferences SET font_family=?, updated_at=? WHERE user_id=?", (font_family, now, user_id)) if table_columns_json is not None: conn.execute("UPDATE user_preferences SET table_columns_json=?, updated_at=? WHERE user_id=?", (str(table_columns_json), now, user_id)) if peers_refresh_seconds is not None: sec = int(peers_refresh_seconds or 0) if sec not in {0, 10, 15, 30, 60}: sec = 0 conn.execute("UPDATE user_preferences SET peers_refresh_seconds=?, updated_at=? WHERE user_id=?", (sec, now, user_id)) if port_check_enabled is not None: conn.execute("UPDATE user_preferences SET port_check_enabled=?, updated_at=? WHERE user_id=?", (1 if port_check_enabled else 0, now, user_id)) return get_preferences(user_id)