Files
pyTorrent/pytorrent/services/smart_queue.py
Mateusz Gruszczyński a72b6eb364 labels and automatizations
2026-05-06 22:13:52 +02:00

579 lines
26 KiB
Python

from __future__ import annotations
from datetime import datetime, timezone
from typing import Any
import json
import time
from ..config import SMART_QUEUE_LABEL
from ..db import connect, default_user_id, utcnow
from . import rtorrent
from .preferences import active_profile, get_profile
def _ts(value: str | None) -> float:
if not value:
return 0.0
try:
return datetime.fromisoformat(value.replace('Z', '+00:00')).timestamp()
except Exception:
return 0.0
def _default_settings(user_id: int, profile_id: int) -> dict[str, Any]:
return {
'user_id': user_id,
'profile_id': profile_id,
'enabled': 0,
'max_active_downloads': 5,
'stalled_seconds': 300,
'min_speed_bytes': 1024,
'min_seeds': 1,
'manage_stopped': 0,
'updated_at': utcnow(),
}
def get_settings(profile_id: int, user_id: int | None = None) -> dict[str, Any]:
user_id = user_id or default_user_id()
with connect() as conn:
row = conn.execute(
'SELECT * FROM smart_queue_settings WHERE user_id=? AND profile_id=?',
(user_id, profile_id),
).fetchone()
return row or _default_settings(user_id, profile_id)
def save_settings(profile_id: int, data: dict[str, Any], user_id: int | None = None) -> dict[str, Any]:
user_id = user_id or default_user_id()
current = get_settings(profile_id, user_id)
settings = {
'enabled': 1 if data.get('enabled', current.get('enabled')) else 0,
'max_active_downloads': max(1, int(data.get('max_active_downloads') or current.get('max_active_downloads') or 5)),
'stalled_seconds': max(30, int(data.get('stalled_seconds') or current.get('stalled_seconds') or 300)),
'min_speed_bytes': max(0, int(data.get('min_speed_bytes') or current.get('min_speed_bytes') or 0)),
'min_seeds': max(0, int(data.get('min_seeds') or current.get('min_seeds') or 0)),
# Note: This switch protects fully stopped torrents from automatic starts; by default Smart Queue manages only paused items.
'manage_stopped': 1 if data.get('manage_stopped', current.get('manage_stopped')) else 0,
}
now = utcnow()
with connect() as conn:
conn.execute(
'''INSERT INTO smart_queue_settings(user_id,profile_id,enabled,max_active_downloads,stalled_seconds,min_speed_bytes,min_seeds,manage_stopped,updated_at)
VALUES(?,?,?,?,?,?,?,?,?)
ON CONFLICT(user_id, profile_id) DO UPDATE SET
enabled=excluded.enabled,
max_active_downloads=excluded.max_active_downloads,
stalled_seconds=excluded.stalled_seconds,
min_speed_bytes=excluded.min_speed_bytes,
min_seeds=excluded.min_seeds,
manage_stopped=excluded.manage_stopped,
updated_at=excluded.updated_at''',
(user_id, profile_id, settings['enabled'], settings['max_active_downloads'], settings['stalled_seconds'], settings['min_speed_bytes'], settings['min_seeds'], settings['manage_stopped'], now),
)
return get_settings(profile_id, user_id)
def list_exclusions(profile_id: int, user_id: int | None = None) -> list[dict[str, Any]]:
user_id = user_id or default_user_id()
with connect() as conn:
return conn.execute(
'SELECT * FROM smart_queue_exclusions WHERE user_id=? AND profile_id=? ORDER BY created_at DESC',
(user_id, profile_id),
).fetchall()
def set_exclusion(profile_id: int, torrent_hash: str, excluded: bool, reason: str = '', user_id: int | None = None) -> None:
user_id = user_id or default_user_id()
now = utcnow()
with connect() as conn:
if excluded:
conn.execute(
'INSERT OR REPLACE INTO smart_queue_exclusions(user_id,profile_id,torrent_hash,reason,created_at) VALUES(?,?,?,?,?)',
(user_id, profile_id, torrent_hash, reason, now),
)
else:
conn.execute(
'DELETE FROM smart_queue_exclusions WHERE user_id=? AND profile_id=? AND torrent_hash=?',
(user_id, profile_id, torrent_hash),
)
def add_history(profile_id: int, event: str, paused: list[str] | None = None, resumed: list[str] | None = None, checked: int = 0, details: dict[str, Any] | None = None, user_id: int | None = None) -> None:
user_id = user_id or default_user_id()
paused = paused or []
resumed = resumed or []
details = details or {}
with connect() as conn:
conn.execute(
'INSERT INTO smart_queue_history(user_id,profile_id,event,paused_count,resumed_count,checked_count,details_json,created_at) VALUES(?,?,?,?,?,?,?,?)',
(user_id, profile_id, event, len(paused), len(resumed), int(checked or 0), json.dumps({**details, 'paused': paused, 'resumed': resumed}), utcnow()),
)
def list_history(profile_id: int, user_id: int | None = None, limit: int = 30) -> list[dict[str, Any]]:
user_id = user_id or default_user_id()
with connect() as conn:
return conn.execute(
'SELECT * FROM smart_queue_history WHERE user_id=? AND profile_id=? ORDER BY created_at DESC LIMIT ?',
(user_id, profile_id, max(1, min(int(limit or 30), 100))),
).fetchall()
def count_history(profile_id: int, user_id: int | None = None) -> int:
user_id = user_id or default_user_id()
with connect() as conn:
row = conn.execute(
'SELECT COUNT(*) AS count FROM smart_queue_history WHERE user_id=? AND profile_id=?',
(user_id, profile_id),
).fetchone()
return int((row or {}).get('count') or 0)
def _excluded_hashes(profile_id: int, user_id: int) -> set[str]:
return {r['torrent_hash'] for r in list_exclusions(profile_id, user_id)}
def _label_names(value: str | None) -> list[str]:
names: list[str] = []
for part in str(value or '').replace(';', ',').replace('|', ',').split(','):
label = part.strip()
if label and label not in names:
names.append(label)
return names
def _label_value(labels: list[str]) -> str:
out: list[str] = []
for label in labels:
label = str(label or '').strip()
if label and label not in out:
out.append(label)
return ', '.join(out)
def _has_smart_queue_label(value: str | None) -> bool:
return SMART_QUEUE_LABEL in _label_names(value)
def _without_smart_queue_label(value: str | None) -> str:
return _label_value([label for label in _label_names(value) if label != SMART_QUEUE_LABEL])
def _with_smart_queue_label(value: str | None) -> str:
labels = _label_names(value)
if SMART_QUEUE_LABEL not in labels:
labels.append(SMART_QUEUE_LABEL)
return _label_value(labels)
def _remember_auto_label(profile_id: int, torrent_hash: str, previous_label: str) -> None:
now = utcnow()
with connect() as conn:
row = conn.execute(
'SELECT previous_label FROM smart_queue_auto_labels WHERE profile_id=? AND torrent_hash=?',
(profile_id, torrent_hash),
).fetchone()
if row:
conn.execute(
'UPDATE smart_queue_auto_labels SET updated_at=? WHERE profile_id=? AND torrent_hash=?',
(now, profile_id, torrent_hash),
)
else:
conn.execute(
'INSERT INTO smart_queue_auto_labels(profile_id,torrent_hash,previous_label,created_at,updated_at) VALUES(?,?,?,?,?)',
(profile_id, torrent_hash, previous_label, now, now),
)
def _read_label(client: Any, torrent_hash: str, fallback: str = '') -> str:
try:
return str(client.call('d.custom1', torrent_hash) or '')
except Exception:
return fallback
def _restore_auto_label(client: Any, profile_id: int, torrent_hash: str, current_label: str | None = None) -> bool:
with connect() as conn:
row = conn.execute(
'SELECT previous_label FROM smart_queue_auto_labels WHERE profile_id=? AND torrent_hash=?',
(profile_id, torrent_hash),
).fetchone()
live_label = _read_label(client, torrent_hash, current_label or '')
if not row and not _has_smart_queue_label(live_label):
return False
restored = _without_smart_queue_label(live_label)
previous = _without_smart_queue_label((row or {}).get('previous_label') or '')
if not restored and previous:
restored = previous
try:
# Note: Smart Queue now removes only its technical label, preserving labels added manually while the torrent was waiting in the queue.
if _has_smart_queue_label(live_label) or current_label is None:
client.call('d.custom1.set', torrent_hash, restored)
if row:
conn.execute('DELETE FROM smart_queue_auto_labels WHERE profile_id=? AND torrent_hash=?', (profile_id, torrent_hash))
return True
except Exception:
return False
def _call_rtorrent_setter(client: Any, method: str, value: int) -> bool:
"""Set a scalar rTorrent setting while tolerating XMLRPC signature differences."""
for args in ((int(value),), ('', int(value))):
try:
client.call(method, *args)
return True
except Exception:
continue
return False
def _ensure_rtorrent_download_cap(client: Any, max_active: int) -> dict[str, Any]:
"""Raise rTorrent download caps that can silently limit Smart Queue to one item."""
result: dict[str, Any] = {'checked': False, 'updated': False, 'items': []}
# Note: rTorrent may have separate global and per-throttle limits. When div=1,
# starts can effectively stop at one active torrent even when the target is 100.
for key in ('throttle.max_downloads.global', 'throttle.max_downloads.div'):
item: dict[str, Any] = {'key': key, 'checked': False, 'updated': False}
try:
current = int(client.call(key) or 0)
item.update({'checked': True, 'current': current, 'target': int(max_active)})
result['checked'] = True
# Note: 0 means unlimited; raise only positive limits lower than the target.
if 0 < current < max_active:
ok = _call_rtorrent_setter(client, f'{key}.set', int(max_active))
item['updated'] = ok
if ok:
result['updated'] = True
item['new'] = int(max_active)
result.setdefault('current', current)
result['new'] = int(max_active)
except Exception as exc:
item.update({'error': str(exc)})
result['items'].append(item)
return result
def _start_download(client: Any, torrent: dict[str, Any]) -> dict[str, Any]:
"""Resume paused torrents through rTorrent's pause model."""
h = str(torrent.get('hash') or '')
if not h:
return {'hash': h, 'ok': False, 'error': 'missing hash'}
if bool(torrent.get('paused')) or str(torrent.get('status') or '').lower() == 'paused' or int(torrent.get('state') or 0):
# Note: Smart Queue candidates paused with d.pause must be resumed with d.resume, without d.start/d.stop.
return rtorrent.resume_paused_hash(client, h)
# Note: Only optional manage_stopped uses the start path for fully stopped torrents.
return rtorrent.start_or_resume_hash(client, h)
def _verify_started_downloads(client: Any, hashes: list[str], attempts: int = 10, delay: float = 0.5) -> tuple[list[str], list[dict[str, Any]]]:
"""Verify starts after rTorrent has time to process resume/start commands."""
pending = [h for h in hashes if h]
started: list[str] = []
no_effect: list[dict[str, Any]] = []
seen_started: set[str] = set()
last_state: dict[str, dict[str, Any]] = {}
for attempt in range(max(1, attempts)):
if attempt:
time.sleep(delay)
for h in list(pending):
live = _read_live_start_state(client, h)
last_state[h] = live
if live.get('started'):
seen_started.add(h)
pending.remove(h)
if not pending:
break
started = [h for h in hashes if h in seen_started]
no_effect = [last_state.get(h, {'hash': h, 'started': False}) for h in hashes if h and h not in seen_started]
return started, no_effect
def _read_live_start_state(client: Any, torrent_hash: str) -> dict[str, Any]:
result: dict[str, Any] = {'hash': torrent_hash}
fields = (
('state', 'd.state'),
('active', 'd.is_active'),
('open', 'd.is_open'),
('priority', 'd.priority'),
('message', 'd.message'),
('label', 'd.custom1'),
)
for key, method in fields:
try:
value = client.call(method, torrent_hash)
result[key] = int(value or 0) if key in {'state', 'active', 'open', 'priority'} else str(value or '')
except Exception as exc:
result[f'{key}_error'] = str(exc)
# Note: Do not treat d.is_open or state=1 as resumed; Paused can also have those values.
# Smart Queue counts a start only after d.is_active=1, meaning the pause was actually removed.
result['started'] = bool(int(result.get('active') or 0))
return result
def _set_smart_queue_label(client: Any, torrent_hash: str, current_label: str | None = None, attempts: int = 3) -> bool:
# Note: The queue label is appended as a technical label instead of replacing the user's labels.
target = _with_smart_queue_label(current_label if current_label is not None else _read_label(client, torrent_hash, ''))
for attempt in range(max(1, attempts)):
try:
client.call('d.custom1.set', torrent_hash, target)
return True
except Exception:
if attempt < attempts - 1:
time.sleep(0.05)
return False
def _mark_auto_paused(client: Any, profile_id: int, torrent: dict[str, Any]) -> bool:
torrent_hash = str(torrent.get('hash') or '')
if not torrent_hash:
return False
previous = str(torrent.get('label') or '')
if not _has_smart_queue_label(previous):
_remember_auto_label(profile_id, torrent_hash, _without_smart_queue_label(previous))
return _set_smart_queue_label(client, torrent_hash, previous)
def _is_smart_queue_hold(torrent: dict[str, Any] | None, manage_stopped: bool = True) -> bool:
if not torrent or int(torrent.get('complete') or 0):
return False
if _has_smart_queue_label(torrent.get('label')):
return True
# Note: Paused in rTorrent usually has state=1 and active=0, so state=0 must not be required.
# This lets Smart Queue treat paused torrents as pending and fill the queue target later.
if bool(torrent.get('paused')):
return True
# Note: Fully stopped items are managed only when Use stopped torrents is enabled.
if not manage_stopped:
return False
return not int(torrent.get('state') or 0)
def _clear_untracked_smart_queue_label(client: Any, torrent_hash: str, current_label: str) -> bool:
if not _has_smart_queue_label(current_label):
return False
try:
# Note: Orphan cleanup removes only the Smart Queue technical label and keeps manual labels intact.
client.call('d.custom1.set', torrent_hash, _without_smart_queue_label(current_label))
return True
except Exception:
return False
def _cleanup_auto_labels(client: Any, profile_id: int, torrents: list[dict[str, Any]], keep_hashes: set[str], manage_stopped: bool = True) -> list[str]:
by_hash = {str(t.get('hash') or ''): t for t in torrents}
restored: list[str] = []
with connect() as conn:
rows = conn.execute('SELECT torrent_hash FROM smart_queue_auto_labels WHERE profile_id=?', (profile_id,)).fetchall()
tracked_hashes = {str(row.get('torrent_hash') or '') for row in rows if row.get('torrent_hash')}
for row in rows:
h = str(row.get('torrent_hash') or '')
t = by_hash.get(h)
if not h or h in keep_hashes:
continue
current_label = '' if t is None else str(t.get('label') or '')
if not _is_smart_queue_hold(t, manage_stopped):
if _restore_auto_label(client, profile_id, h, None if t is None else current_label):
restored.append(h)
continue
if not _has_smart_queue_label(current_label):
_set_smart_queue_label(client, h)
for h, t in by_hash.items():
if not h or h in keep_hashes or h in tracked_hashes or _is_smart_queue_hold(t, manage_stopped):
continue
if _clear_untracked_smart_queue_label(client, h, str(t.get('label') or '')):
restored.append(h)
return restored
def _is_running_download_slot(t: dict[str, Any]) -> bool:
"""Return True for incomplete torrents that already occupy a Smart Queue slot."""
# Note: The Smart Queue limit means the target number of actually active slots.
# Paused can have state=1/open=1, so a slot is counted only after d.is_active=1.
if int(t.get('complete') or 0):
return False
if _has_smart_queue_label(t.get('label')):
return False
status = str(t.get('status') or '').lower()
if status == 'checking' or status == 'paused' or bool(t.get('paused')):
return False
return bool(int(t.get('active') or 0))
def _is_waiting_download_candidate(t: dict[str, Any], manage_stopped: bool) -> bool:
"""Return True for paused/held torrents Smart Queue may resume later."""
if int(t.get('complete') or 0):
return False
if _has_smart_queue_label(t.get('label')):
return True
# Note: Paused items are the primary source for filling the queue, regardless of manage_stopped.
if bool(t.get('paused')) or str(t.get('status') or '').lower() == 'paused':
return True
# Note: Stopped items are added only when the user enabled Use stopped torrents.
return bool(manage_stopped) and not int(t.get('state') or 0)
def check(profile: dict | None = None, user_id: int | None = None, force: bool = False) -> dict[str, Any]:
profile = profile or active_profile()
if not profile:
return {'ok': False, 'error': 'No active rTorrent profile'}
user_id = user_id or default_user_id()
profile_id = int(profile['id'])
settings = get_settings(profile_id, user_id)
if not force and not int(settings.get('enabled') or 0):
restored: list[str] = []
try:
# Note: When Smart Queue is disabled, only technical labels are cleaned up, without starting or pausing torrents.
torrents = rtorrent.list_torrents(profile)
restored = _cleanup_auto_labels(rtorrent.client_for(profile), profile_id, torrents, set(), bool(settings.get('manage_stopped')))
except Exception:
restored = []
add_history(profile_id, 'skipped_disabled', [], [], 0, {'enabled': False, 'labels_restored': restored}, user_id)
return {'ok': True, 'enabled': False, 'paused': [], 'resumed': [], 'labels_restored': restored, 'message': 'Smart Queue disabled'}
torrents = rtorrent.list_torrents(profile)
excluded = _excluded_hashes(profile_id, user_id)
manage_stopped = bool(settings.get('manage_stopped'))
def is_managed_hold(t: dict[str, Any]) -> bool:
return _has_smart_queue_label(t.get('label'))
# Note: Count Smart Queue slots by d.is_active because Paused can have state=1/open=1 and must not occupy the limit.
downloading = [
t for t in torrents
if _is_running_download_slot(t)
and not is_managed_hold(t)
and t.get('hash') not in excluded
]
# Note: Candidates also include regular Paused items without a label. Otherwise the queue sees only one or two items
# and cannot fill the configured target of 100.
stopped = [
t for t in torrents
if t.get('hash') not in excluded
and _is_waiting_download_candidate(t, manage_stopped)
and not _is_running_download_slot(t)
]
min_speed = int(settings.get('min_speed_bytes') or 0)
min_seeds = int(settings.get('min_seeds') or 0)
stalled_seconds = int(settings.get('stalled_seconds') or 300)
now = utcnow()
now_ts = datetime.now(timezone.utc).timestamp()
stalled: list[dict[str, Any]] = []
with connect() as conn:
for t in downloading:
is_stalled = int(t.get('down_rate') or 0) <= min_speed and int(t.get('seeds') or 0) <= min_seeds
h = t.get('hash')
if not h:
continue
if is_stalled:
row = conn.execute('SELECT first_stalled_at FROM smart_queue_stalled WHERE profile_id=? AND torrent_hash=?', (profile_id, h)).fetchone()
if row:
conn.execute('UPDATE smart_queue_stalled SET updated_at=? WHERE profile_id=? AND torrent_hash=?', (now, profile_id, h))
first = row['first_stalled_at']
else:
first = now
conn.execute('INSERT OR REPLACE INTO smart_queue_stalled(profile_id,torrent_hash,first_stalled_at,updated_at) VALUES(?,?,?,?)', (profile_id, h, first, now))
if now_ts - _ts(first) >= stalled_seconds:
stalled.append(t)
else:
conn.execute('DELETE FROM smart_queue_stalled WHERE profile_id=? AND torrent_hash=?', (profile_id, h))
# Candidates with visible sources are preferred. Do not touch excluded torrents.
candidates = sorted(
stopped,
key=lambda t: (int(t.get('seeds') or 0), int(t.get('peers') or 0), int(t.get('down_rate') or 0)),
reverse=True,
)
max_active = max(1, int(settings.get('max_active_downloads') or 5))
stalled_hashes = {str(t.get('hash') or '') for t in stalled}
# Enforce the hard active-download cap first. The previous logic only limited
# newly resumed torrents, so already-active downloads could stay above the limit.
pause_rank = sorted(
downloading,
key=lambda t: (
0 if str(t.get('hash') or '') in stalled_hashes else 1,
int(t.get('down_rate') or 0),
int(t.get('seeds') or 0),
int(t.get('peers') or 0),
),
)
to_pause: list[dict[str, Any]] = pause_rank[:max(0, len(downloading) - max_active)]
pause_hashes = {str(t.get('hash') or '') for t in to_pause}
# Note: Stalled rotation runs only when the queue is full. When slots are missing, Smart Queue should
# first add missing items instead of pausing existing or incorrectly detected stalled items.
if candidates and len(downloading) >= max_active:
replaceable_stalled = [t for t in stalled if str(t.get('hash') or '') not in pause_hashes]
for t in replaceable_stalled[:max(0, len(candidates) - len(to_pause))]:
to_pause.append(t)
pause_hashes.add(str(t.get('hash') or ''))
active_after_pause = max(0, len(downloading) - len(to_pause))
available_slots = max(0, max_active - active_after_pause)
to_resume = candidates[:available_slots]
# Note: Items outside the current start batch are explicitly marked as pending Smart Queue items.
to_label_waiting = candidates[available_slots:]
c = rtorrent.client_for(profile)
rtorrent_cap = _ensure_rtorrent_download_cap(c, max_active)
paused: list[str] = []
resumed: list[str] = []
label_failed: list[str] = []
start_failed: list[dict[str, str]] = []
start_no_effect: list[dict[str, Any]] = []
resume_requested: list[str] = []
start_results: list[dict[str, Any]] = []
for t in to_pause:
try:
pause_result = rtorrent.pause_hash(c, t['hash'])
if not pause_result.get('ok'):
raise RuntimeError(pause_result.get('error') or 'pause failed')
if not _mark_auto_paused(c, profile_id, t):
label_failed.append(t['hash'])
paused.append(t['hash'])
except Exception:
pass
for t in to_label_waiting:
h = str(t.get('hash') or '')
if not h or h in pause_hashes:
continue
try:
if not _mark_auto_paused(c, profile_id, t):
label_failed.append(h)
except Exception:
label_failed.append(h)
# Note: Start the whole candidate batch in one round. Remove the label after an accepted RPC,
# because rTorrent may keep some items in its own queue with active=0 despite a valid d.start/d.resume.
for t in to_resume:
h = str(t.get('hash') or '')
if not h:
continue
try:
result = _start_download(c, t)
start_results.append(result)
resume_requested.append(h)
except Exception as exc:
start_failed.append({'hash': h, 'error': str(exc)})
active_verified, start_no_effect = _verify_started_downloads(c, resume_requested)
for h in active_verified:
_restore_auto_label(c, profile_id, h, None)
# Note: History shows only torrents actually unpaused, not just the number of sent commands.
resumed = list(active_verified)
keep_labels = (
set(paused)
| {str(t.get('hash') or '') for t in to_label_waiting}
| {str(t.get('hash') or '') for t in stopped if _has_smart_queue_label(t.get('label')) and str(t.get('hash') or '') not in set(resumed)}
)
restored = _cleanup_auto_labels(c, profile_id, torrents, keep_labels, manage_stopped)
details = {'excluded': len(excluded), 'enabled': bool(settings.get('enabled')), 'auto_label': SMART_QUEUE_LABEL, 'labels_restored': restored, 'labels_failed': label_failed, 'start_failed': start_failed, 'start_no_effect': start_no_effect, 'start_results': start_results, 'resume_requested': resume_requested, 'active_verified': active_verified, 'waiting_labeled': len(to_label_waiting), 'manage_stopped': manage_stopped, 'max_active_downloads': max_active, 'active_before': len(downloading), 'active_after_expected': active_after_pause + len(resumed), 'paused_planned': len(to_pause), 'resumed_planned': len(to_resume), 'rtorrent_cap': rtorrent_cap}
add_history(profile_id, 'force_check' if force else 'auto_check', paused, resumed, len(torrents), details, user_id)
return {'ok': True, 'enabled': bool(settings.get('enabled')), 'paused': paused, 'resumed': resumed, 'resume_requested': resume_requested, 'waiting_labeled': len(to_label_waiting), 'labels_restored': restored, 'labels_failed': label_failed, 'start_failed': start_failed, 'start_no_effect': start_no_effect, 'active_verified': active_verified, 'rtorrent_cap': rtorrent_cap, 'checked': len(torrents), 'excluded': len(excluded), 'settings': settings}