fix queue

This commit is contained in:
Mateusz Gruszczyński
2026-05-05 18:52:34 +02:00
parent eedfce7207
commit 45cb6cbb3a

View File

@@ -395,6 +395,51 @@ def _is_waiting_download_candidate(t: dict[str, Any], manage_stopped: bool) -> b
return bool(manage_stopped) and not int(t.get('state') or 0) return bool(manage_stopped) and not int(t.get('state') or 0)
def _smart_queue_hold_state(profile_id: int, torrents: list[dict[str, Any]], stalled_seconds: int, now: str, now_ts: float) -> tuple[set[str], set[str], set[str]]:
"""Return pending, expired and all Smart Queue technical holds."""
technical_holds = {
str(t.get('hash') or '')
for t in torrents
if str(t.get('hash') or '') and str(t.get('label') or '') == SMART_QUEUE_LABEL and not int(t.get('complete') or 0)
}
pending: set[str] = set()
expired: set[str] = set()
if not technical_holds:
return pending, expired, technical_holds
with connect() as conn:
for h in technical_holds:
row = conn.execute(
'SELECT first_stalled_at FROM smart_queue_stalled WHERE profile_id=? AND torrent_hash=?',
(profile_id, h),
).fetchone()
if not row:
# Note: Label Smart Queue bez timera traktujemy jako świeżą próbę, żeby pierwszy cykl nie wymieniał go natychmiast.
conn.execute(
'INSERT OR REPLACE INTO smart_queue_stalled(profile_id,torrent_hash,first_stalled_at,updated_at) VALUES(?,?,?,?)',
(profile_id, h, now, now),
)
pending.add(h)
continue
first = row['first_stalled_at']
conn.execute('UPDATE smart_queue_stalled SET updated_at=? WHERE profile_id=? AND torrent_hash=?', (now, profile_id, h))
if now_ts - _ts(first) >= stalled_seconds:
expired.add(h)
else:
pending.add(h)
return pending, expired, technical_holds
def _remember_resume_attempt(profile_id: int, torrent_hash: str, now: str) -> None:
if not torrent_hash:
return
with connect() as conn:
# Note: Timer próby resume jest używany jako karencja przed wymianą nieaktywnego torrenta.
conn.execute(
'INSERT OR REPLACE INTO smart_queue_stalled(profile_id,torrent_hash,first_stalled_at,updated_at) VALUES(?,?,?,?)',
(profile_id, torrent_hash, now, now),
)
def check(profile: dict | None = None, user_id: int | None = None, force: bool = False) -> dict[str, Any]: def check(profile: dict | None = None, user_id: int | None = None, force: bool = False) -> dict[str, Any]:
profile = profile or active_profile() profile = profile or active_profile()
if not profile: if not profile:
@@ -460,13 +505,27 @@ def check(profile: dict | None = None, user_id: int | None = None, force: bool =
else: else:
conn.execute('DELETE FROM smart_queue_stalled WHERE profile_id=? AND torrent_hash=?', (profile_id, h)) conn.execute('DELETE FROM smart_queue_stalled WHERE profile_id=? AND torrent_hash=?', (profile_id, h))
max_active = max(1, int(settings.get('max_active_downloads') or 5))
pending_holds, expired_holds, technical_holds = _smart_queue_hold_state(profile_id, stopped, stalled_seconds, now, now_ts)
# Candidates with visible sources are preferred. Do not touch excluded torrents. # Candidates with visible sources are preferred. Do not touch excluded torrents.
candidates = sorted( fresh_candidates = sorted(
stopped, [
t for t in stopped
if str(t.get('hash') or '')
and str(t.get('hash') or '') not in pending_holds
and str(t.get('hash') or '') not in expired_holds
],
key=lambda t: (int(t.get('seeds') or 0), int(t.get('peers') or 0), int(t.get('down_rate') or 0)), key=lambda t: (int(t.get('seeds') or 0), int(t.get('peers') or 0), int(t.get('down_rate') or 0)),
reverse=True, reverse=True,
) )
max_active = max(1, int(settings.get('max_active_downloads') or 5)) expired_candidates = sorted(
[t for t in stopped if str(t.get('hash') or '') in expired_holds],
key=lambda t: (int(t.get('seeds') or 0), int(t.get('peers') or 0), int(t.get('down_rate') or 0)),
reverse=True,
)
# Note: Najpierw dobieramy nowe pozycje, a dopiero gdy nie ma alternatyw, ponawiamy stare nieaktywne próby.
candidates = fresh_candidates + expired_candidates
stalled_hashes = {str(t.get('hash') or '') for t in stalled} stalled_hashes = {str(t.get('hash') or '') for t in stalled}
# Enforce the hard active-download cap first. The previous logic only limited # Enforce the hard active-download cap first. The previous logic only limited
@@ -483,19 +542,25 @@ def check(profile: dict | None = None, user_id: int | None = None, force: bool =
to_pause: list[dict[str, Any]] = pause_rank[:max(0, len(downloading) - max_active)] to_pause: list[dict[str, Any]] = pause_rank[:max(0, len(downloading) - max_active)]
pause_hashes = {str(t.get('hash') or '') for t in to_pause} pause_hashes = {str(t.get('hash') or '') for t in to_pause}
# Note: Stalled jest wymieniany nie tylko przy pelnej kolejce. Najpierw wypelniamy wolne sloty, # Note: Nieaktywne próby resume trzymamy jako zajęte sloty przez stalled_seconds.
# a dopiero nadmiar kandydatow zuzywamy na rotacje martwych/stalled pobran. # Dzięki temu jeden cykl odpala pełną kolejkę, a wymiana następuje dopiero po czasie kontrolnym.
free_slots_before_pause = max(0, max_active - max(0, len(downloading) - len(to_pause))) protected_holds = {h for h in pending_holds if h and h not in pause_hashes and h not in excluded}
stalled_rotation_slots = max(0, len(candidates) - free_slots_before_pause) active_after_pause = max(0, len(downloading) - len(to_pause))
effective_slots_after_pause = active_after_pause + len(protected_holds)
# Note: Stalled wymieniamy tylko wtedy, gdy jest czym go zastąpić i po uwzględnieniu slotów w karencji.
replacement_capacity = max(0, len(candidates) - max(0, max_active - effective_slots_after_pause))
for t in stalled: for t in stalled:
h = str(t.get('hash') or '') h = str(t.get('hash') or '')
if not h or h in pause_hashes or stalled_rotation_slots <= 0: if not h or h in pause_hashes or replacement_capacity <= 0:
continue continue
to_pause.append(t) to_pause.append(t)
pause_hashes.add(h) pause_hashes.add(h)
stalled_rotation_slots -= 1 replacement_capacity -= 1
active_after_pause = max(0, len(downloading) - len(to_pause)) active_after_pause = max(0, len(downloading) - len(to_pause))
protected_holds = {h for h in protected_holds if h not in pause_hashes}
effective_slots_after_pause = active_after_pause + len(protected_holds)
c = rtorrent.client_for(profile) c = rtorrent.client_for(profile)
rtorrent_cap = _ensure_rtorrent_download_cap(c, max_active) rtorrent_cap = _ensure_rtorrent_download_cap(c, max_active)
@@ -520,73 +585,72 @@ def check(profile: dict | None = None, user_id: int | None = None, force: bool =
except Exception: except Exception:
pass pass
candidate_queue = [t for t in candidates if str(t.get('hash') or '') and str(t.get('hash') or '') not in pause_hashes] candidate_queue = [
active_slots = active_after_pause t for t in candidates
max_resume_attempts = max(len(candidate_queue), max_active * 3) if str(t.get('hash') or '')
and str(t.get('hash') or '') not in pause_hashes
and str(t.get('hash') or '') not in protected_holds
]
slots_left = max(0, max_active - effective_slots_after_pause)
batch = candidate_queue[:slots_left]
batch_requested: list[str] = []
# Note: Resume działa w rundach aż do pełnego limitu z ustawień. Po każdej rundzie # Note: Smart Queue wykonuje jeden masowy strzał do pełnego targetu. Nie dobiera kolejnych
# pobieramy świeży snapshot z rTorrent, bo masowe d.resume/d.start nie zawsze widać # w tym samym przebiegu tylko dlatego, że rTorrent nie pokazał od razu d.is_active=1.
# natychmiast w d.is_active na pojedynczym RPC. for t in batch:
while candidate_queue and active_slots < max_active and len(attempted_hashes) < max_resume_attempts: h = str(t.get('hash') or '')
slots_left = max_active - active_slots if not h or h in attempted_hashes:
# Note: Bierzemy mały nadmiar kandydatów tylko wtedy, gdy poprzednie resume nie zwiększyło
# liczby aktywnych slotów; to naprawia przypadek, gdy część pauzowanych nie wstaje po komendzie.
batch_size = min(len(candidate_queue), max(1, slots_left))
batch = candidate_queue[:batch_size]
candidate_queue = candidate_queue[batch_size:]
batch_requested: list[str] = []
for t in batch:
h = str(t.get('hash') or '')
if not h or h in attempted_hashes:
continue
attempted_hashes.add(h)
try:
result = _start_download(c, t)
start_results.append(result)
resume_requested.append(h)
batch_requested.append(h)
except Exception as exc:
start_failed.append({'hash': h, 'error': str(exc)})
time.sleep(0.03)
if not batch_requested:
continue continue
attempted_hashes.add(h)
try:
if not _mark_auto_paused(c, profile_id, t):
label_failed.append(h)
_remember_resume_attempt(profile_id, h, now)
result = _start_download(c, t)
start_results.append(result)
resume_requested.append(h)
batch_requested.append(h)
except Exception as exc:
start_failed.append({'hash': h, 'error': str(exc)})
time.sleep(0.03)
active_verified, batch_no_effect = _verify_started_downloads(c, batch_requested) if batch_requested:
# Note: Weryfikacja jest informacyjna i służy zdjęciu technicznego labela, ale nie uruchamia kolejnego batcha.
active_verified, batch_no_effect = _verify_started_downloads(c, batch_requested, attempts=4, delay=0.4)
start_no_effect.extend(batch_no_effect) start_no_effect.extend(batch_no_effect)
for h in active_verified: for h in active_verified:
if h not in resumed: if h not in resumed:
_restore_auto_label(c, profile_id, h, None) _restore_auto_label(c, profile_id, h, None)
resumed.append(h) resumed.append(h)
if active_verified:
with connect() as conn:
for h in active_verified:
conn.execute('DELETE FROM smart_queue_stalled WHERE profile_id=? AND torrent_hash=?', (profile_id, h))
fresh_active_slots, fresh_torrents = _refresh_active_slots(profile, excluded, manage_stopped) fresh_active_slots, fresh_torrents = _refresh_active_slots(profile, excluded, manage_stopped)
active_slots = max(active_slots, fresh_active_slots)
# Note: Jeżeli rTorrent wznowił torrent dopiero po odświeżeniu listy, dopisujemy go
# do resumed i zdejmujemy techniczny label Smart Queue.
fresh_by_hash = {str(t.get('hash') or ''): t for t in fresh_torrents} fresh_by_hash = {str(t.get('hash') or ''): t for t in fresh_torrents}
for h in batch_requested: for h in batch_requested:
live_t = fresh_by_hash.get(h) live_t = fresh_by_hash.get(h)
if live_t and _is_running_download_slot(live_t) and h not in resumed: if live_t and _is_running_download_slot(live_t) and h not in resumed:
_restore_auto_label(c, profile_id, h, None) _restore_auto_label(c, profile_id, h, None)
resumed.append(h) resumed.append(h)
if resumed:
if active_slots < max_active and not candidate_queue: with connect() as conn:
# Note: Ostatnia próba dla pozycji, które przyjęły start, ale jeszcze nie pokazały active=1. for h in resumed:
time.sleep(0.75) conn.execute('DELETE FROM smart_queue_stalled WHERE profile_id=? AND torrent_hash=?', (profile_id, h))
fresh_active_slots, fresh_torrents = _refresh_active_slots(profile, excluded, manage_stopped) active_slots = max(effective_slots_after_pause, fresh_active_slots)
active_slots = max(active_slots, fresh_active_slots) else:
active_slots = effective_slots_after_pause
resumed_set = set(resumed) resumed_set = set(resumed)
waiting_hashes = { waiting_hashes = (
str(t.get('hash') or '') (technical_holds | set(batch_requested) | {str(t.get('hash') or '') for t in candidate_queue})
for t in candidates - resumed_set
if str(t.get('hash') or '') and str(t.get('hash') or '') not in pause_hashes and str(t.get('hash') or '') not in resumed_set - pause_hashes
} )
waiting_hashes = {h for h in waiting_hashes if h}
# Note: Kazdy kandydat niewznowiony w tej rundzie zostaje oznaczony jako oczekujacy, # Note: Kandydaci niewznowieni zostają oznaczeni jako oczekujący; po stalled_seconds mogą zostać wymienieni na innych.
# dzieki czemu kolejne cykle nadal dobieraja go z pauzy/labela Smart Queue.
for t in candidates: for t in candidates:
h = str(t.get('hash') or '') h = str(t.get('hash') or '')
if not h or h not in waiting_hashes: if not h or h not in waiting_hashes:
@@ -597,12 +661,8 @@ def check(profile: dict | None = None, user_id: int | None = None, force: bool =
except Exception: except Exception:
label_failed.append(h) label_failed.append(h)
keep_labels = ( keep_labels = set(paused) | waiting_hashes | protected_holds | expired_holds
set(paused)
| waiting_hashes
| {str(t.get('hash') or '') for t in stopped if str(t.get('label') or '') == SMART_QUEUE_LABEL and str(t.get('hash') or '') not in resumed_set}
)
restored = _cleanup_auto_labels(c, profile_id, torrents, keep_labels, manage_stopped) restored = _cleanup_auto_labels(c, profile_id, torrents, keep_labels, manage_stopped)
details = {'excluded': len(excluded), 'enabled': bool(settings.get('enabled')), 'auto_label': SMART_QUEUE_LABEL, 'labels_restored': restored, 'labels_failed': label_failed, 'start_failed': start_failed, 'start_no_effect': start_no_effect, 'start_results': start_results, 'resume_requested': resume_requested, 'active_verified': resumed, 'attempted_count': len(attempted_hashes), 'waiting_labeled': len(waiting_hashes), 'manage_stopped': manage_stopped, 'max_active_downloads': max_active, 'active_before': len(downloading), 'active_after_expected': active_slots, 'paused_planned': len(to_pause), 'resumed_planned': len(attempted_hashes), 'stalled_detected': len(stalled), 'stalled_paused': len([h for h in paused if h in stalled_hashes]), 'rtorrent_cap': rtorrent_cap} details = {'excluded': len(excluded), 'enabled': bool(settings.get('enabled')), 'auto_label': SMART_QUEUE_LABEL, 'labels_restored': restored, 'labels_failed': label_failed, 'start_failed': start_failed, 'start_no_effect': start_no_effect, 'start_results': start_results, 'resume_requested': resume_requested, 'active_verified': resumed, 'attempted_count': len(attempted_hashes), 'waiting_labeled': len(waiting_hashes), 'pending_holds': len(protected_holds), 'expired_holds': len(expired_holds), 'manage_stopped': manage_stopped, 'max_active_downloads': max_active, 'active_before': len(downloading), 'active_after_expected': active_slots, 'paused_planned': len(to_pause), 'resumed_planned': len(attempted_hashes), 'stalled_detected': len(stalled), 'stalled_paused': len([h for h in paused if h in stalled_hashes]), 'rtorrent_cap': rtorrent_cap}
add_history(profile_id, 'force_check' if force else 'auto_check', paused, resumed, len(torrents), details, user_id) add_history(profile_id, 'force_check' if force else 'auto_check', paused, resumed, len(torrents), details, user_id)
return {'ok': True, 'enabled': bool(settings.get('enabled')), 'paused': paused, 'resumed': resumed, 'resume_requested': resume_requested, 'waiting_labeled': len(waiting_hashes), 'labels_restored': restored, 'labels_failed': label_failed, 'start_failed': start_failed, 'start_no_effect': start_no_effect, 'active_verified': resumed, 'rtorrent_cap': rtorrent_cap, 'checked': len(torrents), 'excluded': len(excluded), 'settings': settings} return {'ok': True, 'enabled': bool(settings.get('enabled')), 'paused': paused, 'resumed': resumed, 'resume_requested': resume_requested, 'waiting_labeled': len(waiting_hashes), 'labels_restored': restored, 'labels_failed': label_failed, 'start_failed': start_failed, 'start_no_effect': start_no_effect, 'active_verified': resumed, 'rtorrent_cap': rtorrent_cap, 'checked': len(torrents), 'excluded': len(excluded), 'settings': settings}