from __future__ import annotations from dataclasses import dataclass from datetime import datetime, timedelta from zoneinfo import ZoneInfo from config import TIME_RANGES from app.core_settings import get_settings @dataclass class TimeWindow: start: datetime end: datetime label: str key: str def now_local() -> datetime: settings = get_settings() return datetime.now(ZoneInfo(settings.timezone)) def start_of_local_day(moment: datetime | None = None) -> datetime: current = moment or now_local() return current.replace(hour=0, minute=0, second=0, microsecond=0) def resolve_window(range_key: str | None = None, start: str | None = None, end: str | None = None) -> TimeWindow: settings = get_settings() tz = ZoneInfo(settings.timezone) if (start and not end) or (end and not start): raise ValueError("Provide both start and end for custom range") if start and end: start_dt = _parse_iso(start, tz) end_dt = _parse_iso(end, tz) return TimeWindow(start=start_dt, end=end_dt, label="Custom", key="custom") key = range_key or settings.analytics["default_range"] definition = settings.time_ranges.get(key) if not definition: raise ValueError(f"Unsupported range: {key}") now_dt = datetime.now(tz) end_dt = now_dt special = definition.get("special") if special == "ytd": start_dt = now_dt.replace(month=1, day=1, hour=0, minute=0, second=0, microsecond=0) elif special == "today": start_dt = now_dt.replace(hour=0, minute=0, second=0, microsecond=0) end_dt = now_dt elif special == "yesterday": end_dt = now_dt.replace(hour=0, minute=0, second=0, microsecond=0) start_dt = end_dt - timedelta(days=1) else: start_dt = now_dt - timedelta(seconds=int(definition["seconds"])) return TimeWindow(start=start_dt, end=end_dt, label=definition["label"], key=key) def shift_window(window: TimeWindow, mode: str) -> TimeWindow: if mode == "previous_period": span = window.end - window.start return TimeWindow( start=window.start - span, end=window.start, label="Previous period", key=f"{window.key}:previous_period", ) if mode in {"previous_year", "previous_year_2", "previous_year_3"}: years = {"previous_year": 1, "previous_year_2": 2, "previous_year_3": 3}[mode] return TimeWindow( start=_safe_replace_year(window.start, window.start.year - years), end=_safe_replace_year(window.end, window.end.year - years), label=f"Previous {years} year", key=f"{window.key}:{mode}", ) if mode in {"previous_month_12", "previous_month_24"}: months = {"previous_month_12": 12, "previous_month_24": 24}[mode] return TimeWindow( start=_shift_months(window.start, -months), end=_shift_months(window.end, -months), label=f"Previous {months} months", key=f"{window.key}:{mode}", ) raise ValueError(f"Unsupported compare mode: {mode}") def choose_counter_interval(start: datetime, end: datetime) -> str: span_seconds = max((end - start).total_seconds(), 0) if span_seconds <= 3 * 86400: return "5m" if span_seconds <= 14 * 86400: return "15m" if span_seconds <= 93 * 86400: return "30m" if span_seconds <= 366 * 86400: return "1h" return "3h" def choose_power_interval(start: datetime, end: datetime) -> str: span_seconds = max((end - start).total_seconds(), 0) if span_seconds <= 24 * 3600: return "5m" if span_seconds <= 7 * 86400: return "15m" if span_seconds <= 31 * 86400: return "30m" if span_seconds <= 366 * 86400: return "1h" return "3h" def duration_to_seconds(interval: str) -> int: suffix = interval[-1] amount = int(interval[:-1]) if suffix == "s": return amount if suffix == "m": return amount * 60 if suffix == "h": return amount * 3600 if suffix == "d": return amount * 86400 raise ValueError(f"Unsupported duration format: {interval}") def to_utc_iso(dt: datetime) -> str: return dt.astimezone(ZoneInfo("UTC")).isoformat() def _parse_iso(value: str, tz: ZoneInfo) -> datetime: parsed = datetime.fromisoformat(value.replace("Z", "+00:00")) if parsed.tzinfo is None: return parsed.replace(tzinfo=tz) return parsed.astimezone(tz) def _safe_replace_year(value: datetime, year: int) -> datetime: try: return value.replace(year=year) except ValueError: return value.replace(year=year, day=28) def _shift_months(value: datetime, months: int) -> datetime: year = value.year + ((value.month - 1 + months) // 12) month = ((value.month - 1 + months) % 12) + 1 day = min(value.day, [31, 29 if year % 4 == 0 and (year % 100 != 0 or year % 400 == 0) else 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31][month - 1]) return value.replace(year=year, month=month, day=day)