Files
pyTorrent/pytorrent/services/operation_logs.py
Mateusz Gruszczyński 4cff530b0e config for logs and filters
2026-05-20 10:31:07 +02:00

201 lines
12 KiB
Python

from __future__ import annotations
import json
from datetime import datetime, timedelta, timezone
from typing import Any
from ..db import connect, utcnow, default_user_id
from . import auth, rtorrent
DEFAULT_SETTINGS = {"retention_mode": "days", "retention_days": 30, "retention_lines": 5000}
VALID_RETENTION_MODES = {"days", "lines", "both", "manual"}
def _user_id(user_id: int | None = None) -> int:
return int(user_id or auth.current_user_id() or default_user_id())
def _details(value: dict | None = None) -> str:
try:
return json.dumps(value or {}, ensure_ascii=False, sort_keys=True)
except Exception:
return "{}"
def _row_to_public(row: dict) -> dict:
item = dict(row)
try:
item["details"] = json.loads(item.get("details_json") or "{}")
except Exception:
item["details"] = {}
item["details_h"] = ", ".join(f"{k}: {v}" for k, v in item["details"].items() if v not in (None, ""))
return item
def get_settings(profile_id: int = 0, user_id: int | None = None) -> dict:
user_id = _user_id(user_id)
profile_id = int(profile_id or 0)
with connect() as conn:
row = conn.execute(
"SELECT * FROM operation_log_settings WHERE user_id=? AND profile_id=?",
(user_id, profile_id),
).fetchone()
if not row:
return {"user_id": user_id, "profile_id": profile_id, **DEFAULT_SETTINGS}
data = {**DEFAULT_SETTINGS, **dict(row)}
data["retention_mode"] = data.get("retention_mode") if data.get("retention_mode") in VALID_RETENTION_MODES else "days"
data["retention_days"] = max(1, int(data.get("retention_days") or DEFAULT_SETTINGS["retention_days"]))
data["retention_lines"] = max(100, int(data.get("retention_lines") or DEFAULT_SETTINGS["retention_lines"]))
return data
def save_settings(profile_id: int, data: dict, user_id: int | None = None) -> dict:
user_id = _user_id(user_id)
profile_id = int(profile_id or 0)
mode = str(data.get("retention_mode") or "days").lower()
if mode not in VALID_RETENTION_MODES:
mode = "days"
days = max(1, min(3650, int(data.get("retention_days") or DEFAULT_SETTINGS["retention_days"])))
lines = max(100, min(1_000_000, int(data.get("retention_lines") or DEFAULT_SETTINGS["retention_lines"])))
now = utcnow()
with connect() as conn:
conn.execute(
"""
INSERT INTO operation_log_settings(user_id, profile_id, retention_mode, retention_days, retention_lines, created_at, updated_at)
VALUES(?,?,?,?,?,?,?)
ON CONFLICT(user_id, profile_id) DO UPDATE SET
retention_mode=excluded.retention_mode,
retention_days=excluded.retention_days,
retention_lines=excluded.retention_lines,
updated_at=excluded.updated_at
""",
(user_id, profile_id, mode, days, lines, now, now),
)
return get_settings(profile_id, user_id)
def record(profile_id: int | None, event_type: str, message: str, *, severity: str = "info", source: str = "system", torrent_hash: str | None = None, torrent_name: str | None = None, action: str | None = None, details: dict | None = None, user_id: int | None = None) -> int:
now = utcnow()
user_id = _user_id(user_id)
with connect() as conn:
cur = conn.execute(
"""
INSERT INTO operation_logs(user_id, profile_id, event_type, severity, source, torrent_hash, torrent_name, action, message, details_json, created_at)
VALUES(?,?,?,?,?,?,?,?,?,?,?)
""",
(user_id, int(profile_id or 0) or None, str(event_type), str(severity or "info"), str(source or "system"), torrent_hash, torrent_name, action, str(message), _details(details), now),
)
return int(cur.lastrowid)
def record_job_event(profile_id: int, action: str, status: str, payload: dict | None, result: dict | None = None, error: str = "", job_id: str | None = None, user_id: int | None = None) -> None:
payload = payload or {}
result = result or {}
hashes = payload.get("hashes") or []
ctx = payload.get("job_context") or {}
items = ctx.get("items") or []
by_hash = {str(item.get("hash")): item for item in items if item}
event_type = "job_done" if status == "done" else "job_failed" if status == "failed" else "job_started"
severity = "danger" if status == "failed" else "info"
if action in {"add_magnet", "add_torrent_raw"}:
name = str(payload.get("name") or payload.get("filename") or payload.get("uri") or "torrent")[:300]
msg = f"{action} {status}: {name}"
record(profile_id, "torrent_added" if status == "done" else event_type, msg, severity=severity, source="job", action=action, details={"job_id": job_id, "status": status, "directory": payload.get("directory"), "label": payload.get("label"), "error": error, "result": result}, user_id=user_id)
return
if not hashes:
record(profile_id, event_type, f"{action} {status}", severity=severity, source="job", action=action, details={"job_id": job_id, "status": status, "error": error, "result": result}, user_id=user_id)
return
for h in hashes:
item = by_hash.get(str(h)) or {}
name = str(item.get("name") or h)
record(profile_id, "torrent_removed" if action == "remove" and status == "done" else event_type, f"{action} {status}: {name}", severity=severity, source="job", torrent_hash=str(h), torrent_name=name, action=action, details={"job_id": job_id, "status": status, "error": error, "result": result, "target_path": ctx.get("target_path"), "remove_data": ctx.get("remove_data")}, user_id=user_id)
def record_cache_diff(profile_id: int, added: list[dict], removed: list[str], updated: list[dict], old_rows: dict[str, dict]) -> None:
for row in added or []:
record(profile_id, "torrent_added", f"Torrent added: {row.get('name') or row.get('hash')}", source="poller", torrent_hash=row.get("hash"), torrent_name=row.get("name"), details={"size": row.get("size"), "path": row.get("path"), "label": row.get("label")})
for h in removed or []:
old = old_rows.get(str(h)) or {}
record(profile_id, "torrent_removed", f"Torrent removed: {old.get('name') or h}", source="poller", torrent_hash=str(h), torrent_name=old.get("name"), details={"path": old.get("path"), "label": old.get("label")})
for patch in updated or []:
h = str(patch.get("hash") or "")
old = old_rows.get(h) or {}
was_complete = bool(old.get("complete")) or float(old.get("progress") or 0) >= 100
is_complete = bool(patch.get("complete", old.get("complete"))) or float(patch.get("progress", old.get("progress") or 0) or 0) >= 100
if h and not was_complete and is_complete:
record(profile_id, "torrent_completed", f"Torrent completed: {old.get('name') or h}", source="poller", torrent_hash=h, torrent_name=old.get("name"), details={"ratio": patch.get("ratio", old.get("ratio")), "size": old.get("size"), "path": old.get("path")})
def list_logs(profile_id: int, *, limit: int = 200, offset: int = 0, event_type: str = "", q: str = "", hide_jobs: bool = False) -> dict:
limit = max(1, min(int(limit or 200), 1000))
offset = max(0, int(offset or 0))
where = ["(profile_id=? OR profile_id IS NULL)"]
params: list[Any] = [int(profile_id or 0)]
if event_type:
where.append("event_type=?")
params.append(event_type)
if hide_jobs:
# Note: Job-originated rows include torrent_added/torrent_removed events, so source is the reliable filter.
where.append("COALESCE(source, '') <> 'job'")
if q:
where.append("(message LIKE ? OR torrent_name LIKE ? OR torrent_hash LIKE ? OR action LIKE ?)")
like = f"%{q}%"
params.extend([like, like, like, like])
sql_where = " WHERE " + " AND ".join(where)
with connect() as conn:
rows = conn.execute(f"SELECT * FROM operation_logs{sql_where} ORDER BY id DESC LIMIT ? OFFSET ?", (*params, limit, offset)).fetchall()
total = conn.execute(f"SELECT COUNT(*) AS n FROM operation_logs{sql_where}", tuple(params)).fetchone()["n"]
return {"logs": [_row_to_public(r) for r in rows], "total": int(total or 0), "limit": limit, "offset": offset}
def stats(profile_id: int) -> dict:
profile_id = int(profile_id or 0)
with connect() as conn:
total = conn.execute("SELECT COUNT(*) AS n FROM operation_logs WHERE profile_id=? OR profile_id IS NULL", (profile_id,)).fetchone()["n"]
by_type = conn.execute("SELECT event_type, COUNT(*) AS n FROM operation_logs WHERE profile_id=? OR profile_id IS NULL GROUP BY event_type ORDER BY n DESC LIMIT 12", (profile_id,)).fetchall()
by_day = conn.execute("SELECT substr(created_at,1,10) AS bucket, COUNT(*) AS n FROM operation_logs WHERE profile_id=? OR profile_id IS NULL GROUP BY bucket ORDER BY bucket DESC LIMIT 14", (profile_id,)).fetchall()
by_month = conn.execute("SELECT substr(created_at,1,7) AS bucket, COUNT(*) AS n FROM operation_logs WHERE profile_id=? OR profile_id IS NULL GROUP BY bucket ORDER BY bucket DESC LIMIT 12", (profile_id,)).fetchall()
top_actions = conn.execute("SELECT COALESCE(action, event_type) AS action, COUNT(*) AS n FROM operation_logs WHERE profile_id=? OR profile_id IS NULL GROUP BY COALESCE(action, event_type) ORDER BY n DESC LIMIT 12", (profile_id,)).fetchall()
return {"total": int(total or 0), "by_type": by_type, "by_day": by_day, "by_month": by_month, "top_actions": top_actions, "settings": get_settings(profile_id)}
def retention_label(settings: dict) -> str:
mode = settings.get("retention_mode") or "days"
if mode == "manual":
return "manual cleanup only"
if mode == "lines":
return f"retention {settings.get('retention_lines') or DEFAULT_SETTINGS['retention_lines']} lines"
if mode == "both":
return f"retention {settings.get('retention_days') or DEFAULT_SETTINGS['retention_days']} days and {settings.get('retention_lines') or DEFAULT_SETTINGS['retention_lines']} lines"
return f"retention {settings.get('retention_days') or DEFAULT_SETTINGS['retention_days']} days"
def clear(profile_id: int, *, event_type: str = "") -> int:
where = ["(profile_id=? OR profile_id IS NULL)"]
params: list[Any] = [int(profile_id or 0)]
if event_type:
where.append("event_type=?")
params.append(event_type)
with connect() as conn:
cur = conn.execute("DELETE FROM operation_logs WHERE " + " AND ".join(where), tuple(params))
return int(cur.rowcount or 0)
def apply_retention(profile_id: int, user_id: int | None = None) -> dict:
settings = get_settings(profile_id, user_id)
mode = settings.get("retention_mode") or "manual"
deleted_days = 0
deleted_lines = 0
with connect() as conn:
if mode in {"days", "both"}:
cutoff = (datetime.now(timezone.utc) - timedelta(days=int(settings["retention_days"]))).isoformat(timespec="seconds")
cur = conn.execute("DELETE FROM operation_logs WHERE (profile_id=? OR profile_id IS NULL) AND created_at<?", (int(profile_id or 0), cutoff))
deleted_days = int(cur.rowcount or 0)
if mode in {"lines", "both"}:
keep = int(settings["retention_lines"])
ids = conn.execute("SELECT id FROM operation_logs WHERE profile_id=? OR profile_id IS NULL ORDER BY id DESC LIMIT -1 OFFSET ?", (int(profile_id or 0), keep)).fetchall()
if ids:
placeholders = ",".join("?" for _ in ids)
cur = conn.execute(f"DELETE FROM operation_logs WHERE id IN ({placeholders})", tuple(r["id"] for r in ids))
deleted_lines = int(cur.rowcount or 0)
return {"deleted_days": deleted_days, "deleted_lines": deleted_lines, "deleted": deleted_days + deleted_lines, "settings": settings}