first commit
This commit is contained in:
173
pytorrent/services/automation_rules.py
Normal file
173
pytorrent/services/automation_rules.py
Normal file
@@ -0,0 +1,173 @@
|
||||
from __future__ import annotations
|
||||
from datetime import datetime, timezone
|
||||
from typing import Any
|
||||
import json
|
||||
from ..db import connect, default_user_id, utcnow
|
||||
from . import rtorrent
|
||||
from .preferences import active_profile
|
||||
|
||||
|
||||
def _loads(value: str | None, default: Any) -> Any:
|
||||
try: return json.loads(value or '')
|
||||
except Exception: return default
|
||||
|
||||
|
||||
def _ts(value: str | None) -> float:
|
||||
if not value: return 0.0
|
||||
try: return datetime.fromisoformat(str(value).replace('Z', '+00:00')).timestamp()
|
||||
except Exception: return 0.0
|
||||
|
||||
|
||||
def _now_ts() -> float:
|
||||
return datetime.now(timezone.utc).timestamp()
|
||||
|
||||
|
||||
def _label_names(value: str | None) -> list[str]:
|
||||
seen = []
|
||||
for part in str(value or '').replace(';', ',').replace('|', ',').split(','):
|
||||
item = part.strip()
|
||||
if item and item not in seen: seen.append(item)
|
||||
return seen
|
||||
|
||||
|
||||
def _label_value(labels: list[str]) -> str:
|
||||
out = []
|
||||
for label in labels:
|
||||
label = str(label or '').strip()
|
||||
if label and label not in out: out.append(label)
|
||||
return ', '.join(out)
|
||||
|
||||
|
||||
def _rule_row(row: dict[str, Any]) -> dict[str, Any]:
|
||||
item = dict(row)
|
||||
item['conditions'] = _loads(item.pop('conditions_json', '[]'), [])
|
||||
item['effects'] = _loads(item.pop('effects_json', '[]'), [])
|
||||
return item
|
||||
|
||||
|
||||
def list_rules(profile_id: int | None = None, user_id: int | None = None) -> list[dict[str, Any]]:
|
||||
user_id = user_id or default_user_id()
|
||||
if profile_id is None:
|
||||
profile = active_profile(); profile_id = int(profile['id']) if profile else None
|
||||
with connect() as conn:
|
||||
rows = conn.execute('SELECT * FROM automation_rules WHERE user_id=? AND (profile_id=? OR profile_id IS NULL) ORDER BY enabled DESC, name COLLATE NOCASE', (user_id, profile_id)).fetchall()
|
||||
return [_rule_row(r) for r in rows]
|
||||
|
||||
|
||||
def get_rule(rule_id: int, profile_id: int, user_id: int | None = None) -> dict[str, Any]:
|
||||
user_id = user_id or default_user_id()
|
||||
with connect() as conn:
|
||||
row = conn.execute('SELECT * FROM automation_rules WHERE id=? AND user_id=? AND profile_id=?', (rule_id, user_id, profile_id)).fetchone()
|
||||
if not row: raise ValueError('Rule not found')
|
||||
return _rule_row(row)
|
||||
|
||||
|
||||
def save_rule(profile_id: int, data: dict[str, Any], user_id: int | None = None) -> dict[str, Any]:
|
||||
user_id = user_id or default_user_id()
|
||||
name = str(data.get('name') or 'Automation rule').strip() or 'Automation rule'
|
||||
conditions = data.get('conditions') or []
|
||||
effects = data.get('effects') or []
|
||||
if not isinstance(conditions, list) or not conditions: raise ValueError('Rule needs at least one condition')
|
||||
if not isinstance(effects, list) or not effects: raise ValueError('Rule needs at least one effect')
|
||||
cooldown = max(0, int(data.get('cooldown_minutes') or 0))
|
||||
enabled = 1 if data.get('enabled', True) else 0
|
||||
now = utcnow(); rule_id = int(data.get('id') or 0)
|
||||
with connect() as conn:
|
||||
if rule_id:
|
||||
conn.execute('UPDATE automation_rules SET name=?, enabled=?, conditions_json=?, effects_json=?, cooldown_minutes=?, updated_at=? WHERE id=? AND user_id=? AND profile_id=?', (name, enabled, json.dumps(conditions), json.dumps(effects), cooldown, now, rule_id, user_id, profile_id))
|
||||
else:
|
||||
cur = conn.execute('INSERT INTO automation_rules(user_id,profile_id,name,enabled,conditions_json,effects_json,cooldown_minutes,created_at,updated_at) VALUES(?,?,?,?,?,?,?,?,?)', (user_id, profile_id, name, enabled, json.dumps(conditions), json.dumps(effects), cooldown, now, now))
|
||||
rule_id = int(cur.lastrowid)
|
||||
return get_rule(rule_id, profile_id, user_id)
|
||||
|
||||
|
||||
def delete_rule(rule_id: int, profile_id: int, user_id: int | None = None) -> None:
|
||||
user_id = user_id or default_user_id()
|
||||
with connect() as conn:
|
||||
conn.execute('DELETE FROM automation_rules WHERE id=? AND user_id=? AND profile_id=?', (rule_id, user_id, profile_id))
|
||||
conn.execute('DELETE FROM automation_rule_state WHERE rule_id=? AND profile_id=?', (rule_id, profile_id))
|
||||
|
||||
|
||||
def list_history(profile_id: int, user_id: int | None = None, limit: int = 30) -> list[dict[str, Any]]:
|
||||
user_id = user_id or default_user_id()
|
||||
with connect() as conn:
|
||||
return conn.execute('SELECT * FROM automation_history WHERE user_id=? AND profile_id=? ORDER BY created_at DESC LIMIT ?', (user_id, profile_id, max(1, min(int(limit or 30), 100)))).fetchall()
|
||||
|
||||
|
||||
def _condition_true(t: dict[str, Any], cond: dict[str, Any]) -> bool:
|
||||
typ = str(cond.get('type') or '')
|
||||
if typ == 'completed': return bool(int(t.get('complete') or 0))
|
||||
if typ == 'no_seeds': return int(t.get('seeds') or 0) <= int(cond.get('seeds') or 0)
|
||||
if typ == 'ratio_gte': return float(t.get('ratio') or 0) >= float(cond.get('ratio') or 0)
|
||||
if typ == 'label_missing': return str(cond.get('label') or '').strip() not in _label_names(t.get('label'))
|
||||
if typ == 'label_has': return str(cond.get('label') or '').strip() in _label_names(t.get('label'))
|
||||
if typ == 'status': return str(t.get('status') or '').lower() == str(cond.get('status') or '').lower()
|
||||
if typ == 'path_contains': return str(cond.get('text') or '').lower() in str(t.get('path') or '').lower()
|
||||
return False
|
||||
|
||||
|
||||
def _conditions_match(conn, rule: dict[str, Any], profile_id: int, t: dict[str, Any]) -> bool:
|
||||
h = str(t.get('hash') or '')
|
||||
if not h: return False
|
||||
immediate_ok = True; delayed_ok = True; now = utcnow(); now_ts = _now_ts()
|
||||
for cond in rule.get('conditions') or []:
|
||||
ok = _condition_true(t, cond)
|
||||
if cond.get('type') == 'no_seeds' and int(cond.get('minutes') or 0) > 0:
|
||||
row = conn.execute('SELECT condition_since_at FROM automation_rule_state WHERE rule_id=? AND profile_id=? AND torrent_hash=?', (rule['id'], profile_id, h)).fetchone()
|
||||
if ok:
|
||||
since = row['condition_since_at'] if row and row.get('condition_since_at') else now
|
||||
conn.execute('INSERT INTO automation_rule_state(rule_id,profile_id,torrent_hash,condition_since_at,last_matched_at,updated_at) VALUES(?,?,?,?,?,?) ON CONFLICT(rule_id,profile_id,torrent_hash) DO UPDATE SET condition_since_at=COALESCE(automation_rule_state.condition_since_at, excluded.condition_since_at), last_matched_at=excluded.last_matched_at, updated_at=excluded.updated_at', (rule['id'], profile_id, h, since, now, now))
|
||||
delayed_ok = delayed_ok and (now_ts - _ts(since) >= int(cond.get('minutes') or 0) * 60)
|
||||
else:
|
||||
conn.execute('UPDATE automation_rule_state SET condition_since_at=NULL, updated_at=? WHERE rule_id=? AND profile_id=? AND torrent_hash=?', (now, rule['id'], profile_id, h)); delayed_ok = False
|
||||
else:
|
||||
immediate_ok = immediate_ok and ok
|
||||
return immediate_ok and delayed_ok
|
||||
|
||||
|
||||
def _cooldown_ok(conn, rule: dict[str, Any], profile_id: int, torrent_hash: str) -> bool:
|
||||
cooldown = int(rule.get('cooldown_minutes') or 0)
|
||||
row = conn.execute('SELECT last_applied_at FROM automation_rule_state WHERE rule_id=? AND profile_id=? AND torrent_hash=?', (rule['id'], profile_id, torrent_hash)).fetchone()
|
||||
if not row or not row.get('last_applied_at'): return True
|
||||
return _now_ts() - _ts(row['last_applied_at']) >= cooldown * 60
|
||||
|
||||
|
||||
def _apply_effects(c: Any, profile: dict[str, Any], torrent: dict[str, Any], effects: list[dict[str, Any]]) -> list[dict[str, Any]]:
|
||||
h = str(torrent.get('hash') or ''); labels = _label_names(torrent.get('label')); applied = []
|
||||
for eff in effects:
|
||||
typ = str(eff.get('type') or '')
|
||||
if typ == 'move':
|
||||
path = str(eff.get('path') or '').strip() or rtorrent.default_download_path(profile)
|
||||
if path: c.call('d.directory.set', h, path); applied.append({'type': 'move', 'path': path})
|
||||
elif typ == 'add_label':
|
||||
label = str(eff.get('label') or '').strip()
|
||||
if label and label not in labels: labels.append(label); c.call('d.custom1.set', h, _label_value(labels))
|
||||
if label: applied.append({'type': 'add_label', 'label': label})
|
||||
elif typ == 'remove_label':
|
||||
label = str(eff.get('label') or '').strip(); labels = [x for x in labels if x != label]; c.call('d.custom1.set', h, _label_value(labels)); applied.append({'type': 'remove_label', 'label': label})
|
||||
elif typ == 'set_labels':
|
||||
value = _label_value(_label_names(eff.get('labels'))); c.call('d.custom1.set', h, value); labels = _label_names(value); applied.append({'type': 'set_labels', 'labels': value})
|
||||
elif typ in {'pause', 'stop', 'start', 'resume', 'recheck'}:
|
||||
method = {'pause':'d.pause','stop':'d.stop','start':'d.start','resume':'d.resume','recheck':'d.check_hash'}[typ]; c.call(method, h); applied.append({'type': typ})
|
||||
return applied
|
||||
|
||||
|
||||
def check(profile: dict | None = None, user_id: int | None = None, force: bool = False) -> dict[str, Any]:
|
||||
profile = profile or active_profile()
|
||||
if not profile: return {'ok': False, 'error': 'No active rTorrent profile'}
|
||||
user_id = user_id or default_user_id(); profile_id = int(profile['id'])
|
||||
rules = [r for r in list_rules(profile_id, user_id) if force or int(r.get('enabled') or 0)]
|
||||
if not rules: return {'ok': True, 'checked': 0, 'applied': [], 'rules': 0}
|
||||
torrents = rtorrent.list_torrents(profile); c = rtorrent.client_for(profile); applied = []; now = utcnow()
|
||||
with connect() as conn:
|
||||
for rule in rules:
|
||||
for t in torrents:
|
||||
h = str(t.get('hash') or '')
|
||||
if not _conditions_match(conn, rule, profile_id, t): continue
|
||||
if not force and not _cooldown_ok(conn, rule, profile_id, h): continue
|
||||
try: actions = _apply_effects(c, profile, t, rule.get('effects') or [])
|
||||
except Exception as exc: actions = [{'error': str(exc)}]
|
||||
conn.execute('INSERT INTO automation_rule_state(rule_id,profile_id,torrent_hash,last_matched_at,last_applied_at,updated_at) VALUES(?,?,?,?,?,?) ON CONFLICT(rule_id,profile_id,torrent_hash) DO UPDATE SET last_matched_at=excluded.last_matched_at, last_applied_at=excluded.last_applied_at, updated_at=excluded.updated_at', (rule['id'], profile_id, h, now, now, now))
|
||||
conn.execute('INSERT INTO automation_history(user_id,profile_id,rule_id,torrent_hash,torrent_name,rule_name,actions_json,created_at) VALUES(?,?,?,?,?,?,?,?)', (user_id, profile_id, rule['id'], h, str(t.get('name') or ''), str(rule.get('name') or ''), json.dumps(actions), now))
|
||||
applied.append({'rule_id': rule['id'], 'rule_name': rule.get('name'), 'hash': h, 'name': t.get('name'), 'actions': actions})
|
||||
return {'ok': True, 'checked': len(torrents), 'rules': len(rules), 'applied': applied}
|
||||
38
pytorrent/services/geoip.py
Normal file
38
pytorrent/services/geoip.py
Normal file
@@ -0,0 +1,38 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from functools import lru_cache
|
||||
from pathlib import Path
|
||||
from ..config import GEOIP_DB
|
||||
|
||||
try:
|
||||
import geoip2.database
|
||||
except Exception: # pragma: no cover
|
||||
geoip2 = None
|
||||
|
||||
_reader = None
|
||||
|
||||
|
||||
def _get_reader():
|
||||
global _reader
|
||||
if _reader is not None:
|
||||
return _reader
|
||||
if not GEOIP_DB.exists() or geoip2 is None:
|
||||
return None
|
||||
_reader = geoip2.database.Reader(str(GEOIP_DB))
|
||||
return _reader
|
||||
|
||||
|
||||
@lru_cache(maxsize=50000)
|
||||
def lookup_ip(ip: str) -> dict:
|
||||
reader = _get_reader()
|
||||
if not reader:
|
||||
return {"country_iso": "", "country": "", "city": ""}
|
||||
try:
|
||||
hit = reader.city(ip)
|
||||
return {
|
||||
"country_iso": (hit.country.iso_code or "").lower(),
|
||||
"country": hit.country.name or "",
|
||||
"city": hit.city.name or "",
|
||||
}
|
||||
except Exception:
|
||||
return {"country_iso": "", "country": "", "city": ""}
|
||||
176
pytorrent/services/preferences.py
Normal file
176
pytorrent/services/preferences.py
Normal file
@@ -0,0 +1,176 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from ..db import connect, utcnow, default_user_id
|
||||
|
||||
BOOTSTRAP_THEMES = {
|
||||
"default": "Default Bootstrap",
|
||||
"flatly": "Flatly",
|
||||
"litera": "Litera",
|
||||
"lumen": "Lumen",
|
||||
"minty": "Minty",
|
||||
"sketchy": "Sketchy",
|
||||
"solar": "Solar",
|
||||
"spacelab": "Spacelab",
|
||||
"united": "United",
|
||||
"zephyr": "Zephyr",
|
||||
}
|
||||
|
||||
FONT_FAMILIES = {
|
||||
"default": "Theme default",
|
||||
"adwaita-mono": "Adwaita Mono",
|
||||
"inter": "Inter",
|
||||
"system-ui": "System UI",
|
||||
"source-sans-3": "Source Sans 3",
|
||||
"jetbrains-mono": "JetBrains Mono",
|
||||
}
|
||||
|
||||
def bootstrap_css_url(theme: str | None) -> str:
|
||||
theme = theme if theme in BOOTSTRAP_THEMES else "default"
|
||||
if theme == "default":
|
||||
return "https://cdn.jsdelivr.net/npm/bootstrap@5.3.3/dist/css/bootstrap.min.css"
|
||||
return f"https://cdn.jsdelivr.net/npm/bootswatch@5.3.3/dist/{theme}/bootstrap.min.css"
|
||||
|
||||
|
||||
def list_profiles(user_id: int | None = None):
|
||||
user_id = user_id or default_user_id()
|
||||
with connect() as conn:
|
||||
return conn.execute(
|
||||
"SELECT * FROM rtorrent_profiles WHERE user_id=? ORDER BY is_default DESC, name COLLATE NOCASE",
|
||||
(user_id,),
|
||||
).fetchall()
|
||||
|
||||
|
||||
def get_profile(profile_id: int, user_id: int | None = None):
|
||||
user_id = user_id or default_user_id()
|
||||
with connect() as conn:
|
||||
return conn.execute(
|
||||
"SELECT * FROM rtorrent_profiles WHERE id=? AND user_id=?",
|
||||
(profile_id, user_id),
|
||||
).fetchone()
|
||||
|
||||
|
||||
def active_profile(user_id: int | None = None):
|
||||
user_id = user_id or default_user_id()
|
||||
with connect() as conn:
|
||||
pref = conn.execute("SELECT active_rtorrent_id FROM user_preferences WHERE user_id=?", (user_id,)).fetchone()
|
||||
if pref and pref.get("active_rtorrent_id"):
|
||||
row = conn.execute(
|
||||
"SELECT * FROM rtorrent_profiles WHERE id=? AND user_id=?",
|
||||
(pref["active_rtorrent_id"], user_id),
|
||||
).fetchone()
|
||||
if row:
|
||||
return row
|
||||
row = conn.execute(
|
||||
"SELECT * FROM rtorrent_profiles WHERE user_id=? ORDER BY is_default DESC, id ASC LIMIT 1",
|
||||
(user_id,),
|
||||
).fetchone()
|
||||
return row
|
||||
|
||||
|
||||
def save_profile(data: dict, user_id: int | None = None):
|
||||
user_id = user_id or default_user_id()
|
||||
now = utcnow()
|
||||
name = str(data.get("name") or "rTorrent").strip()
|
||||
scgi_url = str(data.get("scgi_url") or "").strip()
|
||||
timeout = int(data.get("timeout_seconds") or 5)
|
||||
max_parallel = int(data.get("max_parallel_jobs") or 5)
|
||||
is_remote = 1 if data.get("is_remote") else 0
|
||||
is_default = 1 if data.get("is_default") else 0
|
||||
if not scgi_url.startswith("scgi://"):
|
||||
raise ValueError("SCGI URL musi zaczynać się od scgi://")
|
||||
with connect() as conn:
|
||||
if is_default:
|
||||
conn.execute("UPDATE rtorrent_profiles SET is_default=0 WHERE user_id=?", (user_id,))
|
||||
cur = conn.execute(
|
||||
"INSERT INTO rtorrent_profiles(user_id,name,scgi_url,is_default,timeout_seconds,max_parallel_jobs,is_remote,created_at,updated_at) VALUES(?,?,?,?,?,?,?,?,?)",
|
||||
(user_id, name, scgi_url, is_default, timeout, max_parallel, is_remote, now, now),
|
||||
)
|
||||
profile_id = cur.lastrowid
|
||||
pref = conn.execute("SELECT active_rtorrent_id FROM user_preferences WHERE user_id=?", (user_id,)).fetchone()
|
||||
if not pref or not pref.get("active_rtorrent_id") or is_default:
|
||||
conn.execute(
|
||||
"UPDATE user_preferences SET active_rtorrent_id=?, updated_at=? WHERE user_id=?",
|
||||
(profile_id, now, user_id),
|
||||
)
|
||||
return conn.execute("SELECT * FROM rtorrent_profiles WHERE id=? AND user_id=?", (profile_id, user_id)).fetchone()
|
||||
|
||||
|
||||
def update_profile(profile_id: int, data: dict, user_id: int | None = None):
|
||||
user_id = user_id or default_user_id()
|
||||
now = utcnow()
|
||||
name = str(data.get("name") or "rTorrent").strip()
|
||||
scgi_url = str(data.get("scgi_url") or "").strip()
|
||||
timeout = int(data.get("timeout_seconds") or 5)
|
||||
max_parallel = int(data.get("max_parallel_jobs") or 5)
|
||||
is_remote = 1 if data.get("is_remote") else 0
|
||||
is_default = 1 if data.get("is_default") else 0
|
||||
if not scgi_url.startswith("scgi://"):
|
||||
raise ValueError("SCGI URL musi zaczynać się od scgi://")
|
||||
with connect() as conn:
|
||||
row = conn.execute("SELECT id FROM rtorrent_profiles WHERE id=? AND user_id=?", (profile_id, user_id)).fetchone()
|
||||
if not row:
|
||||
raise ValueError("Profil nie istnieje")
|
||||
if is_default:
|
||||
conn.execute("UPDATE rtorrent_profiles SET is_default=0 WHERE user_id=?", (user_id,))
|
||||
conn.execute(
|
||||
"UPDATE rtorrent_profiles SET name=?, scgi_url=?, is_default=?, timeout_seconds=?, max_parallel_jobs=?, is_remote=?, updated_at=? WHERE id=? AND user_id=?",
|
||||
(name, scgi_url, is_default, timeout, max_parallel, is_remote, now, profile_id, user_id),
|
||||
)
|
||||
return conn.execute("SELECT * FROM rtorrent_profiles WHERE id=? AND user_id=?", (profile_id, user_id)).fetchone()
|
||||
|
||||
|
||||
def delete_profile(profile_id: int, user_id: int | None = None):
|
||||
user_id = user_id or default_user_id()
|
||||
with connect() as conn:
|
||||
conn.execute("DELETE FROM rtorrent_profiles WHERE id=? AND user_id=?", (profile_id, user_id))
|
||||
active = active_profile(user_id)
|
||||
conn.execute(
|
||||
"UPDATE user_preferences SET active_rtorrent_id=?, updated_at=? WHERE user_id=?",
|
||||
(active["id"] if active else None, utcnow(), user_id),
|
||||
)
|
||||
|
||||
|
||||
def activate_profile(profile_id: int, user_id: int | None = None):
|
||||
user_id = user_id or default_user_id()
|
||||
with connect() as conn:
|
||||
row = conn.execute("SELECT id FROM rtorrent_profiles WHERE id=? AND user_id=?", (profile_id, user_id)).fetchone()
|
||||
if not row:
|
||||
raise ValueError("Profil nie istnieje")
|
||||
conn.execute(
|
||||
"UPDATE user_preferences SET active_rtorrent_id=?, updated_at=? WHERE user_id=?",
|
||||
(profile_id, utcnow(), user_id),
|
||||
)
|
||||
return get_profile(profile_id, user_id)
|
||||
|
||||
|
||||
def get_preferences(user_id: int | None = None):
|
||||
user_id = user_id or default_user_id()
|
||||
with connect() as conn:
|
||||
return conn.execute("SELECT * FROM user_preferences WHERE user_id=?", (user_id,)).fetchone()
|
||||
|
||||
|
||||
def save_preferences(data: dict, user_id: int | None = None):
|
||||
user_id = user_id or default_user_id()
|
||||
allowed_theme = data.get("theme") if data.get("theme") in {"light", "dark"} else None
|
||||
bootstrap_theme = data.get("bootstrap_theme") if data.get("bootstrap_theme") in BOOTSTRAP_THEMES else None
|
||||
font_family = data.get("font_family") if data.get("font_family") in FONT_FAMILIES else None
|
||||
table_columns_json = data.get("table_columns_json")
|
||||
peers_refresh_seconds = data.get("peers_refresh_seconds")
|
||||
port_check_enabled = data.get("port_check_enabled")
|
||||
with connect() as conn:
|
||||
now = utcnow()
|
||||
if allowed_theme:
|
||||
conn.execute("UPDATE user_preferences SET theme=?, updated_at=? WHERE user_id=?", (allowed_theme, now, user_id))
|
||||
if bootstrap_theme:
|
||||
conn.execute("UPDATE user_preferences SET bootstrap_theme=?, updated_at=? WHERE user_id=?", (bootstrap_theme, now, user_id))
|
||||
if font_family:
|
||||
conn.execute("UPDATE user_preferences SET font_family=?, updated_at=? WHERE user_id=?", (font_family, now, user_id))
|
||||
if table_columns_json is not None:
|
||||
conn.execute("UPDATE user_preferences SET table_columns_json=?, updated_at=? WHERE user_id=?", (str(table_columns_json), now, user_id))
|
||||
if peers_refresh_seconds is not None:
|
||||
sec = int(peers_refresh_seconds or 0)
|
||||
if sec not in {0, 10, 15, 30, 60}: sec = 0
|
||||
conn.execute("UPDATE user_preferences SET peers_refresh_seconds=?, updated_at=? WHERE user_id=?", (sec, now, user_id))
|
||||
if port_check_enabled is not None:
|
||||
conn.execute("UPDATE user_preferences SET port_check_enabled=?, updated_at=? WHERE user_id=?", (1 if port_check_enabled else 0, now, user_id))
|
||||
return get_preferences(user_id)
|
||||
47
pytorrent/services/retention.py
Normal file
47
pytorrent/services/retention.py
Normal file
@@ -0,0 +1,47 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import datetime, timedelta, timezone
|
||||
|
||||
from ..config import JOBS_RETENTION_DAYS, LOG_RETENTION_DAYS, SMART_QUEUE_HISTORY_RETENTION_DAYS, TRAFFIC_HISTORY_RETENTION_DAYS
|
||||
from ..db import connect
|
||||
|
||||
_LAST_CLEANUP = 0.0
|
||||
CLEANUP_EVERY_SECONDS = 3600
|
||||
|
||||
|
||||
def _cutoff(days: int) -> str:
|
||||
return (datetime.now(timezone.utc) - timedelta(days=max(1, int(days or 1)))).isoformat(timespec="seconds")
|
||||
|
||||
|
||||
def _table_exists(conn, table: str) -> bool:
|
||||
row = conn.execute("SELECT name FROM sqlite_master WHERE type='table' AND name=?", (table,)).fetchone()
|
||||
return bool(row)
|
||||
|
||||
|
||||
def cleanup(force: bool = False) -> dict[str, int]:
|
||||
global _LAST_CLEANUP
|
||||
now_ts = datetime.now(timezone.utc).timestamp()
|
||||
if not force and now_ts - _LAST_CLEANUP < CLEANUP_EVERY_SECONDS:
|
||||
return {}
|
||||
_LAST_CLEANUP = now_ts
|
||||
|
||||
deleted: dict[str, int] = {}
|
||||
with connect() as conn:
|
||||
targets = {
|
||||
"traffic_history": ("created_at", TRAFFIC_HISTORY_RETENTION_DAYS),
|
||||
"smart_queue_history": ("created_at", SMART_QUEUE_HISTORY_RETENTION_DAYS),
|
||||
"jobs": ("updated_at", JOBS_RETENTION_DAYS),
|
||||
"logs": ("created_at", LOG_RETENTION_DAYS),
|
||||
}
|
||||
for table, (column, days) in targets.items():
|
||||
if not _table_exists(conn, table):
|
||||
continue
|
||||
if table == "jobs":
|
||||
cur = conn.execute(
|
||||
f"DELETE FROM {table} WHERE {column} < ? AND status IN ('done','failed','cancelled')",
|
||||
(_cutoff(days),),
|
||||
)
|
||||
else:
|
||||
cur = conn.execute(f"DELETE FROM {table} WHERE {column} < ?", (_cutoff(days),))
|
||||
deleted[table] = int(cur.rowcount or 0)
|
||||
return deleted
|
||||
1079
pytorrent/services/rtorrent.py
Normal file
1079
pytorrent/services/rtorrent.py
Normal file
File diff suppressed because it is too large
Load Diff
312
pytorrent/services/smart_queue.py
Normal file
312
pytorrent/services/smart_queue.py
Normal file
@@ -0,0 +1,312 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import datetime, timezone
|
||||
from typing import Any
|
||||
import json
|
||||
import time
|
||||
|
||||
from ..config import SMART_QUEUE_LABEL
|
||||
from ..db import connect, default_user_id, utcnow
|
||||
from . import rtorrent
|
||||
from .preferences import active_profile, get_profile
|
||||
|
||||
|
||||
def _ts(value: str | None) -> float:
|
||||
if not value:
|
||||
return 0.0
|
||||
try:
|
||||
return datetime.fromisoformat(value.replace('Z', '+00:00')).timestamp()
|
||||
except Exception:
|
||||
return 0.0
|
||||
|
||||
|
||||
def _default_settings(user_id: int, profile_id: int) -> dict[str, Any]:
|
||||
return {
|
||||
'user_id': user_id,
|
||||
'profile_id': profile_id,
|
||||
'enabled': 0,
|
||||
'max_active_downloads': 5,
|
||||
'stalled_seconds': 300,
|
||||
'min_speed_bytes': 1024,
|
||||
'min_seeds': 1,
|
||||
'updated_at': utcnow(),
|
||||
}
|
||||
|
||||
|
||||
def get_settings(profile_id: int, user_id: int | None = None) -> dict[str, Any]:
|
||||
user_id = user_id or default_user_id()
|
||||
with connect() as conn:
|
||||
row = conn.execute(
|
||||
'SELECT * FROM smart_queue_settings WHERE user_id=? AND profile_id=?',
|
||||
(user_id, profile_id),
|
||||
).fetchone()
|
||||
return row or _default_settings(user_id, profile_id)
|
||||
|
||||
|
||||
def save_settings(profile_id: int, data: dict[str, Any], user_id: int | None = None) -> dict[str, Any]:
|
||||
user_id = user_id or default_user_id()
|
||||
current = get_settings(profile_id, user_id)
|
||||
settings = {
|
||||
'enabled': 1 if data.get('enabled', current.get('enabled')) else 0,
|
||||
'max_active_downloads': max(1, int(data.get('max_active_downloads') or current.get('max_active_downloads') or 5)),
|
||||
'stalled_seconds': max(30, int(data.get('stalled_seconds') or current.get('stalled_seconds') or 300)),
|
||||
'min_speed_bytes': max(0, int(data.get('min_speed_bytes') or current.get('min_speed_bytes') or 0)),
|
||||
'min_seeds': max(0, int(data.get('min_seeds') or current.get('min_seeds') or 0)),
|
||||
}
|
||||
now = utcnow()
|
||||
with connect() as conn:
|
||||
conn.execute(
|
||||
'''INSERT INTO smart_queue_settings(user_id,profile_id,enabled,max_active_downloads,stalled_seconds,min_speed_bytes,min_seeds,updated_at)
|
||||
VALUES(?,?,?,?,?,?,?,?)
|
||||
ON CONFLICT(user_id, profile_id) DO UPDATE SET
|
||||
enabled=excluded.enabled,
|
||||
max_active_downloads=excluded.max_active_downloads,
|
||||
stalled_seconds=excluded.stalled_seconds,
|
||||
min_speed_bytes=excluded.min_speed_bytes,
|
||||
min_seeds=excluded.min_seeds,
|
||||
updated_at=excluded.updated_at''',
|
||||
(user_id, profile_id, settings['enabled'], settings['max_active_downloads'], settings['stalled_seconds'], settings['min_speed_bytes'], settings['min_seeds'], now),
|
||||
)
|
||||
return get_settings(profile_id, user_id)
|
||||
|
||||
|
||||
def list_exclusions(profile_id: int, user_id: int | None = None) -> list[dict[str, Any]]:
|
||||
user_id = user_id or default_user_id()
|
||||
with connect() as conn:
|
||||
return conn.execute(
|
||||
'SELECT * FROM smart_queue_exclusions WHERE user_id=? AND profile_id=? ORDER BY created_at DESC',
|
||||
(user_id, profile_id),
|
||||
).fetchall()
|
||||
|
||||
|
||||
def set_exclusion(profile_id: int, torrent_hash: str, excluded: bool, reason: str = '', user_id: int | None = None) -> None:
|
||||
user_id = user_id or default_user_id()
|
||||
now = utcnow()
|
||||
with connect() as conn:
|
||||
if excluded:
|
||||
conn.execute(
|
||||
'INSERT OR REPLACE INTO smart_queue_exclusions(user_id,profile_id,torrent_hash,reason,created_at) VALUES(?,?,?,?,?)',
|
||||
(user_id, profile_id, torrent_hash, reason, now),
|
||||
)
|
||||
else:
|
||||
conn.execute(
|
||||
'DELETE FROM smart_queue_exclusions WHERE user_id=? AND profile_id=? AND torrent_hash=?',
|
||||
(user_id, profile_id, torrent_hash),
|
||||
)
|
||||
|
||||
|
||||
|
||||
def add_history(profile_id: int, event: str, paused: list[str] | None = None, resumed: list[str] | None = None, checked: int = 0, details: dict[str, Any] | None = None, user_id: int | None = None) -> None:
|
||||
user_id = user_id or default_user_id()
|
||||
paused = paused or []
|
||||
resumed = resumed or []
|
||||
details = details or {}
|
||||
with connect() as conn:
|
||||
conn.execute(
|
||||
'INSERT INTO smart_queue_history(user_id,profile_id,event,paused_count,resumed_count,checked_count,details_json,created_at) VALUES(?,?,?,?,?,?,?,?)',
|
||||
(user_id, profile_id, event, len(paused), len(resumed), int(checked or 0), json.dumps({**details, 'paused': paused, 'resumed': resumed}), utcnow()),
|
||||
)
|
||||
|
||||
def list_history(profile_id: int, user_id: int | None = None, limit: int = 30) -> list[dict[str, Any]]:
|
||||
user_id = user_id or default_user_id()
|
||||
with connect() as conn:
|
||||
return conn.execute(
|
||||
'SELECT * FROM smart_queue_history WHERE user_id=? AND profile_id=? ORDER BY created_at DESC LIMIT ?',
|
||||
(user_id, profile_id, max(1, min(int(limit or 30), 100))),
|
||||
).fetchall()
|
||||
|
||||
def count_history(profile_id: int, user_id: int | None = None) -> int:
|
||||
user_id = user_id or default_user_id()
|
||||
with connect() as conn:
|
||||
row = conn.execute(
|
||||
'SELECT COUNT(*) AS count FROM smart_queue_history WHERE user_id=? AND profile_id=?',
|
||||
(user_id, profile_id),
|
||||
).fetchone()
|
||||
return int((row or {}).get('count') or 0)
|
||||
|
||||
def _excluded_hashes(profile_id: int, user_id: int) -> set[str]:
|
||||
return {r['torrent_hash'] for r in list_exclusions(profile_id, user_id)}
|
||||
|
||||
|
||||
def _remember_auto_label(profile_id: int, torrent_hash: str, previous_label: str) -> None:
|
||||
now = utcnow()
|
||||
with connect() as conn:
|
||||
row = conn.execute(
|
||||
'SELECT previous_label FROM smart_queue_auto_labels WHERE profile_id=? AND torrent_hash=?',
|
||||
(profile_id, torrent_hash),
|
||||
).fetchone()
|
||||
if row:
|
||||
conn.execute(
|
||||
'UPDATE smart_queue_auto_labels SET updated_at=? WHERE profile_id=? AND torrent_hash=?',
|
||||
(now, profile_id, torrent_hash),
|
||||
)
|
||||
else:
|
||||
conn.execute(
|
||||
'INSERT INTO smart_queue_auto_labels(profile_id,torrent_hash,previous_label,created_at,updated_at) VALUES(?,?,?,?,?)',
|
||||
(profile_id, torrent_hash, previous_label, now, now),
|
||||
)
|
||||
|
||||
|
||||
def _restore_auto_label(client: Any, profile_id: int, torrent_hash: str, current_label: str | None = None) -> bool:
|
||||
with connect() as conn:
|
||||
row = conn.execute(
|
||||
'SELECT previous_label FROM smart_queue_auto_labels WHERE profile_id=? AND torrent_hash=?',
|
||||
(profile_id, torrent_hash),
|
||||
).fetchone()
|
||||
if not row:
|
||||
return False
|
||||
previous = row.get('previous_label') or ''
|
||||
try:
|
||||
if current_label is None or current_label == SMART_QUEUE_LABEL:
|
||||
client.call('d.custom1.set', torrent_hash, previous)
|
||||
conn.execute('DELETE FROM smart_queue_auto_labels WHERE profile_id=? AND torrent_hash=?', (profile_id, torrent_hash))
|
||||
return True
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
def _set_smart_queue_label(client: Any, torrent_hash: str, attempts: int = 3) -> bool:
|
||||
for attempt in range(max(1, attempts)):
|
||||
try:
|
||||
client.call('d.custom1.set', torrent_hash, SMART_QUEUE_LABEL)
|
||||
return True
|
||||
except Exception:
|
||||
if attempt < attempts - 1:
|
||||
time.sleep(0.05)
|
||||
return False
|
||||
|
||||
|
||||
def _mark_auto_paused(client: Any, profile_id: int, torrent: dict[str, Any]) -> bool:
|
||||
torrent_hash = str(torrent.get('hash') or '')
|
||||
if not torrent_hash:
|
||||
return False
|
||||
previous = str(torrent.get('label') or '')
|
||||
if previous != SMART_QUEUE_LABEL:
|
||||
_remember_auto_label(profile_id, torrent_hash, previous)
|
||||
return _set_smart_queue_label(client, torrent_hash)
|
||||
|
||||
|
||||
def _cleanup_auto_labels(client: Any, profile_id: int, torrents: list[dict[str, Any]], keep_hashes: set[str]) -> list[str]:
|
||||
by_hash = {str(t.get('hash') or ''): t for t in torrents}
|
||||
restored: list[str] = []
|
||||
with connect() as conn:
|
||||
rows = conn.execute('SELECT torrent_hash FROM smart_queue_auto_labels WHERE profile_id=?', (profile_id,)).fetchall()
|
||||
for row in rows:
|
||||
h = str(row.get('torrent_hash') or '')
|
||||
t = by_hash.get(h)
|
||||
if not h or h in keep_hashes:
|
||||
continue
|
||||
if t is None or int(t.get('complete') or 0):
|
||||
if _restore_auto_label(client, profile_id, h, None if t is None else str(t.get('label') or '')):
|
||||
restored.append(h)
|
||||
continue
|
||||
is_paused_or_stopped = bool(t.get('paused')) or not int(t.get('active') or 0) or not int(t.get('state') or 0)
|
||||
current_label = str(t.get('label') or '')
|
||||
if is_paused_or_stopped:
|
||||
if current_label != SMART_QUEUE_LABEL:
|
||||
_set_smart_queue_label(client, h)
|
||||
continue
|
||||
if _restore_auto_label(client, profile_id, h, current_label):
|
||||
restored.append(h)
|
||||
return restored
|
||||
|
||||
|
||||
def check(profile: dict | None = None, user_id: int | None = None, force: bool = False) -> dict[str, Any]:
|
||||
profile = profile or active_profile()
|
||||
if not profile:
|
||||
return {'ok': False, 'error': 'No active rTorrent profile'}
|
||||
user_id = user_id or default_user_id()
|
||||
profile_id = int(profile['id'])
|
||||
settings = get_settings(profile_id, user_id)
|
||||
if not force and not int(settings.get('enabled') or 0):
|
||||
add_history(profile_id, 'skipped_disabled', [], [], 0, {'enabled': False}, user_id)
|
||||
return {'ok': True, 'enabled': False, 'paused': [], 'resumed': [], 'message': 'Smart Queue disabled'}
|
||||
|
||||
torrents = rtorrent.list_torrents(profile)
|
||||
excluded = _excluded_hashes(profile_id, user_id)
|
||||
downloading = [t for t in torrents if not int(t.get('complete') or 0) and int(t.get('state') or 0) and not t.get('paused') and t.get('hash') not in excluded]
|
||||
stopped = [t for t in torrents if not int(t.get('complete') or 0) and (not int(t.get('state') or 0) or t.get('paused')) and t.get('hash') not in excluded]
|
||||
min_speed = int(settings.get('min_speed_bytes') or 0)
|
||||
min_seeds = int(settings.get('min_seeds') or 0)
|
||||
stalled_seconds = int(settings.get('stalled_seconds') or 300)
|
||||
now = utcnow()
|
||||
now_ts = datetime.now(timezone.utc).timestamp()
|
||||
stalled: list[dict[str, Any]] = []
|
||||
|
||||
with connect() as conn:
|
||||
for t in downloading:
|
||||
is_stalled = int(t.get('down_rate') or 0) <= min_speed and int(t.get('seeds') or 0) <= min_seeds
|
||||
h = t.get('hash')
|
||||
if not h:
|
||||
continue
|
||||
if is_stalled:
|
||||
row = conn.execute('SELECT first_stalled_at FROM smart_queue_stalled WHERE profile_id=? AND torrent_hash=?', (profile_id, h)).fetchone()
|
||||
if row:
|
||||
conn.execute('UPDATE smart_queue_stalled SET updated_at=? WHERE profile_id=? AND torrent_hash=?', (now, profile_id, h))
|
||||
first = row['first_stalled_at']
|
||||
else:
|
||||
first = now
|
||||
conn.execute('INSERT OR REPLACE INTO smart_queue_stalled(profile_id,torrent_hash,first_stalled_at,updated_at) VALUES(?,?,?,?)', (profile_id, h, first, now))
|
||||
if now_ts - _ts(first) >= stalled_seconds:
|
||||
stalled.append(t)
|
||||
else:
|
||||
conn.execute('DELETE FROM smart_queue_stalled WHERE profile_id=? AND torrent_hash=?', (profile_id, h))
|
||||
|
||||
# Candidates with visible sources are preferred. Do not touch excluded torrents.
|
||||
candidates = sorted(
|
||||
stopped,
|
||||
key=lambda t: (int(t.get('seeds') or 0), int(t.get('peers') or 0), int(t.get('down_rate') or 0)),
|
||||
reverse=True,
|
||||
)
|
||||
max_active = max(1, int(settings.get('max_active_downloads') or 5))
|
||||
stalled_hashes = {str(t.get('hash') or '') for t in stalled}
|
||||
|
||||
# Enforce the hard active-download cap first. The previous logic only limited
|
||||
# newly resumed torrents, so already-active downloads could stay above the limit.
|
||||
pause_rank = sorted(
|
||||
downloading,
|
||||
key=lambda t: (
|
||||
0 if str(t.get('hash') or '') in stalled_hashes else 1,
|
||||
int(t.get('down_rate') or 0),
|
||||
int(t.get('seeds') or 0),
|
||||
int(t.get('peers') or 0),
|
||||
),
|
||||
)
|
||||
to_pause: list[dict[str, Any]] = pause_rank[:max(0, len(downloading) - max_active)]
|
||||
pause_hashes = {str(t.get('hash') or '') for t in to_pause}
|
||||
|
||||
# When the cap is not exceeded, stalled downloads can still be rotated out
|
||||
# one-for-one with better stopped candidates while staying within max_active.
|
||||
if candidates:
|
||||
replaceable_stalled = [t for t in stalled if str(t.get('hash') or '') not in pause_hashes]
|
||||
for t in replaceable_stalled[:max(0, len(candidates) - len(to_pause))]:
|
||||
to_pause.append(t)
|
||||
pause_hashes.add(str(t.get('hash') or ''))
|
||||
|
||||
active_after_pause = max(0, len(downloading) - len(to_pause))
|
||||
available_slots = max(0, max_active - active_after_pause)
|
||||
to_resume = candidates[:available_slots]
|
||||
|
||||
c = rtorrent.client_for(profile)
|
||||
paused: list[str] = []
|
||||
resumed: list[str] = []
|
||||
label_failed: list[str] = []
|
||||
for t in to_pause:
|
||||
try:
|
||||
c.call('d.pause', t['hash'])
|
||||
if not _mark_auto_paused(c, profile_id, t):
|
||||
label_failed.append(t['hash'])
|
||||
paused.append(t['hash'])
|
||||
except Exception:
|
||||
pass
|
||||
for t in to_resume:
|
||||
try:
|
||||
_restore_auto_label(c, profile_id, t['hash'], str(t.get('label') or ''))
|
||||
c.call('d.resume', t['hash'])
|
||||
c.call('d.start', t['hash'])
|
||||
resumed.append(t['hash'])
|
||||
except Exception:
|
||||
pass
|
||||
restored = _cleanup_auto_labels(c, profile_id, torrents, set(paused))
|
||||
add_history(profile_id, 'force_check' if force else 'auto_check', paused, resumed, len(torrents), {'excluded': len(excluded), 'enabled': bool(settings.get('enabled')), 'auto_label': SMART_QUEUE_LABEL, 'labels_restored': restored, 'labels_failed': label_failed, 'max_active_downloads': max_active, 'active_before': len(downloading), 'active_after': active_after_pause + len(resumed)}, user_id)
|
||||
return {'ok': True, 'enabled': bool(settings.get('enabled')), 'paused': paused, 'resumed': resumed, 'labels_restored': restored, 'labels_failed': label_failed, 'checked': len(torrents), 'excluded': len(excluded), 'settings': settings}
|
||||
26
pytorrent/services/startup_config.py
Normal file
26
pytorrent/services/startup_config.py
Normal file
@@ -0,0 +1,26 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from time import sleep
|
||||
from . import preferences, rtorrent
|
||||
|
||||
_started = False
|
||||
|
||||
|
||||
def schedule_startup_config_apply(socketio, delay_seconds: int = 60) -> None:
|
||||
"""Apply saved rTorrent UI overrides after pyTorrent has been running for a moment."""
|
||||
global _started
|
||||
if _started:
|
||||
return
|
||||
_started = True
|
||||
|
||||
def runner():
|
||||
sleep(max(0, int(delay_seconds)))
|
||||
try:
|
||||
for profile in preferences.list_profiles():
|
||||
result = rtorrent.apply_startup_overrides(profile)
|
||||
if not result.get("skipped"):
|
||||
socketio.emit("rtorrent_config_applied", {"profile_id": profile["id"], "result": result})
|
||||
except Exception as exc:
|
||||
socketio.emit("rtorrent_config_applied", {"ok": False, "error": str(exc)})
|
||||
|
||||
socketio.start_background_task(runner)
|
||||
55
pytorrent/services/torrent_cache.py
Normal file
55
pytorrent/services/torrent_cache.py
Normal file
@@ -0,0 +1,55 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from threading import RLock
|
||||
from time import time
|
||||
from . import rtorrent
|
||||
|
||||
_VOLATILE = {"down_rate", "down_rate_h", "up_rate", "up_rate_h", "progress", "completed_bytes", "peers", "seeds", "ratio", "state", "status", "message", "down_total", "down_total_h", "up_total", "up_total_h"}
|
||||
|
||||
|
||||
class TorrentCache:
|
||||
def __init__(self):
|
||||
self._lock = RLock()
|
||||
self._data: dict[int, dict[str, dict]] = {}
|
||||
self._errors: dict[int, str] = {}
|
||||
self._updated_at: dict[int, float] = {}
|
||||
|
||||
def snapshot(self, profile_id: int) -> list[dict]:
|
||||
with self._lock:
|
||||
return list(self._data.get(profile_id, {}).values())
|
||||
|
||||
def error(self, profile_id: int) -> str:
|
||||
with self._lock:
|
||||
return self._errors.get(profile_id, "")
|
||||
|
||||
def refresh(self, profile: dict) -> dict:
|
||||
profile_id = int(profile["id"])
|
||||
try:
|
||||
rows = rtorrent.list_torrents(profile)
|
||||
fresh = {t["hash"]: t for t in rows}
|
||||
with self._lock:
|
||||
old = self._data.get(profile_id, {})
|
||||
added = [v for h, v in fresh.items() if h not in old]
|
||||
removed = [h for h in old.keys() if h not in fresh]
|
||||
updated = []
|
||||
for h, new in fresh.items():
|
||||
prev = old.get(h)
|
||||
if not prev:
|
||||
continue
|
||||
patch = {"hash": h}
|
||||
for key, value in new.items():
|
||||
if prev.get(key) != value:
|
||||
patch[key] = value
|
||||
if len(patch) > 1:
|
||||
updated.append(patch)
|
||||
self._data[profile_id] = fresh
|
||||
self._errors[profile_id] = ""
|
||||
self._updated_at[profile_id] = time()
|
||||
return {"ok": True, "profile_id": profile_id, "added": added, "updated": updated, "removed": removed}
|
||||
except Exception as exc:
|
||||
with self._lock:
|
||||
self._errors[profile_id] = str(exc)
|
||||
return {"ok": False, "profile_id": profile_id, "error": str(exc), "added": [], "updated": [], "removed": []}
|
||||
|
||||
|
||||
torrent_cache = TorrentCache()
|
||||
130
pytorrent/services/torrent_summary.py
Normal file
130
pytorrent/services/torrent_summary.py
Normal file
@@ -0,0 +1,130 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from copy import deepcopy
|
||||
from threading import RLock
|
||||
from time import time
|
||||
|
||||
SUMMARY_CACHE_TTL_SECONDS = 60
|
||||
|
||||
_ERROR_PATTERNS = (
|
||||
"error",
|
||||
"failed",
|
||||
"failure",
|
||||
"timeout",
|
||||
"timed out",
|
||||
"tracker",
|
||||
"could not",
|
||||
"cannot",
|
||||
"refused",
|
||||
"unreachable",
|
||||
"denied",
|
||||
)
|
||||
_SUMMARY_TYPES = ("all", "downloading", "seeding", "paused", "checking", "error", "stopped")
|
||||
_summary_cache: dict[int, dict] = {}
|
||||
_summary_lock = RLock()
|
||||
|
||||
|
||||
def _number(row: dict, key: str) -> int:
|
||||
try:
|
||||
return int(float(row.get(key) or 0))
|
||||
except (TypeError, ValueError):
|
||||
return 0
|
||||
|
||||
|
||||
def _has_error(row: dict) -> bool:
|
||||
message = str(row.get("message") or "").strip().lower()
|
||||
return bool(message and any(pattern in message for pattern in _ERROR_PATTERNS))
|
||||
|
||||
|
||||
def _matches(row: dict, summary_type: str) -> bool:
|
||||
status = str(row.get("status") or "")
|
||||
if summary_type == "all":
|
||||
return True
|
||||
if summary_type == "downloading":
|
||||
return not bool(row.get("complete")) and bool(row.get("state")) and not bool(row.get("paused"))
|
||||
if summary_type == "seeding":
|
||||
return status != "Checking" and bool(row.get("complete")) and bool(row.get("state")) and not bool(row.get("paused"))
|
||||
if summary_type == "paused":
|
||||
return bool(row.get("paused")) or status == "Paused"
|
||||
if summary_type == "checking":
|
||||
return status == "Checking" or _number(row, "hashing") > 0
|
||||
if summary_type == "error":
|
||||
return _has_error(row)
|
||||
if summary_type == "stopped":
|
||||
return not bool(row.get("state"))
|
||||
return False
|
||||
|
||||
|
||||
def _empty_bucket() -> dict:
|
||||
return {
|
||||
"count": 0,
|
||||
"size": 0,
|
||||
"disk_bytes": 0,
|
||||
"completed_bytes": 0,
|
||||
"remaining_bytes": 0,
|
||||
"progress_percent": 0.0,
|
||||
"remaining_percent": 100.0,
|
||||
# Kept for backward compatibility with older clients; not used by the filters UI.
|
||||
"down_total": 0,
|
||||
"up_total": 0,
|
||||
}
|
||||
|
||||
|
||||
def build_summary(rows: list[dict]) -> dict:
|
||||
filters = {summary_type: _empty_bucket() for summary_type in _SUMMARY_TYPES}
|
||||
for row in rows:
|
||||
for summary_type in _SUMMARY_TYPES:
|
||||
if not _matches(row, summary_type):
|
||||
continue
|
||||
bucket = filters[summary_type]
|
||||
bucket["count"] += 1
|
||||
size = _number(row, "size")
|
||||
completed = min(size, _number(row, "completed_bytes")) if size else _number(row, "completed_bytes")
|
||||
bucket["size"] += size
|
||||
bucket["completed_bytes"] += completed
|
||||
bucket["disk_bytes"] += completed
|
||||
bucket["down_total"] += _number(row, "down_total")
|
||||
bucket["up_total"] += _number(row, "up_total")
|
||||
for bucket in filters.values():
|
||||
bucket["remaining_bytes"] = max(0, bucket["size"] - bucket["completed_bytes"])
|
||||
if bucket["size"] > 0:
|
||||
bucket["progress_percent"] = round((bucket["completed_bytes"] / bucket["size"]) * 100, 1)
|
||||
bucket["remaining_percent"] = round(100 - bucket["progress_percent"], 1)
|
||||
else:
|
||||
bucket["progress_percent"] = 0.0
|
||||
bucket["remaining_percent"] = 0.0
|
||||
now = time()
|
||||
return {
|
||||
"filters": filters,
|
||||
"cache_ttl_seconds": SUMMARY_CACHE_TTL_SECONDS,
|
||||
"generated_at_epoch": now,
|
||||
"cached": False,
|
||||
}
|
||||
|
||||
|
||||
def cached_summary(profile_id: int, rows: list[dict], force: bool = False) -> dict:
|
||||
now = time()
|
||||
with _summary_lock:
|
||||
cached = _summary_cache.get(int(profile_id))
|
||||
rows_count = len(rows or [])
|
||||
cached_count = int(((cached or {}).get("filters") or {}).get("all", {}).get("count") or 0)
|
||||
cache_is_fresh = cached and now - float(cached.get("generated_at_epoch") or 0) < SUMMARY_CACHE_TTL_SECONDS
|
||||
cache_is_usable = cache_is_fresh and not (cached_count == 0 and rows_count > 0)
|
||||
if not force and cache_is_usable:
|
||||
result = deepcopy(cached)
|
||||
result["cached"] = True
|
||||
return result
|
||||
result = build_summary(rows or [])
|
||||
# Do not cache an empty cold-start snapshot. On first connection the cache may be populated
|
||||
# before rTorrent refresh finishes, which would otherwise show zeros for the full TTL.
|
||||
if rows_count > 0 or force:
|
||||
_summary_cache[int(profile_id)] = deepcopy(result)
|
||||
return result
|
||||
|
||||
|
||||
def invalidate_summary(profile_id: int | None = None) -> None:
|
||||
with _summary_lock:
|
||||
if profile_id is None:
|
||||
_summary_cache.clear()
|
||||
else:
|
||||
_summary_cache.pop(int(profile_id), None)
|
||||
117
pytorrent/services/traffic_history.py
Normal file
117
pytorrent/services/traffic_history.py
Normal file
@@ -0,0 +1,117 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from typing import Any
|
||||
|
||||
from ..config import TRAFFIC_HISTORY_RETENTION_DAYS
|
||||
from ..db import connect, utcnow
|
||||
from . import retention
|
||||
|
||||
_LAST_WRITE: dict[int, float] = {}
|
||||
WRITE_EVERY_SECONDS = 60
|
||||
|
||||
|
||||
def _now_ts() -> float:
|
||||
return datetime.now(timezone.utc).timestamp()
|
||||
|
||||
|
||||
def record(profile_id: int, down_rate: int = 0, up_rate: int = 0, total_down: int = 0, total_up: int = 0, force: bool = False) -> None:
|
||||
"""Store compact transfer samples. One sample per minute per profile keeps SQLite small."""
|
||||
profile_id = int(profile_id)
|
||||
now_ts = _now_ts()
|
||||
if not force and now_ts - _LAST_WRITE.get(profile_id, 0.0) < WRITE_EVERY_SECONDS:
|
||||
return
|
||||
_LAST_WRITE[profile_id] = now_ts
|
||||
with connect() as conn:
|
||||
conn.execute(
|
||||
"INSERT INTO traffic_history(profile_id,down_rate,up_rate,total_down,total_up,created_at) VALUES(?,?,?,?,?,?)",
|
||||
(profile_id, int(down_rate or 0), int(up_rate or 0), int(total_down or 0), int(total_up or 0), utcnow()),
|
||||
)
|
||||
retention.cleanup()
|
||||
|
||||
|
||||
def _range_to_cutoff(range_name: str) -> datetime:
|
||||
now = datetime.now(timezone.utc)
|
||||
if range_name == "15m":
|
||||
return now - timedelta(minutes=15)
|
||||
if range_name == "1h":
|
||||
return now - timedelta(hours=1)
|
||||
if range_name == "3h":
|
||||
return now - timedelta(hours=3)
|
||||
if range_name == "6h":
|
||||
return now - timedelta(hours=6)
|
||||
if range_name == "24h":
|
||||
return now - timedelta(hours=24)
|
||||
if range_name == "30d":
|
||||
return now - timedelta(days=30)
|
||||
if range_name == "90d":
|
||||
return now - timedelta(days=90)
|
||||
return now - timedelta(days=7)
|
||||
|
||||
|
||||
def _bucket_for(range_name: str) -> str:
|
||||
if range_name in {"15m", "1h", "3h"}:
|
||||
return "%Y-%m-%d %H:%M"
|
||||
if range_name in {"6h", "24h"}:
|
||||
return "%Y-%m-%d %H:00"
|
||||
return "%Y-%m-%d"
|
||||
|
||||
|
||||
def _row_value(row: Any, key: str, index: int, default: Any = 0) -> Any:
|
||||
# connect() uses dict_factory, so SQLite rows are dicts. The fallback keeps
|
||||
# this function compatible with tuple/list rows in tests or future refactors.
|
||||
if isinstance(row, dict):
|
||||
return row.get(key, default)
|
||||
try:
|
||||
return row[index]
|
||||
except (IndexError, KeyError, TypeError):
|
||||
return default
|
||||
|
||||
|
||||
def history(profile_id: int, range_name: str = "7d") -> dict[str, Any]:
|
||||
cutoff = _range_to_cutoff(range_name)
|
||||
bucket = _bucket_for(range_name)
|
||||
cutoff_s = cutoff.isoformat(timespec="seconds")
|
||||
bucket_name = "minute" if range_name in {"15m", "1h", "3h"} else ("hour" if range_name in {"6h", "24h"} else "day")
|
||||
with connect() as conn:
|
||||
raw = conn.execute(
|
||||
"""
|
||||
SELECT down_rate, up_rate, total_down, total_up, created_at
|
||||
FROM traffic_history
|
||||
WHERE profile_id=? AND created_at >= ?
|
||||
ORDER BY created_at ASC
|
||||
""",
|
||||
(int(profile_id), cutoff_s),
|
||||
).fetchall()
|
||||
|
||||
rows_by_bucket: dict[str, dict[str, Any]] = {}
|
||||
prev_down = prev_up = None
|
||||
for r in raw:
|
||||
created = str(_row_value(r, "created_at", 4, ""))
|
||||
try:
|
||||
dt = datetime.fromisoformat(created.replace("Z", "+00:00"))
|
||||
except Exception:
|
||||
continue
|
||||
b = dt.strftime(bucket)
|
||||
item = rows_by_bucket.setdefault(b, {"bucket": b, "avg_down_rate": 0, "avg_up_rate": 0, "downloaded": 0, "uploaded": 0, "samples": 0})
|
||||
down_rate = int(_row_value(r, "down_rate", 0, 0) or 0)
|
||||
up_rate = int(_row_value(r, "up_rate", 1, 0) or 0)
|
||||
total_down = int(_row_value(r, "total_down", 2, 0) or 0)
|
||||
total_up = int(_row_value(r, "total_up", 3, 0) or 0)
|
||||
item["avg_down_rate"] += down_rate
|
||||
item["avg_up_rate"] += up_rate
|
||||
item["samples"] += 1
|
||||
if prev_down is not None and total_down >= prev_down:
|
||||
item["downloaded"] += total_down - prev_down
|
||||
if prev_up is not None and total_up >= prev_up:
|
||||
item["uploaded"] += total_up - prev_up
|
||||
prev_down, prev_up = total_down, total_up
|
||||
|
||||
rows = []
|
||||
for item in rows_by_bucket.values():
|
||||
samples = max(1, int(item["samples"] or 1))
|
||||
item["avg_down_rate"] = round(item["avg_down_rate"] / samples)
|
||||
item["avg_up_rate"] = round(item["avg_up_rate"] / samples)
|
||||
rows.append(item)
|
||||
rows.sort(key=lambda x: x["bucket"])
|
||||
return {"range": range_name, "bucket": bucket_name, "retention_days": TRAFFIC_HISTORY_RETENTION_DAYS, "rows": rows}
|
||||
84
pytorrent/services/websocket.py
Normal file
84
pytorrent/services/websocket.py
Normal file
@@ -0,0 +1,84 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import psutil
|
||||
from flask_socketio import emit
|
||||
from ..config import POLL_INTERVAL
|
||||
from .preferences import active_profile, get_profile
|
||||
from .torrent_cache import torrent_cache
|
||||
from .torrent_summary import cached_summary
|
||||
from . import rtorrent, smart_queue, traffic_history, automation_rules
|
||||
|
||||
_started = False
|
||||
|
||||
|
||||
def register_socketio_handlers(socketio):
|
||||
global _started
|
||||
|
||||
def poller():
|
||||
tick = 0
|
||||
while True:
|
||||
profile = active_profile()
|
||||
if profile:
|
||||
diff = torrent_cache.refresh(profile)
|
||||
heartbeat = {"ok": bool(diff.get("ok")), "profile_id": profile["id"], "tick": tick, "error": diff.get("error", "")}
|
||||
if diff.get("ok") and (diff["added"] or diff["updated"] or diff["removed"]):
|
||||
socketio.emit("torrent_patch", {**diff, "summary": cached_summary(profile["id"], torrent_cache.snapshot(profile["id"]), force=True)})
|
||||
elif not diff.get("ok"):
|
||||
socketio.emit("rtorrent_error", diff)
|
||||
try:
|
||||
status = rtorrent.system_status(profile)
|
||||
if bool(profile.get("is_remote")):
|
||||
status["usage_source"] = "remote-hidden"
|
||||
status["usage_available"] = False
|
||||
else:
|
||||
status["cpu"] = psutil.cpu_percent(interval=None)
|
||||
status["ram"] = psutil.virtual_memory().percent
|
||||
status["usage_source"] = "local"
|
||||
status["usage_available"] = True
|
||||
status["profile_id"] = profile["id"]
|
||||
traffic_history.record(profile["id"], status.get("down_rate", 0), status.get("up_rate", 0), status.get("total_down", 0), status.get("total_up", 0))
|
||||
socketio.emit("system_stats", status)
|
||||
heartbeat["ok"] = True
|
||||
except Exception as exc:
|
||||
heartbeat["ok"] = False
|
||||
heartbeat["error"] = str(exc)
|
||||
socketio.emit("rtorrent_error", {"profile_id": profile["id"], "error": str(exc)})
|
||||
if tick % max(1, int(30 / POLL_INTERVAL)) == 0:
|
||||
try:
|
||||
result = smart_queue.check(profile, force=False)
|
||||
if result.get("enabled"):
|
||||
socketio.emit("smart_queue_update", result)
|
||||
except Exception as exc:
|
||||
socketio.emit("smart_queue_update", {"ok": False, "error": str(exc)})
|
||||
try:
|
||||
auto_result = automation_rules.check(profile, force=False)
|
||||
if auto_result.get("applied"):
|
||||
socketio.emit("automation_update", auto_result)
|
||||
except Exception as exc:
|
||||
socketio.emit("automation_update", {"ok": False, "error": str(exc)})
|
||||
socketio.emit("heartbeat", heartbeat)
|
||||
tick += 1
|
||||
socketio.sleep(POLL_INTERVAL)
|
||||
|
||||
@socketio.on("connect")
|
||||
def handle_connect():
|
||||
global _started
|
||||
if not _started:
|
||||
socketio.start_background_task(poller)
|
||||
_started = True
|
||||
profile = active_profile()
|
||||
emit("connected", {"ok": True, "profile": profile})
|
||||
if profile:
|
||||
rows = torrent_cache.snapshot(profile["id"])
|
||||
emit("torrent_snapshot", {"profile_id": profile["id"], "torrents": rows, "summary": cached_summary(profile["id"], rows)})
|
||||
|
||||
@socketio.on("select_profile")
|
||||
def handle_select_profile(data):
|
||||
profile_id = int((data or {}).get("profile_id") or 0)
|
||||
profile = get_profile(profile_id)
|
||||
if not profile:
|
||||
emit("rtorrent_error", {"error": "Profile does not exist"})
|
||||
return
|
||||
diff = torrent_cache.refresh(profile)
|
||||
rows = torrent_cache.snapshot(profile_id)
|
||||
emit("torrent_snapshot", {"profile_id": profile_id, "torrents": rows, "summary": cached_summary(profile_id, rows, force=True), "error": diff.get("error", "")})
|
||||
250
pytorrent/services/workers.py
Normal file
250
pytorrent/services/workers.py
Normal file
@@ -0,0 +1,250 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import threading
|
||||
import time
|
||||
import uuid
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from . import rtorrent
|
||||
from .preferences import get_profile
|
||||
from ..config import WORKERS
|
||||
from ..db import connect, utcnow, default_user_id
|
||||
|
||||
_executor = ThreadPoolExecutor(max_workers=WORKERS, thread_name_prefix="pytorrent-job")
|
||||
_socketio = None
|
||||
_semaphores: dict[int, threading.Semaphore] = {}
|
||||
_exclusive_locks: dict[int, threading.Lock] = {}
|
||||
_sem_lock = threading.Lock()
|
||||
|
||||
|
||||
def set_socketio(socketio):
|
||||
global _socketio
|
||||
_socketio = socketio
|
||||
|
||||
|
||||
def _emit(name: str, payload: dict):
|
||||
if _socketio:
|
||||
_socketio.emit(name, payload)
|
||||
|
||||
|
||||
def _get_sem(profile: dict) -> threading.Semaphore:
|
||||
profile_id = int(profile["id"])
|
||||
max_parallel = max(1, int(profile.get("max_parallel_jobs") or 3))
|
||||
with _sem_lock:
|
||||
if profile_id not in _semaphores:
|
||||
_semaphores[profile_id] = threading.Semaphore(max_parallel)
|
||||
return _semaphores[profile_id]
|
||||
|
||||
|
||||
def _get_exclusive_lock(profile_id: int) -> threading.Lock:
|
||||
with _sem_lock:
|
||||
if profile_id not in _exclusive_locks:
|
||||
_exclusive_locks[profile_id] = threading.Lock()
|
||||
return _exclusive_locks[profile_id]
|
||||
|
||||
|
||||
def _job_row(job_id: str):
|
||||
with connect() as conn:
|
||||
return conn.execute("SELECT rowid AS _rowid, * FROM jobs WHERE id=?", (job_id,)).fetchone()
|
||||
|
||||
|
||||
def _is_ordered_action(action_name: str) -> bool:
|
||||
return action_name in {"move", "remove"}
|
||||
|
||||
|
||||
def _has_prior_ordered_jobs(profile_id: int, rowid: int) -> bool:
|
||||
with connect() as conn:
|
||||
row = conn.execute(
|
||||
"""
|
||||
SELECT 1
|
||||
FROM jobs
|
||||
WHERE profile_id=?
|
||||
AND rowid<?
|
||||
AND action IN ('move', 'remove')
|
||||
AND status IN ('pending', 'running')
|
||||
LIMIT 1
|
||||
""",
|
||||
(profile_id, rowid),
|
||||
).fetchone()
|
||||
return bool(row)
|
||||
|
||||
|
||||
def _wait_for_prior_ordered_jobs(job_id: str, profile_id: int, rowid: int) -> bool:
|
||||
while _has_prior_ordered_jobs(profile_id, rowid):
|
||||
fresh = _job_row(job_id)
|
||||
if not fresh or fresh["status"] == "cancelled":
|
||||
return False
|
||||
time.sleep(0.5)
|
||||
return True
|
||||
|
||||
|
||||
def _set_job(job_id: str, status: str, error: str = "", result: dict | None = None, started: bool = False, finished: bool = False):
|
||||
now = utcnow()
|
||||
fields = ["status=?", "error=?", "updated_at=?"]
|
||||
values: list = [status, error, now]
|
||||
if result is not None:
|
||||
fields.append("result_json=?")
|
||||
values.append(json.dumps(result))
|
||||
if started:
|
||||
fields.append("started_at=?")
|
||||
values.append(now)
|
||||
if finished:
|
||||
fields.append("finished_at=?")
|
||||
values.append(now)
|
||||
values.append(job_id)
|
||||
with connect() as conn:
|
||||
conn.execute(f"UPDATE jobs SET {', '.join(fields)} WHERE id=?", values)
|
||||
|
||||
|
||||
def enqueue(action_name: str, profile_id: int, payload: dict, user_id: int | None = None, max_attempts: int = 2) -> str:
|
||||
user_id = user_id or default_user_id()
|
||||
job_id = uuid.uuid4().hex
|
||||
now = utcnow()
|
||||
with connect() as conn:
|
||||
conn.execute(
|
||||
"INSERT INTO jobs(id,user_id,profile_id,action,payload_json,status,attempts,max_attempts,created_at,updated_at) VALUES(?,?,?,?,?,?,?,?,?,?)",
|
||||
(job_id, user_id, profile_id, action_name, json.dumps(payload), "pending", 0, max_attempts, now, now),
|
||||
)
|
||||
_emit("job_update", {"id": job_id, "action": action_name, "profile_id": profile_id, "status": "pending"})
|
||||
_executor.submit(_run, job_id)
|
||||
return job_id
|
||||
|
||||
|
||||
def _execute(profile: dict, action_name: str, payload: dict):
|
||||
if action_name == "add_magnet":
|
||||
return rtorrent.add_magnet(profile, payload["uri"], bool(payload.get("start", True)), str(payload.get("directory") or ""), str(payload.get("label") or ""))
|
||||
if action_name == "add_torrent_raw":
|
||||
import base64
|
||||
raw = base64.b64decode(payload["data_b64"])
|
||||
return rtorrent.add_torrent_raw(profile, raw, bool(payload.get("start", True)), str(payload.get("directory") or ""), str(payload.get("label") or ""))
|
||||
if action_name == "set_limits":
|
||||
return rtorrent.set_limits(profile, payload.get("down"), payload.get("up"))
|
||||
hashes = payload.get("hashes") or []
|
||||
return rtorrent.action(profile, hashes, action_name, payload)
|
||||
|
||||
|
||||
def _run(job_id: str):
|
||||
job = _job_row(job_id)
|
||||
if not job or job["status"] == "cancelled":
|
||||
return
|
||||
profile = get_profile(int(job["profile_id"]), int(job["user_id"]))
|
||||
if not profile:
|
||||
_set_job(job_id, "failed", "rTorrent profile does not exist", finished=True)
|
||||
_emit("job_update", {"id": job_id, "status": "failed", "error": "profile not found"})
|
||||
return
|
||||
profile_id = int(profile["id"])
|
||||
ordered_lock = None
|
||||
if _is_ordered_action(str(job["action"])):
|
||||
if not _wait_for_prior_ordered_jobs(job_id, profile_id, int(job["_rowid"])):
|
||||
return
|
||||
ordered_lock = _get_exclusive_lock(profile_id)
|
||||
ordered_lock.acquire()
|
||||
sem = _get_sem(profile)
|
||||
sem.acquire()
|
||||
try:
|
||||
job = _job_row(job_id)
|
||||
if not job or job["status"] == "cancelled":
|
||||
return
|
||||
payload = json.loads(job.get("payload_json") or "{}")
|
||||
attempts = int(job.get("attempts") or 0) + 1
|
||||
with connect() as conn:
|
||||
conn.execute("UPDATE jobs SET status='running', attempts=?, started_at=COALESCE(started_at, ?), updated_at=? WHERE id=?", (attempts, utcnow(), utcnow(), job_id))
|
||||
_emit("operation_started", {"job_id": job_id, "action": job["action"], "profile_id": profile["id"], "hashes": payload.get("hashes") or [], "hash_count": len(payload.get("hashes") or []), "bulk": len(payload.get("hashes") or []) > 1})
|
||||
_emit("job_update", {"id": job_id, "status": "running", "attempts": attempts})
|
||||
result = _execute(profile, job["action"], payload)
|
||||
_set_job(job_id, "done", result=result, finished=True)
|
||||
_emit("operation_finished", {"job_id": job_id, "action": job["action"], "profile_id": profile["id"], "hashes": payload.get("hashes") or [], "hash_count": len(payload.get("hashes") or []), "bulk": len(payload.get("hashes") or []) > 1, "result": result})
|
||||
_emit("job_update", {"id": job_id, "status": "done", "result": result})
|
||||
except Exception as exc:
|
||||
fresh = _job_row(job_id) or {}
|
||||
attempts = int(fresh.get("attempts") or 1)
|
||||
max_attempts = int(fresh.get("max_attempts") or 2)
|
||||
status = "pending" if attempts < max_attempts else "failed"
|
||||
_set_job(job_id, status, str(exc), finished=(status == "failed"))
|
||||
_emit("operation_failed", {"job_id": job_id, "action": job.get("action"), "profile_id": job.get("profile_id"), "hashes": payload.get("hashes") or [], "error": str(exc)})
|
||||
_emit("job_update", {"id": job_id, "status": status, "error": str(exc), "attempts": attempts})
|
||||
if status == "pending":
|
||||
_executor.submit(_run, job_id)
|
||||
finally:
|
||||
sem.release()
|
||||
if ordered_lock:
|
||||
ordered_lock.release()
|
||||
|
||||
|
||||
def _safe_json(value, fallback):
|
||||
try:
|
||||
return json.loads(value or "")
|
||||
except Exception:
|
||||
return fallback
|
||||
|
||||
|
||||
def _job_summary(row: dict, payload: dict, result: dict) -> str:
|
||||
ctx = payload.get("job_context") or {}
|
||||
count = int(ctx.get("hash_count") or len(payload.get("hashes") or []) or result.get("count") or 0)
|
||||
parts = []
|
||||
if count:
|
||||
parts.append(("bulk " if count > 1 else "single ") + f"{count} torrent(s)")
|
||||
if ctx.get("target_path"):
|
||||
parts.append(f"target: {ctx.get('target_path')}")
|
||||
if ctx.get("remove_data"):
|
||||
parts.append("remove data")
|
||||
if ctx.get("move_data"):
|
||||
parts.append("move data")
|
||||
if result.get("count") is not None:
|
||||
parts.append(f"done: {result.get('count')}")
|
||||
if result.get("errors"):
|
||||
parts.append(f"errors: {len(result.get('errors') or [])}")
|
||||
return "; ".join(parts)
|
||||
|
||||
|
||||
def _public_job(row) -> dict:
|
||||
d = dict(row)
|
||||
payload = _safe_json(d.get("payload_json"), {})
|
||||
result = _safe_json(d.get("result_json"), {})
|
||||
ctx = payload.get("job_context") or {}
|
||||
d["payload"] = payload
|
||||
d["result"] = result
|
||||
d["hash_count"] = int(ctx.get("hash_count") or len(payload.get("hashes") or []) or result.get("count") or 0)
|
||||
d["is_bulk"] = bool(ctx.get("bulk") or d["hash_count"] > 1)
|
||||
d["summary"] = _job_summary(d, payload, result)
|
||||
items = ctx.get("items") or []
|
||||
if d["is_bulk"]:
|
||||
d["items_preview"] = ""
|
||||
else:
|
||||
d["items_preview"] = ", ".join([str((x or {}).get("name") or (x or {}).get("hash") or "") for x in items[:1] if x])
|
||||
return d
|
||||
|
||||
|
||||
def list_jobs(limit: int = 200, offset: int = 0):
|
||||
limit = max(1, min(int(limit or 50), 500))
|
||||
offset = max(0, int(offset or 0))
|
||||
with connect() as conn:
|
||||
rows = conn.execute("SELECT * FROM jobs ORDER BY created_at DESC LIMIT ? OFFSET ?", (limit, offset)).fetchall()
|
||||
total = conn.execute("SELECT COUNT(*) AS n FROM jobs").fetchone()["n"]
|
||||
return {"rows": [_public_job(r) for r in rows], "total": total, "limit": limit, "offset": offset}
|
||||
|
||||
|
||||
def cancel_job(job_id: str) -> bool:
|
||||
row = _job_row(job_id)
|
||||
if not row or row["status"] not in {"pending", "failed"}:
|
||||
return False
|
||||
_set_job(job_id, "cancelled", finished=True)
|
||||
_emit("job_update", {"id": job_id, "status": "cancelled"})
|
||||
return True
|
||||
|
||||
|
||||
def clear_jobs() -> int:
|
||||
with connect() as conn:
|
||||
cur = conn.execute("DELETE FROM jobs WHERE status NOT IN ('pending', 'running')")
|
||||
return int(cur.rowcount or 0)
|
||||
|
||||
|
||||
def retry_job(job_id: str) -> bool:
|
||||
row = _job_row(job_id)
|
||||
if not row or row["status"] not in {"failed", "cancelled"}:
|
||||
return False
|
||||
with connect() as conn:
|
||||
conn.execute("UPDATE jobs SET status='pending', error='', finished_at=NULL, updated_at=? WHERE id=?", (utcnow(), job_id))
|
||||
_emit("job_update", {"id": job_id, "status": "pending"})
|
||||
_executor.submit(_run, job_id)
|
||||
return True
|
||||
Reference in New Issue
Block a user