queue_stopped #3
@@ -3,6 +3,7 @@ from __future__ import annotations
|
||||
import argparse
|
||||
import getpass
|
||||
import sys
|
||||
import json
|
||||
|
||||
from .db import connect, init_db, utcnow
|
||||
from .services.auth import password_hash
|
||||
@@ -32,17 +33,21 @@ def reset_password(username: str, password: str) -> bool:
|
||||
|
||||
|
||||
|
||||
def fetch_tracker_favicon(domain: str, refresh: bool = True) -> str:
|
||||
def fetch_tracker_favicon(domain: str, refresh: bool = True, debug: bool = False) -> str:
|
||||
"""Note: Download or refresh one tracker favicon from CLI without starting the web server."""
|
||||
clean = tracker_cache.tracker_domain(domain)
|
||||
if not clean:
|
||||
raise ValueError("Tracker domain is required")
|
||||
init_db()
|
||||
path, mime = tracker_cache.favicon_path(clean, enabled=True, force=refresh)
|
||||
row = tracker_cache.favicon_cache_row(clean)
|
||||
if not path:
|
||||
row = tracker_cache.favicon_cache_row(clean)
|
||||
detail = (row or {}).get("error") if row else "favicon not found"
|
||||
if debug and row:
|
||||
raise RuntimeError(f"{detail or 'favicon not found'}; cache={json.dumps(dict(row), default=str)}")
|
||||
raise RuntimeError(str(detail or "favicon not found"))
|
||||
if debug and row:
|
||||
return f"{path} ({mime or 'unknown'}) cache={json.dumps(dict(row), default=str)}"
|
||||
return f"{path} ({mime or 'unknown'})"
|
||||
|
||||
def _password_from_args(args: argparse.Namespace) -> str:
|
||||
@@ -69,6 +74,7 @@ def build_parser() -> argparse.ArgumentParser:
|
||||
icon = sub.add_parser("tracker-favicon", help="Download or refresh a tracker favicon cache file")
|
||||
icon.add_argument("domain", help="Tracker domain, e.g. t.pte.nu")
|
||||
icon.add_argument("--no-refresh", action="store_true", help="Use fresh cache when available")
|
||||
icon.add_argument("--debug", action="store_true", help="Print cache diagnostics on success or failure")
|
||||
icon.set_defaults(func=_cmd_tracker_favicon)
|
||||
|
||||
return parser
|
||||
@@ -86,7 +92,7 @@ def _cmd_reset_password(args: argparse.Namespace) -> int:
|
||||
|
||||
def _cmd_tracker_favicon(args: argparse.Namespace) -> int:
|
||||
"""Note: Run favicon discovery from CLI and print the saved file path."""
|
||||
print(fetch_tracker_favicon(args.domain, refresh=not args.no_refresh))
|
||||
print(fetch_tracker_favicon(args.domain, refresh=not args.no_refresh, debug=bool(args.debug)))
|
||||
return 0
|
||||
|
||||
|
||||
|
||||
@@ -5,6 +5,7 @@ import mimetypes
|
||||
import re
|
||||
import time
|
||||
import threading
|
||||
import ssl
|
||||
import urllib.error
|
||||
import urllib.parse
|
||||
import urllib.request
|
||||
@@ -32,9 +33,9 @@ class _IconParser(HTMLParser):
|
||||
if tag.lower() != "link":
|
||||
return
|
||||
data = {str(k).lower(): str(v or "") for k, v in attrs}
|
||||
rel = data.get("rel", "").lower()
|
||||
rel = re.sub(r"\s+", " ", data.get("rel", "").lower()).strip()
|
||||
href = data.get("href", "").strip()
|
||||
if href and any(part in rel.split() for part in ("icon", "shortcut", "apple-touch-icon")):
|
||||
if href and "icon" in rel:
|
||||
self.icons.append(href)
|
||||
|
||||
|
||||
@@ -218,14 +219,32 @@ def favicon_public_url(domain: str, enabled: bool = True, create: bool = False,
|
||||
return f"{PUBLIC_FAVICON_BASE}/{urllib.parse.quote(str(rel).replace(chr(92), '/'))}"
|
||||
|
||||
def _fetch(url: str, limit: int = 262144) -> tuple[bytes, str, str]:
|
||||
req = urllib.request.Request(url, headers={"User-Agent": "pyTorrent/1.0 favicon-cache"})
|
||||
with urllib.request.urlopen(req, timeout=5) as resp:
|
||||
data = resp.read(limit + 1)
|
||||
if len(data) > limit:
|
||||
data = data[:limit]
|
||||
content_type = str(resp.headers.get("Content-Type") or "").split(";", 1)[0].strip().lower()
|
||||
final_url = str(resp.geturl() or url)
|
||||
return data, content_type, final_url
|
||||
# Note: Favicon discovery uses browser-like headers and a certificate fallback, because tracker login pages/CDNs often reject minimal Python requests.
|
||||
req = urllib.request.Request(
|
||||
url,
|
||||
headers={
|
||||
"User-Agent": "Mozilla/5.0 (compatible; pyTorrent favicon fetcher)",
|
||||
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/png,image/svg+xml,image/*,*/*;q=0.8",
|
||||
"Connection": "close",
|
||||
},
|
||||
)
|
||||
|
||||
def _read(context=None):
|
||||
with urllib.request.urlopen(req, timeout=8, context=context) as resp:
|
||||
data = resp.read(limit + 1)
|
||||
if len(data) > limit:
|
||||
data = data[:limit]
|
||||
content_type = str(resp.headers.get("Content-Type") or "").split(";", 1)[0].strip().lower()
|
||||
final_url = str(resp.geturl() or url)
|
||||
return data, content_type, final_url
|
||||
|
||||
try:
|
||||
return _read()
|
||||
except urllib.error.URLError as exc:
|
||||
reason = getattr(exc, "reason", None)
|
||||
if isinstance(reason, ssl.SSLError) or "CERTIFICATE_VERIFY_FAILED" in str(exc):
|
||||
return _read(ssl._create_unverified_context())
|
||||
raise
|
||||
|
||||
|
||||
def _is_icon(data: bytes, content_type: str, url: str) -> bool:
|
||||
@@ -257,11 +276,36 @@ def _is_icon(data: bytes, content_type: str, url: str) -> bool:
|
||||
return False
|
||||
|
||||
|
||||
|
||||
def _extract_icon_hrefs(html: str) -> list[str]:
|
||||
# Note: Regex fallback catches real-world tags like <link rel='shortcut icon' href='...'> even when HTMLParser skips malformed markup.
|
||||
hrefs: list[str] = []
|
||||
parser = _IconParser()
|
||||
try:
|
||||
parser.feed(html)
|
||||
hrefs.extend(parser.icons)
|
||||
except Exception:
|
||||
pass
|
||||
for match in re.finditer(r"<link\b[^>]*>", html, re.I):
|
||||
tag = match.group(0)
|
||||
rel = re.search(r"\brel\s*=\s*(['\"])(.*?)\1", tag, re.I | re.S)
|
||||
href = re.search(r"\bhref\s*=\s*(['\"])(.*?)\1", tag, re.I | re.S)
|
||||
if rel and href and "icon" in rel.group(2).lower():
|
||||
hrefs.append(href.group(2).strip())
|
||||
clean = []
|
||||
seen = set()
|
||||
for href in hrefs:
|
||||
if href and href not in seen:
|
||||
seen.add(href)
|
||||
clean.append(href)
|
||||
return clean
|
||||
|
||||
def _favicon_candidates(domain: str) -> list[str]:
|
||||
host = tracker_domain(domain)
|
||||
root = _root_domain(host)
|
||||
candidates = []
|
||||
for h in [host, root]:
|
||||
# Note: Besides host/root favicon.ico, try common CDN/static hostnames after HTML lookup for trackers that publish icons there.
|
||||
for h in [host, root, f"cdn.{root}" if root else "", f"static.{root}" if root else "", f"www.{root}" if root else ""]:
|
||||
if h:
|
||||
candidates.extend([f"https://{h}/favicon.ico", f"http://{h}/favicon.ico"])
|
||||
return list(dict.fromkeys(candidates))
|
||||
@@ -282,12 +326,8 @@ def _html_icon_candidates(domain: str) -> list[str]:
|
||||
continue
|
||||
if "html" not in ctype and b"<html" not in data[:2048].lower() and b"<link" not in data.lower():
|
||||
continue
|
||||
parser = _IconParser()
|
||||
try:
|
||||
parser.feed(data.decode("utf-8", errors="ignore"))
|
||||
except Exception:
|
||||
continue
|
||||
for href in parser.icons:
|
||||
html = data.decode("utf-8", errors="ignore")
|
||||
for href in _extract_icon_hrefs(html):
|
||||
urls.append(urllib.parse.urljoin(final_url, href))
|
||||
return list(dict.fromkeys(urls))
|
||||
|
||||
|
||||
Reference in New Issue
Block a user