from __future__ import annotations from datetime import datetime, timedelta from app.core_settings import AppSettings, get_settings from app.models.definitions import HeroCard, SnapshotGroupRow, SnapshotPayload from app.services.catalog import MetricCatalog, get_catalog from app.services.energy import EnergyService from app.services.influx_http import InfluxHTTPService from app.services.metrics import compare_delta_pct, custom_metric_value, metric_value, to_float from app.utils.time import choose_power_interval, now_local, resolve_window, start_of_local_day class RealtimeService: def __init__( self, settings: AppSettings | None = None, catalog: MetricCatalog | None = None, influx: InfluxHTTPService | None = None, energy: EnergyService | None = None, ) -> None: self.settings = settings or get_settings() self.catalog = catalog or get_catalog() self.influx = influx or InfluxHTTPService(self.settings) self.energy = energy or EnergyService(self.settings, self.catalog, self.influx) def snapshot(self) -> SnapshotPayload: now = now_local() today_start = start_of_local_day(now) yesterday_start = today_start - timedelta(days=1) metric_ids = {"ac_power", "energy_total", "inverter_temp"} for group in self.settings.strings: metric_ids.update(group.get("metrics", {}).values()) metrics = [self.catalog.get(metric_id) for metric_id in metric_ids if self.catalog.safe_get(metric_id)] latest = self.influx.latest_values(metrics) ac_power = to_float(_value(latest, "ac_power")) total_dc_power = round( sum( to_float(_value(latest, group.get("metrics", {}).get("power", ""))) or 0.0 for group in self.settings.strings ), 0, ) energy_today = self.energy.total_for_window(today_start, now) energy_yesterday = self.energy.total_for_window(yesterday_start, today_start) total_energy = to_float(_value(latest, "energy_total")) inverter_temp = to_float(_value(latest, "inverter_temp")) hero_cards = [ self._hero_card("ac_power", ac_power, subtitle="Current AC power"), self._hero_card("dc_power_total", total_dc_power, label="Total DC power", unit="W", subtitle="Sum of DC strings"), self._hero_card("energy_today", energy_today, label="Energy today", unit="kWh", subtitle="Calculated from Influx data"), self._hero_card("energy_total", total_energy, label="Total energy", unit="kWh", subtitle="Lifetime counter"), ] if inverter_temp is not None: hero_cards.append(self._hero_card("inverter_temp", inverter_temp, label="Inverter temperature", unit="°C", subtitle="Optional sensor")) kpis = { "energy_today": custom_metric_value("energy_today", "Energy today", energy_today, unit="kWh", precision=2, status="ok"), "energy_yesterday": custom_metric_value("energy_yesterday", "Energy yesterday", energy_yesterday, unit="kWh", precision=2, status="ok"), "energy_total": custom_metric_value( "energy_total", "Total energy", total_energy, unit="kWh", precision=2, timestamp=_timestamp(latest, "energy_total"), status="ok", ), "dc_power_total": custom_metric_value("dc_power_total", "Total DC power", total_dc_power, unit="W", precision=0, status="ok"), } comparison = compare_delta_pct(energy_today, energy_yesterday) if comparison is not None: kpis["today_vs_yesterday"] = custom_metric_value( "today_vs_yesterday", "Today vs yesterday", comparison, unit="%", precision=2, status="ok" if comparison >= 0 else "warn", ) strings = self._build_string_rows(latest) status = [] if self.catalog.safe_get("inverter_temp"): status.append( metric_value( self.catalog.get("inverter_temp"), inverter_temp, timestamp=_timestamp(latest, "inverter_temp"), ) ) status.append( custom_metric_value( "data_refresh", "Last energy reading", _timestamp(latest, "energy_total").isoformat() if _timestamp(latest, "energy_total") else None, status="ok" if _timestamp(latest, "energy_total") else "neutral", kind="text", ) ) updated_at = _max_timestamp(latest.values()) return SnapshotPayload( updated_at=updated_at, hero_cards=hero_cards, kpis=kpis, strings=strings, phases=[], status=status, faults=[], ) def history(self, range_key: str | None = None, start: str | None = None, end: str | None = None, metric_ids: list[str] | None = None) -> dict: window = resolve_window(range_key=range_key or self.settings.realtime["history_default_range"], start=start, end=end) interval = choose_power_interval(window.start, window.end) series = [] selected = set(metric_ids or []) def include(metric_id: str) -> bool: return not selected or metric_id in selected ac_metric = self.catalog.safe_get("ac_power") if ac_metric is not None and include("ac_power"): series.append( { "metric_id": ac_metric.id, "label": ac_metric.label, "unit": ac_metric.unit, "points": self.influx.gauge_history(ac_metric, window.start, window.end, interval=interval, aggregate="mean"), } ) for group in self.settings.strings: for slot, metric_id in group.get("metrics", {}).items(): if not metric_id or not self.catalog.safe_get(metric_id) or not include(metric_id): continue metric = self.catalog.get(metric_id) series.append( { "metric_id": metric.id, "label": f"{group['label']} power" if slot == "power" else f"{group['label']} voltage" if slot == "voltage" else metric.label, "unit": metric.unit, "points": self.influx.gauge_history(metric, window.start, window.end, interval=interval, aggregate="mean"), } ) temp_metric = self.catalog.safe_get("inverter_temp") if temp_metric is not None and include("inverter_temp"): temp_points = self.influx.gauge_history(temp_metric, window.start, window.end, interval=interval, aggregate="mean") last_value = None filled = [] for point in temp_points: value = point.value if point.value is not None else last_value if point.value is not None: last_value = point.value filled.append({"timestamp": point.timestamp, "value": value}) series.append( { "metric_id": temp_metric.id, "label": temp_metric.label, "unit": temp_metric.unit, "points": filled, } ) return { "range_key": window.key, "start": window.start, "end": window.end, "series": series, } def _hero_card(self, metric_id: str, value, *, label: str | None = None, unit: str | None = None, subtitle: str = "") -> HeroCard: accent = "slate" numeric = to_float(value) if metric_id == "inverter_temp": if numeric is not None and numeric < 55: accent = "emerald" elif numeric is not None and numeric < 70: accent = "amber" elif numeric is not None: accent = "rose" else: accent = "emerald" if numeric not in (None, 0) else "slate" resolved_label = label or (self.catalog.get(metric_id).label if self.catalog.safe_get(metric_id) else metric_id) resolved_unit = unit or (self.catalog.get(metric_id).unit if self.catalog.safe_get(metric_id) else "") return HeroCard( metric_id=metric_id, label=resolved_label, value=value, unit=resolved_unit, accent=accent, subtitle=subtitle, ) def _build_string_rows(self, latest: dict) -> list[SnapshotGroupRow]: rows = [] for group in self.settings.strings: values = {} for slot, metric_id in group.get("metrics", {}).items(): metric = self.catalog.safe_get(metric_id) if metric is None: continue values[slot] = metric_value(metric, _value(latest, metric_id), timestamp=_timestamp(latest, metric_id)) rows.append(SnapshotGroupRow(id=group["id"], label=group["label"], values=values, meta={})) return rows def _value(latest: dict, metric_id: str): payload = latest.get(metric_id) or {} return payload.get("value") def _timestamp(latest: dict, metric_id: str): payload = latest.get(metric_id) or {} return payload.get("timestamp") def _max_timestamp(items) -> datetime | None: timestamps = [item.get("timestamp") for item in items if item.get("timestamp") is not None] return max(timestamps) if timestamps else None