diff --git a/pytorrent/services/smart_queue.py b/pytorrent/services/smart_queue.py index f2f8190..4c2e6c2 100644 --- a/pytorrent/services/smart_queue.py +++ b/pytorrent/services/smart_queue.py @@ -417,6 +417,59 @@ def _ensure_stalled_label(client: Any, torrent_hash: str, current_label: str = ' except Exception: return False + +def _without_stalled_label(value: str | None) -> str: + """Return labels without Smart Queue's Stalled marker.""" + # Note: This keeps user labels intact while clearing only the automatic stalled state. + return _label_value([label for label in _label_names(value) if label.casefold() != SMART_QUEUE_STALLED_LABEL.casefold()]) + + +def _clear_stalled_label(client: Any, torrent_hash: str, current_label: str = '') -> bool: + """Remove the Stalled marker from a torrent that is active again.""" + labels = _label_names(current_label) + if not any(label.casefold() == SMART_QUEUE_STALLED_LABEL.casefold() for label in labels): + return False + try: + # Note: Active downloads must not keep the Stalled marker after they resume transferring. + client.call('d.custom1.set', torrent_hash, _without_stalled_label(current_label)) + return True + except Exception: + return False + + +def _read_int_rpc(client: Any, method: str, torrent_hash: str, default: int = 0) -> int: + """Read an integer rTorrent field without failing the queue pass.""" + # Note: Smart Queue uses this for last-second safety checks before stopping torrents. + try: + return int(client.call(method, torrent_hash) or 0) + except Exception: + return int(default) + + +def _read_live_transfer_activity(client: Any, torrent_hash: str, stalled_seconds: int) -> dict[str, Any]: + """Read live transfer activity directly from rTorrent before a stop decision.""" + # Note: The torrent list can be a stale snapshot; this live guard prevents stopping a torrent that is currently downloading. + down_rate = _read_int_rpc(client, 'd.down.rate', torrent_hash) + up_rate = _read_int_rpc(client, 'd.up.rate', torrent_hash) + last_activity = _read_int_rpc(client, 'd.timestamp.last_active', torrent_hash) + state = _read_int_rpc(client, 'd.state', torrent_hash) + active = _read_int_rpc(client, 'd.is_active', torrent_hash) + if not last_activity and (down_rate > 0 or up_rate > 0): + last_activity = int(time.time()) + recent = bool(last_activity and time.time() - last_activity < max(1, int(stalled_seconds or 0))) + transferring = bool(down_rate > 0 or up_rate > 0) + return { + 'hash': torrent_hash, + 'state': state, + 'active': active, + 'down_rate': down_rate, + 'up_rate': up_rate, + 'last_activity': last_activity, + 'recent_activity': recent, + 'transferring': transferring, + 'protected': bool((state or active) and (transferring or recent)), + } + def _remember_auto_label(profile_id: int, torrent_hash: str, previous_label: str) -> None: now = utcnow() with connect() as conn: @@ -1515,6 +1568,9 @@ def check(profile: dict | None = None, user_id: int | None = None, force: bool = ignored_seed_peer_count = 0 ignored_speed_count = 0 + live_activity_protected: list[dict[str, Any]] = [] + live_activity_protected_hashes: set[str] = set() + with connect() as conn: for t in downloading: # Note: Ignore switches keep matching criteria from advancing stalled cleanup while preserving diagnostics. @@ -1597,6 +1653,18 @@ def check(profile: dict | None = None, user_id: int | None = None, force: bool = c = rtorrent.client_for(profile) rtorrent_cap = _ensure_rtorrent_download_cap(c, max_active) + for t in downloading: + h = str(t.get('hash') or '') + if not h or h in live_activity_protected_hashes or not _has_stalled_label(str(t.get('label') or '')): + continue + live_activity = _read_live_transfer_activity(c, h, stalled_seconds) + if live_activity.get('protected'): + # Note: A torrent that resumes real transfer is removed from Stalled without waiting for another manual action. + live_activity_protected.append(live_activity) + live_activity_protected_hashes.add(h) + _clear_stalled_label(c, h, _read_label(c, h, str(t.get('label') or ''))) + with connect() as conn: + conn.execute('DELETE FROM smart_queue_stalled WHERE profile_id=? AND torrent_hash=?', (profile_id, h)) stopped_by_queue: list[str] = [] started_by_queue: list[str] = [] label_failed: list[str] = [] @@ -1610,6 +1678,17 @@ def check(profile: dict | None = None, user_id: int | None = None, force: bool = for t in to_stop: h = str(t.get('hash') or '') try: + if h in live_activity_protected_hashes: + continue + live_activity = _read_live_transfer_activity(c, h, stalled_seconds) + if live_activity.get('protected'): + # Note: A live rTorrent re-check wins over the stale queue snapshot, so active downloads are never stopped as Stalled. + live_activity_protected.append(live_activity) + live_activity_protected_hashes.add(h) + _clear_stalled_label(c, h, _read_label(c, h, str(t.get('label') or ''))) + with connect() as conn: + conn.execute('DELETE FROM smart_queue_stalled WHERE profile_id=? AND torrent_hash=?', (profile_id, h)) + continue # Note: Smart Queue stops with the same low-level d.stop command used by the manual Stop action. # This avoids extra pre-check RPCs and keeps large queues from failing after only a few items. c.call('d.stop', h) @@ -1694,6 +1773,8 @@ def check(profile: dict | None = None, user_id: int | None = None, force: bool = 'stalled_stopped_hashes': _hash_sample(stalled_stopped_hashes), 'stalled_labeled': stalled_labeled, 'protected_stalled': protected_stalled, + 'live_activity_protected': len(live_activity_protected), + 'live_activity_protected_hashes': _hash_sample([item.get('hash') for item in live_activity_protected]), 'stalled_replacement_allowed': stalled_replacement_allowed, 'excluded': len(user_excluded), 'excluded_stalled': len(stalled_label_hashes), @@ -1729,6 +1810,7 @@ def check(profile: dict | None = None, user_id: int | None = None, force: bool = 'stopped': len(stopped_by_queue), 'stalled': len(stalled), 'protected_stalled': protected_stalled, + 'live_activity_protected': len(live_activity_protected), 'stalled_stopped': len(stalled_stopped_hashes), 'stalled_stopped_hashes': _hash_sample(stalled_stopped_hashes, 20), 'stop_eligible': len(stop_eligible), @@ -1761,6 +1843,7 @@ def check(profile: dict | None = None, user_id: int | None = None, force: bool = }, 'rtorrent_cap': rtorrent_cap, 'to_stop': _diagnostics_torrents(to_stop), + 'live_activity_protected': _diagnostics_sample(live_activity_protected), 'stalled': _diagnostics_torrents(stalled), 'stop_eligible': _diagnostics_torrents(stop_eligible), 'to_start': _diagnostics_torrents(to_start), @@ -1778,4 +1861,4 @@ def check(profile: dict | None = None, user_id: int | None = None, force: bool = mark_run(profile_id, user_id) settings = get_settings(profile_id, user_id) remaining = cooldown_remaining(settings) - return {'ok': True, 'enabled': bool(settings.get('enabled')), 'paused': stopped_by_queue, 'resumed': started_by_queue, 'stopped': stopped_by_queue, 'started': started_by_queue, 'start_requested': start_requested, 'start_batch_size': start_summary['start_batch_size'], 'start_verify_attempts': start_summary['start_verify_attempts'], 'start_verify_delay_seconds': start_summary['start_verify_delay_seconds'], 'waiting_labeled': len(to_label_waiting), 'stalled_labeled': stalled_labeled, 'excluded_stalled': len(stalled_label_hashes), 'manual_labeled_running': len(manual_labeled_running), 'labels_restored': restored, 'labels_failed': label_failed, 'stop_failed': stop_failed, 'start_failed': start_failed, 'start_no_effect': start_no_effect, 'start_pending_confirmation': start_pending_confirmation, 'active_verified': active_verified, 'active_before': len(downloading), 'active_after_stop': active_after_stop, 'over_limit': over_limit, 'stop_eligible': len(stop_eligible), 'start_source_skipped': len(source_skipped), 'ignore_seed_peer': ignore_seed_peer, 'ignore_speed': ignore_speed, 'ignored_seed_peer_count': ignored_seed_peer_count if ignore_seed_peer else 0, 'ignored_speed_count': ignored_speed_count if ignore_speed else 0, 'stalled_seconds': stalled_seconds, 'stalled_timer_key': timer_key, 'stop_batch_size': stop_batch_size, 'start_grace_seconds': start_grace_seconds, 'protect_active_below_cap': protect_active_below_cap, 'prefer_partial_progress': prefer_partial_progress, 'auto_stop_idle': bool(int(settings.get('auto_stop_idle') or 0)), 'stalled_replacement_allowed': stalled_replacement_allowed, 'start_grace_protected': len(start_grace_hashes), 'replacement_capacity': replacement_capacity, 'protected_stalled': protected_stalled, 'healthy_active_protected': 0, 'rtorrent_cap': rtorrent_cap, 'checked': len(torrents), 'excluded': len(user_excluded), 'settings': settings, 'cooldown_remaining_seconds': remaining, 'surge_refill_remaining_seconds': surge_refill_remaining(settings)} + return {'ok': True, 'enabled': bool(settings.get('enabled')), 'paused': stopped_by_queue, 'resumed': started_by_queue, 'stopped': stopped_by_queue, 'started': started_by_queue, 'start_requested': start_requested, 'start_batch_size': start_summary['start_batch_size'], 'start_verify_attempts': start_summary['start_verify_attempts'], 'start_verify_delay_seconds': start_summary['start_verify_delay_seconds'], 'waiting_labeled': len(to_label_waiting), 'stalled_labeled': stalled_labeled, 'excluded_stalled': len(stalled_label_hashes), 'manual_labeled_running': len(manual_labeled_running), 'labels_restored': restored, 'labels_failed': label_failed, 'stop_failed': stop_failed, 'start_failed': start_failed, 'start_no_effect': start_no_effect, 'start_pending_confirmation': start_pending_confirmation, 'active_verified': active_verified, 'active_before': len(downloading), 'active_after_stop': active_after_stop, 'over_limit': over_limit, 'stop_eligible': len(stop_eligible), 'start_source_skipped': len(source_skipped), 'ignore_seed_peer': ignore_seed_peer, 'ignore_speed': ignore_speed, 'ignored_seed_peer_count': ignored_seed_peer_count if ignore_seed_peer else 0, 'ignored_speed_count': ignored_speed_count if ignore_speed else 0, 'stalled_seconds': stalled_seconds, 'stalled_timer_key': timer_key, 'stop_batch_size': stop_batch_size, 'start_grace_seconds': start_grace_seconds, 'protect_active_below_cap': protect_active_below_cap, 'prefer_partial_progress': prefer_partial_progress, 'auto_stop_idle': bool(int(settings.get('auto_stop_idle') or 0)), 'stalled_replacement_allowed': stalled_replacement_allowed, 'start_grace_protected': len(start_grace_hashes), 'replacement_capacity': replacement_capacity, 'protected_stalled': protected_stalled, 'healthy_active_protected': len(live_activity_protected), 'live_activity_protected': live_activity_protected, 'rtorrent_cap': rtorrent_cap, 'checked': len(torrents), 'excluded': len(user_excluded), 'settings': settings, 'cooldown_remaining_seconds': remaining, 'surge_refill_remaining_seconds': surge_refill_remaining(settings)}