fix queue

This commit is contained in:
Mateusz Gruszczyński
2026-05-05 19:19:47 +02:00
parent 45cb6cbb3a
commit aea3c92830
3 changed files with 161 additions and 213 deletions

View File

@@ -226,11 +226,14 @@ def _ensure_rtorrent_download_cap(client: Any, max_active: int) -> dict[str, Any
def _start_download(client: Any, torrent: dict[str, Any]) -> dict[str, Any]:
"""Resume paused torrents and open/start stopped torrents with the same path as manual Start."""
"""Resume paused torrents through rTorrent's pause model."""
h = str(torrent.get('hash') or '')
if not h:
return {'hash': h, 'ok': False, 'error': 'missing hash'}
# Note: Smart Queue używa tej samej sekwencji co ręczny Start, żeby Paused nie zostawał w pauzie po samym d.start.
if bool(torrent.get('paused')) or str(torrent.get('status') or '').lower() == 'paused' or int(torrent.get('state') or 0):
# Note: Kandydaci Smart Queue po d.pause mają być wznawiani przez d.resume, bez d.start/d.stop.
return rtorrent.resume_paused_hash(client, h)
# Note: Tylko opcjonalne manage_stopped korzysta ze ścieżki start dla całkowicie zatrzymanych torrentów.
return rtorrent.start_or_resume_hash(client, h)
@@ -274,25 +277,11 @@ def _read_live_start_state(client: Any, torrent_hash: str) -> dict[str, Any]:
result[key] = int(value or 0) if key in {'state', 'active', 'open', 'priority'} else str(value or '')
except Exception as exc:
result[f'{key}_error'] = str(exc)
# Note: Realny slot liczymy po d.is_active=1. Dodatkowo zwracamy state/open/priority,
# bo przy masowym resume rTorrent czasem przyjmuje start, ale aktywuje transfer dopiero w kolejnym ticku.
# Note: Nie uznajemy d.is_open ani state=1 za wznowienie; Paused też potrafi mieć te wartości.
# Smart Queue zalicza start dopiero po d.is_active=1, czyli po realnym zdjęciu pauzy.
result['started'] = bool(int(result.get('active') or 0))
result['start_accepted'] = bool(int(result.get('state') or 0) or int(result.get('open') or 0))
return result
def _refresh_active_slots(profile: dict, excluded: set[str], manage_stopped: bool) -> tuple[int, list[dict[str, Any]]]:
"""Read a fresh torrent snapshot and count real active Smart Queue slots."""
fresh = rtorrent.list_torrents(profile)
active = [
t for t in fresh
if str(t.get('hash') or '') not in excluded
and _is_running_download_slot(t)
]
# Note: Po batchowym resume nie ufamy staremu snapshotowi; odświeżenie z rTorrent
# pozwala dobić kolejkę także wtedy, gdy aktywacja nastąpiła z opóźnieniem.
return len(active), fresh
def _set_smart_queue_label(client: Any, torrent_hash: str, attempts: int = 3) -> bool:
for attempt in range(max(1, attempts)):
try:
@@ -395,51 +384,6 @@ def _is_waiting_download_candidate(t: dict[str, Any], manage_stopped: bool) -> b
return bool(manage_stopped) and not int(t.get('state') or 0)
def _smart_queue_hold_state(profile_id: int, torrents: list[dict[str, Any]], stalled_seconds: int, now: str, now_ts: float) -> tuple[set[str], set[str], set[str]]:
"""Return pending, expired and all Smart Queue technical holds."""
technical_holds = {
str(t.get('hash') or '')
for t in torrents
if str(t.get('hash') or '') and str(t.get('label') or '') == SMART_QUEUE_LABEL and not int(t.get('complete') or 0)
}
pending: set[str] = set()
expired: set[str] = set()
if not technical_holds:
return pending, expired, technical_holds
with connect() as conn:
for h in technical_holds:
row = conn.execute(
'SELECT first_stalled_at FROM smart_queue_stalled WHERE profile_id=? AND torrent_hash=?',
(profile_id, h),
).fetchone()
if not row:
# Note: Label Smart Queue bez timera traktujemy jako świeżą próbę, żeby pierwszy cykl nie wymieniał go natychmiast.
conn.execute(
'INSERT OR REPLACE INTO smart_queue_stalled(profile_id,torrent_hash,first_stalled_at,updated_at) VALUES(?,?,?,?)',
(profile_id, h, now, now),
)
pending.add(h)
continue
first = row['first_stalled_at']
conn.execute('UPDATE smart_queue_stalled SET updated_at=? WHERE profile_id=? AND torrent_hash=?', (now, profile_id, h))
if now_ts - _ts(first) >= stalled_seconds:
expired.add(h)
else:
pending.add(h)
return pending, expired, technical_holds
def _remember_resume_attempt(profile_id: int, torrent_hash: str, now: str) -> None:
if not torrent_hash:
return
with connect() as conn:
# Note: Timer próby resume jest używany jako karencja przed wymianą nieaktywnego torrenta.
conn.execute(
'INSERT OR REPLACE INTO smart_queue_stalled(profile_id,torrent_hash,first_stalled_at,updated_at) VALUES(?,?,?,?)',
(profile_id, torrent_hash, now, now),
)
def check(profile: dict | None = None, user_id: int | None = None, force: bool = False) -> dict[str, Any]:
profile = profile or active_profile()
if not profile:
@@ -505,27 +449,13 @@ def check(profile: dict | None = None, user_id: int | None = None, force: bool =
else:
conn.execute('DELETE FROM smart_queue_stalled WHERE profile_id=? AND torrent_hash=?', (profile_id, h))
max_active = max(1, int(settings.get('max_active_downloads') or 5))
pending_holds, expired_holds, technical_holds = _smart_queue_hold_state(profile_id, stopped, stalled_seconds, now, now_ts)
# Candidates with visible sources are preferred. Do not touch excluded torrents.
fresh_candidates = sorted(
[
t for t in stopped
if str(t.get('hash') or '')
and str(t.get('hash') or '') not in pending_holds
and str(t.get('hash') or '') not in expired_holds
],
candidates = sorted(
stopped,
key=lambda t: (int(t.get('seeds') or 0), int(t.get('peers') or 0), int(t.get('down_rate') or 0)),
reverse=True,
)
expired_candidates = sorted(
[t for t in stopped if str(t.get('hash') or '') in expired_holds],
key=lambda t: (int(t.get('seeds') or 0), int(t.get('peers') or 0), int(t.get('down_rate') or 0)),
reverse=True,
)
# Note: Najpierw dobieramy nowe pozycje, a dopiero gdy nie ma alternatyw, ponawiamy stare nieaktywne próby.
candidates = fresh_candidates + expired_candidates
max_active = max(1, int(settings.get('max_active_downloads') or 5))
stalled_hashes = {str(t.get('hash') or '') for t in stalled}
# Enforce the hard active-download cap first. The previous logic only limited
@@ -542,25 +472,19 @@ def check(profile: dict | None = None, user_id: int | None = None, force: bool =
to_pause: list[dict[str, Any]] = pause_rank[:max(0, len(downloading) - max_active)]
pause_hashes = {str(t.get('hash') or '') for t in to_pause}
# Note: Nieaktywne próby resume trzymamy jako zajęte sloty przez stalled_seconds.
# Dzięki temu jeden cykl odpala pełną kolejkę, a wymiana następuje dopiero po czasie kontrolnym.
protected_holds = {h for h in pending_holds if h and h not in pause_hashes and h not in excluded}
active_after_pause = max(0, len(downloading) - len(to_pause))
effective_slots_after_pause = active_after_pause + len(protected_holds)
# Note: Stalled wymieniamy tylko wtedy, gdy jest czym go zastąpić i po uwzględnieniu slotów w karencji.
replacement_capacity = max(0, len(candidates) - max(0, max_active - effective_slots_after_pause))
for t in stalled:
h = str(t.get('hash') or '')
if not h or h in pause_hashes or replacement_capacity <= 0:
continue
to_pause.append(t)
pause_hashes.add(h)
replacement_capacity -= 1
# Note: Rotacja stalled działa tylko przy pełnej kolejce. Gdy brakuje slotów, Smart Queue ma
# najpierw dobrać brakujące pozycje, a nie pauzować już istniejące lub błędnie uznane za stalled.
if candidates and len(downloading) >= max_active:
replaceable_stalled = [t for t in stalled if str(t.get('hash') or '') not in pause_hashes]
for t in replaceable_stalled[:max(0, len(candidates) - len(to_pause))]:
to_pause.append(t)
pause_hashes.add(str(t.get('hash') or ''))
active_after_pause = max(0, len(downloading) - len(to_pause))
protected_holds = {h for h in protected_holds if h not in pause_hashes}
effective_slots_after_pause = active_after_pause + len(protected_holds)
available_slots = max(0, max_active - active_after_pause)
to_resume = candidates[:available_slots]
# Note: Pozycje poza bieżącą pulą startu zostają jawnie oznaczone jako oczekujące Smart Queue.
to_label_waiting = candidates[available_slots:]
c = rtorrent.client_for(profile)
rtorrent_cap = _ensure_rtorrent_download_cap(c, max_active)
@@ -571,89 +495,21 @@ def check(profile: dict | None = None, user_id: int | None = None, force: bool =
start_no_effect: list[dict[str, Any]] = []
resume_requested: list[str] = []
start_results: list[dict[str, Any]] = []
attempted_hashes: set[str] = set()
for t in to_pause:
h = str(t.get('hash') or '')
if not h:
continue
try:
c.call('d.pause', h)
pause_result = rtorrent.pause_hash(c, t['hash'])
if not pause_result.get('ok'):
raise RuntimeError(pause_result.get('error') or 'pause failed')
if not _mark_auto_paused(c, profile_id, t):
label_failed.append(h)
paused.append(h)
label_failed.append(t['hash'])
paused.append(t['hash'])
except Exception:
pass
candidate_queue = [
t for t in candidates
if str(t.get('hash') or '')
and str(t.get('hash') or '') not in pause_hashes
and str(t.get('hash') or '') not in protected_holds
]
slots_left = max(0, max_active - effective_slots_after_pause)
batch = candidate_queue[:slots_left]
batch_requested: list[str] = []
# Note: Smart Queue wykonuje jeden masowy strzał do pełnego targetu. Nie dobiera kolejnych
# w tym samym przebiegu tylko dlatego, że rTorrent nie pokazał od razu d.is_active=1.
for t in batch:
for t in to_label_waiting:
h = str(t.get('hash') or '')
if not h or h in attempted_hashes:
continue
attempted_hashes.add(h)
try:
if not _mark_auto_paused(c, profile_id, t):
label_failed.append(h)
_remember_resume_attempt(profile_id, h, now)
result = _start_download(c, t)
start_results.append(result)
resume_requested.append(h)
batch_requested.append(h)
except Exception as exc:
start_failed.append({'hash': h, 'error': str(exc)})
time.sleep(0.03)
if batch_requested:
# Note: Weryfikacja jest informacyjna i służy zdjęciu technicznego labela, ale nie uruchamia kolejnego batcha.
active_verified, batch_no_effect = _verify_started_downloads(c, batch_requested, attempts=4, delay=0.4)
start_no_effect.extend(batch_no_effect)
for h in active_verified:
if h not in resumed:
_restore_auto_label(c, profile_id, h, None)
resumed.append(h)
if active_verified:
with connect() as conn:
for h in active_verified:
conn.execute('DELETE FROM smart_queue_stalled WHERE profile_id=? AND torrent_hash=?', (profile_id, h))
fresh_active_slots, fresh_torrents = _refresh_active_slots(profile, excluded, manage_stopped)
fresh_by_hash = {str(t.get('hash') or ''): t for t in fresh_torrents}
for h in batch_requested:
live_t = fresh_by_hash.get(h)
if live_t and _is_running_download_slot(live_t) and h not in resumed:
_restore_auto_label(c, profile_id, h, None)
resumed.append(h)
if resumed:
with connect() as conn:
for h in resumed:
conn.execute('DELETE FROM smart_queue_stalled WHERE profile_id=? AND torrent_hash=?', (profile_id, h))
active_slots = max(effective_slots_after_pause, fresh_active_slots)
else:
active_slots = effective_slots_after_pause
resumed_set = set(resumed)
waiting_hashes = (
(technical_holds | set(batch_requested) | {str(t.get('hash') or '') for t in candidate_queue})
- resumed_set
- pause_hashes
)
waiting_hashes = {h for h in waiting_hashes if h}
# Note: Kandydaci niewznowieni zostają oznaczeni jako oczekujący; po stalled_seconds mogą zostać wymienieni na innych.
for t in candidates:
h = str(t.get('hash') or '')
if not h or h not in waiting_hashes:
if not h or h in pause_hashes:
continue
try:
if not _mark_auto_paused(c, profile_id, t):
@@ -661,8 +517,30 @@ def check(profile: dict | None = None, user_id: int | None = None, force: bool =
except Exception:
label_failed.append(h)
keep_labels = set(paused) | waiting_hashes | protected_holds | expired_holds
# Note: Startujemy całą pulę kandydatów w jednej rundzie. Label zdejmujemy po zaakceptowanym RPC,
# bo rTorrent może trzymać część pozycji w swojej kolejce z active=0 mimo poprawnego d.start/d.resume.
for t in to_resume:
h = str(t.get('hash') or '')
if not h:
continue
try:
result = _start_download(c, t)
start_results.append(result)
resume_requested.append(h)
except Exception as exc:
start_failed.append({'hash': h, 'error': str(exc)})
active_verified, start_no_effect = _verify_started_downloads(c, resume_requested)
for h in active_verified:
_restore_auto_label(c, profile_id, h, None)
# Note: Historia pokazuje tylko torrenty faktycznie zdjęte z pauzy, a nie samą liczbę wysłanych komend.
resumed = list(active_verified)
keep_labels = (
set(paused)
| {str(t.get('hash') or '') for t in to_label_waiting}
| {str(t.get('hash') or '') for t in stopped if str(t.get('label') or '') == SMART_QUEUE_LABEL and str(t.get('hash') or '') not in set(resumed)}
)
restored = _cleanup_auto_labels(c, profile_id, torrents, keep_labels, manage_stopped)
details = {'excluded': len(excluded), 'enabled': bool(settings.get('enabled')), 'auto_label': SMART_QUEUE_LABEL, 'labels_restored': restored, 'labels_failed': label_failed, 'start_failed': start_failed, 'start_no_effect': start_no_effect, 'start_results': start_results, 'resume_requested': resume_requested, 'active_verified': resumed, 'attempted_count': len(attempted_hashes), 'waiting_labeled': len(waiting_hashes), 'pending_holds': len(protected_holds), 'expired_holds': len(expired_holds), 'manage_stopped': manage_stopped, 'max_active_downloads': max_active, 'active_before': len(downloading), 'active_after_expected': active_slots, 'paused_planned': len(to_pause), 'resumed_planned': len(attempted_hashes), 'stalled_detected': len(stalled), 'stalled_paused': len([h for h in paused if h in stalled_hashes]), 'rtorrent_cap': rtorrent_cap}
details = {'excluded': len(excluded), 'enabled': bool(settings.get('enabled')), 'auto_label': SMART_QUEUE_LABEL, 'labels_restored': restored, 'labels_failed': label_failed, 'start_failed': start_failed, 'start_no_effect': start_no_effect, 'start_results': start_results, 'resume_requested': resume_requested, 'active_verified': active_verified, 'waiting_labeled': len(to_label_waiting), 'manage_stopped': manage_stopped, 'max_active_downloads': max_active, 'active_before': len(downloading), 'active_after_expected': active_after_pause + len(resumed), 'paused_planned': len(to_pause), 'resumed_planned': len(to_resume), 'rtorrent_cap': rtorrent_cap}
add_history(profile_id, 'force_check' if force else 'auto_check', paused, resumed, len(torrents), details, user_id)
return {'ok': True, 'enabled': bool(settings.get('enabled')), 'paused': paused, 'resumed': resumed, 'resume_requested': resume_requested, 'waiting_labeled': len(waiting_hashes), 'labels_restored': restored, 'labels_failed': label_failed, 'start_failed': start_failed, 'start_no_effect': start_no_effect, 'active_verified': resumed, 'rtorrent_cap': rtorrent_cap, 'checked': len(torrents), 'excluded': len(excluded), 'settings': settings}
return {'ok': True, 'enabled': bool(settings.get('enabled')), 'paused': paused, 'resumed': resumed, 'resume_requested': resume_requested, 'waiting_labeled': len(to_label_waiting), 'labels_restored': restored, 'labels_failed': label_failed, 'start_failed': start_failed, 'start_no_effect': start_no_effect, 'active_verified': active_verified, 'rtorrent_cap': rtorrent_cap, 'checked': len(torrents), 'excluded': len(excluded), 'settings': settings}