From 7a4bda98a2e4b0ac74f278fd70867dba7952a589 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mateusz=20Gruszczy=C5=84ski?= Date: Thu, 7 May 2026 09:06:47 +0200 Subject: [PATCH] automatyzacje-comit4 --- pytorrent/services/automation_rules.py | 123 ++++++++++++++++++++----- pytorrent/services/workers.py | 26 ++++-- 2 files changed, 118 insertions(+), 31 deletions(-) diff --git a/pytorrent/services/automation_rules.py b/pytorrent/services/automation_rules.py index a8c8fed..9a4a279 100644 --- a/pytorrent/services/automation_rules.py +++ b/pytorrent/services/automation_rules.py @@ -5,6 +5,10 @@ import json from ..db import connect, default_user_id, utcnow from . import rtorrent from .preferences import active_profile +from .workers import enqueue + +AUTOMATION_JOB_CHUNK_SIZE = 100 + def _loads(value: str | None, default: Any) -> Any: @@ -149,8 +153,59 @@ def _mark_rule_cooldown(conn, rule: dict[str, Any], profile_id: int, now: str) - conn.execute('INSERT INTO automation_rule_state(rule_id,profile_id,torrent_hash,last_applied_at,updated_at) VALUES(?,?,?,?,?) ON CONFLICT(rule_id,profile_id,torrent_hash) DO UPDATE SET last_applied_at=excluded.last_applied_at, updated_at=excluded.updated_at', (rule['id'], profile_id, '__rule__', now, now)) -def _apply_effects_bulk(c: Any, profile: dict[str, Any], torrents: list[dict[str, Any]], effects: list[dict[str, Any]]) -> list[dict[str, Any]]: +def _chunk_hashes(hashes: list[str], size: int = AUTOMATION_JOB_CHUNK_SIZE) -> list[list[str]]: + # Note: Automation jobs use the same small-batch idea as manual bulk jobs, so long move/remove/actions remain visible and recoverable. + safe_size = max(1, int(size or AUTOMATION_JOB_CHUNK_SIZE)) + return [hashes[index:index + safe_size] for index in range(0, len(hashes), safe_size)] + + +def _job_context(rule: dict[str, Any], eff_type: str, hashes: list[str], torrents_by_hash: dict[str, dict[str, Any]], extra: dict[str, Any] | None = None) -> dict[str, Any]: + # Note: Job context marks jobs created by automations, making the Jobs log explain what rule queued the work. + ctx = { + 'source': 'automation', + 'rule_id': rule.get('id'), + 'rule_name': str(rule.get('name') or ''), + 'effect': eff_type, + 'bulk': len(hashes) > 1, + 'hash_count': len(hashes), + 'requested_at': utcnow(), + 'items': [ + { + 'hash': h, + 'name': str((torrents_by_hash.get(h) or {}).get('name') or ''), + 'path': str((torrents_by_hash.get(h) or {}).get('path') or ''), + } + for h in hashes + ], + } + if extra: + ctx.update(extra) + return ctx + + +def _enqueue_automation_job(profile: dict[str, Any], rule: dict[str, Any], action_name: str, hashes: list[str], payload: dict[str, Any], torrents_by_hash: dict[str, dict[str, Any]], user_id: int | None = None, context_extra: dict[str, Any] | None = None) -> list[str]: + # Note: Every automation side effect is queued as a normal job instead of running inline, so it appears in Jobs and uses worker retries/ordering. + job_ids: list[str] = [] + chunks = _chunk_hashes(hashes) + for index, chunk in enumerate(chunks, start=1): + part_payload = dict(payload or {}) + part_payload['hashes'] = chunk + part_payload['automation_ordered'] = True + extra = dict(context_extra or {}) + if len(chunks) > 1: + extra.update({'bulk_label': f'automation-{index}', 'bulk_part': index, 'bulk_parts': len(chunks), 'parent_hash_count': len(hashes)}) + if action_name == 'move': + extra.update({'target_path': str(part_payload.get('path') or ''), 'move_data': bool(part_payload.get('move_data'))}) + if action_name == 'remove': + extra.update({'remove_data': bool(part_payload.get('remove_data'))}) + part_payload['job_context'] = _job_context(rule, str(context_extra.get('effect_type') if context_extra else action_name), chunk, torrents_by_hash, extra) + job_ids.append(enqueue(action_name, int(profile['id']), part_payload, user_id=user_id)) + return job_ids + + +def _apply_effects_bulk(c: Any, profile: dict[str, Any], torrents: list[dict[str, Any]], effects: list[dict[str, Any]], rule: dict[str, Any], user_id: int | None = None) -> list[dict[str, Any]]: hashes = [str(t.get('hash') or '') for t in torrents if str(t.get('hash') or '')] + torrents_by_hash = {str(t.get('hash') or ''): t for t in torrents if str(t.get('hash') or '')} labels_by_hash = {str(t.get('hash') or ''): _label_names(t.get('label')) for t in torrents} applied: list[dict[str, Any]] = [] if not hashes: return applied @@ -164,40 +219,64 @@ def _apply_effects_bulk(c: Any, profile: dict[str, Any], torrents: list[dict[str 'recheck': bool(eff.get('recheck', eff.get('move_data'))), 'keep_seeding': bool(eff.get('keep_seeding')), } - result = rtorrent.action(profile, hashes, 'move', payload) - applied.append({'type': 'move', 'path': path, 'count': len(hashes), 'target_hashes': hashes, 'move_data': payload['move_data'], 'recheck': payload['recheck'], 'keep_seeding': payload['keep_seeding'], 'result': result}) + job_ids = _enqueue_automation_job(profile, rule, 'move', hashes, payload, torrents_by_hash, user_id, {'effect_type': 'move'}) + applied.append({'type': 'move', 'path': path, 'count': len(hashes), 'target_hashes': hashes, 'move_data': payload['move_data'], 'recheck': payload['recheck'], 'keep_seeding': payload['keep_seeding'], 'job_ids': job_ids}) elif typ == 'add_label': label = str(eff.get('label') or '').strip() if label: - # Note: Add-label automations are idempotent; torrents that already have the label are ignored. - target_hashes = [h for h in hashes if label not in labels_by_hash.get(h, [])] - for h in target_hashes: - labels = labels_by_hash.setdefault(h, []) - labels.append(label); c.call('d.custom1.set', h, _label_value(labels)) + # Note: Add-label automations are idempotent and queue only torrents that need a changed label value. + grouped: dict[str, list[str]] = {} + for h in hashes: + labels = labels_by_hash.get(h, []) + if label in labels: + continue + new_labels = list(labels) + [label] + value = _label_value(new_labels) + labels_by_hash[h] = _label_names(value) + grouped.setdefault(value, []).append(h) + target_hashes = [h for group in grouped.values() for h in group] + job_ids: list[str] = [] + for value, group_hashes in grouped.items(): + job_ids.extend(_enqueue_automation_job(profile, rule, 'set_label', group_hashes, {'label': value}, torrents_by_hash, user_id, {'effect_type': 'add_label', 'label': label})) if target_hashes: - applied.append({'type': 'add_label', 'label': label, 'count': len(target_hashes), 'target_hashes': target_hashes}) + applied.append({'type': 'add_label', 'label': label, 'count': len(target_hashes), 'target_hashes': target_hashes, 'job_ids': job_ids}) elif typ == 'remove_label': label = str(eff.get('label') or '').strip() if label: - # Note: Remove-label automations run only on torrents that actually contain the label. - target_hashes = [h for h in hashes if label in labels_by_hash.get(h, [])] - for h in target_hashes: - labels = [x for x in labels_by_hash.get(h, []) if x != label] - labels_by_hash[h] = labels; c.call('d.custom1.set', h, _label_value(labels)) + # Note: Remove-label automations are queued only for torrents where the requested label exists. + grouped: dict[str, list[str]] = {} + for h in hashes: + labels = labels_by_hash.get(h, []) + if label not in labels: + continue + value = _label_value([x for x in labels if x != label]) + labels_by_hash[h] = _label_names(value) + grouped.setdefault(value, []).append(h) + target_hashes = [h for group in grouped.values() for h in group] + job_ids: list[str] = [] + for value, group_hashes in grouped.items(): + job_ids.extend(_enqueue_automation_job(profile, rule, 'set_label', group_hashes, {'label': value}, torrents_by_hash, user_id, {'effect_type': 'remove_label', 'label': label})) if target_hashes: - applied.append({'type': 'remove_label', 'label': label, 'count': len(target_hashes), 'target_hashes': target_hashes}) + applied.append({'type': 'remove_label', 'label': label, 'count': len(target_hashes), 'target_hashes': target_hashes, 'job_ids': job_ids}) elif typ == 'set_labels': value = _label_value(_label_names(eff.get('labels'))) target_labels = _label_names(value) - # Note: Set-labels skips torrents whose current label list already matches the requested list. + # Note: Set-labels queues a job only if the current labels differ from the requested exact list. target_hashes = [h for h in hashes if labels_by_hash.get(h, []) != target_labels] for h in target_hashes: - labels_by_hash[h] = list(target_labels); c.call('d.custom1.set', h, value) + labels_by_hash[h] = list(target_labels) if target_hashes: - applied.append({'type': 'set_labels', 'labels': value, 'count': len(target_hashes), 'target_hashes': target_hashes}) - elif typ in {'pause', 'stop', 'start', 'resume', 'recheck'}: - result = rtorrent.action(profile, hashes, typ, {}) - applied.append({'type': typ, 'count': len(hashes), 'target_hashes': hashes, 'result': result}) + job_ids = _enqueue_automation_job(profile, rule, 'set_label', target_hashes, {'label': value}, torrents_by_hash, user_id, {'effect_type': 'set_labels', 'labels': value}) + applied.append({'type': 'set_labels', 'labels': value, 'count': len(target_hashes), 'target_hashes': target_hashes, 'job_ids': job_ids}) + elif typ in {'pause', 'stop', 'start', 'resume', 'recheck', 'reannounce'}: + # Note: Runtime actions are queued as jobs too, so automation activity is visible in the Jobs panel. + job_ids = _enqueue_automation_job(profile, rule, typ, hashes, {}, torrents_by_hash, user_id, {'effect_type': typ}) + applied.append({'type': typ, 'count': len(hashes), 'target_hashes': hashes, 'job_ids': job_ids}) + elif typ == 'remove': + # Note: Remove is supported for automation payloads and still goes through ordered worker jobs. + payload = {'remove_data': bool(eff.get('remove_data'))} + job_ids = _enqueue_automation_job(profile, rule, 'remove', hashes, payload, torrents_by_hash, user_id, {'effect_type': 'remove'}) + applied.append({'type': 'remove', 'count': len(hashes), 'target_hashes': hashes, 'remove_data': payload['remove_data'], 'job_ids': job_ids}) return applied @@ -220,7 +299,7 @@ def check(profile: dict | None = None, user_id: int | None = None, force: bool = if not hashes: continue try: - actions = _apply_effects_bulk(c, profile, matched, rule.get('effects') or []) + actions = _apply_effects_bulk(c, profile, matched, rule.get('effects') or [], rule, user_id) except Exception as exc: actions = [{'error': str(exc), 'count': len(hashes), 'target_hashes': hashes}] changed_hashes = sorted({h for a in actions for h in (a.get('target_hashes') or [])}) diff --git a/pytorrent/services/workers.py b/pytorrent/services/workers.py index a9d0835..d587776 100644 --- a/pytorrent/services/workers.py +++ b/pytorrent/services/workers.py @@ -54,25 +54,33 @@ def _job_row(job_id: str): return conn.execute("SELECT rowid AS _rowid, * FROM jobs WHERE id=?", (job_id,)).fetchone() -def _is_ordered_action(action_name: str) -> bool: - return action_name in {"move", "remove"} +def _job_payload(row) -> dict: + try: + return json.loads((row or {}).get("payload_json") or "{}") + except Exception: + return {} + + +def _is_ordered_job(row) -> bool: + payload = _job_payload(row) + # Note: Move/remove remain ordered, and automation-created jobs can opt in so effect order is visible and predictable. + return str((row or {}).get("action") or "") in {"move", "remove"} or bool(payload.get("automation_ordered")) def _has_prior_ordered_jobs(profile_id: int, rowid: int) -> bool: with connect() as conn: - row = conn.execute( + rows = conn.execute( """ - SELECT 1 + SELECT rowid AS _rowid, action, payload_json FROM jobs WHERE profile_id=? AND rowid bool: @@ -140,7 +148,7 @@ def _run(job_id: str): return profile_id = int(profile["id"]) ordered_lock = None - if _is_ordered_action(str(job["action"])): + if _is_ordered_job(job): if not _wait_for_prior_ordered_jobs(job_id, profile_id, int(job["_rowid"])): return ordered_lock = _get_exclusive_lock(profile_id)