228 lines
14 KiB
Python
228 lines
14 KiB
Python
from __future__ import annotations
|
|
from datetime import datetime, timezone
|
|
from typing import Any
|
|
import json
|
|
from ..db import connect, default_user_id, utcnow
|
|
from . import rtorrent
|
|
from .preferences import active_profile
|
|
|
|
|
|
def _loads(value: str | None, default: Any) -> Any:
|
|
try: return json.loads(value or '')
|
|
except Exception: return default
|
|
|
|
|
|
def _ts(value: str | None) -> float:
|
|
if not value: return 0.0
|
|
try: return datetime.fromisoformat(str(value).replace('Z', '+00:00')).timestamp()
|
|
except Exception: return 0.0
|
|
|
|
|
|
def _now_ts() -> float:
|
|
return datetime.now(timezone.utc).timestamp()
|
|
|
|
|
|
def _label_names(value: str | None) -> list[str]:
|
|
seen = []
|
|
for part in str(value or '').replace(';', ',').replace('|', ',').split(','):
|
|
item = part.strip()
|
|
if item and item not in seen: seen.append(item)
|
|
return seen
|
|
|
|
|
|
def _label_value(labels: list[str]) -> str:
|
|
out = []
|
|
for label in labels:
|
|
label = str(label or '').strip()
|
|
if label and label not in out: out.append(label)
|
|
return ', '.join(out)
|
|
|
|
|
|
def _rule_row(row: dict[str, Any]) -> dict[str, Any]:
|
|
item = dict(row)
|
|
item['conditions'] = _loads(item.pop('conditions_json', '[]'), [])
|
|
item['effects'] = _loads(item.pop('effects_json', '[]'), [])
|
|
return item
|
|
|
|
|
|
def list_rules(profile_id: int | None = None, user_id: int | None = None) -> list[dict[str, Any]]:
|
|
user_id = user_id or default_user_id()
|
|
if profile_id is None:
|
|
profile = active_profile(); profile_id = int(profile['id']) if profile else None
|
|
with connect() as conn:
|
|
rows = conn.execute('SELECT * FROM automation_rules WHERE user_id=? AND (profile_id=? OR profile_id IS NULL) ORDER BY enabled DESC, name COLLATE NOCASE', (user_id, profile_id)).fetchall()
|
|
return [_rule_row(r) for r in rows]
|
|
|
|
|
|
def get_rule(rule_id: int, profile_id: int, user_id: int | None = None) -> dict[str, Any]:
|
|
user_id = user_id or default_user_id()
|
|
with connect() as conn:
|
|
row = conn.execute('SELECT * FROM automation_rules WHERE id=? AND user_id=? AND profile_id=?', (rule_id, user_id, profile_id)).fetchone()
|
|
if not row: raise ValueError('Rule not found')
|
|
return _rule_row(row)
|
|
|
|
|
|
def save_rule(profile_id: int, data: dict[str, Any], user_id: int | None = None) -> dict[str, Any]:
|
|
user_id = user_id or default_user_id()
|
|
name = str(data.get('name') or 'Automation rule').strip() or 'Automation rule'
|
|
conditions = data.get('conditions') or []
|
|
effects = data.get('effects') or []
|
|
if not isinstance(conditions, list) or not conditions: raise ValueError('Rule needs at least one condition')
|
|
if not isinstance(effects, list) or not effects: raise ValueError('Rule needs at least one effect')
|
|
cooldown = max(0, int(data.get('cooldown_minutes') or 0))
|
|
enabled = 1 if data.get('enabled', True) else 0
|
|
now = utcnow(); rule_id = int(data.get('id') or 0)
|
|
with connect() as conn:
|
|
if rule_id:
|
|
conn.execute('UPDATE automation_rules SET name=?, enabled=?, conditions_json=?, effects_json=?, cooldown_minutes=?, updated_at=? WHERE id=? AND user_id=? AND profile_id=?', (name, enabled, json.dumps(conditions), json.dumps(effects), cooldown, now, rule_id, user_id, profile_id))
|
|
else:
|
|
cur = conn.execute('INSERT INTO automation_rules(user_id,profile_id,name,enabled,conditions_json,effects_json,cooldown_minutes,created_at,updated_at) VALUES(?,?,?,?,?,?,?,?,?)', (user_id, profile_id, name, enabled, json.dumps(conditions), json.dumps(effects), cooldown, now, now))
|
|
rule_id = int(cur.lastrowid)
|
|
return get_rule(rule_id, profile_id, user_id)
|
|
|
|
|
|
def delete_rule(rule_id: int, profile_id: int, user_id: int | None = None) -> None:
|
|
user_id = user_id or default_user_id()
|
|
with connect() as conn:
|
|
conn.execute('DELETE FROM automation_rules WHERE id=? AND user_id=? AND profile_id=?', (rule_id, user_id, profile_id))
|
|
conn.execute('DELETE FROM automation_rule_state WHERE rule_id=? AND profile_id=?', (rule_id, profile_id))
|
|
|
|
|
|
def list_history(profile_id: int, user_id: int | None = None, limit: int = 50) -> list[dict[str, Any]]:
|
|
user_id = user_id or default_user_id()
|
|
with connect() as conn:
|
|
return conn.execute('SELECT * FROM automation_history WHERE user_id=? AND profile_id=? ORDER BY created_at DESC LIMIT ?', (user_id, profile_id, max(1, min(int(limit or 50), 200)))).fetchall()
|
|
|
|
|
|
def delete_history_item(history_id: int, profile_id: int, user_id: int | None = None) -> int:
|
|
# Note: Allow removing a single automation history card from the UI without touching rules.
|
|
user_id = user_id or default_user_id()
|
|
with connect() as conn:
|
|
cur = conn.execute('DELETE FROM automation_history WHERE id=? AND user_id=? AND profile_id=?', (int(history_id), user_id, profile_id))
|
|
return int(cur.rowcount or 0)
|
|
|
|
|
|
def clear_history(profile_id: int, user_id: int | None = None) -> int:
|
|
# Note: History cleanup is separate from deleting automation rules.
|
|
user_id = user_id or default_user_id()
|
|
with connect() as conn:
|
|
cur = conn.execute('DELETE FROM automation_history WHERE user_id=? AND profile_id=?', (user_id, profile_id))
|
|
return int(cur.rowcount or 0)
|
|
|
|
|
|
def _condition_true(t: dict[str, Any], cond: dict[str, Any]) -> bool:
|
|
typ = str(cond.get('type') or '')
|
|
if typ == 'completed': return bool(int(t.get('complete') or 0))
|
|
if typ == 'no_seeds': return int(t.get('seeds') or 0) <= int(cond.get('seeds') or 0)
|
|
if typ == 'ratio_gte': return float(t.get('ratio') or 0) >= float(cond.get('ratio') or 0)
|
|
if typ == 'label_missing': return str(cond.get('label') or '').strip() not in _label_names(t.get('label'))
|
|
if typ == 'label_has': return str(cond.get('label') or '').strip() in _label_names(t.get('label'))
|
|
if typ == 'status': return str(t.get('status') or '').lower() == str(cond.get('status') or '').lower()
|
|
if typ == 'path_contains': return str(cond.get('text') or '').lower() in str(t.get('path') or '').lower()
|
|
return False
|
|
|
|
|
|
def _conditions_match(conn, rule: dict[str, Any], profile_id: int, t: dict[str, Any]) -> bool:
|
|
h = str(t.get('hash') or '')
|
|
if not h: return False
|
|
immediate_ok = True; delayed_ok = True; now = utcnow(); now_ts = _now_ts()
|
|
for cond in rule.get('conditions') or []:
|
|
raw_ok = _condition_true(t, cond)
|
|
negated = bool(cond.get('negate'))
|
|
ok = (not raw_ok) if negated else raw_ok
|
|
# Note: Conditions can now be negated in automation rules. Timed no-seeds keeps its old delayed behavior only for the positive condition, so old rules do not change.
|
|
if cond.get('type') == 'no_seeds' and int(cond.get('minutes') or 0) > 0 and not negated:
|
|
row = conn.execute('SELECT condition_since_at FROM automation_rule_state WHERE rule_id=? AND profile_id=? AND torrent_hash=?', (rule['id'], profile_id, h)).fetchone()
|
|
if ok:
|
|
since = row['condition_since_at'] if row and row.get('condition_since_at') else now
|
|
conn.execute('INSERT INTO automation_rule_state(rule_id,profile_id,torrent_hash,condition_since_at,last_matched_at,updated_at) VALUES(?,?,?,?,?,?) ON CONFLICT(rule_id,profile_id,torrent_hash) DO UPDATE SET condition_since_at=COALESCE(automation_rule_state.condition_since_at, excluded.condition_since_at), last_matched_at=excluded.last_matched_at, updated_at=excluded.updated_at', (rule['id'], profile_id, h, since, now, now))
|
|
delayed_ok = delayed_ok and (now_ts - _ts(since) >= int(cond.get('minutes') or 0) * 60)
|
|
else:
|
|
conn.execute('UPDATE automation_rule_state SET condition_since_at=NULL, updated_at=? WHERE rule_id=? AND profile_id=? AND torrent_hash=?', (now, rule['id'], profile_id, h)); delayed_ok = False
|
|
else:
|
|
immediate_ok = immediate_ok and ok
|
|
return immediate_ok and delayed_ok
|
|
|
|
|
|
def _cooldown_ok(conn, rule: dict[str, Any], profile_id: int, torrent_hash: str = '*') -> bool:
|
|
# Note: Automation cooldown is rule-wide for batch execution; '*' stores the last run for the whole rule.
|
|
cooldown = int(rule.get('cooldown_minutes') or 0)
|
|
if cooldown <= 0: return True
|
|
row = conn.execute('SELECT last_applied_at FROM automation_rule_state WHERE rule_id=? AND profile_id=? AND torrent_hash=?', (rule['id'], profile_id, torrent_hash)).fetchone()
|
|
if not row or not row.get('last_applied_at'): return True
|
|
return _now_ts() - _ts(row['last_applied_at']) >= cooldown * 60
|
|
|
|
|
|
def _touch_rule_cooldown(conn, rule: dict[str, Any], profile_id: int, now: str) -> None:
|
|
conn.execute('INSERT INTO automation_rule_state(rule_id,profile_id,torrent_hash,last_matched_at,last_applied_at,updated_at) VALUES(?,?,?,?,?,?) ON CONFLICT(rule_id,profile_id,torrent_hash) DO UPDATE SET last_matched_at=excluded.last_matched_at, last_applied_at=excluded.last_applied_at, updated_at=excluded.updated_at', (rule['id'], profile_id, '*', now, now, now))
|
|
|
|
|
|
def _apply_effects_batch(c: Any, profile: dict[str, Any], torrents: list[dict[str, Any]], effects: list[dict[str, Any]]) -> list[dict[str, Any]]:
|
|
# Note: Rules now execute actions once for all matching torrents instead of calling move/check/start one item at a time.
|
|
hashes = [str(t.get('hash') or '') for t in torrents if str(t.get('hash') or '')]
|
|
labels_by_hash = {str(t.get('hash') or ''): _label_names(t.get('label')) for t in torrents}
|
|
applied: list[dict[str, Any]] = []
|
|
if not hashes: return applied
|
|
for eff in effects:
|
|
typ = str(eff.get('type') or '')
|
|
if typ == 'move':
|
|
path = str(eff.get('path') or '').strip() or rtorrent.default_download_path(profile)
|
|
move_payload = {'path': path, 'move_data': bool(eff.get('move_data')), 'recheck': bool(eff.get('recheck', eff.get('move_data'))), 'keep_seeding': bool(eff.get('keep_seeding'))}
|
|
result = rtorrent.move_torrents(profile, hashes, move_payload) if path else None
|
|
if path: applied.append({'type': 'move', 'path': path, 'count': len(hashes), 'move_data': bool(eff.get('move_data')), 'recheck': bool(move_payload['recheck']), 'keep_seeding': bool(eff.get('keep_seeding')), 'result': result})
|
|
elif typ == 'add_label':
|
|
label = str(eff.get('label') or '').strip()
|
|
changed = 0
|
|
if label:
|
|
for h in hashes:
|
|
labels = labels_by_hash.get(h, [])
|
|
if label not in labels:
|
|
labels.append(label); labels_by_hash[h] = labels; c.call('d.custom1.set', h, _label_value(labels)); changed += 1
|
|
applied.append({'type': 'add_label', 'label': label, 'count': changed})
|
|
elif typ == 'remove_label':
|
|
label = str(eff.get('label') or '').strip(); changed = 0
|
|
if label:
|
|
for h in hashes:
|
|
labels = labels_by_hash.get(h, [])
|
|
new_labels = [x for x in labels if x != label]
|
|
if new_labels != labels:
|
|
labels_by_hash[h] = new_labels; c.call('d.custom1.set', h, _label_value(new_labels)); changed += 1
|
|
applied.append({'type': 'remove_label', 'label': label, 'count': changed})
|
|
elif typ == 'set_labels':
|
|
value = _label_value(_label_names(eff.get('labels'))); new_labels = _label_names(value)
|
|
for h in hashes:
|
|
labels_by_hash[h] = list(new_labels); c.call('d.custom1.set', h, value)
|
|
applied.append({'type': 'set_labels', 'labels': value, 'count': len(hashes)})
|
|
elif typ in {'pause', 'stop', 'start', 'resume', 'recheck'}:
|
|
method = {'pause':'d.pause','stop':'d.stop','start':'d.start','resume':'d.resume','recheck':'d.check_hash'}[typ]
|
|
for h in hashes: c.call(method, h)
|
|
applied.append({'type': typ, 'count': len(hashes)})
|
|
return applied
|
|
|
|
|
|
def check(profile: dict | None = None, user_id: int | None = None, force: bool = False) -> dict[str, Any]:
|
|
profile = profile or active_profile()
|
|
if not profile: return {'ok': False, 'error': 'No active rTorrent profile'}
|
|
user_id = user_id or default_user_id(); profile_id = int(profile['id'])
|
|
rules = [r for r in list_rules(profile_id, user_id) if force or int(r.get('enabled') or 0)]
|
|
if not rules: return {'ok': True, 'checked': 0, 'applied': [], 'rules': 0}
|
|
torrents = rtorrent.list_torrents(profile); c = rtorrent.client_for(profile); applied = []; now = utcnow()
|
|
with connect() as conn:
|
|
for rule in rules:
|
|
matched = [t for t in torrents if _conditions_match(conn, rule, profile_id, t)]
|
|
if not matched: continue
|
|
if not force and not _cooldown_ok(conn, rule, profile_id, '*'): continue
|
|
hashes = [str(t.get('hash') or '') for t in matched if str(t.get('hash') or '')]
|
|
names = [str(t.get('name') or '') for t in matched]
|
|
try: actions = _apply_effects_batch(c, profile, matched, rule.get('effects') or [])
|
|
except Exception as exc: actions = [{'error': str(exc), 'count': len(hashes)}]
|
|
_touch_rule_cooldown(conn, rule, profile_id, now)
|
|
for h in hashes:
|
|
conn.execute('INSERT INTO automation_rule_state(rule_id,profile_id,torrent_hash,last_matched_at,last_applied_at,updated_at) VALUES(?,?,?,?,?,?) ON CONFLICT(rule_id,profile_id,torrent_hash) DO UPDATE SET last_matched_at=excluded.last_matched_at, last_applied_at=excluded.last_applied_at, updated_at=excluded.updated_at', (rule['id'], profile_id, h, now, now, now))
|
|
history_payload = {'mode': 'batch', 'count': len(hashes), 'hashes': hashes, 'names': names[:50], 'actions': actions}
|
|
torrent_name = names[0] if len(names) == 1 else f'{len(hashes)} torrents'
|
|
torrent_hash = hashes[0] if len(hashes) == 1 else ','.join(hashes[:20])
|
|
conn.execute('INSERT INTO automation_history(user_id,profile_id,rule_id,torrent_hash,torrent_name,rule_name,actions_json,created_at) VALUES(?,?,?,?,?,?,?,?)', (user_id, profile_id, rule['id'], torrent_hash, torrent_name, str(rule.get('name') or ''), json.dumps(history_payload), now))
|
|
applied.append({'rule_id': rule['id'], 'rule_name': rule.get('name'), 'count': len(hashes), 'hashes': hashes, 'names': names[:20], 'actions': actions})
|
|
return {'ok': True, 'checked': len(torrents), 'rules': len(rules), 'applied': applied}
|