bulk-part-jobs, and scgi retries
This commit is contained in:
@@ -1,5 +1,6 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import errno
|
||||
import os
|
||||
import posixpath
|
||||
import socket
|
||||
@@ -53,24 +54,57 @@ class ScgiRtorrentClient:
|
||||
}
|
||||
header_blob = b"".join(k.encode() + b"\0" + v.encode() + b"\0" for k, v in headers.items())
|
||||
payload = str(len(header_blob)).encode("ascii") + b":" + header_blob + b"," + body
|
||||
with socket.create_connection((self.host, self.port), timeout=self.timeout) as sock:
|
||||
sock.settimeout(self.timeout)
|
||||
sock.sendall(payload)
|
||||
chunks: list[bytes] = []
|
||||
while True:
|
||||
chunk = sock.recv(65536)
|
||||
if not chunk:
|
||||
break
|
||||
chunks.append(chunk)
|
||||
response = b"".join(chunks)
|
||||
if not response:
|
||||
raise ConnectionError("Empty response from rTorrent SCGI")
|
||||
if b"\r\n\r\n" in response:
|
||||
response = response.split(b"\r\n\r\n", 1)[1]
|
||||
elif b"\n\n" in response:
|
||||
response = response.split(b"\n\n", 1)[1]
|
||||
result, _ = loads(response)
|
||||
return result[0] if len(result) == 1 else result
|
||||
attempts = _scgi_retry_attempts()
|
||||
last_exc = None
|
||||
for attempt in range(1, attempts + 1):
|
||||
try:
|
||||
with socket.create_connection((self.host, self.port), timeout=self.timeout) as sock:
|
||||
sock.settimeout(self.timeout)
|
||||
sock.sendall(payload)
|
||||
chunks: list[bytes] = []
|
||||
while True:
|
||||
chunk = sock.recv(65536)
|
||||
if not chunk:
|
||||
break
|
||||
chunks.append(chunk)
|
||||
response = b"".join(chunks)
|
||||
if not response:
|
||||
raise ConnectionError("Empty response from rTorrent SCGI")
|
||||
if b"\r\n\r\n" in response:
|
||||
response = response.split(b"\r\n\r\n", 1)[1]
|
||||
elif b"\n\n" in response:
|
||||
response = response.split(b"\n\n", 1)[1]
|
||||
result, _ = loads(response)
|
||||
return result[0] if len(result) == 1 else result
|
||||
except Exception as exc:
|
||||
last_exc = exc
|
||||
if attempt >= attempts or not _is_transient_scgi_error(exc):
|
||||
raise
|
||||
time.sleep(_scgi_retry_delay(attempt))
|
||||
raise last_exc or ConnectionError("rTorrent SCGI call failed")
|
||||
|
||||
|
||||
def _scgi_retry_attempts() -> int:
|
||||
# Note: Krotki retry/backoff chroni masowe operacje przed chwilowym Errno 111 przy wysokim loadzie rTorrent.
|
||||
try:
|
||||
return max(1, min(10, int(os.environ.get("PYTORRENT_SCGI_RETRIES", "5"))))
|
||||
except Exception:
|
||||
return 5
|
||||
|
||||
|
||||
def _scgi_retry_delay(attempt: int) -> float:
|
||||
return min(5.0, 0.35 * (2 ** max(0, attempt - 1)))
|
||||
|
||||
|
||||
def _is_transient_scgi_error(exc: Exception) -> bool:
|
||||
# Note: Retry obejmuje typowe chwilowe bledy SCGI/socket, ale nie ukrywa bledow merytorycznych XML-RPC.
|
||||
if isinstance(exc, (ConnectionRefusedError, ConnectionResetError, TimeoutError, socket.timeout)):
|
||||
return True
|
||||
err_no = getattr(exc, "errno", None)
|
||||
if err_no in {errno.ECONNREFUSED, errno.ECONNRESET, errno.ETIMEDOUT, errno.EHOSTUNREACH, errno.ENETUNREACH}:
|
||||
return True
|
||||
msg = str(exc).lower()
|
||||
return any(text in msg for text in ("connection refused", "connection reset", "timed out", "empty response"))
|
||||
|
||||
|
||||
def client_for(profile: dict) -> ScgiRtorrentClient:
|
||||
@@ -159,7 +193,7 @@ def _run_remote_move(c: ScgiRtorrentClient, src: str, dst: str, poll_interval: f
|
||||
try:
|
||||
output = str(_rt_execute(c, "execute.capture", "sh", "-c", poll_script, "pytorrent-move-poll", status_path) or "").strip()
|
||||
except Exception as exc:
|
||||
if _is_rt_timeout_error(exc):
|
||||
if _is_rt_timeout_error(exc) or _is_transient_scgi_error(exc):
|
||||
continue
|
||||
raise
|
||||
if not output:
|
||||
@@ -207,6 +241,46 @@ def _safe_rm_rf_path(path: str) -> str:
|
||||
return path
|
||||
|
||||
|
||||
def _run_remote_rm(c: ScgiRtorrentClient, path: str, poll_interval: float = 2.0) -> None:
|
||||
# Note: rm -rf dziala w tle po stronie rTorrent, wiec dlugie kasowanie nie trzyma jednego polaczenia SCGI.
|
||||
token = uuid.uuid4().hex
|
||||
status_path = f"/tmp/pytorrent-rm-{token}.status"
|
||||
script = (
|
||||
'target=$1; status=$2; tmp=${status}.tmp; '
|
||||
'rm -f "$status" "$tmp"; '
|
||||
'( rc=0; '
|
||||
'if [ -z "$target" ] || [ "$target" = "/" ] || [ "$target" = "." ]; then echo "unsafe remove target: $target" >&2; rc=5; '
|
||||
'else rm -rf -- "$target" || rc=$?; fi; '
|
||||
'if [ $rc -eq 0 ]; then printf "OK\n" > "$status"; else printf "ERR %s\n" "$rc" > "$status"; fi; '
|
||||
'if [ -s "$tmp" ]; then cat "$tmp" >> "$status"; fi; '
|
||||
'rm -f "$tmp" ) > "$tmp" 2>&1 &'
|
||||
)
|
||||
poll_script = 'status=$1; [ -f "$status" ] && cat "$status" || true'
|
||||
cleanup_script = 'rm -f "$1"'
|
||||
_rt_execute_allow_timeout(c, "execute.throw", "sh", "-c", script, "pytorrent-rm-start", path, status_path)
|
||||
while True:
|
||||
time.sleep(max(0.25, poll_interval))
|
||||
try:
|
||||
output = str(_rt_execute(c, "execute.capture", "sh", "-c", poll_script, "pytorrent-rm-poll", status_path) or "").strip()
|
||||
except Exception as exc:
|
||||
if _is_rt_timeout_error(exc) or _is_transient_scgi_error(exc):
|
||||
continue
|
||||
raise
|
||||
if not output:
|
||||
continue
|
||||
try:
|
||||
_rt_execute(c, "execute.throw", "sh", "-c", cleanup_script, "pytorrent-rm-clean", status_path)
|
||||
except Exception:
|
||||
pass
|
||||
first_line = output.splitlines()[0].strip()
|
||||
if first_line == "OK":
|
||||
return
|
||||
if first_line.startswith("ERR"):
|
||||
details = "\n".join(output.splitlines()[1:]).strip()
|
||||
raise RuntimeError(details or first_line)
|
||||
raise RuntimeError(output)
|
||||
|
||||
|
||||
def _remove_torrent_data(c: ScgiRtorrentClient, torrent_hash: str) -> dict:
|
||||
data_path = _safe_rm_rf_path(_torrent_data_path(c, torrent_hash))
|
||||
try:
|
||||
@@ -217,7 +291,7 @@ def _remove_torrent_data(c: ScgiRtorrentClient, torrent_hash: str) -> dict:
|
||||
c.call("d.close", torrent_hash)
|
||||
except Exception:
|
||||
pass
|
||||
_rt_execute(c, "execute.throw", "rm", "-rf", data_path)
|
||||
_run_remote_rm(c, data_path)
|
||||
return {"hash": torrent_hash, "removed_path": data_path}
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user