Files
Mateusz Gruszczyński 84fe898a74 fix ux
2026-03-24 15:49:06 +01:00

223 lines
9.3 KiB
Python

from __future__ import annotations
from collections import defaultdict
from dataclasses import dataclass
from datetime import date, datetime, time, timedelta
from zoneinfo import ZoneInfo
from app.core_settings import AppSettings, get_settings
from app.models.definitions import BucketPoint, DailyEnergyRecord, MetricDefinition, SeriesPoint
from app.services.catalog import MetricCatalog, get_catalog
from app.services.influx_http import InfluxHTTPService
from app.services.metrics import to_float
from app.storage import SQLiteEnergyRepository
from app.utils.time import (
choose_counter_interval,
choose_power_interval,
duration_to_seconds,
now_local,
)
@dataclass
class EnergySample:
timestamp: datetime
delta_kwh: float
class EnergyService:
def __init__(
self,
settings: AppSettings | None = None,
catalog: MetricCatalog | None = None,
influx: InfluxHTTPService | None = None,
repository: SQLiteEnergyRepository | None = None,
) -> None:
self.settings = settings or get_settings()
self.catalog = catalog or get_catalog()
self.influx = influx or InfluxHTTPService(self.settings)
self.repository = repository or SQLiteEnergyRepository(self.settings.storage["sqlite_path"])
self.tz = ZoneInfo(self.settings.timezone)
def total_for_window(self, start: datetime, end: datetime) -> float:
total, _, _ = self.window_total_with_meta(start, end)
return total
def window_total_with_meta(self, start: datetime, end: datetime) -> tuple[float, str, int]:
samples, source, observations_count = self._samples_for_window(start, end)
return round(sum(sample.delta_kwh for sample in samples), 2), source, observations_count
def total_for_full_day(self, day: date) -> tuple[float, str, int]:
start = datetime.combine(day, time.min, tzinfo=self.tz)
end = start + timedelta(days=1)
return self.window_total_with_meta(start, end)
def samples(self, start: datetime, end: datetime) -> list[EnergySample]:
samples, _, _ = self._samples_for_window(start, end)
return samples
def daily_records_for_window(
self,
start: datetime,
end: datetime,
*,
persist_missing: bool = True,
) -> list[DailyEnergyRecord]:
start_local = start.astimezone(self.tz)
end_local = end.astimezone(self.tz)
if end_local <= start_local:
return []
start_day = start_local.date()
end_day = end_local.date()
cached = self.repository.fetch_daily_energy(start_day, end_day)
today_local = now_local().date()
rows: list[DailyEnergyRecord] = []
current = start_day
while current <= end_day:
day_start = datetime.combine(current, time.min, tzinfo=self.tz)
day_end = day_start + timedelta(days=1)
segment_start = max(start_local, day_start)
segment_end = min(end_local, day_end)
if segment_end <= segment_start:
current = current + timedelta(days=1)
continue
is_full_day = segment_start == day_start and segment_end == day_end
cached_row = cached.get(current)
if is_full_day and cached_row is not None:
rows.append(cached_row)
else:
total, source, observations_count = self.window_total_with_meta(segment_start, segment_end)
record = DailyEnergyRecord(
day=current,
energy_kwh=total,
source=source,
samples_count=observations_count,
)
rows.append(record)
if is_full_day and persist_missing and current < today_local and observations_count > 0:
self.repository.upsert_daily_energy(record)
current = current + timedelta(days=1)
return rows
def bucketize_daily(self, records: list[DailyEnergyRecord], bucket: str) -> list[BucketPoint]:
grouped: dict[str, dict] = defaultdict(lambda: {"value": 0.0, "start": None, "end": None, "label": "", "has_data": False})
for record in records:
start = datetime.combine(record.day, time.min, tzinfo=self.tz)
if bucket == "day":
bucket_start = start
bucket_end = bucket_start + timedelta(days=1)
key = bucket_start.strftime("%Y-%m-%d")
label = bucket_start.strftime("%d.%m")
elif bucket == "week":
bucket_start = start - timedelta(days=start.weekday())
bucket_end = bucket_start + timedelta(days=7)
iso = bucket_start.isocalendar()
key = f"{iso.year}-W{iso.week:02d}"
label = key
elif bucket == "month":
bucket_start = start.replace(day=1)
if bucket_start.month == 12:
bucket_end = bucket_start.replace(year=bucket_start.year + 1, month=1)
else:
bucket_end = bucket_start.replace(month=bucket_start.month + 1)
key = bucket_start.strftime("%Y-%m")
label = key
elif bucket == "year":
bucket_start = start.replace(month=1, day=1)
bucket_end = bucket_start.replace(year=bucket_start.year + 1)
key = bucket_start.strftime("%Y")
label = key
else:
raise ValueError(f"Unsupported bucket: {bucket}")
current = grouped[key]
current["label"] = label
if record.samples_count > 0:
current["value"] += record.energy_kwh
current["has_data"] = True
current["start"] = bucket_start if current["start"] is None else min(current["start"], bucket_start)
current["end"] = bucket_end if current["end"] is None else max(current["end"], bucket_end)
rows = []
for key in sorted(grouped.keys()):
item = grouped[key]
rows.append(
BucketPoint(
label=item["label"],
start=item["start"],
end=item["end"],
value=round(item["value"], 2) if item["has_data"] else None,
)
)
return rows
def _samples_for_window(self, start: datetime, end: datetime) -> tuple[list[EnergySample], str, int]:
counter_metric = self.catalog.safe_get(self.settings.analytics["production_metric_id"])
if counter_metric is not None:
samples, observations_count = self._samples_from_counter(counter_metric, start, end)
return samples, "counter", observations_count
power_metric = self.catalog.safe_get(self.settings.analytics["fallback_power_metric_id"])
if power_metric is not None:
samples, observations_count = self._samples_from_power(power_metric, start, end)
return samples, "power_estimated", observations_count
return [], "unavailable", 0
def _samples_from_counter(self, metric: MetricDefinition, start: datetime, end: datetime) -> tuple[list[EnergySample], int]:
interval = choose_counter_interval(start, end)
baseline = self.influx.last_before(metric, start)
series = self.influx.grouped_last_series(metric, start, end, interval)
points: list[SeriesPoint] = []
if baseline and baseline.value is not None:
points.append(SeriesPoint(timestamp=start, value=baseline.value))
else:
first_value = next((point.value for point in series if point.value is not None), None)
if first_value is not None:
points.append(SeriesPoint(timestamp=start, value=first_value))
points.extend(series)
samples: list[EnergySample] = []
previous_value = None
for point in points:
current_value = to_float(point.value)
if current_value is None:
continue
if previous_value is None:
previous_value = current_value
continue
delta = current_value - previous_value
previous_value = current_value
if delta <= 0:
continue
if point.timestamp < start or point.timestamp > end:
continue
samples.append(EnergySample(timestamp=point.timestamp, delta_kwh=round(delta, 6)))
observations_count = sum(1 for point in series if to_float(point.value) is not None)
return samples, observations_count
def _samples_from_power(self, metric: MetricDefinition, start: datetime, end: datetime) -> tuple[list[EnergySample], int]:
interval = choose_power_interval(start, end)
interval_seconds = duration_to_seconds(interval)
points = self.influx.gauge_history(metric, start, end, interval, aggregate="mean")
samples: list[EnergySample] = []
observations_count = 0
for point in points:
watts = to_float(point.value)
if watts is None:
continue
observations_count += 1
if watts <= 0:
continue
delta_kwh = watts * (interval_seconds / 3600.0) / 1000.0
samples.append(EnergySample(timestamp=point.timestamp, delta_kwh=round(delta_kwh, 6)))
return samples, observations_count