From 45cb6cbb3a9fca4ceb1eae882018117221a383e5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mateusz=20Gruszczy=C5=84ski?= Date: Tue, 5 May 2026 18:52:34 +0200 Subject: [PATCH] fix queue --- pytorrent/services/smart_queue.py | 186 ++++++++++++++++++++---------- 1 file changed, 123 insertions(+), 63 deletions(-) diff --git a/pytorrent/services/smart_queue.py b/pytorrent/services/smart_queue.py index 5d3480d..42e658d 100644 --- a/pytorrent/services/smart_queue.py +++ b/pytorrent/services/smart_queue.py @@ -395,6 +395,51 @@ def _is_waiting_download_candidate(t: dict[str, Any], manage_stopped: bool) -> b return bool(manage_stopped) and not int(t.get('state') or 0) +def _smart_queue_hold_state(profile_id: int, torrents: list[dict[str, Any]], stalled_seconds: int, now: str, now_ts: float) -> tuple[set[str], set[str], set[str]]: + """Return pending, expired and all Smart Queue technical holds.""" + technical_holds = { + str(t.get('hash') or '') + for t in torrents + if str(t.get('hash') or '') and str(t.get('label') or '') == SMART_QUEUE_LABEL and not int(t.get('complete') or 0) + } + pending: set[str] = set() + expired: set[str] = set() + if not technical_holds: + return pending, expired, technical_holds + with connect() as conn: + for h in technical_holds: + row = conn.execute( + 'SELECT first_stalled_at FROM smart_queue_stalled WHERE profile_id=? AND torrent_hash=?', + (profile_id, h), + ).fetchone() + if not row: + # Note: Label Smart Queue bez timera traktujemy jako świeżą próbę, żeby pierwszy cykl nie wymieniał go natychmiast. + conn.execute( + 'INSERT OR REPLACE INTO smart_queue_stalled(profile_id,torrent_hash,first_stalled_at,updated_at) VALUES(?,?,?,?)', + (profile_id, h, now, now), + ) + pending.add(h) + continue + first = row['first_stalled_at'] + conn.execute('UPDATE smart_queue_stalled SET updated_at=? WHERE profile_id=? AND torrent_hash=?', (now, profile_id, h)) + if now_ts - _ts(first) >= stalled_seconds: + expired.add(h) + else: + pending.add(h) + return pending, expired, technical_holds + + +def _remember_resume_attempt(profile_id: int, torrent_hash: str, now: str) -> None: + if not torrent_hash: + return + with connect() as conn: + # Note: Timer próby resume jest używany jako karencja przed wymianą nieaktywnego torrenta. + conn.execute( + 'INSERT OR REPLACE INTO smart_queue_stalled(profile_id,torrent_hash,first_stalled_at,updated_at) VALUES(?,?,?,?)', + (profile_id, torrent_hash, now, now), + ) + + def check(profile: dict | None = None, user_id: int | None = None, force: bool = False) -> dict[str, Any]: profile = profile or active_profile() if not profile: @@ -460,13 +505,27 @@ def check(profile: dict | None = None, user_id: int | None = None, force: bool = else: conn.execute('DELETE FROM smart_queue_stalled WHERE profile_id=? AND torrent_hash=?', (profile_id, h)) + max_active = max(1, int(settings.get('max_active_downloads') or 5)) + pending_holds, expired_holds, technical_holds = _smart_queue_hold_state(profile_id, stopped, stalled_seconds, now, now_ts) + # Candidates with visible sources are preferred. Do not touch excluded torrents. - candidates = sorted( - stopped, + fresh_candidates = sorted( + [ + t for t in stopped + if str(t.get('hash') or '') + and str(t.get('hash') or '') not in pending_holds + and str(t.get('hash') or '') not in expired_holds + ], key=lambda t: (int(t.get('seeds') or 0), int(t.get('peers') or 0), int(t.get('down_rate') or 0)), reverse=True, ) - max_active = max(1, int(settings.get('max_active_downloads') or 5)) + expired_candidates = sorted( + [t for t in stopped if str(t.get('hash') or '') in expired_holds], + key=lambda t: (int(t.get('seeds') or 0), int(t.get('peers') or 0), int(t.get('down_rate') or 0)), + reverse=True, + ) + # Note: Najpierw dobieramy nowe pozycje, a dopiero gdy nie ma alternatyw, ponawiamy stare nieaktywne próby. + candidates = fresh_candidates + expired_candidates stalled_hashes = {str(t.get('hash') or '') for t in stalled} # Enforce the hard active-download cap first. The previous logic only limited @@ -483,19 +542,25 @@ def check(profile: dict | None = None, user_id: int | None = None, force: bool = to_pause: list[dict[str, Any]] = pause_rank[:max(0, len(downloading) - max_active)] pause_hashes = {str(t.get('hash') or '') for t in to_pause} - # Note: Stalled jest wymieniany nie tylko przy pelnej kolejce. Najpierw wypelniamy wolne sloty, - # a dopiero nadmiar kandydatow zuzywamy na rotacje martwych/stalled pobran. - free_slots_before_pause = max(0, max_active - max(0, len(downloading) - len(to_pause))) - stalled_rotation_slots = max(0, len(candidates) - free_slots_before_pause) + # Note: Nieaktywne próby resume trzymamy jako zajęte sloty przez stalled_seconds. + # Dzięki temu jeden cykl odpala pełną kolejkę, a wymiana następuje dopiero po czasie kontrolnym. + protected_holds = {h for h in pending_holds if h and h not in pause_hashes and h not in excluded} + active_after_pause = max(0, len(downloading) - len(to_pause)) + effective_slots_after_pause = active_after_pause + len(protected_holds) + + # Note: Stalled wymieniamy tylko wtedy, gdy jest czym go zastąpić i po uwzględnieniu slotów w karencji. + replacement_capacity = max(0, len(candidates) - max(0, max_active - effective_slots_after_pause)) for t in stalled: h = str(t.get('hash') or '') - if not h or h in pause_hashes or stalled_rotation_slots <= 0: + if not h or h in pause_hashes or replacement_capacity <= 0: continue to_pause.append(t) pause_hashes.add(h) - stalled_rotation_slots -= 1 + replacement_capacity -= 1 active_after_pause = max(0, len(downloading) - len(to_pause)) + protected_holds = {h for h in protected_holds if h not in pause_hashes} + effective_slots_after_pause = active_after_pause + len(protected_holds) c = rtorrent.client_for(profile) rtorrent_cap = _ensure_rtorrent_download_cap(c, max_active) @@ -520,73 +585,72 @@ def check(profile: dict | None = None, user_id: int | None = None, force: bool = except Exception: pass - candidate_queue = [t for t in candidates if str(t.get('hash') or '') and str(t.get('hash') or '') not in pause_hashes] - active_slots = active_after_pause - max_resume_attempts = max(len(candidate_queue), max_active * 3) + candidate_queue = [ + t for t in candidates + if str(t.get('hash') or '') + and str(t.get('hash') or '') not in pause_hashes + and str(t.get('hash') or '') not in protected_holds + ] + slots_left = max(0, max_active - effective_slots_after_pause) + batch = candidate_queue[:slots_left] + batch_requested: list[str] = [] - # Note: Resume działa w rundach aż do pełnego limitu z ustawień. Po każdej rundzie - # pobieramy świeży snapshot z rTorrent, bo masowe d.resume/d.start nie zawsze widać - # natychmiast w d.is_active na pojedynczym RPC. - while candidate_queue and active_slots < max_active and len(attempted_hashes) < max_resume_attempts: - slots_left = max_active - active_slots - # Note: Bierzemy mały nadmiar kandydatów tylko wtedy, gdy poprzednie resume nie zwiększyło - # liczby aktywnych slotów; to naprawia przypadek, gdy część pauzowanych nie wstaje po komendzie. - batch_size = min(len(candidate_queue), max(1, slots_left)) - batch = candidate_queue[:batch_size] - candidate_queue = candidate_queue[batch_size:] - batch_requested: list[str] = [] - - for t in batch: - h = str(t.get('hash') or '') - if not h or h in attempted_hashes: - continue - attempted_hashes.add(h) - try: - result = _start_download(c, t) - start_results.append(result) - resume_requested.append(h) - batch_requested.append(h) - except Exception as exc: - start_failed.append({'hash': h, 'error': str(exc)}) - time.sleep(0.03) - - if not batch_requested: + # Note: Smart Queue wykonuje jeden masowy strzał do pełnego targetu. Nie dobiera kolejnych + # w tym samym przebiegu tylko dlatego, że rTorrent nie pokazał od razu d.is_active=1. + for t in batch: + h = str(t.get('hash') or '') + if not h or h in attempted_hashes: continue + attempted_hashes.add(h) + try: + if not _mark_auto_paused(c, profile_id, t): + label_failed.append(h) + _remember_resume_attempt(profile_id, h, now) + result = _start_download(c, t) + start_results.append(result) + resume_requested.append(h) + batch_requested.append(h) + except Exception as exc: + start_failed.append({'hash': h, 'error': str(exc)}) + time.sleep(0.03) - active_verified, batch_no_effect = _verify_started_downloads(c, batch_requested) + if batch_requested: + # Note: Weryfikacja jest informacyjna i służy zdjęciu technicznego labela, ale nie uruchamia kolejnego batcha. + active_verified, batch_no_effect = _verify_started_downloads(c, batch_requested, attempts=4, delay=0.4) start_no_effect.extend(batch_no_effect) for h in active_verified: if h not in resumed: _restore_auto_label(c, profile_id, h, None) resumed.append(h) + if active_verified: + with connect() as conn: + for h in active_verified: + conn.execute('DELETE FROM smart_queue_stalled WHERE profile_id=? AND torrent_hash=?', (profile_id, h)) fresh_active_slots, fresh_torrents = _refresh_active_slots(profile, excluded, manage_stopped) - active_slots = max(active_slots, fresh_active_slots) - - # Note: Jeżeli rTorrent wznowił torrent dopiero po odświeżeniu listy, dopisujemy go - # do resumed i zdejmujemy techniczny label Smart Queue. fresh_by_hash = {str(t.get('hash') or ''): t for t in fresh_torrents} for h in batch_requested: live_t = fresh_by_hash.get(h) if live_t and _is_running_download_slot(live_t) and h not in resumed: _restore_auto_label(c, profile_id, h, None) resumed.append(h) - - if active_slots < max_active and not candidate_queue: - # Note: Ostatnia próba dla pozycji, które przyjęły start, ale jeszcze nie pokazały active=1. - time.sleep(0.75) - fresh_active_slots, fresh_torrents = _refresh_active_slots(profile, excluded, manage_stopped) - active_slots = max(active_slots, fresh_active_slots) + if resumed: + with connect() as conn: + for h in resumed: + conn.execute('DELETE FROM smart_queue_stalled WHERE profile_id=? AND torrent_hash=?', (profile_id, h)) + active_slots = max(effective_slots_after_pause, fresh_active_slots) + else: + active_slots = effective_slots_after_pause resumed_set = set(resumed) - waiting_hashes = { - str(t.get('hash') or '') - for t in candidates - if str(t.get('hash') or '') and str(t.get('hash') or '') not in pause_hashes and str(t.get('hash') or '') not in resumed_set - } + waiting_hashes = ( + (technical_holds | set(batch_requested) | {str(t.get('hash') or '') for t in candidate_queue}) + - resumed_set + - pause_hashes + ) + waiting_hashes = {h for h in waiting_hashes if h} - # Note: Kazdy kandydat niewznowiony w tej rundzie zostaje oznaczony jako oczekujacy, - # dzieki czemu kolejne cykle nadal dobieraja go z pauzy/labela Smart Queue. + # Note: Kandydaci niewznowieni zostają oznaczeni jako oczekujący; po stalled_seconds mogą zostać wymienieni na innych. for t in candidates: h = str(t.get('hash') or '') if not h or h not in waiting_hashes: @@ -597,12 +661,8 @@ def check(profile: dict | None = None, user_id: int | None = None, force: bool = except Exception: label_failed.append(h) - keep_labels = ( - set(paused) - | waiting_hashes - | {str(t.get('hash') or '') for t in stopped if str(t.get('label') or '') == SMART_QUEUE_LABEL and str(t.get('hash') or '') not in resumed_set} - ) + keep_labels = set(paused) | waiting_hashes | protected_holds | expired_holds restored = _cleanup_auto_labels(c, profile_id, torrents, keep_labels, manage_stopped) - details = {'excluded': len(excluded), 'enabled': bool(settings.get('enabled')), 'auto_label': SMART_QUEUE_LABEL, 'labels_restored': restored, 'labels_failed': label_failed, 'start_failed': start_failed, 'start_no_effect': start_no_effect, 'start_results': start_results, 'resume_requested': resume_requested, 'active_verified': resumed, 'attempted_count': len(attempted_hashes), 'waiting_labeled': len(waiting_hashes), 'manage_stopped': manage_stopped, 'max_active_downloads': max_active, 'active_before': len(downloading), 'active_after_expected': active_slots, 'paused_planned': len(to_pause), 'resumed_planned': len(attempted_hashes), 'stalled_detected': len(stalled), 'stalled_paused': len([h for h in paused if h in stalled_hashes]), 'rtorrent_cap': rtorrent_cap} + details = {'excluded': len(excluded), 'enabled': bool(settings.get('enabled')), 'auto_label': SMART_QUEUE_LABEL, 'labels_restored': restored, 'labels_failed': label_failed, 'start_failed': start_failed, 'start_no_effect': start_no_effect, 'start_results': start_results, 'resume_requested': resume_requested, 'active_verified': resumed, 'attempted_count': len(attempted_hashes), 'waiting_labeled': len(waiting_hashes), 'pending_holds': len(protected_holds), 'expired_holds': len(expired_holds), 'manage_stopped': manage_stopped, 'max_active_downloads': max_active, 'active_before': len(downloading), 'active_after_expected': active_slots, 'paused_planned': len(to_pause), 'resumed_planned': len(attempted_hashes), 'stalled_detected': len(stalled), 'stalled_paused': len([h for h in paused if h in stalled_hashes]), 'rtorrent_cap': rtorrent_cap} add_history(profile_id, 'force_check' if force else 'auto_check', paused, resumed, len(torrents), details, user_id) return {'ok': True, 'enabled': bool(settings.get('enabled')), 'paused': paused, 'resumed': resumed, 'resume_requested': resume_requested, 'waiting_labeled': len(waiting_hashes), 'labels_restored': restored, 'labels_failed': label_failed, 'start_failed': start_failed, 'start_no_effect': start_no_effect, 'active_verified': resumed, 'rtorrent_cap': rtorrent_cap, 'checked': len(torrents), 'excluded': len(excluded), 'settings': settings}