lazy_retention
This commit is contained in:
@@ -6,8 +6,19 @@ from typing import Any
|
||||
from ..db import connect, utcnow, default_user_id
|
||||
from . import auth
|
||||
|
||||
DEFAULT_SETTINGS = {"retention_mode": "days", "retention_days": 30, "retention_lines": 5000}
|
||||
VALID_RETENTION_MODES = {"days", "lines", "both", "manual"}
|
||||
|
||||
DEFAULT_SETTINGS = {
|
||||
"retention_mode": "days",
|
||||
"retention_days": 30,
|
||||
"retention_lines": 5000,
|
||||
"retention_interval_hours": 24,
|
||||
}
|
||||
DEFAULT_CATEGORY_SETTINGS = {
|
||||
"job": {"retention_mode": "days", "retention_days": 7, "retention_lines": 2000, "retention_interval_hours": 24},
|
||||
"operation": {"retention_mode": "days", "retention_days": 30, "retention_lines": 5000, "retention_interval_hours": 24},
|
||||
}
|
||||
VALID_LOG_CATEGORIES = {"job", "operation"}
|
||||
MAX_DETAIL_TEXT = 4000
|
||||
MAX_DETAIL_ITEMS = 200
|
||||
|
||||
@@ -99,6 +110,51 @@ def _row_to_public(row: dict) -> dict:
|
||||
return item
|
||||
|
||||
|
||||
def _sanitize_mode(value: Any, default: str = "days") -> str:
|
||||
mode = str(value or default).lower()
|
||||
return mode if mode in VALID_RETENTION_MODES else default
|
||||
|
||||
|
||||
def _sanitize_days(value: Any, default: int) -> int:
|
||||
return max(1, min(3650, int(value or default)))
|
||||
|
||||
|
||||
def _sanitize_lines(value: Any, default: int) -> int:
|
||||
return max(100, min(1_000_000, int(value or default)))
|
||||
|
||||
|
||||
def _sanitize_interval(value: Any, default: int = 24) -> int:
|
||||
return max(1, min(8760, int(value or default)))
|
||||
|
||||
|
||||
def _log_category(event_type: str = "", source: str = "") -> str:
|
||||
return "job" if str(source or "") in {"job", "worker"} or str(event_type or "").startswith("job_") else "operation"
|
||||
|
||||
|
||||
def _category_where(category: str) -> str:
|
||||
if category == "job":
|
||||
return "(COALESCE(source, '') IN ('job', 'worker') OR event_type LIKE 'job_%')"
|
||||
return "NOT (COALESCE(source, '') IN ('job', 'worker') OR event_type LIKE 'job_%')"
|
||||
|
||||
|
||||
def _parse_dt(value: Any) -> datetime | None:
|
||||
if not value:
|
||||
return None
|
||||
try:
|
||||
text = str(value).replace("Z", "+00:00")
|
||||
dt = datetime.fromisoformat(text)
|
||||
return dt if dt.tzinfo else dt.replace(tzinfo=timezone.utc)
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
|
||||
def _next_retention_run(settings: dict, category: str) -> str | None:
|
||||
last = _parse_dt(settings.get(f"{category}_last_retention_run_at"))
|
||||
if not last:
|
||||
return None
|
||||
return (last + timedelta(hours=int(settings.get(f"{category}_retention_interval_hours") or 24))).isoformat(timespec="seconds")
|
||||
|
||||
|
||||
def get_settings(profile_id: int = 0, user_id: int | None = None) -> dict:
|
||||
user_id = _user_id(user_id)
|
||||
profile_id = int(profile_id or 0)
|
||||
@@ -108,51 +164,109 @@ def get_settings(profile_id: int = 0, user_id: int | None = None) -> dict:
|
||||
(profile_id,),
|
||||
).fetchone()
|
||||
if not row:
|
||||
return {"owner_user_id": user_id, "profile_id": profile_id, **DEFAULT_SETTINGS}
|
||||
data = {**DEFAULT_SETTINGS, **dict(row)}
|
||||
data["owner_user_id"] = int(data.pop("user_id", user_id) or user_id)
|
||||
data["retention_mode"] = data.get("retention_mode") if data.get("retention_mode") in VALID_RETENTION_MODES else "days"
|
||||
data["retention_days"] = max(1, int(data.get("retention_days") or DEFAULT_SETTINGS["retention_days"]))
|
||||
data["retention_lines"] = max(100, int(data.get("retention_lines") or DEFAULT_SETTINGS["retention_lines"]))
|
||||
data = {"owner_user_id": user_id, "profile_id": profile_id, **DEFAULT_SETTINGS}
|
||||
else:
|
||||
data = {**DEFAULT_SETTINGS, **dict(row)}
|
||||
data["owner_user_id"] = int(data.pop("user_id", user_id) or user_id)
|
||||
data["profile_id"] = profile_id
|
||||
data["retention_mode"] = _sanitize_mode(data.get("retention_mode"), DEFAULT_SETTINGS["retention_mode"])
|
||||
data["retention_days"] = _sanitize_days(data.get("retention_days"), DEFAULT_SETTINGS["retention_days"])
|
||||
data["retention_lines"] = _sanitize_lines(data.get("retention_lines"), DEFAULT_SETTINGS["retention_lines"])
|
||||
data["retention_interval_hours"] = _sanitize_interval(data.get("retention_interval_hours"), DEFAULT_SETTINGS["retention_interval_hours"])
|
||||
for category, defaults in DEFAULT_CATEGORY_SETTINGS.items():
|
||||
data[f"{category}_retention_mode"] = _sanitize_mode(data.get(f"{category}_retention_mode") or data.get("retention_mode"), defaults["retention_mode"])
|
||||
data[f"{category}_retention_days"] = _sanitize_days(data.get(f"{category}_retention_days") or data.get("retention_days"), defaults["retention_days"])
|
||||
data[f"{category}_retention_lines"] = _sanitize_lines(data.get(f"{category}_retention_lines") or data.get("retention_lines"), defaults["retention_lines"])
|
||||
data[f"{category}_retention_interval_hours"] = _sanitize_interval(data.get(f"{category}_retention_interval_hours") or data.get("retention_interval_hours"), defaults["retention_interval_hours"])
|
||||
data[f"{category}_last_retention_deleted"] = max(0, int(data.get(f"{category}_last_retention_deleted") or 0))
|
||||
data[f"{category}_next_retention_run_at"] = _next_retention_run(data, category)
|
||||
return data
|
||||
|
||||
|
||||
def save_settings(profile_id: int, data: dict, user_id: int | None = None) -> dict:
|
||||
user_id = _user_id(user_id)
|
||||
profile_id = int(profile_id or 0)
|
||||
mode = str(data.get("retention_mode") or "days").lower()
|
||||
if mode not in VALID_RETENTION_MODES:
|
||||
mode = "days"
|
||||
days = max(1, min(3650, int(data.get("retention_days") or DEFAULT_SETTINGS["retention_days"])))
|
||||
lines = max(100, min(1_000_000, int(data.get("retention_lines") or DEFAULT_SETTINGS["retention_lines"])))
|
||||
now = utcnow()
|
||||
if not auth.can_write_profile(profile_id, user_id):
|
||||
raise PermissionError("No write access to profile")
|
||||
current = get_settings(profile_id, user_id)
|
||||
legacy_mode = _sanitize_mode(data.get("retention_mode") or current.get("retention_mode"), DEFAULT_SETTINGS["retention_mode"])
|
||||
legacy_days = _sanitize_days(data.get("retention_days") or current.get("retention_days"), DEFAULT_SETTINGS["retention_days"])
|
||||
legacy_lines = _sanitize_lines(data.get("retention_lines") or current.get("retention_lines"), DEFAULT_SETTINGS["retention_lines"])
|
||||
legacy_interval = _sanitize_interval(data.get("retention_interval_hours") or current.get("retention_interval_hours"), DEFAULT_SETTINGS["retention_interval_hours"])
|
||||
values: dict[str, Any] = {
|
||||
"retention_mode": legacy_mode,
|
||||
"retention_days": legacy_days,
|
||||
"retention_lines": legacy_lines,
|
||||
"retention_interval_hours": legacy_interval,
|
||||
}
|
||||
for category, defaults in DEFAULT_CATEGORY_SETTINGS.items():
|
||||
values[f"{category}_retention_mode"] = _sanitize_mode(data.get(f"{category}_retention_mode") or current.get(f"{category}_retention_mode"), defaults["retention_mode"])
|
||||
values[f"{category}_retention_days"] = _sanitize_days(data.get(f"{category}_retention_days") or current.get(f"{category}_retention_days"), defaults["retention_days"])
|
||||
values[f"{category}_retention_lines"] = _sanitize_lines(data.get(f"{category}_retention_lines") or current.get(f"{category}_retention_lines"), defaults["retention_lines"])
|
||||
values[f"{category}_retention_interval_hours"] = _sanitize_interval(data.get(f"{category}_retention_interval_hours") or current.get(f"{category}_retention_interval_hours"), defaults["retention_interval_hours"])
|
||||
values[f"{category}_last_retention_run_at"] = current.get(f"{category}_last_retention_run_at")
|
||||
values[f"{category}_last_retention_deleted"] = int(current.get(f"{category}_last_retention_deleted") or 0)
|
||||
with connect() as conn:
|
||||
conn.execute("DELETE FROM operation_log_settings WHERE profile_id=?", (profile_id,))
|
||||
conn.execute(
|
||||
"""
|
||||
INSERT INTO operation_log_settings(user_id, profile_id, retention_mode, retention_days, retention_lines, created_at, updated_at)
|
||||
VALUES(?,?,?,?,?,?,?)
|
||||
INSERT INTO operation_log_settings(
|
||||
user_id, profile_id, retention_mode, retention_days, retention_lines,
|
||||
retention_interval_hours,
|
||||
job_retention_mode, job_retention_days, job_retention_lines, job_retention_interval_hours, job_last_retention_run_at, job_last_retention_deleted,
|
||||
operation_retention_mode, operation_retention_days, operation_retention_lines, operation_retention_interval_hours, operation_last_retention_run_at, operation_last_retention_deleted,
|
||||
created_at, updated_at
|
||||
) VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)
|
||||
ON CONFLICT(user_id, profile_id) DO UPDATE SET
|
||||
retention_mode=excluded.retention_mode,
|
||||
retention_days=excluded.retention_days,
|
||||
retention_lines=excluded.retention_lines,
|
||||
retention_interval_hours=excluded.retention_interval_hours,
|
||||
job_retention_mode=excluded.job_retention_mode,
|
||||
job_retention_days=excluded.job_retention_days,
|
||||
job_retention_lines=excluded.job_retention_lines,
|
||||
job_retention_interval_hours=excluded.job_retention_interval_hours,
|
||||
job_last_retention_run_at=excluded.job_last_retention_run_at,
|
||||
job_last_retention_deleted=excluded.job_last_retention_deleted,
|
||||
operation_retention_mode=excluded.operation_retention_mode,
|
||||
operation_retention_days=excluded.operation_retention_days,
|
||||
operation_retention_lines=excluded.operation_retention_lines,
|
||||
operation_retention_interval_hours=excluded.operation_retention_interval_hours,
|
||||
operation_last_retention_run_at=excluded.operation_last_retention_run_at,
|
||||
operation_last_retention_deleted=excluded.operation_last_retention_deleted,
|
||||
updated_at=excluded.updated_at
|
||||
""",
|
||||
(user_id, profile_id, mode, days, lines, now, now),
|
||||
(
|
||||
user_id, profile_id, values["retention_mode"], values["retention_days"], values["retention_lines"], values["retention_interval_hours"],
|
||||
values["job_retention_mode"], values["job_retention_days"], values["job_retention_lines"], values["job_retention_interval_hours"], values["job_last_retention_run_at"], values["job_last_retention_deleted"],
|
||||
values["operation_retention_mode"], values["operation_retention_days"], values["operation_retention_lines"], values["operation_retention_interval_hours"], values["operation_last_retention_run_at"], values["operation_last_retention_deleted"],
|
||||
now, now,
|
||||
),
|
||||
)
|
||||
return get_settings(profile_id, user_id)
|
||||
|
||||
|
||||
def record(profile_id: int | None, event_type: str, message: str, *, severity: str = "info", source: str = "system", torrent_hash: str | None = None, torrent_name: str | None = None, action: str | None = None, details: dict | None = None, user_id: int | None = None) -> int:
|
||||
"""Insert one operation log row and keep all details in JSON-safe form."""
|
||||
"""Insert one operation log row and lazily run retention for its category when due."""
|
||||
now = utcnow()
|
||||
user_id = _user_id(user_id)
|
||||
event_type_s = str(event_type)
|
||||
source_s = str(source or "system")
|
||||
with connect() as conn:
|
||||
cur = conn.execute(
|
||||
"""
|
||||
INSERT INTO operation_logs(user_id, profile_id, event_type, severity, source, torrent_hash, torrent_name, action, message, details_json, created_at)
|
||||
VALUES(?,?,?,?,?,?,?,?,?,?,?)
|
||||
""",
|
||||
(user_id, int(profile_id or 0) or None, str(event_type), str(severity or "info"), str(source or "system"), torrent_hash, torrent_name, action, str(message), _details(details), now),
|
||||
(user_id, int(profile_id or 0) or None, event_type_s, str(severity or "info"), source_s, torrent_hash, torrent_name, action, str(message), _details(details), now),
|
||||
)
|
||||
return int(cur.lastrowid)
|
||||
row_id = int(cur.lastrowid)
|
||||
try:
|
||||
maybe_apply_retention(int(profile_id or 0), _log_category(event_type_s, source_s), user_id=user_id)
|
||||
except Exception:
|
||||
# Logging must never fail because cleanup metadata could not be updated.
|
||||
pass
|
||||
return row_id
|
||||
|
||||
|
||||
def _job_event_type(status: str) -> str:
|
||||
@@ -282,7 +396,7 @@ def list_logs(profile_id: int, *, limit: int = 200, offset: int = 0, event_type:
|
||||
where.append("event_type=?")
|
||||
params.append(event_type)
|
||||
if hide_jobs:
|
||||
where.append("COALESCE(source, '') <> 'job' AND event_type NOT LIKE 'job_%'")
|
||||
where.append("COALESCE(source, '') NOT IN ('job', 'worker') AND event_type NOT LIKE 'job_%'")
|
||||
if q:
|
||||
where.append("(message LIKE ? OR torrent_name LIKE ? OR torrent_hash LIKE ? OR action LIKE ? OR details_json LIKE ?)")
|
||||
like = f"%{q}%"
|
||||
@@ -305,20 +419,29 @@ def stats(profile_id: int) -> dict:
|
||||
return {"total": int(total or 0), "by_type": by_type, "by_day": by_day, "by_month": by_month, "top_actions": top_actions, "settings": get_settings(profile_id)}
|
||||
|
||||
|
||||
def retention_label(settings: dict) -> str:
|
||||
mode = settings.get("retention_mode") or "days"
|
||||
def _retention_label_for(settings: dict, category: str) -> str:
|
||||
mode = settings.get(f"{category}_retention_mode") or "days"
|
||||
days = settings.get(f"{category}_retention_days") or DEFAULT_CATEGORY_SETTINGS[category]["retention_days"]
|
||||
lines = settings.get(f"{category}_retention_lines") or DEFAULT_CATEGORY_SETTINGS[category]["retention_lines"]
|
||||
interval = settings.get(f"{category}_retention_interval_hours") or DEFAULT_CATEGORY_SETTINGS[category]["retention_interval_hours"]
|
||||
if mode == "manual":
|
||||
return "manual cleanup only"
|
||||
return f"manual cleanup only, checked every {interval}h"
|
||||
if mode == "lines":
|
||||
return f"retention {settings.get('retention_lines') or DEFAULT_SETTINGS['retention_lines']} lines"
|
||||
return f"retention {lines} lines, checked every {interval}h"
|
||||
if mode == "both":
|
||||
return f"retention {settings.get('retention_days') or DEFAULT_SETTINGS['retention_days']} days and {settings.get('retention_lines') or DEFAULT_SETTINGS['retention_lines']} lines"
|
||||
return f"retention {settings.get('retention_days') or DEFAULT_SETTINGS['retention_days']} days"
|
||||
return f"retention {days} days and {lines} lines, checked every {interval}h"
|
||||
return f"retention {days} days, checked every {interval}h"
|
||||
|
||||
|
||||
def clear(profile_id: int, *, event_type: str = "") -> int:
|
||||
def retention_label(settings: dict) -> str:
|
||||
return f"Jobs: {_retention_label_for(settings, 'job')} / Operations: {_retention_label_for(settings, 'operation')}"
|
||||
|
||||
|
||||
def clear(profile_id: int, *, event_type: str = "", category: str = "") -> int:
|
||||
where = ["(profile_id=? OR profile_id IS NULL)"]
|
||||
params: list[Any] = [int(profile_id or 0)]
|
||||
if category in VALID_LOG_CATEGORIES:
|
||||
where.append(_category_where(category))
|
||||
if event_type:
|
||||
where.append("event_type=?")
|
||||
params.append(event_type)
|
||||
@@ -327,22 +450,88 @@ def clear(profile_id: int, *, event_type: str = "") -> int:
|
||||
return int(cur.rowcount or 0)
|
||||
|
||||
|
||||
def apply_retention(profile_id: int, user_id: int | None = None) -> dict:
|
||||
"""Apply operation-log retention without touching torrent data or other history tables."""
|
||||
settings = get_settings(profile_id, user_id)
|
||||
mode = settings.get("retention_mode") or "manual"
|
||||
def _apply_retention_category(conn, profile_id: int, settings: dict, category: str) -> dict:
|
||||
mode = settings.get(f"{category}_retention_mode") or "manual"
|
||||
deleted_days = 0
|
||||
deleted_lines = 0
|
||||
base_where = f"(profile_id=? OR profile_id IS NULL) AND {_category_where(category)}"
|
||||
if mode in {"days", "both"}:
|
||||
cutoff = (datetime.now(timezone.utc) - timedelta(days=int(settings[f"{category}_retention_days"]))).isoformat(timespec="seconds")
|
||||
cur = conn.execute(f"DELETE FROM operation_logs WHERE {base_where} AND created_at<?", (int(profile_id or 0), cutoff))
|
||||
deleted_days = int(cur.rowcount or 0)
|
||||
if mode in {"lines", "both"}:
|
||||
keep = int(settings[f"{category}_retention_lines"])
|
||||
cur = conn.execute(
|
||||
f"""
|
||||
DELETE FROM operation_logs
|
||||
WHERE id IN (
|
||||
SELECT id FROM operation_logs
|
||||
WHERE {base_where}
|
||||
ORDER BY id DESC
|
||||
LIMIT -1 OFFSET ?
|
||||
)
|
||||
""",
|
||||
(int(profile_id or 0), keep),
|
||||
)
|
||||
deleted_lines = int(cur.rowcount or 0)
|
||||
return {"deleted_days": deleted_days, "deleted_lines": deleted_lines, "deleted": deleted_days + deleted_lines}
|
||||
|
||||
|
||||
def _update_retention_metadata(conn, profile_id: int, category: str, deleted: int, settings: dict, user_id: int | None = None) -> None:
|
||||
now = utcnow()
|
||||
owner_id = int(settings.get("owner_user_id") or _user_id(user_id))
|
||||
profile_id = int(profile_id or 0)
|
||||
cur = conn.execute(
|
||||
f"""
|
||||
UPDATE operation_log_settings
|
||||
SET {category}_last_retention_run_at=?, {category}_last_retention_deleted=?, updated_at=?
|
||||
WHERE profile_id=?
|
||||
""",
|
||||
(now, int(deleted or 0), now, profile_id),
|
||||
)
|
||||
if int(cur.rowcount or 0) == 0:
|
||||
conn.execute(
|
||||
"""
|
||||
INSERT INTO operation_log_settings(user_id, profile_id, created_at, updated_at)
|
||||
VALUES(?,?,?,?)
|
||||
ON CONFLICT(user_id, profile_id) DO UPDATE SET updated_at=excluded.updated_at
|
||||
""",
|
||||
(owner_id, profile_id, now, now),
|
||||
)
|
||||
conn.execute(
|
||||
f"UPDATE operation_log_settings SET {category}_last_retention_run_at=?, {category}_last_retention_deleted=?, updated_at=? WHERE profile_id=?",
|
||||
(now, int(deleted or 0), now, profile_id),
|
||||
)
|
||||
|
||||
|
||||
def apply_retention(profile_id: int, user_id: int | None = None, category: str = "all") -> dict:
|
||||
"""Apply due operation-log retention without touching torrent data or other history tables."""
|
||||
profile_id = int(profile_id or 0)
|
||||
settings = get_settings(profile_id, user_id)
|
||||
categories = [category] if category in VALID_LOG_CATEGORIES else ["job", "operation"]
|
||||
results: dict[str, Any] = {}
|
||||
total = 0
|
||||
with connect() as conn:
|
||||
if mode in {"days", "both"}:
|
||||
cutoff = (datetime.now(timezone.utc) - timedelta(days=int(settings["retention_days"]))).isoformat(timespec="seconds")
|
||||
cur = conn.execute("DELETE FROM operation_logs WHERE (profile_id=? OR profile_id IS NULL) AND created_at<?", (int(profile_id or 0), cutoff))
|
||||
deleted_days = int(cur.rowcount or 0)
|
||||
if mode in {"lines", "both"}:
|
||||
keep = int(settings["retention_lines"])
|
||||
ids = conn.execute("SELECT id FROM operation_logs WHERE profile_id=? OR profile_id IS NULL ORDER BY id DESC LIMIT -1 OFFSET ?", (int(profile_id or 0), keep)).fetchall()
|
||||
if ids:
|
||||
placeholders = ",".join("?" for _ in ids)
|
||||
cur = conn.execute(f"DELETE FROM operation_logs WHERE id IN ({placeholders})", tuple(r["id"] for r in ids))
|
||||
deleted_lines = int(cur.rowcount or 0)
|
||||
return {"deleted_days": deleted_days, "deleted_lines": deleted_lines, "deleted": deleted_days + deleted_lines, "settings": settings}
|
||||
for cat in categories:
|
||||
item = _apply_retention_category(conn, profile_id, settings, cat)
|
||||
_update_retention_metadata(conn, profile_id, cat, int(item["deleted"]), settings, user_id=user_id)
|
||||
results[cat] = item
|
||||
total += int(item["deleted"])
|
||||
fresh = get_settings(profile_id, user_id)
|
||||
return {"deleted": total, "categories": results, "settings": fresh}
|
||||
|
||||
|
||||
def maybe_apply_retention(profile_id: int, category: str, user_id: int | None = None) -> dict:
|
||||
"""Run retention for a category only when interval since last cleanup elapsed."""
|
||||
if category not in VALID_LOG_CATEGORIES:
|
||||
category = "operation"
|
||||
settings = get_settings(profile_id, user_id)
|
||||
interval = int(settings.get(f"{category}_retention_interval_hours") or 24)
|
||||
last = _parse_dt(settings.get(f"{category}_last_retention_run_at"))
|
||||
now = datetime.now(timezone.utc)
|
||||
if last and now < last + timedelta(hours=interval):
|
||||
return {"skipped": True, "category": category, "next_run_at": (last + timedelta(hours=interval)).isoformat(timespec="seconds"), "settings": settings}
|
||||
result = apply_retention(profile_id, user_id=user_id, category=category)
|
||||
result["skipped"] = False
|
||||
result["category"] = category
|
||||
return result
|
||||
|
||||
Reference in New Issue
Block a user