Files
pyTorrent/pytorrent/services/workers.py
Mateusz Gruszczyński 76ffe32319 states fix
2026-05-28 15:41:41 +02:00

613 lines
26 KiB
Python

from __future__ import annotations
import json
import threading
import time
import uuid
from concurrent.futures import ThreadPoolExecutor
from . import rtorrent, auth, disk_guard, operation_logs
from .preferences import get_profile
from ..config import WORKERS
from ..db import connect, utcnow, default_user_id
from .torrent_cache import torrent_cache
from .torrent_summary import cached_summary
LIGHT_ACTIONS = {"start", "stop", "pause", "resume", "unpause", "set_label", "set_ratio_group", "reannounce", "set_limits"}
WATCHDOG_INTERVAL_SECONDS = 30
_heavy_executor = ThreadPoolExecutor(max_workers=WORKERS, thread_name_prefix="pytorrent-heavy-job")
_light_executor = ThreadPoolExecutor(max_workers=max(4, min(WORKERS, 16)), thread_name_prefix="pytorrent-light-job")
_socketio = None
_heavy_semaphores: dict[int, tuple[int, threading.Semaphore]] = {}
_light_semaphores: dict[int, tuple[int, threading.Semaphore]] = {}
_exclusive_locks: dict[int, threading.Lock] = {}
_active_runners: set[str] = set()
_sem_lock = threading.Lock()
_runner_lock = threading.Lock()
_watchdog_started = False
_watchdog_lock = threading.Lock()
def set_socketio(socketio):
global _socketio
_socketio = socketio
def _emit(name: str, payload: dict):
if not _socketio:
return
profile_id = payload.get("profile_id")
if auth.enabled() and profile_id:
# Note: Job/socket events are sent only to clients joined to the affected profile room.
_socketio.emit(name, payload, to=f"profile:{int(profile_id)}")
else:
_socketio.emit(name, payload)
def _bounded_int(value, default: int, minimum: int = 1) -> int:
try:
parsed = int(value if value is not None else default)
except (TypeError, ValueError):
parsed = default
return max(minimum, parsed)
def _is_light_action(action_name: str) -> bool:
return str(action_name or "") in LIGHT_ACTIONS
def _profile_heavy_limit(profile: dict) -> int:
return _bounded_int(profile.get("max_parallel_jobs"), 5)
def _profile_light_limit(profile: dict) -> int:
return _bounded_int(profile.get("light_parallel_jobs"), 4)
def _get_sem(profile: dict, light: bool = False) -> threading.Semaphore:
profile_id = int(profile["id"])
limit = _profile_light_limit(profile) if light else _profile_heavy_limit(profile)
registry = _light_semaphores if light else _heavy_semaphores
with _sem_lock:
current = registry.get(profile_id)
if not current or current[0] != limit:
registry[profile_id] = (limit, threading.Semaphore(limit))
return registry[profile_id][1]
def _get_exclusive_lock(profile_id: int) -> threading.Lock:
with _sem_lock:
if profile_id not in _exclusive_locks:
_exclusive_locks[profile_id] = threading.Lock()
return _exclusive_locks[profile_id]
def _job_row(job_id: str):
with connect() as conn:
return conn.execute("SELECT rowid AS _rowid, * FROM jobs WHERE id=?", (job_id,)).fetchone()
def _job_payload(row) -> dict:
try:
return json.loads((row or {}).get("payload_json") or "{}")
except Exception:
return {}
def _is_ordered_job(row) -> bool:
payload = _job_payload(row)
action = str((row or {}).get("action") or "")
# Note: Only long/destructive tasks are ordered; lightweight start/stop/label jobs may run beside other work.
return action in {"move", "remove", "add_magnet", "add_torrent_raw"} or bool(payload.get("requires_order"))
def _is_priority_job(row) -> bool:
payload = _job_payload(row)
return bool(payload.get('priority_job') or payload.get('force_job')) or str((row or {}).get('action') or '') == 'set_limits'
def _is_light_job(row) -> bool:
return _is_light_action(str((row or {}).get("action") or ""))
def _has_prior_ordered_jobs(profile_id: int, rowid: int) -> bool:
with connect() as conn:
rows = conn.execute(
"""
SELECT rowid AS _rowid, action, payload_json
FROM jobs
WHERE profile_id=?
AND rowid<?
AND status IN ('pending', 'running')
ORDER BY rowid
""",
(profile_id, rowid),
).fetchall()
return any(_is_ordered_job(row) and not _is_priority_job(row) for row in rows)
def _wait_for_prior_ordered_jobs(job_id: str, profile_id: int, rowid: int) -> bool:
while _has_prior_ordered_jobs(profile_id, rowid):
fresh = _job_row(job_id)
if not fresh or fresh["status"] == "cancelled":
return False
if _is_priority_job(fresh):
return True
time.sleep(0.5)
return True
def _set_job(job_id: str, status: str, error: str = "", result: dict | None = None, started: bool = False, finished: bool = False):
now = utcnow()
fields = ["status=?", "error=?", "updated_at=?"]
values: list = [status, error, now]
if result is not None:
fields.append("result_json=?")
values.append(json.dumps(result))
if started:
fields.append("started_at=?")
values.append(now)
if finished:
fields.append("finished_at=?")
values.append(now)
values.append(job_id)
with connect() as conn:
conn.execute(f"UPDATE jobs SET {', '.join(fields)} WHERE id=?", values)
def _job_state(row) -> dict:
try:
return json.loads((row or {}).get("state_json") or "{}")
except Exception:
return {}
def _checkpoint_job(job_id: str, state: dict, progress_current: int | None = None, progress_total: int | None = None) -> None:
now = utcnow()
fields = ["state_json=?", "heartbeat_at=?", "updated_at=?"]
values: list = [json.dumps(state), now, now]
if progress_current is not None:
fields.append("progress_current=?")
values.append(int(progress_current))
if progress_total is not None:
fields.append("progress_total=?")
values.append(int(progress_total))
values.append(job_id)
with connect() as conn:
conn.execute(f"UPDATE jobs SET {', '.join(fields)} WHERE id=? AND status='running'", values)
def _submit_job(job_id: str, action_name: str | None = None):
if action_name is None:
row = _job_row(job_id)
action_name = str((row or {}).get("action") or "")
executor = _light_executor if _is_light_action(str(action_name or "")) else _heavy_executor
executor.submit(_run, job_id)
def enqueue(action_name: str, profile_id: int, payload: dict, user_id: int | None = None, max_attempts: int = 2, force: bool = False) -> str:
user_id = user_id or auth.current_user_id() or default_user_id()
job_id = uuid.uuid4().hex
if force:
payload = dict(payload or {})
# Note: Forced pending jobs bypass ordered waits and run in a separate worker slot after explicit user confirmation.
payload['force_job'] = True
payload['priority_job'] = True
now = utcnow()
progress_total = len((payload or {}).get("hashes") or [])
with connect() as conn:
conn.execute(
"INSERT INTO jobs(id,user_id,profile_id,action,payload_json,status,attempts,max_attempts,progress_total,created_at,updated_at) VALUES(?,?,?,?,?,?,?,?,?,?,?)",
(job_id, user_id, profile_id, action_name, json.dumps(payload), "pending", 0, max_attempts, progress_total, now, now),
)
_emit("job_update", {"id": job_id, "action": action_name, "profile_id": profile_id, "status": "pending"})
_submit_job(job_id, action_name)
return job_id
def _job_event_meta(payload: dict) -> dict:
ctx = payload.get("job_context") or {}
source = str(ctx.get("source") or payload.get("source") or "user")
meta = {"source": source}
if source == "automation":
# Note: Socket operation toasts use this flag so automation notifications respect user preferences.
meta["automation"] = True
meta["source_label"] = str(ctx.get("rule_name") or "automation")
if ctx.get("rule_id") is not None:
meta["rule_id"] = ctx.get("rule_id")
return meta
def _execute(profile: dict, action_name: str, payload: dict, user_id: int | None = None):
if action_name == "smart_queue_check":
from . import smart_queue
# Note: Worker execution uses the job owner instead of Flask session state.
return smart_queue.check(profile, user_id=user_id or default_user_id(), force=True)
if action_name == "add_magnet":
if bool(payload.get("start", True)):
disk_guard.assert_can_start_download(profile)
return rtorrent.add_magnet(profile, payload["uri"], bool(payload.get("start", True)), str(payload.get("directory") or ""), str(payload.get("label") or ""))
if action_name == "add_torrent_raw":
import base64
raw = base64.b64decode(payload["data_b64"])
if bool(payload.get("start", True)):
disk_guard.assert_can_start_download(profile)
return rtorrent.add_torrent_raw(profile, raw, bool(payload.get("start", True)), str(payload.get("directory") or ""), str(payload.get("label") or ""), payload.get("file_priorities") or None)
if action_name == "set_limits":
return rtorrent.set_limits(profile, payload.get("down"), payload.get("up"))
hashes = payload.get("hashes") or []
if action_name in {"start", "resume", "unpause"}:
disk_guard.assert_can_start_download(profile)
state = payload.get("__resume_state") or {}
def checkpoint(next_state: dict, current: int, total: int):
job_id = payload.get("__job_id")
if job_id:
_checkpoint_job(str(job_id), next_state, current, total)
return rtorrent.action(profile, hashes, action_name, payload, checkpoint=checkpoint, resume_state=state)
def _claim_runner(job_id: str) -> bool:
with _runner_lock:
if job_id in _active_runners:
return False
_active_runners.add(job_id)
return True
def _release_runner(job_id: str) -> None:
with _runner_lock:
_active_runners.discard(job_id)
def _mark_running(job_id: str, attempts: int) -> bool:
now = utcnow()
with connect() as conn:
cur = conn.execute(
"UPDATE jobs SET status='running', attempts=?, started_at=COALESCE(started_at, ?), updated_at=? WHERE id=? AND status='pending'",
(attempts, now, now, job_id),
)
return int(cur.rowcount or 0) == 1
def _emit_torrent_refresh(profile: dict, action_name: str) -> None:
if action_name not in {"add_magnet", "add_torrent_raw", "remove", "move", "start", "stop", "pause", "resume", "unpause", "set_label", "set_ratio_group", "recheck"}:
return
try:
diff = torrent_cache.refresh(profile)
profile_id = int(profile["id"])
if diff.get("ok"):
rows = torrent_cache.snapshot(profile_id)
_emit("torrent_patch", {**diff, "summary": cached_summary(profile_id, rows, force=True)})
else:
_emit("rtorrent_error", diff)
except Exception as exc:
# Note: A failed live refresh must not change the already completed job result.
_emit("rtorrent_error", {"profile_id": int(profile.get("id") or 0), "error": str(exc)})
def _schedule_delayed_torrent_refresh(profile: dict, action_name: str) -> None:
if action_name not in {"start", "stop", "pause", "resume", "unpause"} or not _socketio:
return
def delayed_refresh():
# Note: rTorrent may expose state changes one poll later than the XML-RPC action result.
sleep_fn = getattr(_socketio, "sleep", time.sleep)
for delay in (0.75, 1.75):
sleep_fn(delay)
_emit_torrent_refresh(profile, action_name)
_socketio.start_background_task(delayed_refresh)
def _run(job_id: str):
if not _claim_runner(job_id):
return
sem = None
ordered_lock = None
job = {}
payload = {}
try:
job = _job_row(job_id)
if not job or job["status"] == "cancelled":
return
profile = get_profile(int(job["profile_id"]), int(job["user_id"]))
if not profile:
_set_job(job_id, "failed", "rTorrent profile does not exist", finished=True)
_emit("job_update", {"id": job_id, "profile_id": job.get("profile_id"), "status": "failed", "error": "profile not found"})
return
profile_id = int(profile["id"])
if _is_ordered_job(job) and not _is_priority_job(job):
if not _wait_for_prior_ordered_jobs(job_id, profile_id, int(job["_rowid"])):
return
ordered_lock = _get_exclusive_lock(profile_id)
ordered_lock.acquire()
sem = _get_sem(profile, light=_is_light_job(job))
sem.acquire()
job = _job_row(job_id)
if not job or job["status"] == "cancelled":
return
payload = json.loads(job.get("payload_json") or "{}")
payload["__job_id"] = job_id
payload["__resume_state"] = _job_state(job)
attempts = int(job.get("attempts") or 0) + 1
if not _mark_running(job_id, attempts):
return
event_meta = _job_event_meta(payload)
operation_logs.record_job_event(profile["id"], job["action"], "started", payload, job_id=job_id, user_id=int(job.get("user_id") or 0))
_emit("operation_started", {"job_id": job_id, "action": job["action"], "profile_id": profile["id"], "hashes": payload.get("hashes") or [], "hash_count": len(payload.get("hashes") or []), "bulk": len(payload.get("hashes") or []) > 1, **event_meta})
_emit("job_update", {"id": job_id, "profile_id": profile["id"], "status": "running", "attempts": attempts})
result = _execute(profile, job["action"], payload, user_id=int(job.get("user_id") or 0))
fresh = _job_row(job_id)
# Note: Emergency cancel and watchdog timeout keep late work from overwriting a terminal state.
if fresh and fresh["status"] != "running":
return
_set_job(job_id, "done", result=result, finished=True)
operation_logs.record_job_event(profile["id"], job["action"], "done", payload, result=result or {}, job_id=job_id, user_id=int(job.get("user_id") or 0))
_emit("operation_finished", {"job_id": job_id, "action": job["action"], "profile_id": profile["id"], "hashes": payload.get("hashes") or [], "hash_count": len(payload.get("hashes") or []), "bulk": len(payload.get("hashes") or []) > 1, "result": result, **event_meta})
# Note: Completed jobs must publish a fresh torrent snapshot/patch so removed or moved torrents disappear without a page reload.
action_name = str(job["action"] or "")
_emit_torrent_refresh(profile, action_name)
_schedule_delayed_torrent_refresh(profile, action_name)
_emit("job_update", {"id": job_id, "profile_id": profile["id"], "status": "done", "result": result})
except Exception as exc:
fresh = _job_row(job_id) or {}
attempts = int(fresh.get("attempts") or 1)
max_attempts = int(fresh.get("max_attempts") or 2)
# Note: Emergency cancel keeps an exception from a cancelled job from moving it back to retry or failed.
if fresh and fresh.get("status") != "running":
return
status = "pending" if attempts < max_attempts else "failed"
_set_job(job_id, status, str(exc), finished=(status == "failed"))
if status == "failed":
operation_logs.record_job_event(int(job.get("profile_id") or 0), job.get("action"), "failed", payload, error=str(exc), job_id=job_id, user_id=int(job.get("user_id") or 0))
_emit("operation_failed", {"job_id": job_id, "action": job.get("action"), "profile_id": job.get("profile_id"), "hashes": payload.get("hashes") or [], "error": str(exc), **_job_event_meta(payload)})
_emit("job_update", {"id": job_id, "profile_id": job.get("profile_id"), "status": status, "error": str(exc), "attempts": attempts})
if status == "pending":
_submit_job(job_id, job.get("action"))
finally:
if sem:
sem.release()
if ordered_lock:
ordered_lock.release()
_release_runner(job_id)
def _parse_ts(value: str | None) -> float | None:
if not value:
return None
try:
from datetime import datetime
return datetime.fromisoformat(str(value).replace("Z", "+00:00")).timestamp()
except Exception:
return None
def _job_timeout_seconds(profile: dict, row) -> int:
key = "light_job_timeout_seconds" if _is_light_job(row) else "heavy_job_timeout_seconds"
default = 300 if _is_light_job(row) else 7200
return _bounded_int(profile.get(key), default, 30)
def _pending_timeout_seconds(profile: dict) -> int:
return _bounded_int(profile.get("pending_job_timeout_seconds"), 900, 60)
def _timeout_running_jobs() -> None:
now_ts = time.time()
with connect() as conn:
rows = conn.execute("SELECT id,user_id,profile_id,action,started_at FROM jobs WHERE status='running'").fetchall()
for row in rows:
profile = get_profile(int(row["profile_id"]), int(row["user_id"]))
if not profile:
continue
started_ts = _parse_ts(row.get("started_at"))
if started_ts is None or now_ts - started_ts < _job_timeout_seconds(profile, row):
continue
message = f"Watchdog timeout after {_job_timeout_seconds(profile, row)} seconds"
_set_job(row["id"], "failed", message, finished=True)
_emit("operation_failed", {"job_id": row["id"], "action": row.get("action"), "profile_id": row.get("profile_id"), "hashes": [], "error": message, "source": "watchdog"})
_emit("job_update", {"id": row["id"], "profile_id": row.get("profile_id"), "status": "failed", "error": message})
def _resubmit_interrupted_running_jobs() -> None:
now_ts = time.time()
with connect() as conn:
rows = conn.execute("SELECT id,user_id,profile_id,action,heartbeat_at,updated_at FROM jobs WHERE status='running'").fetchall()
for row in rows:
with _runner_lock:
active = row["id"] in _active_runners
if active:
continue
profile = get_profile(int(row["profile_id"]), int(row["user_id"]))
if not profile:
continue
last_seen_ts = _parse_ts(row.get("heartbeat_at") or row.get("updated_at"))
# Note: After process restart there is no in-memory runner for this job.
# A short grace avoids stealing work from another still-alive Gunicorn worker.
if last_seen_ts is not None and now_ts - last_seen_ts < 90:
continue
with connect() as conn:
cur = conn.execute(
"UPDATE jobs SET status='pending', error=?, updated_at=? WHERE id=? AND status='running'",
("Resuming interrupted job from last checkpoint", utcnow(), row["id"]),
)
if int(cur.rowcount or 0):
_emit("job_update", {"id": row["id"], "profile_id": row.get("profile_id"), "status": "pending", "resumed": True})
_submit_job(row["id"], row.get("action"))
def _resubmit_stale_pending_jobs() -> None:
now_ts = time.time()
with connect() as conn:
rows = conn.execute("SELECT id,user_id,profile_id,action,updated_at FROM jobs WHERE status='pending'").fetchall()
for row in rows:
with _runner_lock:
active = row["id"] in _active_runners
if active:
continue
profile = get_profile(int(row["profile_id"]), int(row["user_id"]))
if not profile:
continue
updated_ts = _parse_ts(row.get("updated_at"))
if updated_ts is None or now_ts - updated_ts < _pending_timeout_seconds(profile):
continue
with connect() as conn:
conn.execute("UPDATE jobs SET error=?, updated_at=? WHERE id=? AND status='pending'", ("Watchdog resubmitted stale pending job", utcnow(), row["id"]))
_emit("job_update", {"id": row["id"], "profile_id": row.get("profile_id"), "status": "pending", "watchdog": True})
_submit_job(row["id"], row.get("action"))
def _watchdog_loop() -> None:
while True:
try:
_resubmit_interrupted_running_jobs()
_timeout_running_jobs()
_resubmit_stale_pending_jobs()
except Exception:
pass
time.sleep(WATCHDOG_INTERVAL_SECONDS)
def start_watchdog() -> None:
global _watchdog_started
with _watchdog_lock:
if _watchdog_started:
return
_watchdog_started = True
thread = threading.Thread(target=_watchdog_loop, name="pytorrent-job-watchdog", daemon=True)
thread.start()
def _safe_json(value, fallback):
try:
return json.loads(value or "")
except Exception:
return fallback
def _job_summary(row: dict, payload: dict, result: dict) -> str:
ctx = payload.get("job_context") or {}
count = int(ctx.get("hash_count") or len(payload.get("hashes") or []) or result.get("count") or 0)
parts = []
if ctx.get("bulk_label"):
# Note: Shows which generated bulk part is being displayed in the job queue.
parts.append(f"{ctx.get('bulk_label')} of {ctx.get('bulk_parts')}")
if count:
parts.append(("bulk " if count > 1 else "single ") + f"{count} torrent(s)")
if ctx.get("target_path"):
parts.append(f"target: {ctx.get('target_path')}")
if ctx.get("remove_data"):
parts.append("remove data")
if ctx.get("move_data"):
parts.append("move data")
if result.get("count") is not None:
parts.append(f"done: {result.get('count')}")
if result.get("errors"):
parts.append(f"errors: {len(result.get('errors') or [])}")
return "; ".join(parts)
def _public_job(row) -> dict:
d = dict(row)
payload = _safe_json(d.get("payload_json"), {})
result = _safe_json(d.get("result_json"), {})
ctx = payload.get("job_context") or {}
d["payload"] = payload
state = _safe_json(d.get("state_json"), {})
d["result"] = result
d["state"] = state
d["progress_current"] = int(d.get("progress_current") or len(state.get("completed_hashes") or []))
d["progress_total"] = int(d.get("progress_total") or len(payload.get("hashes") or []) or result.get("count") or 0)
d["hash_count"] = int(ctx.get("hash_count") or len(payload.get("hashes") or []) or result.get("count") or 0)
d["is_bulk"] = bool(ctx.get("bulk") or d["hash_count"] > 1)
d["summary"] = _job_summary(d, payload, result)
d["source"] = str(ctx.get("source") or "user")
d["source_label"] = str(ctx.get("rule_name") or ctx.get("source") or "user")
d["is_forced"] = bool(payload.get("force_job") or payload.get("priority_job"))
items = ctx.get("items") or []
if d["is_bulk"]:
d["items_preview"] = ""
else:
d["items_preview"] = ", ".join([str((x or {}).get("name") or (x or {}).get("hash") or "") for x in items[:1] if x])
return d
def _job_scope_sql(writable: bool = False) -> tuple[str, tuple]:
visible = auth.writable_profile_ids() if writable else auth.visible_profile_ids()
if visible is None:
return "", ()
if not visible:
return " WHERE 1=0", ()
placeholders = ",".join("?" for _ in visible)
return f" WHERE profile_id IN ({placeholders})", tuple(visible)
def list_jobs(limit: int = 200, offset: int = 0):
limit = max(1, min(int(limit or 50), 500))
offset = max(0, int(offset or 0))
where, params = _job_scope_sql()
with connect() as conn:
rows = conn.execute(f"SELECT * FROM jobs{where} ORDER BY created_at DESC LIMIT ? OFFSET ?", (*params, limit, offset)).fetchall()
total = conn.execute(f"SELECT COUNT(*) AS n FROM jobs{where}", params).fetchone()["n"]
return {"rows": [_public_job(r) for r in rows], "total": total, "limit": limit, "offset": offset}
def cancel_job(job_id: str) -> bool:
row = _job_row(job_id)
if not row or row["status"] not in {"pending", "running"}:
return False
# Note: Emergency cancel is useful only for unfinished jobs; failed/done entries stay available for retry or log cleanup.
_set_job(job_id, "cancelled", finished=True)
_emit("job_update", {"id": job_id, "profile_id": row.get("profile_id"), "status": "cancelled"})
return True
def clear_jobs() -> int:
where, params = _job_scope_sql(writable=True)
status_clause = "status NOT IN ('pending', 'running')"
sql = f"DELETE FROM jobs{where} AND {status_clause}" if where else f"DELETE FROM jobs WHERE {status_clause}"
with connect() as conn:
cur = conn.execute(sql, params)
return int(cur.rowcount or 0)
def emergency_clear_jobs() -> int:
# Note: Emergency cleanup first marks active jobs as cancelled, then clears the whole job log list.
now = utcnow()
where, params = _job_scope_sql(writable=True)
status_clause = "status IN ('pending', 'running')"
update_sql = f"UPDATE jobs SET status='cancelled', error='Emergency cancelled by user', finished_at=COALESCE(finished_at, ?), updated_at=?{where} AND {status_clause}" if where else "UPDATE jobs SET status='cancelled', error='Emergency cancelled by user', finished_at=COALESCE(finished_at, ?), updated_at=? WHERE status IN ('pending', 'running')"
with connect() as conn:
conn.execute(update_sql, (now, now, *params) if where else (now, now))
cur = conn.execute(f"DELETE FROM jobs{where}", params) if where else conn.execute("DELETE FROM jobs")
deleted = int(cur.rowcount or 0)
_emit("job_update", {"status": "cleared", "emergency": True})
return deleted
def force_job(job_id: str) -> bool:
row = _job_row(job_id)
if not row or row['status'] != 'pending':
return False
payload = _job_payload(row)
payload['force_job'] = True
payload['priority_job'] = True
with connect() as conn:
conn.execute("UPDATE jobs SET payload_json=?, updated_at=? WHERE id=?", (json.dumps(payload), utcnow(), job_id))
_emit('job_update', {'id': job_id, 'profile_id': row.get('profile_id'), 'status': 'pending', 'forced': True})
_submit_job(job_id, row.get('action'))
return True
def retry_job(job_id: str) -> bool:
row = _job_row(job_id)
if not row or row["status"] not in {"failed", "cancelled"}:
return False
with connect() as conn:
conn.execute("UPDATE jobs SET status='pending', error='', finished_at=NULL, state_json=NULL, progress_current=0, heartbeat_at=NULL, updated_at=? WHERE id=?", (utcnow(), job_id))
_emit("job_update", {"id": job_id, "profile_id": row.get("profile_id"), "status": "pending"})
_submit_job(job_id, row.get("action"))
return True