from __future__ import annotations import json import mimetypes import re import time import urllib.error import urllib.parse import urllib.request from html.parser import HTMLParser from pathlib import Path from ..config import BASE_DIR from ..db import connect, utcnow TRACKER_CACHE_TTL_SECONDS = 7 * 24 * 60 * 60 FAVICON_CACHE_TTL_SECONDS = 7 * 24 * 60 * 60 TRACKER_SCAN_LIMIT = 80 FAVICON_DIR = BASE_DIR / "data" / "tracker_favicons" PUBLIC_FAVICON_BASE = "/static/tracker_favicons" class _IconParser(HTMLParser): def __init__(self): super().__init__() self.icons: list[str] = [] def handle_starttag(self, tag: str, attrs): if tag.lower() != "link": return data = {str(k).lower(): str(v or "") for k, v in attrs} rel = data.get("rel", "").lower() href = data.get("href", "").strip() if href and any(part in rel.split() for part in ("icon", "shortcut", "apple-touch-icon")): self.icons.append(href) def _now_epoch() -> float: return time.time() def tracker_domain(url: str) -> str: raw = str(url or "").strip() if not raw: return "" parsed = urllib.parse.urlparse(raw if "://" in raw else f"http://{raw}") host = (parsed.hostname or "").lower().strip(".") if host.startswith("www."): host = host[4:] return host def _root_domain(domain: str) -> str: parts = [p for p in str(domain or "").lower().strip(".").split(".") if p] if len(parts) <= 2: return ".".join(parts) if len(parts[-1]) == 2 and len(parts[-2]) <= 3 and len(parts) >= 3: return ".".join(parts[-3:]) return ".".join(parts[-2:]) def _safe_filename(domain: str) -> str: return re.sub(r"[^a-z0-9_.-]+", "_", domain.lower()).strip("._") or "tracker" def _read_cached(profile_id: int, hashes: list[str], ttl: int) -> tuple[dict[str, list[dict]], set[str]]: if not hashes: return {}, set() now = _now_epoch() cached: dict[str, list[dict]] = {} fresh: set[str] = set() with connect() as conn: for start in range(0, len(hashes), 900): chunk = hashes[start:start + 900] placeholders = ",".join("?" for _ in chunk) rows = conn.execute( f"SELECT torrent_hash, trackers_json, updated_epoch FROM tracker_summary_cache WHERE profile_id=? AND torrent_hash IN ({placeholders})", (profile_id, *chunk), ).fetchall() for row in rows: h = str(row.get("torrent_hash") or "") try: items = json.loads(row.get("trackers_json") or "[]") except Exception: items = [] cached[h] = items if isinstance(items, list) else [] if now - float(row.get("updated_epoch") or 0) < ttl: fresh.add(h) return cached, fresh def _store(profile_id: int, torrent_hash: str, trackers: list[dict]) -> None: now = utcnow() epoch = _now_epoch() compact = [] seen = set() for item in trackers: domain = tracker_domain(str(item.get("url") or item.get("domain") or "")) or str(item.get("domain") or "") if not domain or domain in seen: continue seen.add(domain) compact.append({"domain": domain, "url": str(item.get("url") or "")}) with connect() as conn: conn.execute( """ INSERT INTO tracker_summary_cache(profile_id, torrent_hash, trackers_json, updated_at, updated_epoch) VALUES(?, ?, ?, ?, ?) ON CONFLICT(profile_id, torrent_hash) DO UPDATE SET trackers_json=excluded.trackers_json, updated_at=excluded.updated_at, updated_epoch=excluded.updated_epoch """, (profile_id, torrent_hash, json.dumps(compact), now, epoch), ) def summary(profile: dict, hashes: list[str], loader, scan_limit: int = TRACKER_SCAN_LIMIT, include_favicons: bool = False) -> dict: """Build tracker sidebar data from disk cache and refresh a small batch per request.""" # Note: Tracker data is cached per torrent hash, so huge rTorrent libraries are never scanned in one UI request. profile_id = int(profile.get("id") or 0) clean_hashes = [str(h or "").strip() for h in hashes if str(h or "").strip()] cached, fresh = _read_cached(profile_id, clean_hashes, TRACKER_CACHE_TTL_SECONDS) missing = [h for h in clean_hashes if h not in fresh] errors: list[dict] = [] scanned_now = 0 for h in missing[:max(0, int(scan_limit or 0))]: try: trackers = loader(h) _store(profile_id, h, trackers) cached[h] = [{"domain": tracker_domain(t.get("url") or t.get("domain") or ""), "url": str(t.get("url") or "")} for t in trackers] fresh.add(h) scanned_now += 1 except Exception as exc: errors.append({"hash": h, "error": str(exc)}) by_hash: dict[str, list[dict]] = {} counts: dict[str, dict] = {} for h in clean_hashes: items = [] seen = set() for item in cached.get(h, []): domain = tracker_domain(str(item.get("url") or item.get("domain") or "")) or str(item.get("domain") or "") if not domain or domain in seen: continue seen.add(domain) row = {"domain": domain, "url": str(item.get("url") or "")} items.append(row) bucket = counts.setdefault(domain, {"domain": domain, "url": row["url"], "count": 0}) bucket["count"] += 1 if not bucket.get("url") and row["url"]: bucket["url"] = row["url"] by_hash[h] = items trackers = sorted(counts.values(), key=lambda x: (-int(x.get("count") or 0), str(x.get("domain") or ""))) if include_favicons: # Note: Summary returns only already cached static favicon URLs; network favicon discovery stays outside the hot tracker count path. for item in trackers: item["favicon_url"] = favicon_public_url(str(item.get("domain") or ""), enabled=True, create=False) pending = max(0, len([h for h in clean_hashes if h not in fresh])) return {"hashes": by_hash, "trackers": trackers, "errors": errors[:25], "scanned": len(clean_hashes), "scanned_now": scanned_now, "pending": pending, "cached": len(clean_hashes) - pending} def favicon_public_url(domain: str, enabled: bool = True, create: bool = False) -> str: """Return the static URL for a cached tracker favicon, optionally creating it first.""" # Note: Favicon files stay in data/tracker_favicons, but the browser loads them via the static/tracker_favicons symlink. clean = tracker_domain(domain) if not enabled or not clean: return "" if create: favicon_path(clean, enabled=True) cached = _cached_favicon(clean) now = _now_epoch() if not cached or now - float(cached.get("updated_epoch") or 0) >= FAVICON_CACHE_TTL_SECONDS: return "" path = Path(str(cached.get("file_path") or "")) if not path.exists() or not path.is_file(): return "" try: rel = path.resolve().relative_to(FAVICON_DIR.resolve()) except Exception: rel = Path(path.name) return f"{PUBLIC_FAVICON_BASE}/{urllib.parse.quote(str(rel).replace(chr(92), '/'))}" def _fetch(url: str, limit: int = 262144) -> tuple[bytes, str, str]: req = urllib.request.Request(url, headers={"User-Agent": "pyTorrent/1.0 favicon-cache"}) with urllib.request.urlopen(req, timeout=5) as resp: data = resp.read(limit + 1) if len(data) > limit: data = data[:limit] content_type = str(resp.headers.get("Content-Type") or "").split(";", 1)[0].strip().lower() final_url = str(resp.geturl() or url) return data, content_type, final_url def _is_icon(data: bytes, content_type: str, url: str) -> bool: if not data: return False ctype = content_type.lower() if ctype.startswith("image/") or ctype in {"application/octet-stream", "binary/octet-stream"}: return True return urllib.parse.urlparse(url).path.lower().endswith((".ico", ".png", ".jpg", ".jpeg", ".svg", ".webp")) def _favicon_candidates(domain: str) -> list[str]: host = tracker_domain(domain) root = _root_domain(host) candidates = [] for h in [host, root]: if h: candidates.extend([f"https://{h}/favicon.ico", f"http://{h}/favicon.ico"]) return list(dict.fromkeys(candidates)) def _html_icon_candidates(domain: str) -> list[str]: host = tracker_domain(domain) root = _root_domain(host) urls = [] for h in [host, root]: if not h: continue for scheme in ("https", "http"): base = f"{scheme}://{h}/" try: data, ctype, final_url = _fetch(base, limit=524288) except Exception: continue if "html" not in ctype and b" tuple[Path | None, str | None]: clean = tracker_domain(domain) if not enabled or not clean: return None, None cached = _cached_favicon(clean) now = _now_epoch() if cached and now - float(cached.get("updated_epoch") or 0) < FAVICON_CACHE_TTL_SECONDS: path = Path(str(cached.get("file_path") or "")) if path.exists(): return path, str(cached.get("mime_type") or mimetypes.guess_type(path.name)[0] or "image/x-icon") if cached.get("error"): return None, None # Note: Favicon lookup tries tracker host, root domain, then HTML and stores the result for a week. FAVICON_DIR.mkdir(parents=True, exist_ok=True) errors = [] candidates = _favicon_candidates(clean) checked_html = False idx = 0 while idx < len(candidates): url = candidates[idx] idx += 1 try: data, ctype, final_url = _fetch(url, limit=524288) if not _is_icon(data, ctype, final_url): continue ext = Path(urllib.parse.urlparse(final_url).path).suffix.lower() or mimetypes.guess_extension(ctype) or ".ico" if ext not in {".ico", ".png", ".jpg", ".jpeg", ".svg", ".webp"}: ext = ".ico" path = FAVICON_DIR / f"{_safe_filename(clean)}{ext}" path.write_bytes(data) mime = ctype if ctype.startswith("image/") else (mimetypes.guess_type(path.name)[0] or "image/x-icon") with connect() as conn: conn.execute( """ INSERT INTO tracker_favicon_cache(domain, source_url, file_path, mime_type, updated_at, updated_epoch, error) VALUES(?, ?, ?, ?, ?, ?, NULL) ON CONFLICT(domain) DO UPDATE SET source_url=excluded.source_url, file_path=excluded.file_path, mime_type=excluded.mime_type, updated_at=excluded.updated_at, updated_epoch=excluded.updated_epoch, error=NULL """, (clean, final_url, str(path), mime, utcnow(), now), ) return path, mime except Exception as exc: errors.append(f"{url}: {exc}") if idx >= len(candidates) and not checked_html: checked_html = True candidates.extend([u for u in _html_icon_candidates(clean) if u not in candidates]) with connect() as conn: conn.execute( """ INSERT INTO tracker_favicon_cache(domain, source_url, file_path, mime_type, updated_at, updated_epoch, error) VALUES(?, '', '', '', ?, ?, ?) ON CONFLICT(domain) DO UPDATE SET updated_at=excluded.updated_at, updated_epoch=excluded.updated_epoch, error=excluded.error """, (clean, utcnow(), now, "; ".join(errors[-3:]) or "favicon not found"), ) return None, None