bulk-part-jobs
This commit is contained in:
@@ -24,6 +24,8 @@ from ..services.geoip import lookup_ip
|
||||
|
||||
bp = Blueprint("api", __name__, url_prefix="/api")
|
||||
|
||||
MOVE_BULK_MAX_HASHES = 100
|
||||
|
||||
|
||||
def ok(payload=None):
|
||||
data = {"ok": True}
|
||||
@@ -303,6 +305,42 @@ def enrich_bulk_payload(profile: dict, action_name: str, data: dict) -> dict:
|
||||
return payload
|
||||
|
||||
|
||||
def _chunk_hashes(hashes: list[str], size: int = MOVE_BULK_MAX_HASHES) -> list[list[str]]:
|
||||
# Note: Splits very large torrent selections into predictable chunks so each queued job stays small and recoverable.
|
||||
safe_size = max(1, int(size or MOVE_BULK_MAX_HASHES))
|
||||
return [hashes[index:index + safe_size] for index in range(0, len(hashes), safe_size)]
|
||||
|
||||
|
||||
def enqueue_move_bulk_parts(profile: dict, data: dict) -> list[dict]:
|
||||
# Note: Keeps the existing move action intact for normal batches, while large moves are queued as bulk-1, bulk-2, etc.
|
||||
base_payload = enrich_bulk_payload(profile, "move", data)
|
||||
hashes = base_payload.get("hashes") or []
|
||||
chunks = _chunk_hashes(hashes)
|
||||
if len(chunks) <= 1:
|
||||
job_id = enqueue("move", profile["id"], base_payload)
|
||||
return [{"job_id": job_id, "label": "bulk-1", "part": 1, "parts": 1, "hashes": hashes, "hash_count": len(hashes)}]
|
||||
|
||||
jobs = []
|
||||
items_by_hash = {str(item.get("hash")): item for item in (base_payload.get("job_context") or {}).get("items") or []}
|
||||
for index, chunk in enumerate(chunks, start=1):
|
||||
payload = dict(base_payload)
|
||||
payload["hashes"] = chunk
|
||||
context = dict(base_payload.get("job_context") or {})
|
||||
context.update({
|
||||
"bulk": True,
|
||||
"bulk_label": f"bulk-{index}",
|
||||
"bulk_part": index,
|
||||
"bulk_parts": len(chunks),
|
||||
"hash_count": len(chunk),
|
||||
"parent_hash_count": len(hashes),
|
||||
"items": [items_by_hash[h] for h in chunk if h in items_by_hash],
|
||||
})
|
||||
payload["job_context"] = context
|
||||
job_id = enqueue("move", profile["id"], payload)
|
||||
jobs.append({"job_id": job_id, "label": context["bulk_label"], "part": index, "parts": len(chunks), "hashes": chunk, "hash_count": len(chunk)})
|
||||
return jobs
|
||||
|
||||
|
||||
@bp.get("/profiles")
|
||||
def profiles_list():
|
||||
return ok({"profiles": preferences.list_profiles(), "active": preferences.active_profile()})
|
||||
@@ -437,6 +475,20 @@ def torrent_action(action_name: str):
|
||||
allowed = {"start", "pause", "stop", "resume", "recheck", "reannounce", "remove", "move", "set_label", "set_ratio_group"}
|
||||
if action_name not in allowed:
|
||||
return jsonify({"ok": False, "error": "Unknown action"}), 400
|
||||
if action_name == "move":
|
||||
# Note: Large move requests are split into ordered bulk parts; smaller requests keep the old single-job response shape.
|
||||
jobs = enqueue_move_bulk_parts(profile, data)
|
||||
first_job_id = jobs[0]["job_id"] if jobs else None
|
||||
total_hashes = sum(int(job.get("hash_count") or 0) for job in jobs)
|
||||
return ok({
|
||||
"job_id": first_job_id,
|
||||
"job_ids": [job["job_id"] for job in jobs],
|
||||
"jobs": jobs,
|
||||
"hash_count": total_hashes,
|
||||
"bulk": total_hashes > 1,
|
||||
"bulk_parts": len(jobs),
|
||||
"chunk_size": MOVE_BULK_MAX_HASHES,
|
||||
})
|
||||
payload = enrich_bulk_payload(profile, action_name, data)
|
||||
job_id = enqueue(action_name, profile["id"], payload)
|
||||
return ok({"job_id": job_id, "hash_count": len(payload.get("hashes") or []), "bulk": len(payload.get("hashes") or []) > 1})
|
||||
|
||||
@@ -55,7 +55,7 @@ def openapi():
|
||||
},
|
||||
},
|
||||
"/api/torrents": {"get": {"summary": "Get cached torrent snapshot", "responses": {"200": {"description": "Torrent list"}}}},
|
||||
"/api/torrents/{action_name}": {"post": {"summary": "Queue torrent action", "description": "For move, path is the target directory; move_data=true physically moves data on the rTorrent host using a detached shell move with status polling, force-overwrites an existing destination, tolerates rTorrent execute timeouts around mkdir/start/polling, handles retries after a partially completed move, avoids SCGI timeout on long mv operations, and recheck defaults to move_data. Move and remove jobs are ordered per profile, so a later remove waits for earlier move/remove jobs to finish.", "parameters": [{"name": "action_name", "in": "path", "required": True, "schema": {"type": "string", "enum": ["start", "pause", "stop", "resume", "recheck", "remove", "move", "set_label", "set_ratio_group"]}}], "requestBody": {"content": {"application/json": {"schema": {"type": "object", "properties": {"hashes": {"type": "array", "items": {"type": "string"}}, "path": {"type": "string", "description": "Target directory for move"}, "move_data": {"type": "boolean", "description": "Physically move data before setting torrent directory"}, "recheck": {"type": "boolean", "description": "Run hash check after physical move; defaults to move_data"}, "label": {"type": "string"}, "ratio_group": {"type": "string"}, "remove_data": {"type": "boolean"}}}}}}, "responses": {"200": {"description": "Job queued"}}}},
|
||||
"/api/torrents/{action_name}": {"post": {"summary": "Queue torrent action", "description": "For move, path is the target directory; move_data=true physically moves data on the rTorrent host using a detached shell move with status polling, force-overwrites an existing destination, tolerates rTorrent execute timeouts around mkdir/start/polling, handles retries after a partially completed move, avoids SCGI timeout on long mv operations, and recheck defaults to move_data. Large move selections are split into ordered bulk parts of up to 100 hashes. Move and remove jobs are ordered per profile, so a later remove waits for earlier move/remove jobs to finish.", "parameters": [{"name": "action_name", "in": "path", "required": True, "schema": {"type": "string", "enum": ["start", "pause", "stop", "resume", "recheck", "remove", "move", "set_label", "set_ratio_group"]}}], "requestBody": {"content": {"application/json": {"schema": {"type": "object", "properties": {"hashes": {"type": "array", "items": {"type": "string"}}, "path": {"type": "string", "description": "Target directory for move"}, "move_data": {"type": "boolean", "description": "Physically move data before setting torrent directory"}, "recheck": {"type": "boolean", "description": "Run hash check after physical move; defaults to move_data"}, "label": {"type": "string"}, "ratio_group": {"type": "string"}, "remove_data": {"type": "boolean"}}}}}}, "responses": {"200": {"description": "Job queued"}}}},
|
||||
"/api/torrents/add": {"post": {"summary": "Add magnet links or torrent files", "requestBody": {"content": {"multipart/form-data": {"schema": {"type": "object", "properties": {"uris": {"type": "string"}, "directory": {"type": "string"}, "label": {"type": "string"}, "start": {"type": "boolean"}, "files": {"type": "array", "items": {"type": "string", "format": "binary"}}}}}, "application/json": {"schema": {"type": "object"}}}}, "responses": {"200": {"description": "Jobs queued"}}}},
|
||||
"/api/torrents/{torrent_hash}/files": {"get": {"summary": "Torrent files", "parameters": [{"name": "torrent_hash", "in": "path", "required": True, "schema": {"type": "string"}}], "responses": {"200": {"description": "Files"}}}},
|
||||
"/api/torrents/{torrent_hash}/peers": {"get": {"summary": "Torrent peers with GeoIP", "parameters": [{"name": "torrent_hash", "in": "path", "required": True, "schema": {"type": "string"}}], "responses": {"200": {"description": "Peers"}}}},
|
||||
|
||||
Reference in New Issue
Block a user