from __future__ import annotations from datetime import datetime, timezone from typing import Any import json from ..db import connect, default_user_id, utcnow from . import rtorrent from .preferences import active_profile from .workers import enqueue AUTOMATION_JOB_CHUNK_SIZE = 100 def _loads(value: str | None, default: Any) -> Any: try: return json.loads(value or '') except Exception: return default def _ts(value: str | None) -> float: if not value: return 0.0 try: return datetime.fromisoformat(str(value).replace('Z', '+00:00')).timestamp() except Exception: return 0.0 def _now_ts() -> float: return datetime.now(timezone.utc).timestamp() def _label_names(value: str | None) -> list[str]: seen = [] for part in str(value or '').replace(';', ',').replace('|', ',').split(','): item = part.strip() if item and item not in seen: seen.append(item) return seen def _label_value(labels: list[str]) -> str: out = [] for label in labels: label = str(label or '').strip() if label and label not in out: out.append(label) return ', '.join(out) def _rule_row(row: dict[str, Any]) -> dict[str, Any]: item = dict(row) item['conditions'] = _loads(item.pop('conditions_json', '[]'), []) item['effects'] = _loads(item.pop('effects_json', '[]'), []) return item def list_rules(profile_id: int | None = None, user_id: int | None = None) -> list[dict[str, Any]]: user_id = user_id or default_user_id() if profile_id is None: profile = active_profile(); profile_id = int(profile['id']) if profile else None with connect() as conn: rows = conn.execute('SELECT * FROM automation_rules WHERE user_id=? AND (profile_id=? OR profile_id IS NULL) ORDER BY enabled DESC, name COLLATE NOCASE', (user_id, profile_id)).fetchall() return [_rule_row(r) for r in rows] def get_rule(rule_id: int, profile_id: int, user_id: int | None = None) -> dict[str, Any]: user_id = user_id or default_user_id() with connect() as conn: row = conn.execute('SELECT * FROM automation_rules WHERE id=? AND user_id=? AND profile_id=?', (rule_id, user_id, profile_id)).fetchone() if not row: raise ValueError('Rule not found') return _rule_row(row) def save_rule(profile_id: int, data: dict[str, Any], user_id: int | None = None) -> dict[str, Any]: user_id = user_id or default_user_id() name = str(data.get('name') or 'Automation rule').strip() or 'Automation rule' conditions = data.get('conditions') or [] effects = data.get('effects') or [] if not isinstance(conditions, list) or not conditions: raise ValueError('Rule needs at least one condition') if not isinstance(effects, list) or not effects: raise ValueError('Rule needs at least one effect') cooldown = max(0, int(data.get('cooldown_minutes') or 0)) enabled = 1 if data.get('enabled', True) else 0 now = utcnow(); rule_id = int(data.get('id') or 0) with connect() as conn: if rule_id: conn.execute('UPDATE automation_rules SET name=?, enabled=?, conditions_json=?, effects_json=?, cooldown_minutes=?, updated_at=? WHERE id=? AND user_id=? AND profile_id=?', (name, enabled, json.dumps(conditions), json.dumps(effects), cooldown, now, rule_id, user_id, profile_id)) else: cur = conn.execute('INSERT INTO automation_rules(user_id,profile_id,name,enabled,conditions_json,effects_json,cooldown_minutes,created_at,updated_at) VALUES(?,?,?,?,?,?,?,?,?)', (user_id, profile_id, name, enabled, json.dumps(conditions), json.dumps(effects), cooldown, now, now)) rule_id = int(cur.lastrowid) return get_rule(rule_id, profile_id, user_id) def delete_rule(rule_id: int, profile_id: int, user_id: int | None = None) -> None: user_id = user_id or default_user_id() with connect() as conn: conn.execute('DELETE FROM automation_rules WHERE id=? AND user_id=? AND profile_id=?', (rule_id, user_id, profile_id)) conn.execute('DELETE FROM automation_rule_state WHERE rule_id=? AND profile_id=?', (rule_id, profile_id)) def list_history(profile_id: int, user_id: int | None = None, limit: int = 30) -> list[dict[str, Any]]: user_id = user_id or default_user_id() with connect() as conn: return conn.execute('SELECT * FROM automation_history WHERE user_id=? AND profile_id=? ORDER BY created_at DESC LIMIT ?', (user_id, profile_id, max(1, min(int(limit or 30), 100)))).fetchall() def clear_history(profile_id: int, user_id: int | None = None) -> int: user_id = user_id or default_user_id() with connect() as conn: # Note: Manual automation log cleanup is scoped to the active profile and current user. cur = conn.execute('DELETE FROM automation_history WHERE user_id=? AND profile_id=?', (user_id, profile_id)) return int(cur.rowcount or 0) def _condition_true(t: dict[str, Any], cond: dict[str, Any]) -> bool: typ = str(cond.get('type') or '') if typ == 'completed': return bool(int(t.get('complete') or 0)) if typ == 'no_seeds': return int(t.get('seeds') or 0) <= int(cond.get('seeds') or 0) if typ == 'ratio_gte': return float(t.get('ratio') or 0) >= float(cond.get('ratio') or 0) if typ == 'label_missing': return str(cond.get('label') or '').strip() not in _label_names(t.get('label')) if typ == 'label_has': return str(cond.get('label') or '').strip() in _label_names(t.get('label')) if typ == 'status': return str(t.get('status') or '').lower() == str(cond.get('status') or '').lower() if typ == 'path_contains': return str(cond.get('text') or '').lower() in str(t.get('path') or '').lower() return False def _conditions_match(conn, rule: dict[str, Any], profile_id: int, t: dict[str, Any]) -> bool: h = str(t.get('hash') or '') if not h: return False immediate_ok = True; delayed_ok = True; now = utcnow(); now_ts = _now_ts() for cond in rule.get('conditions') or []: raw_ok = _condition_true(t, cond) negated = bool(cond.get('negate')) # Note: Negation is applied in the backend, so UI and API only store the condition flag. ok = (not raw_ok) if negated else raw_ok if cond.get('type') == 'no_seeds' and int(cond.get('minutes') or 0) > 0 and not negated: row = conn.execute('SELECT condition_since_at FROM automation_rule_state WHERE rule_id=? AND profile_id=? AND torrent_hash=?', (rule['id'], profile_id, h)).fetchone() if ok: since = row['condition_since_at'] if row and row.get('condition_since_at') else now conn.execute('INSERT INTO automation_rule_state(rule_id,profile_id,torrent_hash,condition_since_at,last_matched_at,updated_at) VALUES(?,?,?,?,?,?) ON CONFLICT(rule_id,profile_id,torrent_hash) DO UPDATE SET condition_since_at=COALESCE(automation_rule_state.condition_since_at, excluded.condition_since_at), last_matched_at=excluded.last_matched_at, updated_at=excluded.updated_at', (rule['id'], profile_id, h, since, now, now)) delayed_ok = delayed_ok and (now_ts - _ts(since) >= int(cond.get('minutes') or 0) * 60) else: conn.execute('UPDATE automation_rule_state SET condition_since_at=NULL, updated_at=? WHERE rule_id=? AND profile_id=? AND torrent_hash=?', (now, rule['id'], profile_id, h)); delayed_ok = False else: immediate_ok = immediate_ok and ok return immediate_ok and delayed_ok def _cooldown_ok(conn, rule: dict[str, Any], profile_id: int, torrent_hash: str = '__rule__') -> bool: cooldown = int(rule.get('cooldown_minutes') or 0) if cooldown <= 0: return True row = conn.execute('SELECT last_applied_at FROM automation_rule_state WHERE rule_id=? AND profile_id=? AND torrent_hash=?', (rule['id'], profile_id, torrent_hash)).fetchone() if not row or not row.get('last_applied_at'): return True return _now_ts() - _ts(row['last_applied_at']) >= cooldown * 60 def _mark_rule_cooldown(conn, rule: dict[str, Any], profile_id: int, now: str) -> None: # Note: Cooldown is rule-level, so one batch execution blocks the whole automation until the cooldown expires. conn.execute('INSERT INTO automation_rule_state(rule_id,profile_id,torrent_hash,last_applied_at,updated_at) VALUES(?,?,?,?,?) ON CONFLICT(rule_id,profile_id,torrent_hash) DO UPDATE SET last_applied_at=excluded.last_applied_at, updated_at=excluded.updated_at', (rule['id'], profile_id, '__rule__', now, now)) def _chunk_hashes(hashes: list[str], size: int = AUTOMATION_JOB_CHUNK_SIZE) -> list[list[str]]: # Note: Automation jobs use the same small-batch idea as manual bulk jobs, so long move/remove/actions remain visible and recoverable. safe_size = max(1, int(size or AUTOMATION_JOB_CHUNK_SIZE)) return [hashes[index:index + safe_size] for index in range(0, len(hashes), safe_size)] def _job_context(rule: dict[str, Any], eff_type: str, hashes: list[str], torrents_by_hash: dict[str, dict[str, Any]], extra: dict[str, Any] | None = None) -> dict[str, Any]: # Note: Job context marks jobs created by automations, making the Jobs log explain what rule queued the work. ctx = { 'source': 'automation', 'rule_id': rule.get('id'), 'rule_name': str(rule.get('name') or ''), 'effect': eff_type, 'bulk': len(hashes) > 1, 'hash_count': len(hashes), 'requested_at': utcnow(), 'items': [ { 'hash': h, 'name': str((torrents_by_hash.get(h) or {}).get('name') or ''), 'path': str((torrents_by_hash.get(h) or {}).get('path') or ''), } for h in hashes ], } if extra: ctx.update(extra) return ctx def _enqueue_automation_job(profile: dict[str, Any], rule: dict[str, Any], action_name: str, hashes: list[str], payload: dict[str, Any], torrents_by_hash: dict[str, dict[str, Any]], user_id: int | None = None, context_extra: dict[str, Any] | None = None) -> list[str]: # Note: Every automation side effect is queued as a normal job instead of running inline, so it appears in Jobs and uses worker retries/ordering. job_ids: list[str] = [] chunks = _chunk_hashes(hashes) for index, chunk in enumerate(chunks, start=1): part_payload = dict(payload or {}) part_payload['hashes'] = chunk part_payload['automation_ordered'] = True extra = dict(context_extra or {}) if len(chunks) > 1: extra.update({'bulk_label': f'automation-{index}', 'bulk_part': index, 'bulk_parts': len(chunks), 'parent_hash_count': len(hashes)}) if action_name == 'move': extra.update({'target_path': str(part_payload.get('path') or ''), 'move_data': bool(part_payload.get('move_data'))}) if action_name == 'remove': extra.update({'remove_data': bool(part_payload.get('remove_data'))}) part_payload['job_context'] = _job_context(rule, str(context_extra.get('effect_type') if context_extra else action_name), chunk, torrents_by_hash, extra) job_ids.append(enqueue(action_name, int(profile['id']), part_payload, user_id=user_id)) return job_ids def _apply_effects_bulk(c: Any, profile: dict[str, Any], torrents: list[dict[str, Any]], effects: list[dict[str, Any]], rule: dict[str, Any], user_id: int | None = None) -> list[dict[str, Any]]: hashes = [str(t.get('hash') or '') for t in torrents if str(t.get('hash') or '')] torrents_by_hash = {str(t.get('hash') or ''): t for t in torrents if str(t.get('hash') or '')} labels_by_hash = {str(t.get('hash') or ''): _label_names(t.get('label')) for t in torrents} applied: list[dict[str, Any]] = [] if not hashes: return applied for eff in effects: typ = str(eff.get('type') or '') if typ == 'move': path = str(eff.get('path') or '').strip() or rtorrent.default_download_path(profile) payload = { 'path': path, 'move_data': bool(eff.get('move_data')), 'recheck': bool(eff.get('recheck', eff.get('move_data'))), 'keep_seeding': bool(eff.get('keep_seeding')), } job_ids = _enqueue_automation_job(profile, rule, 'move', hashes, payload, torrents_by_hash, user_id, {'effect_type': 'move'}) applied.append({'type': 'move', 'path': path, 'count': len(hashes), 'target_hashes': hashes, 'move_data': payload['move_data'], 'recheck': payload['recheck'], 'keep_seeding': payload['keep_seeding'], 'job_ids': job_ids}) elif typ == 'add_label': label = str(eff.get('label') or '').strip() if label: # Note: Add-label automations are idempotent and queue only torrents that need a changed label value. grouped: dict[str, list[str]] = {} for h in hashes: labels = labels_by_hash.get(h, []) if label in labels: continue new_labels = list(labels) + [label] value = _label_value(new_labels) labels_by_hash[h] = _label_names(value) grouped.setdefault(value, []).append(h) target_hashes = [h for group in grouped.values() for h in group] job_ids: list[str] = [] for value, group_hashes in grouped.items(): job_ids.extend(_enqueue_automation_job(profile, rule, 'set_label', group_hashes, {'label': value}, torrents_by_hash, user_id, {'effect_type': 'add_label', 'label': label})) if target_hashes: applied.append({'type': 'add_label', 'label': label, 'count': len(target_hashes), 'target_hashes': target_hashes, 'job_ids': job_ids}) elif typ == 'remove_label': label = str(eff.get('label') or '').strip() if label: # Note: Remove-label automations are queued only for torrents where the requested label exists. grouped: dict[str, list[str]] = {} for h in hashes: labels = labels_by_hash.get(h, []) if label not in labels: continue value = _label_value([x for x in labels if x != label]) labels_by_hash[h] = _label_names(value) grouped.setdefault(value, []).append(h) target_hashes = [h for group in grouped.values() for h in group] job_ids: list[str] = [] for value, group_hashes in grouped.items(): job_ids.extend(_enqueue_automation_job(profile, rule, 'set_label', group_hashes, {'label': value}, torrents_by_hash, user_id, {'effect_type': 'remove_label', 'label': label})) if target_hashes: applied.append({'type': 'remove_label', 'label': label, 'count': len(target_hashes), 'target_hashes': target_hashes, 'job_ids': job_ids}) elif typ == 'set_labels': value = _label_value(_label_names(eff.get('labels'))) target_labels = _label_names(value) # Note: Set-labels queues a job only if the current labels differ from the requested exact list. target_hashes = [h for h in hashes if labels_by_hash.get(h, []) != target_labels] for h in target_hashes: labels_by_hash[h] = list(target_labels) if target_hashes: job_ids = _enqueue_automation_job(profile, rule, 'set_label', target_hashes, {'label': value}, torrents_by_hash, user_id, {'effect_type': 'set_labels', 'labels': value}) applied.append({'type': 'set_labels', 'labels': value, 'count': len(target_hashes), 'target_hashes': target_hashes, 'job_ids': job_ids}) elif typ in {'pause', 'stop', 'start', 'resume', 'recheck', 'reannounce'}: # Note: Runtime actions are queued as jobs too, so automation activity is visible in the Jobs panel. job_ids = _enqueue_automation_job(profile, rule, typ, hashes, {}, torrents_by_hash, user_id, {'effect_type': typ}) applied.append({'type': typ, 'count': len(hashes), 'target_hashes': hashes, 'job_ids': job_ids}) elif typ == 'remove': # Note: Remove is supported for automation payloads and still goes through ordered worker jobs. payload = {'remove_data': bool(eff.get('remove_data'))} job_ids = _enqueue_automation_job(profile, rule, 'remove', hashes, payload, torrents_by_hash, user_id, {'effect_type': 'remove'}) applied.append({'type': 'remove', 'count': len(hashes), 'target_hashes': hashes, 'remove_data': payload['remove_data'], 'job_ids': job_ids}) return applied def check(profile: dict | None = None, user_id: int | None = None, force: bool = False) -> dict[str, Any]: profile = profile or active_profile() if not profile: return {'ok': False, 'error': 'No active rTorrent profile'} user_id = user_id or default_user_id(); profile_id = int(profile['id']) rules = [r for r in list_rules(profile_id, user_id) if force or int(r.get('enabled') or 0)] if not rules: return {'ok': True, 'checked': 0, 'applied': [], 'batches': [], 'rules': 0} torrents = rtorrent.list_torrents(profile); c = rtorrent.client_for(profile); applied = []; batches = []; now = utcnow() with connect() as conn: for rule in rules: # Note: Automations now execute as one batch per rule, not as one independent action per torrent. if not force and not _cooldown_ok(conn, rule, profile_id): continue matched = [t for t in torrents if _conditions_match(conn, rule, profile_id, t)] if not matched: continue hashes = [str(t.get('hash') or '') for t in matched if str(t.get('hash') or '')] if not hashes: continue try: actions = _apply_effects_bulk(c, profile, matched, rule.get('effects') or [], rule, user_id) except Exception as exc: actions = [{'error': str(exc), 'count': len(hashes), 'target_hashes': hashes}] changed_hashes = sorted({h for a in actions for h in (a.get('target_hashes') or [])}) if not actions or not changed_hashes: # Note: Matching torrents with no real action are not logged and do not restart the cooldown. continue matched_by_hash = {str(t.get('hash') or ''): t for t in matched} for h in changed_hashes: t = matched_by_hash.get(h, {}) conn.execute('INSERT INTO automation_rule_state(rule_id,profile_id,torrent_hash,last_matched_at,last_applied_at,updated_at) VALUES(?,?,?,?,?,?) ON CONFLICT(rule_id,profile_id,torrent_hash) DO UPDATE SET last_matched_at=excluded.last_matched_at, last_applied_at=excluded.last_applied_at, updated_at=excluded.updated_at', (rule['id'], profile_id, h, now, now, now)) applied.append({'rule_id': rule['id'], 'rule_name': rule.get('name'), 'hash': h, 'name': t.get('name'), 'actions': [{'type': a.get('type', 'error'), 'count': a.get('count', len(changed_hashes))} for a in actions]}) _mark_rule_cooldown(conn, rule, profile_id, now) torrent_name = str(matched_by_hash.get(changed_hashes[0], {}).get('name') or '') if len(changed_hashes) == 1 else f'{len(changed_hashes)} torrents' torrent_hash = changed_hashes[0] if len(changed_hashes) == 1 else f'batch:{rule["id"]}:{now}' history_actions = [{k: v for k, v in a.items() if k != 'target_hashes'} for a in actions] conn.execute('INSERT INTO automation_history(user_id,profile_id,rule_id,torrent_hash,torrent_name,rule_name,actions_json,created_at) VALUES(?,?,?,?,?,?,?,?)', (user_id, profile_id, rule['id'], torrent_hash, torrent_name, str(rule.get('name') or ''), json.dumps(history_actions), now)) batches.append({'rule_id': rule['id'], 'rule_name': rule.get('name'), 'count': len(changed_hashes), 'actions': history_actions}) return {'ok': True, 'checked': len(torrents), 'rules': len(rules), 'applied': applied, 'batches': batches}