Files
solar-pv-dashboard/backend/app/services/historical_sync.py
Mateusz Gruszczyński 138059945e poprawki i zmiany ux
2026-03-26 09:30:39 +01:00

606 lines
25 KiB
Python

from __future__ import annotations
import copy
import logging
import threading
import uuid
from datetime import date, datetime, timedelta
from functools import lru_cache
from math import ceil
from typing import Iterable
from app.core_settings import AppSettings, get_settings
from app.models import (
DailyEnergyRecord,
HistoricalActivityEvent,
HistoricalChunkProgress,
HistoricalImportStatus,
)
from app.services.catalog import MetricCatalog, get_catalog
from app.services.energy import EnergyService
from app.services.influx_http import InfluxHTTPService
from app.storage import SQLiteEnergyRepository
from app.utils.time import now_local
logger = logging.getLogger(__name__)
class HistoricalSyncService:
MAX_RECENT_CHUNKS = 18
MAX_RECENT_EVENTS = 40
def __init__(
self,
settings: AppSettings | None = None,
catalog: MetricCatalog | None = None,
influx: InfluxHTTPService | None = None,
energy: EnergyService | None = None,
repository: SQLiteEnergyRepository | None = None,
) -> None:
self.settings = settings or get_settings()
self.catalog = catalog or get_catalog()
self.influx = influx or InfluxHTTPService(self.settings)
self.energy = energy or EnergyService(self.settings, self.catalog, self.influx)
self.repository = repository or SQLiteEnergyRepository(self.settings.storage["sqlite_path"])
self._state_lock = threading.Lock()
self._worker: threading.Thread | None = None
self._cancel_event = threading.Event()
self._scheduler_stop = threading.Event()
self._scheduler: threading.Thread | None = None
self._available_bounds_cache: tuple[datetime, date | None, date | None] | None = None
self._state = HistoricalImportStatus(
enabled=self.settings.history.get("enabled", True),
state="idle",
default_chunk_days=self.settings.history.get("default_chunk_days", 7),
)
self._refresh_coverage()
self._refresh_available_bounds()
self._refresh_runtime_metrics()
def status(self) -> HistoricalImportStatus:
with self._state_lock:
self._refresh_coverage(lock_held=True)
self._refresh_available_bounds(lock_held=True)
self._refresh_runtime_metrics(lock_held=True)
return copy.deepcopy(self._state)
def start(
self,
*,
start_date: date | None = None,
end_date: date | None = None,
chunk_days: int | None = None,
force: bool = False,
auto: bool = False,
) -> HistoricalImportStatus:
if not self.settings.history.get("enabled", True):
raise RuntimeError("Historical import is disabled")
chunk_days = max(int(chunk_days or self.settings.history.get("default_chunk_days", 7)), 1)
resolved = self._resolve_range(start_date=start_date, end_date=end_date)
if resolved is None:
with self._state_lock:
self._state.running = False
self._state.state = "idle"
self._state.message = "There are no missing days to import."
self._state.finished_at = datetime.utcnow()
self._refresh_coverage(lock_held=True)
self._refresh_available_bounds(lock_held=True)
self._refresh_runtime_metrics(lock_held=True)
return copy.deepcopy(self._state)
resolved_start, resolved_end = resolved
total_days = (resolved_end - resolved_start).days + 1
total_chunks = max(ceil(total_days / chunk_days), 1)
start_message = "Historical import started" if not auto else "Automatic historical sync started"
with self._state_lock:
if self._worker and self._worker.is_alive():
return copy.deepcopy(self._state)
self._cancel_event = threading.Event()
self._state = HistoricalImportStatus(
enabled=True,
running=True,
state="running",
job_id=uuid.uuid4().hex[:12],
started_at=datetime.utcnow(),
requested_start_date=resolved_start,
requested_end_date=resolved_end,
total_days=total_days,
chunk_days=chunk_days,
total_chunks=total_chunks,
active_chunk_index=1,
current_chunk_start=resolved_start,
current_chunk_end=min(resolved_start + timedelta(days=chunk_days - 1), resolved_end),
message=start_message,
default_chunk_days=self.settings.history.get("default_chunk_days", 7),
recent_chunks=[],
recent_events=[],
)
self._refresh_coverage(lock_held=True)
self._refresh_available_bounds(lock_held=True)
self._refresh_runtime_metrics(lock_held=True)
self._worker = threading.Thread(
target=self._run_worker,
kwargs={
"start_date": resolved_start,
"end_date": resolved_end,
"chunk_days": chunk_days,
"force": force,
"auto": auto,
},
name="pv-historical-backfill",
daemon=True,
)
self._worker.start()
self._record_event(
level="info",
title="Job started",
message=f"Range {resolved_start.isoformat()} -> {resolved_end.isoformat()}, chunk size {chunk_days} days",
)
return self.status()
def cancel(self) -> HistoricalImportStatus:
self._cancel_event.set()
with self._state_lock:
self._state.message = "Cancelling job..."
self._refresh_runtime_metrics(lock_held=True)
snapshot = copy.deepcopy(self._state)
self._record_event(level="warn", title="Cancellation requested", message="The user requested the job to stop.")
return snapshot
def run_blocking(
self,
*,
start_date: date | None = None,
end_date: date | None = None,
chunk_days: int | None = None,
force: bool = False,
) -> HistoricalImportStatus:
resolved = self._resolve_range(start_date=start_date, end_date=end_date)
if resolved is None:
return self.status()
resolved_start, resolved_end = resolved
chunk_days = max(int(chunk_days or self.settings.history.get("default_chunk_days", 7)), 1)
total_days = (resolved_end - resolved_start).days + 1
total_chunks = max(ceil(total_days / chunk_days), 1)
with self._state_lock:
self._state = HistoricalImportStatus(
enabled=True,
running=True,
state="running",
job_id=uuid.uuid4().hex[:12],
started_at=datetime.utcnow(),
requested_start_date=resolved_start,
requested_end_date=resolved_end,
total_days=total_days,
chunk_days=chunk_days,
total_chunks=total_chunks,
default_chunk_days=self.settings.history.get("default_chunk_days", 7),
recent_chunks=[],
recent_events=[],
)
self._record_event(
level="info",
title="Job started",
message=f"Range {resolved_start.isoformat()} -> {resolved_end.isoformat()}, chunk size {chunk_days} days",
)
self._run_worker(
start_date=resolved_start,
end_date=resolved_end,
chunk_days=chunk_days,
force=force,
auto=False,
)
return self.status()
def start_scheduler_if_enabled(self) -> None:
if not self.settings.history.get("enabled", True):
return
if not self.settings.history.get("auto_sync_enabled", False):
return
if self._scheduler and self._scheduler.is_alive():
return
self._scheduler_stop.clear()
self._scheduler = threading.Thread(target=self._scheduler_loop, name="pv-history-scheduler", daemon=True)
self._scheduler.start()
def _scheduler_loop(self) -> None:
interval_seconds = max(int(self.settings.history.get("auto_sync_interval_minutes", 30)), 1) * 60
if self.settings.history.get("auto_sync_on_start", False):
try:
self.start(auto=True)
except Exception as exc:
logger.warning("Unable to auto-start historical sync: %s", exc)
while not self._scheduler_stop.wait(interval_seconds):
try:
if self._worker and self._worker.is_alive():
continue
self.start(auto=True)
except Exception as exc:
logger.warning("Historical scheduler cycle failed: %s", exc)
def _run_worker(
self,
*,
start_date: date,
end_date: date,
chunk_days: int,
force: bool,
auto: bool,
) -> None:
total_chunks = max(ceil(((end_date - start_date).days + 1) / chunk_days), 1)
try:
chunk_index = 0
chunk_start = start_date
while chunk_start <= end_date:
if self._cancel_event.is_set():
self._record_event(level="warn", title="Cancelled", message="Historical import was cancelled by the user.")
self._finish("cancelled", running=False, message="Historical import was cancelled by the user.")
return
chunk_index += 1
chunk_end = min(chunk_start + timedelta(days=chunk_days - 1), end_date)
self._update_chunk(chunk_index, total_chunks, chunk_start, chunk_end)
imported, skipped, energy_kwh, cancelled = self._process_chunk(
chunk_index=chunk_index,
start_day=chunk_start,
end_day=chunk_end,
force=force,
)
if cancelled:
self._close_chunk(
chunk_index,
imported_days=imported,
skipped_days=skipped,
energy_kwh=energy_kwh,
state="cancelled",
note="Chunk stopped during processing",
)
self._record_event(level="warn", title="Cancelled", message="Historical import was cancelled by the user.")
self._finish("cancelled", running=False, message="Historical import was cancelled by the user.")
return
self._close_chunk(
chunk_index,
imported_days=imported,
skipped_days=skipped,
energy_kwh=energy_kwh,
state="completed",
note=f"Chunk completed: imported {imported}, skipped {skipped}",
)
self._record_event(
level="success",
title=f"Chunk {chunk_index}/{total_chunks} completed",
message=f"Range {chunk_start.isoformat()} -> {chunk_end.isoformat()}, imported {imported}, skipped {skipped}, energy {energy_kwh:.2f} kWh",
chunk_index=chunk_index,
)
chunk_start = chunk_end + timedelta(days=1)
final_message = "Historical synchronization completed" if auto else "Historical import completed"
self._record_event(level="success", title="Completed", message=final_message)
self._finish("completed", running=False, message=final_message)
except Exception as exc:
logger.exception("Historical import failed")
self._record_event(level="error", title="Import error", message=str(exc))
self._finish("failed", running=False, message="Historical import finished with an error.", last_error=str(exc))
def _process_chunk(self, *, chunk_index: int, start_day: date, end_day: date, force: bool) -> tuple[int, int, float, bool]:
imported_days = 0
skipped_days = 0
energy_kwh = 0.0
for day in self._date_range(start_day, end_day):
if self._cancel_event.is_set():
return imported_days, skipped_days, energy_kwh, True
if not force and self.repository.has_day(day):
skipped_days += 1
self._advance_day(
day,
imported=False,
message=f"Skipped {day.isoformat()} - day already exists in cache",
level="warn",
title="Day skipped",
chunk_index=chunk_index,
)
continue
total, source, samples_count = self.energy.total_for_full_day(day)
if samples_count <= 0:
skipped_days += 1
self._advance_day(
day,
imported=False,
message=f"Skipped {day.isoformat()} - no samples in InfluxDB",
level="warn",
title="No samples",
chunk_index=chunk_index,
)
continue
self.repository.upsert_daily_energy(
DailyEnergyRecord(
day=day,
energy_kwh=total,
source=source,
samples_count=samples_count,
)
)
imported_days += 1
energy_kwh += total
self._advance_day(
day,
imported=True,
message=f"Imported {day.isoformat()} ({total:.2f} kWh)",
level="success",
title="Day imported",
chunk_index=chunk_index,
energy_kwh=total,
)
return imported_days, skipped_days, round(energy_kwh, 3), False
def _advance_day(
self,
day: date,
*,
imported: bool,
message: str,
level: str,
title: str,
chunk_index: int,
energy_kwh: float | None = None,
) -> None:
with self._state_lock:
self._state.processed_days += 1
if imported:
self._state.imported_days += 1
else:
self._state.skipped_days += 1
self._state.current_date = day
self._state.message = message
self._refresh_coverage(lock_held=True)
self._refresh_runtime_metrics(lock_held=True)
suffix = f" Energy: {energy_kwh:.2f} kWh." if imported and energy_kwh is not None else ""
self._record_event(
level=level,
title=title,
message=f"{message}.{suffix}" if not message.endswith(".") else f"{message}{suffix}",
day=day,
chunk_index=chunk_index,
)
def _update_chunk(self, chunk_index: int, total_chunks: int, chunk_start: date, chunk_end: date) -> None:
chunk = HistoricalChunkProgress(
chunk_index=chunk_index,
total_chunks=total_chunks,
start_date=chunk_start,
end_date=chunk_end,
state="running",
started_at=datetime.utcnow(),
note=f"Active chunk {chunk_start.isoformat()} -> {chunk_end.isoformat()}",
)
with self._state_lock:
self._state.current_chunk_start = chunk_start
self._state.current_chunk_end = chunk_end
self._state.active_chunk_index = chunk_index
self._state.message = f"Processing range {chunk_start.isoformat()} -> {chunk_end.isoformat()}"
self._upsert_chunk_locked(chunk)
self._refresh_runtime_metrics(lock_held=True)
self._record_event(
level="info",
title=f"Chunk {chunk_index}/{total_chunks}",
message=f"Starting range {chunk_start.isoformat()} -> {chunk_end.isoformat()}",
chunk_index=chunk_index,
)
def _close_chunk(
self,
chunk_index: int,
*,
imported_days: int,
skipped_days: int,
energy_kwh: float,
state: str,
note: str,
) -> None:
with self._state_lock:
existing = self._find_chunk_locked(chunk_index)
started_at = existing.started_at if existing and existing.started_at else datetime.utcnow()
finished_at = datetime.utcnow()
processed_days = imported_days + skipped_days
duration_seconds = max((finished_at - started_at).total_seconds(), 0.0)
chunk = HistoricalChunkProgress(
chunk_index=chunk_index,
total_chunks=self._state.total_chunks,
start_date=existing.start_date if existing else self._state.current_chunk_start or self._state.requested_start_date or date.today(),
end_date=existing.end_date if existing else self._state.current_chunk_end or self._state.requested_end_date or date.today(),
processed_days=processed_days,
imported_days=imported_days,
skipped_days=skipped_days,
energy_kwh=round(energy_kwh, 3),
state=state,
started_at=started_at,
finished_at=finished_at,
duration_seconds=round(duration_seconds, 2),
note=note,
)
self._upsert_chunk_locked(chunk)
if state != "running":
self._state.message = note
self._refresh_runtime_metrics(lock_held=True)
def _finish(
self,
state: str,
*,
running: bool,
message: str,
last_error: str | None = None,
) -> None:
with self._state_lock:
self._state.running = running
self._state.state = state
self._state.finished_at = datetime.utcnow()
self._state.last_error = last_error
self._state.message = message
self._state.active_chunk_index = 0
self._refresh_coverage(lock_held=True)
self._refresh_available_bounds(lock_held=True)
self._refresh_runtime_metrics(lock_held=True)
def _resolve_range(self, *, start_date: date | None, end_date: date | None) -> tuple[date, date] | None:
today = now_local().date()
include_today = self.settings.history.get("include_today_in_sync", False)
default_end = today if include_today else today - timedelta(days=1)
resolved_end = end_date or default_end
if start_date is None:
coverage = self.repository.coverage()
if coverage.last_day:
resolved_start = coverage.last_day + timedelta(days=1)
else:
bootstrap_start = self.settings.history.get("bootstrap_start_date")
if bootstrap_start:
resolved_start = date.fromisoformat(bootstrap_start)
else:
available_start, _ = self._available_bounds()
resolved_start = available_start or resolved_end
else:
resolved_start = start_date
if resolved_start > resolved_end:
return None
return resolved_start, resolved_end
def _available_bounds(self) -> tuple[date | None, date | None]:
now_utc = datetime.utcnow()
cached = self._available_bounds_cache
if cached and (now_utc - cached[0]).total_seconds() < 300:
return cached[1], cached[2]
available_start: date | None = None
available_end: date | None = None
metric = self.catalog.safe_get(self.settings.analytics.get("production_metric_id", "energy_total"))
fallback = self.catalog.safe_get(self.settings.analytics.get("fallback_power_metric_id", "ac_power"))
source_metric = metric or fallback
if source_metric is not None:
first_point = self.influx.first_value(source_metric)
last_point = self.influx.last_value(source_metric)
available_start = first_point.timestamp.astimezone(self.energy.tz).date() if first_point else None
available_end = last_point.timestamp.astimezone(self.energy.tz).date() if last_point else None
self._available_bounds_cache = (now_utc, available_start, available_end)
return available_start, available_end
def _refresh_coverage(self, *, lock_held: bool = False) -> None:
coverage = self.repository.coverage()
available_start, available_end = self._available_bounds()
if available_start and available_end and available_start <= available_end:
available_days = (available_end - available_start).days + 1
missing_days = self.repository.count_missing_days(available_start, available_end)
coverage.available_days = available_days
coverage.missing_days = missing_days
imported_in_range = max(available_days - missing_days, 0)
coverage.coverage_pct = round((imported_in_range / available_days) * 100, 1) if available_days > 0 else None
else:
coverage.available_days = 0
coverage.missing_days = 0
coverage.coverage_pct = None
if lock_held:
self._state.coverage = coverage
else:
with self._state_lock:
self._state.coverage = coverage
def _refresh_available_bounds(self, *, lock_held: bool = False) -> None:
available_start, available_end = self._available_bounds()
if lock_held:
self._state.available_start_date = available_start
self._state.available_end_date = available_end
else:
with self._state_lock:
self._state.available_start_date = available_start
self._state.available_end_date = available_end
def _refresh_runtime_metrics(self, *, lock_held: bool = False) -> None:
def apply() -> None:
if self._state.started_at is None:
self._state.elapsed_seconds = None
self._state.estimated_remaining_seconds = None
self._state.avg_days_per_minute = None
return
end_reference = datetime.utcnow() if self._state.running or self._state.finished_at is None else self._state.finished_at
elapsed_seconds = max((end_reference - self._state.started_at).total_seconds(), 0.0)
self._state.elapsed_seconds = round(elapsed_seconds, 1)
if self._state.processed_days > 0 and elapsed_seconds > 0:
avg_days_per_minute = (self._state.processed_days / elapsed_seconds) * 60
remaining_days = max(self._state.total_days - self._state.processed_days, 0)
estimated_remaining = (remaining_days / self._state.processed_days) * elapsed_seconds
self._state.avg_days_per_minute = round(avg_days_per_minute, 2)
self._state.estimated_remaining_seconds = round(estimated_remaining, 1) if self._state.running else 0.0
else:
self._state.avg_days_per_minute = None
self._state.estimated_remaining_seconds = None if self._state.running else 0.0
if lock_held:
apply()
else:
with self._state_lock:
apply()
def _record_event(
self,
*,
level: str,
title: str,
message: str,
day: date | None = None,
chunk_index: int | None = None,
) -> None:
event = HistoricalActivityEvent(
timestamp=datetime.utcnow(),
level=level,
title=title,
message=message,
day=day,
chunk_index=chunk_index,
)
with self._state_lock:
self._state.recent_events.append(event)
self._state.recent_events = self._state.recent_events[-self.MAX_RECENT_EVENTS :]
def _find_chunk_locked(self, chunk_index: int) -> HistoricalChunkProgress | None:
for chunk in self._state.recent_chunks:
if chunk.chunk_index == chunk_index:
return chunk
return None
def _upsert_chunk_locked(self, chunk: HistoricalChunkProgress) -> None:
for index, existing in enumerate(self._state.recent_chunks):
if existing.chunk_index == chunk.chunk_index:
self._state.recent_chunks[index] = chunk
break
else:
self._state.recent_chunks.append(chunk)
self._state.recent_chunks = self._state.recent_chunks[-self.MAX_RECENT_CHUNKS :]
@staticmethod
def _date_range(start_day: date, end_day: date) -> Iterable[date]:
current = start_day
while current <= end_day:
yield current
current = current + timedelta(days=1)
@lru_cache(maxsize=1)
def get_historical_sync_service() -> HistoricalSyncService:
return HistoricalSyncService()