192 lines
7.1 KiB
Python
192 lines
7.1 KiB
Python
from __future__ import annotations
|
|
import json
|
|
import re
|
|
import socket
|
|
import time
|
|
import urllib.parse
|
|
import urllib.request
|
|
from datetime import datetime, timezone
|
|
from typing import Any
|
|
from ..db import connect
|
|
from . import preferences, rtorrent
|
|
|
|
PORT_CHECK_CACHE_SECONDS = 6 * 60 * 60
|
|
MAX_PORT_CHECK_CANDIDATES = 256
|
|
|
|
|
|
def _app_setting_get(key: str) -> str | None:
|
|
with connect() as conn:
|
|
row = conn.execute("SELECT value FROM app_settings WHERE key=?", (key,)).fetchone()
|
|
return row.get("value") if row else None
|
|
|
|
|
|
def _app_setting_set(key: str, value: str) -> None:
|
|
with connect() as conn:
|
|
conn.execute("INSERT OR REPLACE INTO app_settings(key,value) VALUES(?,?)", (key, value))
|
|
|
|
|
|
def _iso_from_epoch(value: Any) -> str | None:
|
|
try:
|
|
return datetime.fromtimestamp(float(value), timezone.utc).isoformat(timespec="seconds")
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def _public_ip(profile: dict | None = None, force: bool = False) -> str:
|
|
if profile and bool(profile.get("is_remote")):
|
|
return rtorrent.remote_public_ip(profile, force=force)
|
|
req = urllib.request.Request("https://api.ipify.org", headers={"User-Agent": "pyTorrent/port-check"})
|
|
with urllib.request.urlopen(req, timeout=8) as res:
|
|
return res.read(64).decode("utf-8", "replace").strip()
|
|
|
|
|
|
def _parse_port_candidates(value: str, limit: int = MAX_PORT_CHECK_CANDIDATES) -> tuple[list[int], bool]:
|
|
"""Return valid incoming port candidates from rTorrent network.port_range."""
|
|
ports: list[int] = []
|
|
seen: set[int] = set()
|
|
truncated = False
|
|
|
|
def add(port: int) -> None:
|
|
nonlocal truncated
|
|
if not 1 <= port <= 65535 or port in seen:
|
|
return
|
|
if len(ports) >= limit:
|
|
truncated = True
|
|
return
|
|
seen.add(port)
|
|
ports.append(port)
|
|
|
|
for start, end in re.findall(r"(\d{1,5})\s*-\s*(\d{1,5})", value or ""):
|
|
a, b = int(start), int(end)
|
|
if a > b:
|
|
a, b = b, a
|
|
for port in range(a, b + 1):
|
|
add(port)
|
|
if truncated:
|
|
break
|
|
|
|
without_ranges = re.sub(r"\d{1,5}\s*-\s*\d{1,5}", " ", value or "")
|
|
for item in re.findall(r"\d{1,5}", without_ranges):
|
|
add(int(item))
|
|
|
|
return ports, truncated
|
|
|
|
|
|
def _incoming_ports(profile: dict) -> dict:
|
|
try:
|
|
raw_value = str(rtorrent.client_for(profile).call("network.port_range") or "")
|
|
except Exception:
|
|
raw_value = ""
|
|
ports, truncated = _parse_port_candidates(raw_value)
|
|
return {"ports": ports, "raw": raw_value, "truncated": truncated}
|
|
|
|
|
|
def _yougetsignal_check(public_ip: str, port: int) -> dict:
|
|
body = urllib.parse.urlencode({"remoteAddress": public_ip, "portNumber": str(port)}).encode("utf-8")
|
|
req = urllib.request.Request(
|
|
"https://ports.yougetsignal.com/check-port.php",
|
|
data=body,
|
|
headers={
|
|
"Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
|
|
"User-Agent": "pyTorrent/port-check",
|
|
"Accept": "text/html,application/json,*/*",
|
|
},
|
|
method="POST",
|
|
)
|
|
with urllib.request.urlopen(req, timeout=12) as res:
|
|
text = res.read(8192).decode("utf-8", "replace")
|
|
low = text.lower()
|
|
if "is open" in low:
|
|
return {"status": "open", "source": "yougetsignal", "raw": text[:500]}
|
|
if "is closed" in low:
|
|
return {"status": "closed", "source": "yougetsignal", "raw": text[:500]}
|
|
return {"status": "unknown", "source": "yougetsignal", "raw": text[:500]}
|
|
|
|
|
|
def _local_port_fallback(public_ip: str, port: int) -> dict:
|
|
try:
|
|
with socket.create_connection((public_ip, port), timeout=3):
|
|
return {"status": "open", "source": "local-fallback"}
|
|
except Exception as exc:
|
|
return {"status": "unknown", "source": "local-fallback", "error": f"Local fallback inconclusive: {exc}"}
|
|
|
|
|
|
def _check_ports(public_ip: str, ports: list[int], checker) -> dict:
|
|
checked: list[int] = []
|
|
first_closed: dict | None = None
|
|
last_result: dict = {"status": "unknown"}
|
|
|
|
for port in ports:
|
|
checked.append(port)
|
|
current = checker(public_ip, port)
|
|
last_result = current
|
|
if current.get("status") == "open":
|
|
current.update({"port": port, "open_port": port, "checked_ports": checked})
|
|
return current
|
|
if current.get("status") == "closed" and first_closed is None:
|
|
first_closed = current
|
|
|
|
result = first_closed or last_result
|
|
result.update({"port": ports[0] if ports else None, "open_port": None, "checked_ports": checked})
|
|
return result
|
|
|
|
|
|
def port_check_status(profile: dict | None = None, force: bool = False, user_id: int | None = None) -> dict:
|
|
"""Return cached or freshly checked incoming-port status for one rTorrent profile."""
|
|
profile = profile or preferences.active_profile(user_id)
|
|
prefs = preferences.get_preferences(user_id, int(profile.get("id"))) if profile else preferences.get_preferences(user_id)
|
|
enabled = bool((prefs or {}).get("port_check_enabled"))
|
|
if not profile:
|
|
return {"status": "unknown", "enabled": enabled, "error": "No profile"}
|
|
|
|
port_info = _incoming_ports(profile)
|
|
ports = port_info["ports"]
|
|
if not ports:
|
|
return {"status": "unknown", "enabled": enabled, "error": "Cannot read rTorrent network.port_range"}
|
|
|
|
ports_key = ",".join(str(port) for port in ports)
|
|
cache_key = f"port_check:{profile['id']}:{ports_key}:{int(bool(port_info['truncated']))}"
|
|
if not force:
|
|
cached = _app_setting_get(cache_key)
|
|
if cached:
|
|
try:
|
|
data = json.loads(cached)
|
|
if time.time() - float(data.get("checked_at_epoch") or 0) < PORT_CHECK_CACHE_SECONDS:
|
|
data["cached"] = True
|
|
data["enabled"] = enabled
|
|
if not data.get("checked_at"):
|
|
data["checked_at"] = _iso_from_epoch(data.get("checked_at_epoch"))
|
|
return data
|
|
except Exception:
|
|
pass
|
|
|
|
checked_at_epoch = time.time()
|
|
result = {
|
|
"status": "unknown",
|
|
"enabled": enabled,
|
|
"port": ports[0],
|
|
"ports": ports,
|
|
"port_range": port_info["raw"],
|
|
"ports_truncated": port_info["truncated"],
|
|
"checked_at_epoch": checked_at_epoch,
|
|
"checked_at": _iso_from_epoch(checked_at_epoch),
|
|
"cached": False,
|
|
}
|
|
try:
|
|
public_ip = _public_ip(profile, force=force)
|
|
result["public_ip"] = public_ip
|
|
result["remote"] = bool(profile.get("is_remote"))
|
|
result.update(_check_ports(public_ip, ports, _yougetsignal_check))
|
|
except Exception as exc:
|
|
result["error"] = f"YouGetSignal failed: {exc}"
|
|
try:
|
|
public_ip = result.get("public_ip") or _public_ip(profile, force=force)
|
|
result["public_ip"] = public_ip
|
|
result["remote"] = bool(profile.get("is_remote"))
|
|
result.update(_check_ports(public_ip, ports, _local_port_fallback))
|
|
except Exception as fallback_exc:
|
|
result["fallback_error"] = str(fallback_exc)
|
|
result["source"] = "none"
|
|
_app_setting_set(cache_key, json.dumps(result))
|
|
return result
|