157 lines
5.0 KiB
Python
157 lines
5.0 KiB
Python
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass
|
|
from datetime import datetime, timedelta
|
|
from zoneinfo import ZoneInfo
|
|
|
|
from config import TIME_RANGES
|
|
from app.core_settings import get_settings
|
|
|
|
|
|
@dataclass
|
|
class TimeWindow:
|
|
start: datetime
|
|
end: datetime
|
|
label: str
|
|
key: str
|
|
|
|
|
|
def now_local() -> datetime:
|
|
settings = get_settings()
|
|
return datetime.now(ZoneInfo(settings.timezone))
|
|
|
|
|
|
def start_of_local_day(moment: datetime | None = None) -> datetime:
|
|
current = moment or now_local()
|
|
return current.replace(hour=0, minute=0, second=0, microsecond=0)
|
|
|
|
|
|
def resolve_window(range_key: str | None = None, start: str | None = None, end: str | None = None) -> TimeWindow:
|
|
settings = get_settings()
|
|
tz = ZoneInfo(settings.timezone)
|
|
|
|
if (start and not end) or (end and not start):
|
|
raise ValueError("Provide both start and end for custom range")
|
|
|
|
if start and end:
|
|
start_dt = _parse_iso(start, tz)
|
|
end_dt = _parse_iso(end, tz)
|
|
return TimeWindow(start=start_dt, end=end_dt, label="Custom", key="custom")
|
|
|
|
key = range_key or settings.analytics["default_range"]
|
|
definition = settings.time_ranges.get(key)
|
|
if not definition:
|
|
raise ValueError(f"Unsupported range: {key}")
|
|
|
|
now_dt = datetime.now(tz)
|
|
end_dt = now_dt
|
|
special = definition.get("special")
|
|
if special == "ytd":
|
|
start_dt = now_dt.replace(month=1, day=1, hour=0, minute=0, second=0, microsecond=0)
|
|
elif special == "today":
|
|
start_dt = now_dt.replace(hour=0, minute=0, second=0, microsecond=0)
|
|
end_dt = now_dt
|
|
elif special == "yesterday":
|
|
end_dt = now_dt.replace(hour=0, minute=0, second=0, microsecond=0)
|
|
start_dt = end_dt - timedelta(days=1)
|
|
else:
|
|
start_dt = now_dt - timedelta(seconds=int(definition["seconds"]))
|
|
|
|
return TimeWindow(start=start_dt, end=end_dt, label=definition["label"], key=key)
|
|
|
|
|
|
def shift_window(window: TimeWindow, mode: str) -> TimeWindow:
|
|
if mode == "previous_period":
|
|
span = window.end - window.start
|
|
return TimeWindow(
|
|
start=window.start - span,
|
|
end=window.start,
|
|
label="Previous period",
|
|
key=f"{window.key}:previous_period",
|
|
)
|
|
|
|
if mode in {"previous_year", "previous_year_2", "previous_year_3"}:
|
|
years = {"previous_year": 1, "previous_year_2": 2, "previous_year_3": 3}[mode]
|
|
return TimeWindow(
|
|
start=_safe_replace_year(window.start, window.start.year - years),
|
|
end=_safe_replace_year(window.end, window.end.year - years),
|
|
label=f"Previous {years} year",
|
|
key=f"{window.key}:{mode}",
|
|
)
|
|
|
|
if mode in {"previous_month_12", "previous_month_24"}:
|
|
months = {"previous_month_12": 12, "previous_month_24": 24}[mode]
|
|
return TimeWindow(
|
|
start=_shift_months(window.start, -months),
|
|
end=_shift_months(window.end, -months),
|
|
label=f"Previous {months} months",
|
|
key=f"{window.key}:{mode}",
|
|
)
|
|
|
|
raise ValueError(f"Unsupported compare mode: {mode}")
|
|
|
|
|
|
def choose_counter_interval(start: datetime, end: datetime) -> str:
|
|
span_seconds = max((end - start).total_seconds(), 0)
|
|
if span_seconds <= 3 * 86400:
|
|
return "5m"
|
|
if span_seconds <= 14 * 86400:
|
|
return "15m"
|
|
if span_seconds <= 93 * 86400:
|
|
return "30m"
|
|
if span_seconds <= 366 * 86400:
|
|
return "1h"
|
|
return "3h"
|
|
|
|
|
|
def choose_power_interval(start: datetime, end: datetime) -> str:
|
|
span_seconds = max((end - start).total_seconds(), 0)
|
|
if span_seconds <= 24 * 3600:
|
|
return "5m"
|
|
if span_seconds <= 7 * 86400:
|
|
return "15m"
|
|
if span_seconds <= 31 * 86400:
|
|
return "30m"
|
|
if span_seconds <= 366 * 86400:
|
|
return "1h"
|
|
return "3h"
|
|
|
|
|
|
def duration_to_seconds(interval: str) -> int:
|
|
suffix = interval[-1]
|
|
amount = int(interval[:-1])
|
|
if suffix == "s":
|
|
return amount
|
|
if suffix == "m":
|
|
return amount * 60
|
|
if suffix == "h":
|
|
return amount * 3600
|
|
if suffix == "d":
|
|
return amount * 86400
|
|
raise ValueError(f"Unsupported duration format: {interval}")
|
|
|
|
|
|
def to_utc_iso(dt: datetime) -> str:
|
|
return dt.astimezone(ZoneInfo("UTC")).isoformat()
|
|
|
|
|
|
def _parse_iso(value: str, tz: ZoneInfo) -> datetime:
|
|
parsed = datetime.fromisoformat(value.replace("Z", "+00:00"))
|
|
if parsed.tzinfo is None:
|
|
return parsed.replace(tzinfo=tz)
|
|
return parsed.astimezone(tz)
|
|
|
|
|
|
def _safe_replace_year(value: datetime, year: int) -> datetime:
|
|
try:
|
|
return value.replace(year=year)
|
|
except ValueError:
|
|
return value.replace(year=year, day=28)
|
|
|
|
|
|
def _shift_months(value: datetime, months: int) -> datetime:
|
|
year = value.year + ((value.month - 1 + months) // 12)
|
|
month = ((value.month - 1 + months) % 12) + 1
|
|
day = min(value.day, [31, 29 if year % 4 == 0 and (year % 100 != 0 or year % 400 == 0) else 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31][month - 1])
|
|
return value.replace(year=year, month=month, day=day)
|