poller fixes per profil

This commit is contained in:
Mateusz Gruszczyński
2026-06-20 18:39:00 +02:00
parent e1b5822a59
commit d4c9150c42
7 changed files with 129 additions and 35 deletions
+47 -14
View File
@@ -112,24 +112,55 @@ def _is_light_job(row) -> bool:
return _is_light_action(str((row or {}).get("action") or ""))
def _has_prior_ordered_jobs(profile_id: int, rowid: int) -> bool:
def _ordered_profile_ids(row) -> set[int]:
"""Return every profile touched by an ordered job."""
# Note: Profile-transfer jobs touch both source and target profiles, so they must be ordered across both sides.
ids: set[int] = set()
try:
profile_id = int((row or {}).get("profile_id") or 0)
if profile_id:
ids.add(profile_id)
except Exception:
pass
try:
payload = _job_payload(row)
target_id = int(payload.get("target_profile_id") or 0)
if str((row or {}).get("action") or "") == "profile_transfer" and target_id:
ids.add(target_id)
except Exception:
pass
return ids
def _ordered_locks_for(row) -> list[threading.Lock]:
"""Acquire locks in stable order to avoid deadlocks between cross-profile jobs."""
return [_get_exclusive_lock(profile_id) for profile_id in sorted(_ordered_profile_ids(row))]
def _has_prior_ordered_jobs(profile_ids: set[int], rowid: int) -> bool:
if not profile_ids:
return False
with connect() as conn:
rows = conn.execute(
"""
SELECT rowid AS _rowid, action, payload_json
SELECT rowid AS _rowid, profile_id, action, payload_json
FROM jobs
WHERE profile_id=?
AND rowid<?
WHERE rowid<?
AND status IN ('pending', 'running')
ORDER BY rowid
""",
(profile_id, rowid),
(rowid,),
).fetchall()
return any(_is_ordered_job(row) and not _is_priority_job(row) for row in rows)
for row in rows:
if not _is_ordered_job(row) or _is_priority_job(row):
continue
if profile_ids.intersection(_ordered_profile_ids(row)):
return True
return False
def _wait_for_prior_ordered_jobs(job_id: str, profile_id: int, rowid: int) -> bool:
while _has_prior_ordered_jobs(profile_id, rowid):
def _wait_for_prior_ordered_jobs(job_id: str, profile_ids: set[int], rowid: int) -> bool:
while _has_prior_ordered_jobs(profile_ids, rowid):
fresh = _job_row(job_id)
if not fresh or fresh["status"] == "cancelled":
return False
@@ -379,7 +410,7 @@ def _run(job_id: str):
if not _claim_runner(job_id):
return
sem = None
ordered_lock = None
ordered_locks: list[threading.Lock] = []
job = {}
payload = {}
try:
@@ -394,10 +425,12 @@ def _run(job_id: str):
return
profile_id = int(profile["id"])
if _is_ordered_job(job) and not _is_priority_job(job):
if not _wait_for_prior_ordered_jobs(job_id, profile_id, int(job["_rowid"])):
involved_profile_ids = _ordered_profile_ids(job)
if not _wait_for_prior_ordered_jobs(job_id, involved_profile_ids, int(job["_rowid"])):
return
ordered_lock = _get_exclusive_lock(profile_id)
ordered_lock.acquire()
ordered_locks = _ordered_locks_for(job)
for lock in ordered_locks:
lock.acquire()
sem = _get_sem(profile, light=_is_light_job(job))
sem.acquire()
job = _job_row(job_id)
@@ -454,8 +487,8 @@ def _run(job_id: str):
finally:
if sem:
sem.release()
if ordered_lock:
ordered_lock.release()
for lock in reversed(ordered_locks):
lock.release()
_release_runner(job_id)