execute.capture fix

This commit is contained in:
Mateusz Gruszczyński
2026-05-04 21:37:02 +02:00
parent d5b7d97528
commit 879c60d563
2 changed files with 1145 additions and 340 deletions

View File

@@ -104,7 +104,7 @@ def _is_transient_scgi_error(exc: Exception) -> bool:
if err_no in {errno.ECONNREFUSED, errno.ECONNRESET, errno.ETIMEDOUT, errno.EHOSTUNREACH, errno.ENETUNREACH}:
return True
msg = str(exc).lower()
return any(text in msg for text in ("connection refused", "connection reset", "timed out", "empty response"))
return any(text in msg for text in ("connection refused", "connection reset", "timed out", "timeout", "empty response", "pipe creation failed", "resource temporarily unavailable", "try again", "temporarily unavailable"))
def client_for(profile: dict) -> ScgiRtorrentClient:
@@ -112,32 +112,78 @@ def client_for(profile: dict) -> ScgiRtorrentClient:
_UNSUPPORTED_EXEC_METHODS: set[str] = set()
_EXEC_TARGET_STYLE: dict[str, int] = {}
def _rt_execute_preview(method_name: str, call_args: tuple) -> str:
# Note: Skrocony opis RPC usuwa dlugie skrypty z komunikatu bledu, ale zostawia metode i pierwsze argumenty do diagnostyki.
preview = ", ".join(repr(x) for x in call_args[:3])
if len(call_args) > 3:
preview += ", ..."
return f"{method_name}({preview})"
def _rt_execute_target_variants(method: str, args: tuple) -> list[tuple]:
# Note: rTorrent XML-RPC w zaleznosci od wersji wymaga pustego targetu albo go odrzuca; zapamietujemy dzialajacy wariant per metoda.
variants = [("", *args), args]
preferred = _EXEC_TARGET_STYLE.get(method)
if preferred is not None and 0 <= preferred < len(variants):
return [variants[preferred]] + [v for i, v in enumerate(variants) if i != preferred]
return variants
def _is_rt_method_missing(exc: Exception) -> bool:
msg = str(exc).lower()
return "not defined" in msg or "no such method" in msg or "unknown method" in msg
def _rt_execute_methods(method: str) -> list[str]:
# Note: execute2.* jest probowane dopiero gdy podstawowe execute.* nie istnieje, zeby nie generowac falszywych bledow retry.
methods = [method]
if method.startswith("execute."):
fallback = method.replace("execute.", "execute2.", 1)
if fallback not in _UNSUPPORTED_EXEC_METHODS:
methods.append(fallback)
return methods
def _rt_execute(c: ScgiRtorrentClient, method: str, *args):
"""Run rTorrent execute.* as the rTorrent user across XML-RPC variants."""
method_names = [method]
if method.startswith("execute."):
execute2 = method.replace("execute.", "execute2.", 1)
if execute2 not in _UNSUPPORTED_EXEC_METHODS:
method_names.append(execute2)
errors = []
for method_name in method_names:
for call_args in (("", *args), args):
try:
return c.call(method_name, *call_args)
except Exception as exc:
message = str(exc)
if "not defined" in message.lower():
_UNSUPPORTED_EXEC_METHODS.add(method_name)
preview = ", ".join(repr(x) for x in call_args[:3])
if len(call_args) > 3:
preview += ", ..."
errors.append(f"{method_name}({preview}): {exc}")
errors: list[str] = []
attempts = _scgi_retry_attempts()
for attempt in range(1, attempts + 1):
errors.clear()
transient_seen = False
primary_missing = False
for method_index, method_name in enumerate(_rt_execute_methods(method)):
if method_name in _UNSUPPORTED_EXEC_METHODS:
continue
if method_index > 0 and not primary_missing:
continue
for call_args in _rt_execute_target_variants(method_name, args):
try:
result = c.call(method_name, *call_args)
if method_name == method:
_EXEC_TARGET_STYLE[method_name] = 0 if call_args and call_args[0] == "" else 1
return result
except Exception as exc:
if _is_rt_method_missing(exc):
_UNSUPPORTED_EXEC_METHODS.add(method_name)
if method_name == method:
primary_missing = True
errors.append(f"{method_name}: method not defined")
break
transient_seen = transient_seen or _is_transient_scgi_error(exc)
errors.append(f"{_rt_execute_preview(method_name, call_args)}: {exc}")
if transient_seen and attempt < attempts:
time.sleep(_scgi_retry_delay(attempt))
continue
break
raise RuntimeError("rTorrent execute failed: " + "; ".join(errors))
def _is_rt_timeout_error(exc: Exception) -> bool:
return isinstance(exc, (TimeoutError, socket.timeout)) or "timed out" in str(exc).lower()
msg = str(exc).lower()
return isinstance(exc, (TimeoutError, socket.timeout)) or "timed out" in msg or "timeout" in msg
def _rt_execute_allow_timeout(c: ScgiRtorrentClient, method: str, *args):
@@ -193,6 +239,7 @@ def _run_remote_move(c: ScgiRtorrentClient, src: str, dst: str, poll_interval: f
try:
output = str(_rt_execute(c, "execute.capture", "sh", "-c", poll_script, "pytorrent-move-poll", status_path) or "").strip()
except Exception as exc:
# Note: Podczas masowego move rTorrent potrafi chwilowo nie utworzyc pipe dla execute.capture; polling czeka i probuje dalej.
if _is_rt_timeout_error(exc) or _is_transient_scgi_error(exc):
continue
raise
@@ -263,6 +310,7 @@ def _run_remote_rm(c: ScgiRtorrentClient, path: str, poll_interval: float = 2.0)
try:
output = str(_rt_execute(c, "execute.capture", "sh", "-c", poll_script, "pytorrent-rm-poll", status_path) or "").strip()
except Exception as exc:
# Note: Remove uzywa tego samego bezpiecznego pollingu co move, wiec chwilowy brak pipe nie wywala calej kolejki.
if _is_rt_timeout_error(exc) or _is_transient_scgi_error(exc):
continue
raise

File diff suppressed because it is too large Load Diff