from __future__ import annotations from datetime import datetime, timezone from typing import Any import json import time from ..config import SMART_QUEUE_LABEL, SMART_QUEUE_STALLED_LABEL from ..db import connect, default_user_id, utcnow from . import rtorrent from .preferences import active_profile, get_profile def _ts(value: str | None) -> float: if not value: return 0.0 try: return datetime.fromisoformat(value.replace('Z', '+00:00')).timestamp() except Exception: return 0.0 def _int_setting(data: dict[str, Any], current: dict[str, Any], key: str, default: int, minimum: int = 0) -> int: raw = data.get(key) if key in data else current.get(key) try: return max(minimum, int(raw if raw is not None and raw != '' else default)) except (TypeError, ValueError): return max(minimum, int(default)) def _default_settings(user_id: int, profile_id: int) -> dict[str, Any]: return { 'user_id': user_id, 'profile_id': profile_id, 'enabled': 0, 'max_active_downloads': 5, 'stalled_seconds': 300, 'min_speed_bytes': 1024, 'min_seeds': 1, 'min_peers': 0, 'ignore_seed_peer': 0, 'ignore_speed': 0, 'manage_stopped': 1, 'updated_at': utcnow(), } def get_settings(profile_id: int, user_id: int | None = None) -> dict[str, Any]: user_id = user_id or default_user_id() with connect() as conn: row = conn.execute( 'SELECT * FROM smart_queue_settings WHERE user_id=? AND profile_id=?', (user_id, profile_id), ).fetchone() return row or _default_settings(user_id, profile_id) def save_settings(profile_id: int, data: dict[str, Any], user_id: int | None = None) -> dict[str, Any]: user_id = user_id or default_user_id() current = get_settings(profile_id, user_id) settings = { 'enabled': 1 if data.get('enabled', current.get('enabled')) else 0, 'max_active_downloads': _int_setting(data, current, 'max_active_downloads', 5, 1), 'stalled_seconds': _int_setting(data, current, 'stalled_seconds', 300, 30), 'min_speed_bytes': _int_setting(data, current, 'min_speed_bytes', 0, 0), 'min_seeds': _int_setting(data, current, 'min_seeds', 0, 0), # Note: Min peers is optional; when set, stalled detection requires low speed, low seeds and low peers. 'min_peers': _int_setting(data, current, 'min_peers', 0, 0), # Note: Ignore seed/peer removes source counts from stalled detection, useful when sources appear rarely. 'ignore_seed_peer': 1 if data.get('ignore_seed_peer', current.get('ignore_seed_peer')) else 0, # Note: Ignore speed removes low transfer rate from stalled detection; with both ignores enabled only stalled_seconds matters. 'ignore_speed': 1 if data.get('ignore_speed', current.get('ignore_speed')) else 0, # Note: Compatibility field retained; enabled Smart Queue always manages stopped torrents and never manages user-paused torrents. 'manage_stopped': 1, } now = utcnow() with connect() as conn: conn.execute( '''INSERT INTO smart_queue_settings(user_id,profile_id,enabled,max_active_downloads,stalled_seconds,min_speed_bytes,min_seeds,min_peers,ignore_seed_peer,ignore_speed,manage_stopped,updated_at) VALUES(?,?,?,?,?,?,?,?,?,?,?,?) ON CONFLICT(user_id, profile_id) DO UPDATE SET enabled=excluded.enabled, max_active_downloads=excluded.max_active_downloads, stalled_seconds=excluded.stalled_seconds, min_speed_bytes=excluded.min_speed_bytes, min_seeds=excluded.min_seeds, min_peers=excluded.min_peers, ignore_seed_peer=excluded.ignore_seed_peer, ignore_speed=excluded.ignore_speed, manage_stopped=excluded.manage_stopped, updated_at=excluded.updated_at''', (user_id, profile_id, settings['enabled'], settings['max_active_downloads'], settings['stalled_seconds'], settings['min_speed_bytes'], settings['min_seeds'], settings['min_peers'], settings['ignore_seed_peer'], settings['ignore_speed'], settings['manage_stopped'], now), ) return get_settings(profile_id, user_id) def list_exclusions(profile_id: int, user_id: int | None = None) -> list[dict[str, Any]]: user_id = user_id or default_user_id() with connect() as conn: return conn.execute( 'SELECT * FROM smart_queue_exclusions WHERE user_id=? AND profile_id=? ORDER BY created_at DESC', (user_id, profile_id), ).fetchall() def set_exclusion(profile_id: int, torrent_hash: str, excluded: bool, reason: str = '', user_id: int | None = None) -> None: user_id = user_id or default_user_id() now = utcnow() with connect() as conn: if excluded: conn.execute( 'INSERT OR REPLACE INTO smart_queue_exclusions(user_id,profile_id,torrent_hash,reason,created_at) VALUES(?,?,?,?,?)', (user_id, profile_id, torrent_hash, reason, now), ) else: conn.execute( 'DELETE FROM smart_queue_exclusions WHERE user_id=? AND profile_id=? AND torrent_hash=?', (user_id, profile_id, torrent_hash), ) def add_history(profile_id: int, event: str, paused: list[str] | None = None, resumed: list[str] | None = None, checked: int = 0, details: dict[str, Any] | None = None, user_id: int | None = None) -> None: user_id = user_id or default_user_id() paused = paused or [] resumed = resumed or [] details = details or {} with connect() as conn: conn.execute( 'INSERT INTO smart_queue_history(user_id,profile_id,event,paused_count,resumed_count,checked_count,details_json,created_at) VALUES(?,?,?,?,?,?,?,?)', (user_id, profile_id, event, len(paused), len(resumed), int(checked or 0), json.dumps({**details, 'paused': paused, 'resumed': resumed}), utcnow()), ) def list_history(profile_id: int, user_id: int | None = None, limit: int = 30) -> list[dict[str, Any]]: user_id = user_id or default_user_id() with connect() as conn: return conn.execute( 'SELECT * FROM smart_queue_history WHERE user_id=? AND profile_id=? ORDER BY created_at DESC LIMIT ?', (user_id, profile_id, max(1, min(int(limit or 30), 100))), ).fetchall() def count_history(profile_id: int, user_id: int | None = None) -> int: user_id = user_id or default_user_id() with connect() as conn: row = conn.execute( 'SELECT COUNT(*) AS count FROM smart_queue_history WHERE user_id=? AND profile_id=?', (user_id, profile_id), ).fetchone() return int((row or {}).get('count') or 0) def _excluded_hashes(profile_id: int, user_id: int) -> set[str]: return {r['torrent_hash'] for r in list_exclusions(profile_id, user_id)} def _label_names(value: str | None) -> list[str]: names: list[str] = [] for part in str(value or '').replace(';', ',').replace('|', ',').split(','): label = part.strip() if label and label not in names: names.append(label) return names def _label_value(labels: list[str]) -> str: output: list[str] = [] for label in labels: item = str(label or '').strip() if item and item not in output: output.append(item) return ', '.join(output) def _has_smart_queue_label(value: str | None) -> bool: return SMART_QUEUE_LABEL in _label_names(value) def _without_smart_queue_label(value: str | None) -> str: return _label_value([label for label in _label_names(value) if label != SMART_QUEUE_LABEL]) def _has_stalled_label(value: str | None) -> bool: # Note: Stalled is treated case-insensitively so manually edited labels still block Smart Queue. target = SMART_QUEUE_STALLED_LABEL.casefold() return any(label.casefold() == target for label in _label_names(value)) def _without_queue_technical_labels(value: str | None) -> str: return _label_value([label for label in _label_names(value) if label != SMART_QUEUE_LABEL]) def _ensure_stalled_label(client: Any, torrent_hash: str, current_label: str = '') -> bool: labels = [label for label in _label_names(current_label) if label != SMART_QUEUE_LABEL] changed = False if not any(label.casefold() == SMART_QUEUE_STALLED_LABEL.casefold() for label in labels): labels.append(SMART_QUEUE_STALLED_LABEL) changed = True if SMART_QUEUE_LABEL in _label_names(current_label): changed = True if not changed: return True try: # Note: Stalled marking is idempotent; it adds Stalled and removes only the Smart Queue technical marker. client.call('d.custom1.set', torrent_hash, _label_value(labels)) return True except Exception: return False def _remember_auto_label(profile_id: int, torrent_hash: str, previous_label: str) -> None: now = utcnow() with connect() as conn: row = conn.execute( 'SELECT previous_label FROM smart_queue_auto_labels WHERE profile_id=? AND torrent_hash=?', (profile_id, torrent_hash), ).fetchone() if row: conn.execute( 'UPDATE smart_queue_auto_labels SET updated_at=? WHERE profile_id=? AND torrent_hash=?', (now, profile_id, torrent_hash), ) else: conn.execute( 'INSERT INTO smart_queue_auto_labels(profile_id,torrent_hash,previous_label,created_at,updated_at) VALUES(?,?,?,?,?)', (profile_id, torrent_hash, previous_label, now, now), ) def _read_label(client: Any, torrent_hash: str, fallback: str = '') -> str: try: return str(client.call('d.custom1', torrent_hash) or '') except Exception: return fallback def _restore_auto_label(client: Any, profile_id: int, torrent_hash: str, current_label: str | None = None) -> bool: with connect() as conn: row = conn.execute( 'SELECT previous_label FROM smart_queue_auto_labels WHERE profile_id=? AND torrent_hash=?', (profile_id, torrent_hash), ).fetchone() live_label = _read_label(client, torrent_hash, current_label or '') if not row: if not _has_smart_queue_label(live_label): return False try: # Note: Remove only the Smart Queue technical label and keep every user label untouched. client.call('d.custom1.set', torrent_hash, _without_smart_queue_label(live_label)) return True except Exception: return False try: # Note: Starting a torrent removes only Smart Queue's technical marker, so labels added while stopped stay untouched. if _has_smart_queue_label(live_label): client.call('d.custom1.set', torrent_hash, _without_smart_queue_label(live_label)) conn.execute('DELETE FROM smart_queue_auto_labels WHERE profile_id=? AND torrent_hash=?', (profile_id, torrent_hash)) return True except Exception: return False def _call_rtorrent_setter(client: Any, method: str, value: int) -> bool: """Set a scalar rTorrent setting while tolerating XMLRPC signature differences.""" for args in ((int(value),), ('', int(value))): try: client.call(method, *args) return True except Exception: continue return False def _ensure_rtorrent_download_cap(client: Any, max_active: int) -> dict[str, Any]: """Raise rTorrent download caps that can silently limit Smart Queue to one item.""" result: dict[str, Any] = {'checked': False, 'updated': False, 'items': []} # Note: rTorrent may have separate global and per-throttle limits. When div=1, # starts can effectively stop at one active torrent even when the target is 100. for key in ('throttle.max_downloads.global', 'throttle.max_downloads.div'): item: dict[str, Any] = {'key': key, 'checked': False, 'updated': False} try: current = int(client.call(key) or 0) item.update({'checked': True, 'current': current, 'target': int(max_active)}) result['checked'] = True # Note: 0 means unlimited; raise only positive limits lower than the target. if 0 < current < max_active: ok = _call_rtorrent_setter(client, f'{key}.set', int(max_active)) item['updated'] = ok if ok: result['updated'] = True item['new'] = int(max_active) result.setdefault('current', current) result['new'] = int(max_active) except Exception as exc: item.update({'error': str(exc)}) result['items'].append(item) return result def _start_download(client: Any, torrent: dict[str, Any]) -> dict[str, Any]: """Start only stopped Smart Queue candidates; paused torrents are a user decision.""" h = str(torrent.get('hash') or '') if not h: return {'hash': h, 'ok': False, 'error': 'missing hash'} if _is_user_paused(torrent): # Note: Smart Queue never unpauses user-paused torrents; it manages only stopped items. return {'hash': h, 'ok': False, 'skipped': 'user_paused'} # Note: This is the same helper used by the manual Start action, so queue starts follow the UI path. # Note: Smart Queue uses the same helper as the manual Start action, so start behavior stays identical. return rtorrent.start_or_resume_hash(client, h) def _verify_started_downloads(client: Any, hashes: list[str], attempts: int = 10, delay: float = 0.5) -> tuple[list[str], list[dict[str, Any]]]: """Verify starts after rTorrent has time to process manual-equivalent start commands.""" pending = [h for h in hashes if h] started: list[str] = [] no_effect: list[dict[str, Any]] = [] seen_started: set[str] = set() last_state: dict[str, dict[str, Any]] = {} for attempt in range(max(1, attempts)): if attempt: time.sleep(delay) for h in list(pending): live = _read_live_start_state(client, h) last_state[h] = live if live.get('started'): seen_started.add(h) pending.remove(h) if not pending: break started = [h for h in hashes if h in seen_started] no_effect = [last_state.get(h, {'hash': h, 'started': False}) for h in hashes if h and h not in seen_started] return started, no_effect def _read_live_start_state(client: Any, torrent_hash: str) -> dict[str, Any]: result: dict[str, Any] = {'hash': torrent_hash} fields = ( ('state', 'd.state'), ('active', 'd.is_active'), ('open', 'd.is_open'), ('priority', 'd.priority'), ('message', 'd.message'), ('label', 'd.custom1'), ) for key, method in fields: try: value = client.call(method, torrent_hash) result[key] = int(value or 0) if key in {'state', 'active', 'open', 'priority'} else str(value or '') except Exception as exc: result[f'{key}_error'] = str(exc) # Note: Manual Start in rTorrent is successful when d.state becomes 1. # d.is_active can stay 0 for queued/idle downloads, so it must not be used as the only success check. result['started'] = bool(int(result.get('state') or 0) or int(result.get('active') or 0)) return result def _is_user_paused(torrent: dict[str, Any]) -> bool: """Return True for torrents paused by the user; Smart Queue must not touch them.""" status = str(torrent.get('status') or '').lower() return bool(torrent.get('paused')) or status == 'paused' def _set_smart_queue_label(client: Any, torrent_hash: str, current_label: str = '', attempts: int = 3) -> bool: labels = _label_names(current_label) if SMART_QUEUE_LABEL in labels: return True labels.append(SMART_QUEUE_LABEL) value = _label_value(labels) for attempt in range(max(1, attempts)): try: # Note: Smart Queue appends its technical label instead of overwriting existing torrent labels. client.call('d.custom1.set', torrent_hash, value) return True except Exception: if attempt < attempts - 1: time.sleep(0.05) return False def _mark_auto_stopped(client: Any, profile_id: int, torrent: dict[str, Any]) -> bool: torrent_hash = str(torrent.get('hash') or '') if not torrent_hash: return False previous = str(torrent.get('label') or '') if not _has_smart_queue_label(previous): _remember_auto_label(profile_id, torrent_hash, previous) return _set_smart_queue_label(client, torrent_hash, previous) def _is_started_download_slot(torrent: dict[str, Any] | None) -> bool: """Return True for incomplete torrents already started in rTorrent, including manual starts.""" if not torrent or int(torrent.get('complete') or 0): return False status = str(torrent.get('status') or '').lower() if status == 'checking': return False # Note: Manual Start changes d.state first; d.is_active may stay 0 while rTorrent is queued or idle. return bool(int(torrent.get('state') or 0) or int(torrent.get('active') or 0)) def _is_smart_queue_hold(torrent: dict[str, Any] | None, manage_stopped: bool = True) -> bool: if not torrent or int(torrent.get('complete') or 0): return False if _is_started_download_slot(torrent): # Note: A manual start can leave the Smart Queue label behind; started items are active slots, not holds. return False if _has_stalled_label(str(torrent.get('label') or '')): return False if _is_user_paused(torrent): # Note: Paused torrents are always treated as user-controlled and are not Smart Queue holds. return False if _has_smart_queue_label(str(torrent.get('label') or '')): return True # Note: Smart Queue manages stopped torrents by default; the old manage_stopped flag is ignored for compatibility. return not int(torrent.get('state') or 0) def _clear_untracked_smart_queue_label(client: Any, torrent_hash: str, current_label: str) -> bool: if not _has_smart_queue_label(current_label): return False try: # Note: Clear only the orphaned Smart Queue marker and keep unrelated labels intact. client.call('d.custom1.set', torrent_hash, _without_smart_queue_label(current_label)) return True except Exception: return False def _cleanup_auto_labels(client: Any, profile_id: int, torrents: list[dict[str, Any]], keep_hashes: set[str], manage_stopped: bool = True) -> list[str]: by_hash = {str(t.get('hash') or ''): t for t in torrents} restored: list[str] = [] with connect() as conn: rows = conn.execute('SELECT torrent_hash FROM smart_queue_auto_labels WHERE profile_id=?', (profile_id,)).fetchall() tracked_hashes = {str(row.get('torrent_hash') or '') for row in rows if row.get('torrent_hash')} for row in rows: h = str(row.get('torrent_hash') or '') t = by_hash.get(h) if not h or h in keep_hashes: continue current_label = '' if t is None else str(t.get('label') or '') if not _is_smart_queue_hold(t, manage_stopped): if _restore_auto_label(client, profile_id, h, None if t is None else current_label): restored.append(h) continue if not _has_smart_queue_label(current_label): _set_smart_queue_label(client, h, current_label) for h, t in by_hash.items(): if not h or h in keep_hashes or h in tracked_hashes or _is_smart_queue_hold(t, manage_stopped): continue if _clear_untracked_smart_queue_label(client, h, str(t.get('label') or '')): restored.append(h) return restored def _is_running_download_slot(t: dict[str, Any]) -> bool: """Return True for incomplete torrents that already occupy a Smart Queue slot.""" # Note: Do not exclude Smart Queue/Stalled labels here. Manual Start can leave old labels, # and those torrents still must count toward the global Smart Queue limit. return _is_started_download_slot(t) def _is_stalled_download(t: dict[str, Any], min_speed: int, min_seeds: int, min_peers: int, ignore_seed_peer: bool, ignore_speed: bool) -> bool: """Return True when a started torrent should begin or continue the stalled timer.""" # Note: Each ignore switch only removes its own criterion; the stalled timer is still respected after criteria match. speed_ok = True if ignore_speed else int(t.get('down_rate') or 0) <= max(0, int(min_speed or 0)) source_ok = True if ignore_seed_peer else int(t.get('seeds') or 0) <= max(0, int(min_seeds or 0)) and (min_peers <= 0 or int(t.get('peers') or 0) <= min_peers) return speed_ok and source_ok def _stalled_timer_key(min_speed: int, min_seeds: int, min_peers: int, stalled_seconds: int, ignore_seed_peer: bool, ignore_speed: bool) -> str: """Return a stable key for the stalled rules that started the current timer.""" # Note: Changing ignore switches or thresholds restarts existing stalled timers instead of reusing old rows. return f"v2|speed={int(min_speed or 0)}|seeds={int(min_seeds or 0)}|peers={int(min_peers or 0)}|seconds={int(stalled_seconds or 0)}|ignore_sources={int(bool(ignore_seed_peer))}|ignore_speed={int(bool(ignore_speed))}" def _is_low_activity_download(t: dict[str, Any], min_speed: int, min_seeds: int, min_peers: int, ignore_seed_peer: bool = False, ignore_speed: bool = False) -> bool: """Return True when a started torrent is weak and should be stopped first.""" # Note: Stop priority uses only criteria that are not ignored, so disabled criteria cannot stop torrents earlier. low_speed = False if ignore_speed else int(t.get('down_rate') or 0) <= max(0, int(min_speed or 0)) low_seeds = False if ignore_seed_peer else int(t.get('seeds') or 0) <= max(0, int(min_seeds or 0)) low_peers = False if ignore_seed_peer or min_peers <= 0 else int(t.get('peers') or 0) <= max(0, int(min_peers or 0)) return low_speed or low_seeds or low_peers def _is_waiting_download_candidate(t: dict[str, Any], manage_stopped: bool) -> bool: """Return True for stopped torrents Smart Queue may start later.""" if int(t.get('complete') or 0): return False if str(t.get('status') or '').lower() == 'checking': # Note: Torrents still being checked must finish post-check handling before Smart Queue may start them. return False if _has_stalled_label(str(t.get('label') or '')): return False if _is_user_paused(t): # Note: User-paused torrents are never candidates, even when they have no Smart Queue label. return False if _has_smart_queue_label(str(t.get('label') or '')): return True # Note: Enabled Smart Queue manages all stopped torrents; no separate stopped-torrent switch is needed. return not int(t.get('state') or 0) def check(profile: dict | None = None, user_id: int | None = None, force: bool = False) -> dict[str, Any]: profile = profile or active_profile() if not profile: return {'ok': False, 'error': 'No active rTorrent profile'} user_id = user_id or default_user_id() profile_id = int(profile['id']) settings = get_settings(profile_id, user_id) if not force and not int(settings.get('enabled') or 0): restored: list[str] = [] try: # Note: When Smart Queue is disabled, only technical labels are cleaned up, without starting or pausing torrents. torrents = rtorrent.list_torrents(profile) restored = _cleanup_auto_labels(rtorrent.client_for(profile), profile_id, torrents, set(), True) except Exception: restored = [] add_history(profile_id, 'skipped_disabled', [], [], 0, {'enabled': False, 'labels_restored': restored}, user_id) return {'ok': True, 'enabled': False, 'paused': [], 'resumed': [], 'stopped': [], 'started': [], 'labels_restored': restored, 'message': 'Smart Queue disabled'} torrents = rtorrent.list_torrents(profile) # Note: Stalled labels block automatic starting only; a manually started Stalled item still counts as a running slot. stalled_label_hashes = {str(t.get('hash') or '') for t in torrents if _has_stalled_label(str(t.get('label') or '')) and t.get('hash')} user_excluded = _excluded_hashes(profile_id, user_id) manage_stopped = True # Note: Count every started incomplete torrent, including items started manually and items with old Smart Queue labels. downloading = [ t for t in torrents if _is_running_download_slot(t) and str(t.get('hash') or '') not in user_excluded ] # Note: Waiting candidates are stopped queue holds only; Stalled labels are not auto-started again. stopped = [ t for t in torrents if str(t.get('hash') or '') not in user_excluded and str(t.get('hash') or '') not in stalled_label_hashes and _is_waiting_download_candidate(t, manage_stopped) and not _is_running_download_slot(t) ] manual_labeled_running = [ str(t.get('hash') or '') for t in downloading if str(t.get('hash') or '') and _has_smart_queue_label(str(t.get('label') or '')) ] min_speed = int(settings.get('min_speed_bytes') or 0) min_seeds = int(settings.get('min_seeds') or 0) min_peers = int(settings.get('min_peers') or 0) ignore_seed_peer = bool(int(settings.get('ignore_seed_peer') or 0)) ignore_speed = bool(int(settings.get('ignore_speed') or 0)) stalled_seconds = int(settings.get('stalled_seconds') or 300) timer_key = _stalled_timer_key(min_speed, min_seeds, min_peers, stalled_seconds, ignore_seed_peer, ignore_speed) now = utcnow() now_ts = datetime.now(timezone.utc).timestamp() stalled: list[dict[str, Any]] = [] stop_eligible: list[dict[str, Any]] = [] # Note: Toast diagnostics count active torrents whose ignored criteria would otherwise match during this check. ignored_seed_peer_count = 0 ignored_speed_count = 0 with connect() as conn: for t in downloading: # Note: Stalled detection respects seed/peer and speed ignore switches before starting the timer. if ignore_seed_peer and (int(t.get('seeds') or 0) <= max(0, int(min_seeds or 0)) or (min_peers > 0 and int(t.get('peers') or 0) <= max(0, int(min_peers or 0)))): ignored_seed_peer_count += 1 if ignore_speed and int(t.get('down_rate') or 0) <= max(0, int(min_speed or 0)): ignored_speed_count += 1 is_stalled = _is_stalled_download(t, min_speed, min_seeds, min_peers, ignore_seed_peer, ignore_speed) # Note: Hard-limit enforcement respects the same ignore switches before choosing weak items. if _is_low_activity_download(t, min_speed, min_seeds, min_peers, ignore_seed_peer, ignore_speed): stop_eligible.append(t) h = t.get('hash') if not h: continue if is_stalled: row = conn.execute('SELECT first_stalled_at, timer_key FROM smart_queue_stalled WHERE profile_id=? AND torrent_hash=?', (profile_id, h)).fetchone() if row and str(row.get('timer_key') or '') == timer_key: conn.execute('UPDATE smart_queue_stalled SET updated_at=? WHERE profile_id=? AND torrent_hash=?', (now, profile_id, h)) first = row['first_stalled_at'] else: # Note: A changed stalled rule starts a fresh timer, so old rows cannot instantly mark torrents as Stalled. first = now conn.execute('INSERT OR REPLACE INTO smart_queue_stalled(profile_id,torrent_hash,first_stalled_at,updated_at,timer_key) VALUES(?,?,?,?,?)', (profile_id, h, first, now, timer_key)) if now_ts - _ts(first) >= stalled_seconds: stalled.append(t) else: conn.execute('DELETE FROM smart_queue_stalled WHERE profile_id=? AND torrent_hash=?', (profile_id, h)) # Candidates with visible sources are preferred. Do not touch excluded torrents. candidates = sorted( stopped, key=lambda t: (int(t.get('seeds') or 0), int(t.get('peers') or 0), int(t.get('down_rate') or 0)), reverse=True, ) max_active = max(1, int(settings.get('max_active_downloads') or 5)) stalled_hashes = {str(t.get('hash') or '') for t in stalled} # Enforce the hard active-download cap across the whole started queue, including manual starts. # Note: Weak/no-source torrents are stopped first, but the cap is still enforced when the overflow is larger. over_limit = max(0, len(downloading) - max_active) stop_eligible_hashes = {str(t.get('hash') or '') for t in stop_eligible} stop_rank = sorted( downloading, key=lambda t: ( 0 if str(t.get('hash') or '') in stalled_hashes else 1, 0 if str(t.get('hash') or '') in stop_eligible_hashes else 1, int(t.get('down_rate') or 0), int(t.get('seeds') or 0), int(t.get('peers') or 0), ), ) to_stop: list[dict[str, Any]] = stop_rank[:over_limit] stop_hashes = {str(t.get('hash') or '') for t in to_stop} # Note: Confirmed stalled downloads are removed from the active queue immediately, then new candidates can fill those slots. for t in stalled: h = str(t.get('hash') or '') if h and h not in stop_hashes: to_stop.append(t) stop_hashes.add(h) c = rtorrent.client_for(profile) rtorrent_cap = _ensure_rtorrent_download_cap(c, max_active) stopped_by_queue: list[str] = [] started_by_queue: list[str] = [] label_failed: list[str] = [] stalled_labeled: list[str] = [] stop_failed: list[dict[str, str]] = [] start_failed: list[dict[str, str]] = [] start_no_effect: list[dict[str, Any]] = [] start_requested: list[str] = [] start_results: list[dict[str, Any]] = [] for t in to_stop: h = str(t.get('hash') or '') try: # Note: Smart Queue stops with the same low-level d.stop command used by the manual Stop action. # This avoids extra pre-check RPCs and keeps large queues from failing after only a few items. c.call('d.stop', h) if h in stalled_hashes: if _ensure_stalled_label(c, h, _read_label(c, h, str(t.get('label') or ''))): stalled_labeled.append(h) else: label_failed.append(h) elif not _mark_auto_stopped(c, profile_id, t): label_failed.append(h) stopped_by_queue.append(h) except Exception as exc: # Note: Stop failures are stored in history instead of being swallowed, so queue drift is visible. stop_failed.append({'hash': h, 'error': str(exc)}) active_after_stop = max(0, len(downloading) - len(stopped_by_queue)) # Note: Starts are planned only after confirmed stops, so failed stops cannot push the queue above the cap. available_slots = max(0, max_active - active_after_stop) to_start = candidates[:available_slots] # Note: Items outside the current start batch are explicitly marked as pending Smart Queue items. to_label_waiting = candidates[available_slots:] for t in to_label_waiting: h = str(t.get('hash') or '') if not h or h in stop_hashes: continue try: if not _mark_auto_stopped(c, profile_id, t): label_failed.append(h) except Exception: label_failed.append(h) # Note: Start the whole candidate batch in one round. Remove the label after an accepted RPC, # because rTorrent may keep some items in its own queue with active=0 despite a valid d.start/d.resume. for t in to_start: h = str(t.get('hash') or '') if not h: continue try: result = _start_download(c, t) start_results.append(result) start_requested.append(h) except Exception as exc: start_failed.append({'hash': h, 'error': str(exc)}) active_verified, start_no_effect = _verify_started_downloads(c, start_requested) for h in active_verified: _restore_auto_label(c, profile_id, h, None) try: # Note: Once Smart Queue starts a post-check torrent, its temporary download-after-check label is no longer needed. rtorrent.clear_post_check_download_label(c, h, None) except Exception: label_failed.append(h) # Note: History shows only torrents actually started, not just the number of sent commands. started_by_queue = list(active_verified) keep_labels = ( set(stopped_by_queue) | {str(t.get('hash') or '') for t in to_label_waiting} | {str(t.get('hash') or '') for t in stopped if _has_smart_queue_label(str(t.get('label') or '')) and str(t.get('hash') or '') not in set(started_by_queue)} ) restored = _cleanup_auto_labels(c, profile_id, torrents, keep_labels, manage_stopped) details = {'excluded': len(user_excluded), 'excluded_stalled': len(stalled_label_hashes), 'manual_labeled_running': len(manual_labeled_running), 'manual_labeled_running_hashes': manual_labeled_running[:100], 'enabled': bool(settings.get('enabled')), 'auto_label': SMART_QUEUE_LABEL, 'stalled_label': SMART_QUEUE_STALLED_LABEL, 'stalled_labeled': stalled_labeled, 'labels_restored': restored, 'labels_failed': label_failed, 'stop_failed': stop_failed, 'start_failed': start_failed, 'start_no_effect': start_no_effect, 'start_results': start_results, 'start_requested': start_requested, 'active_verified': active_verified, 'waiting_labeled': len(to_label_waiting), 'manage_stopped': True, 'max_active_downloads': max_active, 'active_before': len(downloading), 'active_after_stop': active_after_stop, 'active_after_expected': active_after_stop + len(started_by_queue), 'over_limit': over_limit, 'stop_eligible': len(stop_eligible), 'ignore_seed_peer': ignore_seed_peer, 'ignore_speed': ignore_speed, 'ignored_seed_peer_count': ignored_seed_peer_count if ignore_seed_peer else 0, 'ignored_speed_count': ignored_speed_count if ignore_speed else 0, 'stalled_seconds': stalled_seconds, 'stalled_timer_key': timer_key, 'healthy_active_protected': 0, 'stopped_planned': len(to_stop), 'started_planned': len(to_start), 'paused_planned': len(to_stop), 'resumed_planned': len(to_start), 'rtorrent_cap': rtorrent_cap} add_history(profile_id, 'force_check' if force else 'auto_check', stopped_by_queue, started_by_queue, len(torrents), {**details, 'stopped': stopped_by_queue, 'started': started_by_queue}, user_id) return {'ok': True, 'enabled': bool(settings.get('enabled')), 'paused': stopped_by_queue, 'resumed': started_by_queue, 'stopped': stopped_by_queue, 'started': started_by_queue, 'start_requested': start_requested, 'waiting_labeled': len(to_label_waiting), 'stalled_labeled': stalled_labeled, 'excluded_stalled': len(stalled_label_hashes), 'manual_labeled_running': len(manual_labeled_running), 'labels_restored': restored, 'labels_failed': label_failed, 'stop_failed': stop_failed, 'start_failed': start_failed, 'start_no_effect': start_no_effect, 'active_verified': active_verified, 'active_before': len(downloading), 'active_after_stop': active_after_stop, 'over_limit': over_limit, 'stop_eligible': len(stop_eligible), 'ignore_seed_peer': ignore_seed_peer, 'ignore_speed': ignore_speed, 'ignored_seed_peer_count': ignored_seed_peer_count if ignore_seed_peer else 0, 'ignored_speed_count': ignored_speed_count if ignore_speed else 0, 'stalled_seconds': stalled_seconds, 'stalled_timer_key': timer_key, 'healthy_active_protected': 0, 'rtorrent_cap': rtorrent_cap, 'checked': len(torrents), 'excluded': len(user_excluded), 'settings': settings}