from __future__ import annotations import copy import logging import threading import uuid from datetime import date, datetime, timedelta from functools import lru_cache from math import ceil from typing import Iterable from app.core_settings import AppSettings, get_settings from app.models import ( DailyEnergyRecord, HistoricalActivityEvent, HistoricalChunkProgress, HistoricalImportStatus, ) from app.services.catalog import MetricCatalog, get_catalog from app.services.energy import EnergyService from app.services.influx_http import InfluxHTTPService from app.storage import SQLiteEnergyRepository from app.utils.time import now_local logger = logging.getLogger(__name__) class HistoricalSyncService: MAX_RECENT_CHUNKS = 18 MAX_RECENT_EVENTS = 40 def __init__( self, settings: AppSettings | None = None, catalog: MetricCatalog | None = None, influx: InfluxHTTPService | None = None, energy: EnergyService | None = None, repository: SQLiteEnergyRepository | None = None, ) -> None: self.settings = settings or get_settings() self.catalog = catalog or get_catalog() self.influx = influx or InfluxHTTPService(self.settings) self.energy = energy or EnergyService(self.settings, self.catalog, self.influx) self.repository = repository or SQLiteEnergyRepository(self.settings.storage["sqlite_path"]) self._state_lock = threading.Lock() self._worker: threading.Thread | None = None self._cancel_event = threading.Event() self._scheduler_stop = threading.Event() self._scheduler: threading.Thread | None = None self._available_bounds_cache: tuple[datetime, date | None, date | None] | None = None self._state = HistoricalImportStatus( enabled=self.settings.history.get("enabled", True), state="idle", default_chunk_days=self.settings.history.get("default_chunk_days", 7), ) self._refresh_coverage() self._refresh_available_bounds() self._refresh_runtime_metrics() def status(self) -> HistoricalImportStatus: with self._state_lock: self._refresh_coverage(lock_held=True) self._refresh_available_bounds(lock_held=True) self._refresh_runtime_metrics(lock_held=True) return copy.deepcopy(self._state) def start( self, *, start_date: date | None = None, end_date: date | None = None, chunk_days: int | None = None, force: bool = False, auto: bool = False, ) -> HistoricalImportStatus: if not self.settings.history.get("enabled", True): raise RuntimeError("Historical import is disabled") chunk_days = max(int(chunk_days or self.settings.history.get("default_chunk_days", 7)), 1) resolved = self._resolve_range(start_date=start_date, end_date=end_date) if resolved is None: with self._state_lock: self._state.running = False self._state.state = "idle" self._state.message = "Brak brakujacych dni do importu." self._state.finished_at = datetime.utcnow() self._refresh_coverage(lock_held=True) self._refresh_available_bounds(lock_held=True) self._refresh_runtime_metrics(lock_held=True) return copy.deepcopy(self._state) resolved_start, resolved_end = resolved total_days = (resolved_end - resolved_start).days + 1 total_chunks = max(ceil(total_days / chunk_days), 1) start_message = "Start importu archiwalnego" if not auto else "Start automatycznej synchronizacji archiwum" with self._state_lock: if self._worker and self._worker.is_alive(): return copy.deepcopy(self._state) self._cancel_event = threading.Event() self._state = HistoricalImportStatus( enabled=True, running=True, state="running", job_id=uuid.uuid4().hex[:12], started_at=datetime.utcnow(), requested_start_date=resolved_start, requested_end_date=resolved_end, total_days=total_days, chunk_days=chunk_days, total_chunks=total_chunks, active_chunk_index=1, current_chunk_start=resolved_start, current_chunk_end=min(resolved_start + timedelta(days=chunk_days - 1), resolved_end), message=start_message, default_chunk_days=self.settings.history.get("default_chunk_days", 7), recent_chunks=[], recent_events=[], ) self._refresh_coverage(lock_held=True) self._refresh_available_bounds(lock_held=True) self._refresh_runtime_metrics(lock_held=True) self._worker = threading.Thread( target=self._run_worker, kwargs={ "start_date": resolved_start, "end_date": resolved_end, "chunk_days": chunk_days, "force": force, "auto": auto, }, name="pv-historical-backfill", daemon=True, ) self._worker.start() self._record_event( level="info", title="Uruchomiono zadanie", message=f"Zakres {resolved_start.isoformat()} -> {resolved_end.isoformat()}, chunk {chunk_days} dni", ) return self.status() def cancel(self) -> HistoricalImportStatus: self._cancel_event.set() with self._state_lock: self._state.message = "Anulowanie zadania..." self._refresh_runtime_metrics(lock_held=True) snapshot = copy.deepcopy(self._state) self._record_event(level="warn", title="Anulowanie", message="Uzytkownik poprosil o zatrzymanie zadania.") return snapshot def run_blocking( self, *, start_date: date | None = None, end_date: date | None = None, chunk_days: int | None = None, force: bool = False, ) -> HistoricalImportStatus: resolved = self._resolve_range(start_date=start_date, end_date=end_date) if resolved is None: return self.status() resolved_start, resolved_end = resolved chunk_days = max(int(chunk_days or self.settings.history.get("default_chunk_days", 7)), 1) total_days = (resolved_end - resolved_start).days + 1 total_chunks = max(ceil(total_days / chunk_days), 1) with self._state_lock: self._state = HistoricalImportStatus( enabled=True, running=True, state="running", job_id=uuid.uuid4().hex[:12], started_at=datetime.utcnow(), requested_start_date=resolved_start, requested_end_date=resolved_end, total_days=total_days, chunk_days=chunk_days, total_chunks=total_chunks, default_chunk_days=self.settings.history.get("default_chunk_days", 7), recent_chunks=[], recent_events=[], ) self._record_event( level="info", title="Uruchomiono zadanie", message=f"Zakres {resolved_start.isoformat()} -> {resolved_end.isoformat()}, chunk {chunk_days} dni", ) self._run_worker( start_date=resolved_start, end_date=resolved_end, chunk_days=chunk_days, force=force, auto=False, ) return self.status() def start_scheduler_if_enabled(self) -> None: if not self.settings.history.get("enabled", True): return if not self.settings.history.get("auto_sync_enabled", False): return if self._scheduler and self._scheduler.is_alive(): return self._scheduler_stop.clear() self._scheduler = threading.Thread(target=self._scheduler_loop, name="pv-history-scheduler", daemon=True) self._scheduler.start() def _scheduler_loop(self) -> None: interval_seconds = max(int(self.settings.history.get("auto_sync_interval_minutes", 30)), 1) * 60 if self.settings.history.get("auto_sync_on_start", False): try: self.start(auto=True) except Exception as exc: logger.warning("Unable to auto-start historical sync: %s", exc) while not self._scheduler_stop.wait(interval_seconds): try: if self._worker and self._worker.is_alive(): continue self.start(auto=True) except Exception as exc: logger.warning("Historical scheduler cycle failed: %s", exc) def _run_worker( self, *, start_date: date, end_date: date, chunk_days: int, force: bool, auto: bool, ) -> None: total_chunks = max(ceil(((end_date - start_date).days + 1) / chunk_days), 1) try: chunk_index = 0 chunk_start = start_date while chunk_start <= end_date: if self._cancel_event.is_set(): self._record_event(level="warn", title="Anulowano", message="Import archiwalny anulowany przez uzytkownika.") self._finish("cancelled", running=False, message="Import archiwalny anulowany przez uzytkownika.") return chunk_index += 1 chunk_end = min(chunk_start + timedelta(days=chunk_days - 1), end_date) self._update_chunk(chunk_index, total_chunks, chunk_start, chunk_end) imported, skipped, energy_kwh, cancelled = self._process_chunk( chunk_index=chunk_index, start_day=chunk_start, end_day=chunk_end, force=force, ) if cancelled: self._close_chunk( chunk_index, imported_days=imported, skipped_days=skipped, energy_kwh=energy_kwh, state="cancelled", note="Chunk zatrzymany podczas przetwarzania", ) self._record_event(level="warn", title="Anulowano", message="Import archiwalny anulowany przez uzytkownika.") self._finish("cancelled", running=False, message="Import archiwalny anulowany przez uzytkownika.") return self._close_chunk( chunk_index, imported_days=imported, skipped_days=skipped, energy_kwh=energy_kwh, state="completed", note=f"Chunk zakonczony: import {imported}, pominiete {skipped}", ) self._record_event( level="success", title=f"Chunk {chunk_index}/{total_chunks} zakonczony", message=f"Zakres {chunk_start.isoformat()} -> {chunk_end.isoformat()}, import {imported}, pominiete {skipped}, energia {energy_kwh:.2f} kWh", chunk_index=chunk_index, ) chunk_start = chunk_end + timedelta(days=1) final_message = "Synchronizacja archiwalna zakonczona" if auto else "Import archiwalny zakonczony" self._record_event(level="success", title="Zakonczono", message=final_message) self._finish("completed", running=False, message=final_message) except Exception as exc: logger.exception("Historical import failed") self._record_event(level="error", title="Blad importu", message=str(exc)) self._finish("failed", running=False, message="Import archiwalny zakonczyl sie bledem.", last_error=str(exc)) def _process_chunk(self, *, chunk_index: int, start_day: date, end_day: date, force: bool) -> tuple[int, int, float, bool]: imported_days = 0 skipped_days = 0 energy_kwh = 0.0 for day in self._date_range(start_day, end_day): if self._cancel_event.is_set(): return imported_days, skipped_days, energy_kwh, True if not force and self.repository.has_day(day): skipped_days += 1 self._advance_day( day, imported=False, message=f"Pominieto {day.isoformat()} - dzien juz istnieje w cache", level="warn", title="Pominieto dzien", chunk_index=chunk_index, ) continue total, source, samples_count = self.energy.total_for_full_day(day) if samples_count <= 0: skipped_days += 1 self._advance_day( day, imported=False, message=f"Pominieto {day.isoformat()} - brak probek w InfluxDB", level="warn", title="Brak probek", chunk_index=chunk_index, ) continue self.repository.upsert_daily_energy( DailyEnergyRecord( day=day, energy_kwh=total, source=source, samples_count=samples_count, ) ) imported_days += 1 energy_kwh += total self._advance_day( day, imported=True, message=f"Zaimportowano {day.isoformat()} ({total:.2f} kWh)", level="success", title="Zaimportowano dzien", chunk_index=chunk_index, energy_kwh=total, ) return imported_days, skipped_days, round(energy_kwh, 3), False def _advance_day( self, day: date, *, imported: bool, message: str, level: str, title: str, chunk_index: int, energy_kwh: float | None = None, ) -> None: with self._state_lock: self._state.processed_days += 1 if imported: self._state.imported_days += 1 else: self._state.skipped_days += 1 self._state.current_date = day self._state.message = message self._refresh_coverage(lock_held=True) self._refresh_runtime_metrics(lock_held=True) suffix = f" Energia: {energy_kwh:.2f} kWh." if imported and energy_kwh is not None else "" self._record_event( level=level, title=title, message=f"{message}.{suffix}" if not message.endswith(".") else f"{message}{suffix}", day=day, chunk_index=chunk_index, ) def _update_chunk(self, chunk_index: int, total_chunks: int, chunk_start: date, chunk_end: date) -> None: chunk = HistoricalChunkProgress( chunk_index=chunk_index, total_chunks=total_chunks, start_date=chunk_start, end_date=chunk_end, state="running", started_at=datetime.utcnow(), note=f"Aktywny chunk {chunk_start.isoformat()} -> {chunk_end.isoformat()}", ) with self._state_lock: self._state.current_chunk_start = chunk_start self._state.current_chunk_end = chunk_end self._state.active_chunk_index = chunk_index self._state.message = f"Przetwarzanie zakresu {chunk_start.isoformat()} -> {chunk_end.isoformat()}" self._upsert_chunk_locked(chunk) self._refresh_runtime_metrics(lock_held=True) self._record_event( level="info", title=f"Chunk {chunk_index}/{total_chunks}", message=f"Start zakresu {chunk_start.isoformat()} -> {chunk_end.isoformat()}", chunk_index=chunk_index, ) def _close_chunk( self, chunk_index: int, *, imported_days: int, skipped_days: int, energy_kwh: float, state: str, note: str, ) -> None: with self._state_lock: existing = self._find_chunk_locked(chunk_index) started_at = existing.started_at if existing and existing.started_at else datetime.utcnow() finished_at = datetime.utcnow() processed_days = imported_days + skipped_days duration_seconds = max((finished_at - started_at).total_seconds(), 0.0) chunk = HistoricalChunkProgress( chunk_index=chunk_index, total_chunks=self._state.total_chunks, start_date=existing.start_date if existing else self._state.current_chunk_start or self._state.requested_start_date or date.today(), end_date=existing.end_date if existing else self._state.current_chunk_end or self._state.requested_end_date or date.today(), processed_days=processed_days, imported_days=imported_days, skipped_days=skipped_days, energy_kwh=round(energy_kwh, 3), state=state, started_at=started_at, finished_at=finished_at, duration_seconds=round(duration_seconds, 2), note=note, ) self._upsert_chunk_locked(chunk) if state != "running": self._state.message = note self._refresh_runtime_metrics(lock_held=True) def _finish( self, state: str, *, running: bool, message: str, last_error: str | None = None, ) -> None: with self._state_lock: self._state.running = running self._state.state = state self._state.finished_at = datetime.utcnow() self._state.last_error = last_error self._state.message = message self._state.active_chunk_index = 0 self._refresh_coverage(lock_held=True) self._refresh_available_bounds(lock_held=True) self._refresh_runtime_metrics(lock_held=True) def _resolve_range(self, *, start_date: date | None, end_date: date | None) -> tuple[date, date] | None: today = now_local().date() include_today = self.settings.history.get("include_today_in_sync", False) default_end = today if include_today else today - timedelta(days=1) resolved_end = end_date or default_end if start_date is None: coverage = self.repository.coverage() if coverage.last_day: resolved_start = coverage.last_day + timedelta(days=1) else: bootstrap_start = self.settings.history.get("bootstrap_start_date") if bootstrap_start: resolved_start = date.fromisoformat(bootstrap_start) else: available_start, _ = self._available_bounds() resolved_start = available_start or resolved_end else: resolved_start = start_date if resolved_start > resolved_end: return None return resolved_start, resolved_end def _available_bounds(self) -> tuple[date | None, date | None]: now_utc = datetime.utcnow() cached = self._available_bounds_cache if cached and (now_utc - cached[0]).total_seconds() < 300: return cached[1], cached[2] available_start: date | None = None available_end: date | None = None metric = self.catalog.safe_get(self.settings.analytics.get("production_metric_id", "energy_total")) fallback = self.catalog.safe_get(self.settings.analytics.get("fallback_power_metric_id", "ac_power")) source_metric = metric or fallback if source_metric is not None: first_point = self.influx.first_value(source_metric) last_point = self.influx.last_value(source_metric) available_start = first_point.timestamp.astimezone(self.energy.tz).date() if first_point else None available_end = last_point.timestamp.astimezone(self.energy.tz).date() if last_point else None self._available_bounds_cache = (now_utc, available_start, available_end) return available_start, available_end def _refresh_coverage(self, *, lock_held: bool = False) -> None: coverage = self.repository.coverage() available_start, available_end = self._available_bounds() if available_start and available_end and available_start <= available_end: available_days = (available_end - available_start).days + 1 missing_days = self.repository.count_missing_days(available_start, available_end) coverage.available_days = available_days coverage.missing_days = missing_days imported_in_range = max(available_days - missing_days, 0) coverage.coverage_pct = round((imported_in_range / available_days) * 100, 1) if available_days > 0 else None else: coverage.available_days = 0 coverage.missing_days = 0 coverage.coverage_pct = None if lock_held: self._state.coverage = coverage else: with self._state_lock: self._state.coverage = coverage def _refresh_available_bounds(self, *, lock_held: bool = False) -> None: available_start, available_end = self._available_bounds() if lock_held: self._state.available_start_date = available_start self._state.available_end_date = available_end else: with self._state_lock: self._state.available_start_date = available_start self._state.available_end_date = available_end def _refresh_runtime_metrics(self, *, lock_held: bool = False) -> None: def apply() -> None: if self._state.started_at is None: self._state.elapsed_seconds = None self._state.estimated_remaining_seconds = None self._state.avg_days_per_minute = None return end_reference = datetime.utcnow() if self._state.running or self._state.finished_at is None else self._state.finished_at elapsed_seconds = max((end_reference - self._state.started_at).total_seconds(), 0.0) self._state.elapsed_seconds = round(elapsed_seconds, 1) if self._state.processed_days > 0 and elapsed_seconds > 0: avg_days_per_minute = (self._state.processed_days / elapsed_seconds) * 60 remaining_days = max(self._state.total_days - self._state.processed_days, 0) estimated_remaining = (remaining_days / self._state.processed_days) * elapsed_seconds self._state.avg_days_per_minute = round(avg_days_per_minute, 2) self._state.estimated_remaining_seconds = round(estimated_remaining, 1) if self._state.running else 0.0 else: self._state.avg_days_per_minute = None self._state.estimated_remaining_seconds = None if self._state.running else 0.0 if lock_held: apply() else: with self._state_lock: apply() def _record_event( self, *, level: str, title: str, message: str, day: date | None = None, chunk_index: int | None = None, ) -> None: event = HistoricalActivityEvent( timestamp=datetime.utcnow(), level=level, title=title, message=message, day=day, chunk_index=chunk_index, ) with self._state_lock: self._state.recent_events.append(event) self._state.recent_events = self._state.recent_events[-self.MAX_RECENT_EVENTS :] def _find_chunk_locked(self, chunk_index: int) -> HistoricalChunkProgress | None: for chunk in self._state.recent_chunks: if chunk.chunk_index == chunk_index: return chunk return None def _upsert_chunk_locked(self, chunk: HistoricalChunkProgress) -> None: for index, existing in enumerate(self._state.recent_chunks): if existing.chunk_index == chunk.chunk_index: self._state.recent_chunks[index] = chunk break else: self._state.recent_chunks.append(chunk) self._state.recent_chunks = self._state.recent_chunks[-self.MAX_RECENT_CHUNKS :] @staticmethod def _date_range(start_day: date, end_day: date) -> Iterable[date]: current = start_day while current <= end_day: yield current current = current + timedelta(days=1) @lru_cache(maxsize=1) def get_historical_sync_service() -> HistoricalSyncService: return HistoricalSyncService()