move to anther profile
This commit is contained in:
@@ -5,7 +5,7 @@ import json
|
||||
import threading
|
||||
from ..db import connect, default_user_id, utcnow
|
||||
from . import rtorrent, auth
|
||||
from .preferences import active_profile
|
||||
from .preferences import active_profile, get_profile, get_disk_monitor_preferences
|
||||
from .workers import enqueue
|
||||
|
||||
AUTOMATION_JOB_CHUNK_SIZE = 100
|
||||
@@ -369,6 +369,8 @@ def _enqueue_automation_job(profile: dict[str, Any], rule: dict[str, Any], actio
|
||||
extra.update({'bulk_label': f'automation-{index}', 'bulk_part': index, 'bulk_parts': len(chunks), 'parent_hash_count': len(hashes)})
|
||||
if action_name == 'move':
|
||||
extra.update({'target_path': str(part_payload.get('path') or ''), 'move_data': bool(part_payload.get('move_data'))})
|
||||
if action_name == 'profile_transfer':
|
||||
extra.update({'target_profile_id': int(part_payload.get('target_profile_id') or 0), 'target_path': str(part_payload.get('target_path') or ''), 'move_data': bool(part_payload.get('move_data')), 'post_action': str(part_payload.get('post_action') or 'none')})
|
||||
if action_name == 'remove':
|
||||
extra.update({'remove_data': bool(part_payload.get('remove_data'))})
|
||||
effect_type = str(context_extra.get('effect_type') if context_extra else action_name)
|
||||
@@ -377,6 +379,78 @@ def _enqueue_automation_job(profile: dict[str, Any], rule: dict[str, Any], actio
|
||||
return job_ids
|
||||
|
||||
|
||||
|
||||
|
||||
def _safe_remote_path(value: str) -> str:
|
||||
path = str(value or '').strip().replace('\\', '/')
|
||||
while '//' in path:
|
||||
path = path.replace('//', '/')
|
||||
if path.endswith('/') and path != '/':
|
||||
path = path.rstrip('/')
|
||||
return path
|
||||
|
||||
def _path_inside_root(path: str, root: str) -> bool:
|
||||
path = _safe_remote_path(path)
|
||||
root = _safe_remote_path(root)
|
||||
return bool(path and root and (path == root or path.startswith(root.rstrip('/') + '/')))
|
||||
|
||||
def _automation_profile_transfer_payload(profile: dict[str, Any], eff: dict[str, Any], user_id: int) -> dict[str, Any]:
|
||||
# Note: Automation profile transfers reuse server-side permission checks; UI values are not trusted.
|
||||
source_id = int(profile.get('id') or 0)
|
||||
if not auth.can_write_profile(source_id, user_id):
|
||||
raise ValueError('Rule owner has no write access to source profile')
|
||||
target_id = int(eff.get('target_profile_id') or 0)
|
||||
if not target_id or target_id == source_id:
|
||||
raise ValueError('Automation target profile is invalid')
|
||||
if not auth.can_write_profile(target_id, user_id):
|
||||
raise ValueError('Rule owner has no write access to target profile')
|
||||
target_profile = get_profile(target_id, user_id)
|
||||
if not target_profile:
|
||||
raise ValueError('Automation target profile does not exist')
|
||||
default_path = _safe_remote_path(rtorrent.default_download_path(target_profile))
|
||||
target_path = _safe_remote_path(str(eff.get('target_path') or eff.get('path') or default_path))
|
||||
roots = [default_path]
|
||||
try:
|
||||
prefs = get_disk_monitor_preferences(target_id, user_id=user_id)
|
||||
for item in json.loads((prefs or {}).get('disk_monitor_paths_json') or '[]'):
|
||||
clean = _safe_remote_path(str(item or ''))
|
||||
if clean and clean not in roots:
|
||||
roots.append(clean)
|
||||
selected = _safe_remote_path(str((prefs or {}).get('disk_monitor_selected_path') or ''))
|
||||
if selected and selected not in roots:
|
||||
roots.append(selected)
|
||||
except Exception:
|
||||
pass
|
||||
target_roots = [r for r in roots if r]
|
||||
if not any(_path_inside_root(target_path, root) for root in target_roots):
|
||||
target_path = default_path
|
||||
requested_move_data = bool(eff.get('move_data'))
|
||||
move_data = False
|
||||
downgrade_reason = ''
|
||||
if requested_move_data:
|
||||
check = rtorrent.remote_can_write_directory(profile, target_path)
|
||||
move_data = bool(check.get('ok'))
|
||||
if not move_data:
|
||||
downgrade_reason = str(check.get('message') or check.get('error') or 'target path is not writable by source rTorrent user')
|
||||
post_action = str(eff.get('post_action') or 'none').strip().lower()
|
||||
if post_action not in {'none', 'start', 'stop', 'pause', 'check', 'recheck'}:
|
||||
post_action = 'none'
|
||||
label_mode = str(eff.get('label_mode') or 'none').strip().lower()
|
||||
if label_mode not in {'none', 'custom', 'moved_from', 'moved_to'}:
|
||||
label_mode = 'none'
|
||||
return {
|
||||
'target_profile_id': target_id,
|
||||
'target_path': target_path,
|
||||
'path': target_path,
|
||||
'move_data': move_data,
|
||||
'move_data_requested': requested_move_data,
|
||||
'move_data_downgraded': bool(requested_move_data and not move_data),
|
||||
'move_data_downgrade_reason': downgrade_reason,
|
||||
'post_action': post_action,
|
||||
'label_mode': label_mode,
|
||||
'label_value': str(eff.get('label_value') or '').strip(),
|
||||
}
|
||||
|
||||
def _apply_effects_bulk(c: Any, profile: dict[str, Any], torrents: list[dict[str, Any]], effects: list[dict[str, Any]], rule: dict[str, Any], user_id: int | None = None) -> list[dict[str, Any]]:
|
||||
hashes = [str(t.get('hash') or '') for t in torrents if str(t.get('hash') or '')]
|
||||
torrents_by_hash = {str(t.get('hash') or ''): t for t in torrents if str(t.get('hash') or '')}
|
||||
@@ -395,6 +469,11 @@ def _apply_effects_bulk(c: Any, profile: dict[str, Any], torrents: list[dict[str
|
||||
}
|
||||
job_ids = _enqueue_automation_job(profile, rule, 'move', hashes, payload, torrents_by_hash, user_id, {'effect_type': 'move'})
|
||||
applied.append({'type': 'move', 'path': path, 'count': len(hashes), 'target_hashes': hashes, 'move_data': payload['move_data'], 'recheck': payload['recheck'], 'keep_seeding': payload['keep_seeding'], 'job_ids': job_ids})
|
||||
elif typ == 'profile_transfer':
|
||||
owner_id = int(user_id or rule.get('user_id') or rule.get('owner_user_id') or default_user_id())
|
||||
payload = _automation_profile_transfer_payload(profile, eff, owner_id)
|
||||
job_ids = _enqueue_automation_job(profile, rule, 'profile_transfer', hashes, payload, torrents_by_hash, owner_id, {'effect_type': 'profile_transfer'})
|
||||
applied.append({'type': 'profile_transfer', 'target_profile_id': payload['target_profile_id'], 'target_path': payload['target_path'], 'count': len(hashes), 'target_hashes': hashes, 'move_data': payload['move_data'], 'move_data_requested': payload['move_data_requested'], 'move_data_downgraded': payload['move_data_downgraded'], 'post_action': payload['post_action'], 'label_mode': payload['label_mode'], 'label': payload['label_value'], 'job_ids': job_ids})
|
||||
elif typ == 'add_label':
|
||||
label = str(eff.get('label') or '').strip()
|
||||
if label:
|
||||
|
||||
@@ -80,7 +80,7 @@ def _details_summary(details: dict) -> str:
|
||||
priority = [
|
||||
"status", "job_id", "attempt", "attempts", "count", "hash_count", "action",
|
||||
"source", "source_label", "directory", "label", "target_path", "remove_data",
|
||||
"move_data", "keep_seeding", "error", "error_count", "result_count",
|
||||
"move_data", "target_profile_id", "move_data_downgraded", "keep_seeding", "error", "error_count", "result_count",
|
||||
]
|
||||
parts: list[str] = []
|
||||
for key in priority:
|
||||
@@ -315,6 +315,7 @@ def _job_action_label(action: str) -> str:
|
||||
"set_ratio_group": "Set ratio group",
|
||||
"set_limits": "Set speed limits",
|
||||
"smart_queue_check": "Smart Queue check",
|
||||
"profile_transfer": "Move to another profile",
|
||||
}
|
||||
return labels.get(str(action or ""), str(action or "job"))
|
||||
|
||||
@@ -354,6 +355,8 @@ def record_job_event(profile_id: int, action: str, status: str, payload: dict |
|
||||
"target_path": ctx.get("target_path") or payload.get("path"),
|
||||
"remove_data": ctx.get("remove_data") or payload.get("remove_data"),
|
||||
"move_data": ctx.get("move_data") or payload.get("move_data"),
|
||||
"target_profile_id": ctx.get("target_profile_id") or payload.get("target_profile_id"),
|
||||
"move_data_downgraded": ctx.get("move_data_downgraded") or payload.get("move_data_downgraded"),
|
||||
"keep_seeding": payload.get("keep_seeding"),
|
||||
"hash_count": len(hashes),
|
||||
"error": error,
|
||||
|
||||
@@ -345,6 +345,30 @@ def _run_remote_rm(c: ScgiRtorrentClient, path: str, poll_interval: float = 2.0)
|
||||
raise RuntimeError(output)
|
||||
|
||||
|
||||
|
||||
def remote_can_write_directory(profile: dict, path: str) -> dict:
|
||||
"""Return whether the source rTorrent OS user can write to a remote directory safely."""
|
||||
clean = _remote_clean_path(path)
|
||||
# Note: Profile transfers may touch filesystem paths, so only absolute non-root directories are probed.
|
||||
if not clean.startswith("/") or clean in {"/", "."}:
|
||||
return {"ok": False, "path": clean, "error": "unsafe destination path"}
|
||||
script = (
|
||||
'p=$1; '
|
||||
'case "$p" in /*) ;; *) echo "NO\tunsafe path"; exit 0;; esac; '
|
||||
'if [ -d "$p" ]; then '
|
||||
' if [ -w "$p" ]; then echo "OK\tdirectory writable"; else echo "NO\tdirectory not writable"; fi; '
|
||||
' exit 0; '
|
||||
'fi; '
|
||||
'parent=${p%/*}; [ -n "$parent" ] || parent=/; '
|
||||
'if [ -d "$parent" ] && [ -w "$parent" ]; then echo "OK\tparent writable"; else echo "NO\tparent not writable"; fi'
|
||||
)
|
||||
try:
|
||||
output = str(_rt_execute(client_for(profile), "execute.capture", "sh", "-c", script, "pytorrent-transfer-write-check", clean) or "").strip()
|
||||
except Exception as exc:
|
||||
return {"ok": False, "path": clean, "error": str(exc)}
|
||||
ok = output.startswith("OK")
|
||||
return {"ok": ok, "path": clean, "message": output.split("\t", 1)[1] if "\t" in output else output}
|
||||
|
||||
def _remove_torrent_data(c: ScgiRtorrentClient, torrent_hash: str) -> dict:
|
||||
data_path = _safe_rm_rf_path(_torrent_data_path(c, torrent_hash))
|
||||
try:
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
from __future__ import annotations
|
||||
import time
|
||||
from .client import *
|
||||
from .files import set_file_priorities
|
||||
from .files import export_torrent_file, iter_remote_file_chunks, set_file_priorities
|
||||
from .system import disk_usage_for_default_path
|
||||
|
||||
XMLRPC_DEFAULT_SIZE_LIMIT_BYTES = 512 * 1024
|
||||
@@ -804,6 +804,140 @@ def start_or_resume_hash(c: ScgiRtorrentClient, torrent_hash: str, prefer_start:
|
||||
result['ok'] = result.get('ok', True)
|
||||
return result
|
||||
|
||||
|
||||
def _read_exported_torrent_bytes(profile: dict, torrent_hash: str) -> tuple[bytes, dict]:
|
||||
item = export_torrent_file(profile, torrent_hash)
|
||||
if item.get("local"):
|
||||
return LocalPath(str(item.get("path") or "")).read_bytes(), item
|
||||
data = b"".join(bytes(chunk) for chunk in iter_remote_file_chunks(profile, str(item.get("path") or "")) if chunk)
|
||||
if not data:
|
||||
raise RuntimeError(f"Cannot read exported torrent file for {torrent_hash}")
|
||||
return data, item
|
||||
|
||||
|
||||
def _move_profile_transfer_data(source_client: ScgiRtorrentClient, torrent_hash: str, target_path: str) -> dict:
|
||||
"""Move one torrent data path for a profile transfer after backend permission checks."""
|
||||
src = _remote_clean_path(_torrent_data_path(source_client, torrent_hash))
|
||||
if not src:
|
||||
raise ValueError(f"Cannot determine source path for {torrent_hash}")
|
||||
dst = _remote_join(target_path, posixpath.basename(src.rstrip("/")))
|
||||
try:
|
||||
source_client.call("d.stop", torrent_hash)
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
source_client.call("d.close", torrent_hash)
|
||||
except Exception:
|
||||
pass
|
||||
if src == dst:
|
||||
return {"skipped_data_move": "source and destination are the same"}
|
||||
_run_remote_move(source_client, src, dst)
|
||||
return {"moved_from": src, "moved_to": dst}
|
||||
|
||||
|
||||
def transfer_profile(source_profile: dict, target_profile: dict, torrent_hashes: list[str], payload: dict | None = None, checkpoint=None, resume_state: dict | None = None) -> dict:
|
||||
"""Move torrent entries between rTorrent profiles; data moving is delegated to a separate helper."""
|
||||
payload = payload or {}
|
||||
resume_state = resume_state or {}
|
||||
target_path = _remote_clean_path(payload.get("target_path") or payload.get("path") or "")
|
||||
move_data = bool(payload.get("move_data"))
|
||||
post_action = str(payload.get("post_action") or "none").strip().lower()
|
||||
if post_action not in {"none", "start", "stop", "pause", "check", "recheck"}:
|
||||
raise ValueError("Unsupported post-transfer action")
|
||||
label_mode = str(payload.get("label_mode") or "none").strip().lower()
|
||||
label_value = str(payload.get("label_value") or "").strip()
|
||||
if label_mode not in {"none", "custom", "moved_from", "moved_to"}:
|
||||
label_mode = "none"
|
||||
if label_mode == "moved_from":
|
||||
label_value = f"Moved from {source_profile.get('name') or source_profile.get('id') or 'profile'}"
|
||||
elif label_mode == "moved_to":
|
||||
label_value = f"Moved to {target_profile.get('name') or target_profile.get('id') or 'profile'}"
|
||||
elif label_mode != "custom":
|
||||
label_value = ""
|
||||
if len(label_value) > 120:
|
||||
label_value = label_value[:120]
|
||||
if not target_path or not target_path.startswith("/") or target_path == "/":
|
||||
raise ValueError("Missing or unsafe target path")
|
||||
completed_hashes = set(str(x) for x in (resume_state.get("completed_hashes") or []))
|
||||
previous_results = list(resume_state.get("results") or [])
|
||||
source_client = client_for(source_profile)
|
||||
target_client = client_for(target_profile)
|
||||
|
||||
def mark_done(torrent_hash: str, results: list) -> None:
|
||||
completed_hashes.add(str(torrent_hash))
|
||||
if checkpoint:
|
||||
checkpoint({"completed_hashes": sorted(completed_hashes), "results": results}, len(completed_hashes), len(torrent_hashes))
|
||||
|
||||
results = previous_results
|
||||
for h in [x for x in torrent_hashes if str(x) not in completed_hashes]:
|
||||
item = {
|
||||
"hash": h,
|
||||
"source_profile_id": int(source_profile.get("id") or 0),
|
||||
"target_profile_id": int(target_profile.get("id") or 0),
|
||||
"target_path": target_path,
|
||||
"move_data": move_data,
|
||||
"move_data_requested": bool(payload.get("move_data_requested")),
|
||||
"move_data_downgraded": bool(payload.get("move_data_downgraded")),
|
||||
}
|
||||
data, exported = _read_exported_torrent_bytes(source_profile, h)
|
||||
item["exported_from"] = exported.get("path")
|
||||
limit = validate_torrent_upload_size(target_profile, data, False, target_path, "")
|
||||
if not limit.get("ok"):
|
||||
raise RuntimeError(f"Target profile XML-RPC limit is too small for {h}: {limit.get('request_h')} > {limit.get('limit_h')}")
|
||||
try:
|
||||
label = str(source_client.call("d.custom1", h) or "")
|
||||
except Exception:
|
||||
label = ""
|
||||
target_label = label_value if label_value else label
|
||||
try:
|
||||
was_state = int(source_client.call("d.state", h) or 0)
|
||||
except Exception:
|
||||
was_state = 0
|
||||
try:
|
||||
was_active = int(source_client.call("d.is_active", h) or 0)
|
||||
except Exception:
|
||||
was_active = was_state
|
||||
moved_to = ""
|
||||
if move_data:
|
||||
move_result = _move_profile_transfer_data(source_client, h, target_path)
|
||||
item.update(move_result)
|
||||
moved_to = str(move_result.get("moved_to") or "")
|
||||
# Note: Explicit post-transfer actions override state restoration and keep command effects predictable.
|
||||
start_on_target = bool(move_data and (was_state or was_active)) if post_action == "none" else post_action == "start"
|
||||
try:
|
||||
added = add_torrent_raw(target_profile, data, start_on_target, target_path, target_label)
|
||||
if not added.get("ok"):
|
||||
raise RuntimeError(added.get("error") or "target add failed")
|
||||
except Exception:
|
||||
if move_data and moved_to:
|
||||
try:
|
||||
source_client.call("d.directory.set", h, target_path)
|
||||
if was_state or was_active:
|
||||
source_client.call("d.start", h)
|
||||
item["rollback"] = "source torrent kept and pointed at moved data"
|
||||
except Exception as rollback_exc:
|
||||
item["rollback_error"] = str(rollback_exc)
|
||||
raise
|
||||
if post_action in {"stop", "pause", "check", "recheck"}:
|
||||
try:
|
||||
if post_action == "stop":
|
||||
target_client.call("d.stop", h)
|
||||
elif post_action == "pause":
|
||||
pause_hash(target_client, h)
|
||||
else:
|
||||
target_client.call("d.check_hash", h)
|
||||
item["post_action_applied"] = post_action
|
||||
except Exception as post_exc:
|
||||
item["post_action_error"] = str(post_exc)
|
||||
source_client.call("d.erase", h)
|
||||
item["target_started"] = start_on_target
|
||||
item["label"] = target_label
|
||||
item["previous_label"] = label
|
||||
item["post_action"] = post_action
|
||||
results.append(item)
|
||||
mark_done(h, results)
|
||||
return {"ok": True, "count": len(torrent_hashes), "move_data": move_data, "target_profile_id": int(target_profile.get("id") or 0), "target_path": target_path, "label": label_value, "post_action": post_action, "results": results}
|
||||
|
||||
def action(profile: dict, torrent_hashes: list[str], name: str, payload: dict | None = None, checkpoint=None, resume_state: dict | None = None) -> dict:
|
||||
payload = payload or {}
|
||||
resume_state = resume_state or {}
|
||||
|
||||
@@ -100,7 +100,7 @@ def _job_payload(row) -> dict:
|
||||
def _is_ordered_job(row) -> bool:
|
||||
payload = _job_payload(row)
|
||||
action = str((row or {}).get("action") or "")
|
||||
return action in {"move", "remove", "add_magnet", "add_torrent_raw"} or bool(payload.get("requires_order"))
|
||||
return action in {"move", "remove", "profile_transfer", "add_magnet", "add_torrent_raw"} or bool(payload.get("requires_order"))
|
||||
|
||||
|
||||
def _is_priority_job(row) -> bool:
|
||||
@@ -302,6 +302,12 @@ def _execute(profile: dict, action_name: str, payload: dict, user_id: int | None
|
||||
if bool(payload.get("start", True)):
|
||||
disk_guard.assert_can_start_download(profile)
|
||||
return rtorrent.add_torrent_raw(profile, raw, bool(payload.get("start", True)), str(payload.get("directory") or ""), str(payload.get("label") or ""), payload.get("file_priorities") or None)
|
||||
if action_name == "profile_transfer":
|
||||
# Note: Target profile is resolved inside the worker with the original user's permissions, not trusted from the request payload.
|
||||
target_profile = get_profile(int(payload.get("target_profile_id") or 0), user_id or default_user_id())
|
||||
if not target_profile:
|
||||
raise ValueError("Target profile does not exist or is not accessible")
|
||||
return rtorrent.transfer_profile(profile, target_profile, payload.get("hashes") or [], payload, checkpoint=checkpoint, resume_state=payload.get("__resume_state") or {})
|
||||
if action_name == "set_limits":
|
||||
return rtorrent.set_limits(profile, payload.get("down"), payload.get("up"))
|
||||
hashes = payload.get("hashes") or []
|
||||
@@ -341,7 +347,7 @@ def _mark_running(job_id: str, attempts: int) -> bool:
|
||||
|
||||
|
||||
def _emit_torrent_refresh(profile: dict, action_name: str) -> None:
|
||||
if action_name not in {"add_magnet", "add_torrent_raw", "remove", "move", "start", "stop", "pause", "resume", "unpause", "set_label", "set_ratio_group", "recheck"}:
|
||||
if action_name not in {"add_magnet", "add_torrent_raw", "remove", "move", "profile_transfer", "start", "stop", "pause", "resume", "unpause", "set_label", "set_ratio_group", "recheck"}:
|
||||
return
|
||||
try:
|
||||
diff = torrent_cache.refresh(profile)
|
||||
@@ -416,6 +422,14 @@ def _run(job_id: str):
|
||||
action_name = str(job["action"] or "")
|
||||
_emit_disk_refresh_requested(int(profile["id"]), action_name, payload, result or {})
|
||||
_emit_torrent_refresh(profile, action_name)
|
||||
if action_name == "profile_transfer":
|
||||
# Note: Refresh the destination profile cache as well so users see transferred torrents immediately after switching.
|
||||
try:
|
||||
target_profile = get_profile(int(payload.get("target_profile_id") or 0), int(job.get("user_id") or 0))
|
||||
if target_profile:
|
||||
_emit_torrent_refresh(target_profile, action_name)
|
||||
except Exception:
|
||||
pass
|
||||
_schedule_delayed_torrent_refresh(profile, action_name)
|
||||
_emit("job_update", {"id": job_id, "profile_id": profile["id"], "status": "done", "result": result})
|
||||
except Exception as exc:
|
||||
|
||||
Reference in New Issue
Block a user