606 lines
25 KiB
Python
606 lines
25 KiB
Python
from __future__ import annotations
|
|
|
|
import copy
|
|
import logging
|
|
import threading
|
|
import uuid
|
|
from datetime import date, datetime, timedelta
|
|
from functools import lru_cache
|
|
from math import ceil
|
|
from typing import Iterable
|
|
|
|
from app.core_settings import AppSettings, get_settings
|
|
from app.models import (
|
|
DailyEnergyRecord,
|
|
HistoricalActivityEvent,
|
|
HistoricalChunkProgress,
|
|
HistoricalImportStatus,
|
|
)
|
|
from app.services.catalog import MetricCatalog, get_catalog
|
|
from app.services.energy import EnergyService
|
|
from app.services.influx_http import InfluxHTTPService
|
|
from app.storage import SQLiteEnergyRepository
|
|
from app.utils.time import now_local
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class HistoricalSyncService:
|
|
MAX_RECENT_CHUNKS = 18
|
|
MAX_RECENT_EVENTS = 40
|
|
|
|
def __init__(
|
|
self,
|
|
settings: AppSettings | None = None,
|
|
catalog: MetricCatalog | None = None,
|
|
influx: InfluxHTTPService | None = None,
|
|
energy: EnergyService | None = None,
|
|
repository: SQLiteEnergyRepository | None = None,
|
|
) -> None:
|
|
self.settings = settings or get_settings()
|
|
self.catalog = catalog or get_catalog()
|
|
self.influx = influx or InfluxHTTPService(self.settings)
|
|
self.energy = energy or EnergyService(self.settings, self.catalog, self.influx)
|
|
self.repository = repository or SQLiteEnergyRepository(self.settings.storage["sqlite_path"])
|
|
self._state_lock = threading.Lock()
|
|
self._worker: threading.Thread | None = None
|
|
self._cancel_event = threading.Event()
|
|
self._scheduler_stop = threading.Event()
|
|
self._scheduler: threading.Thread | None = None
|
|
self._available_bounds_cache: tuple[datetime, date | None, date | None] | None = None
|
|
self._state = HistoricalImportStatus(
|
|
enabled=self.settings.history.get("enabled", True),
|
|
state="idle",
|
|
default_chunk_days=self.settings.history.get("default_chunk_days", 7),
|
|
)
|
|
self._refresh_coverage()
|
|
self._refresh_available_bounds()
|
|
self._refresh_runtime_metrics()
|
|
|
|
def status(self) -> HistoricalImportStatus:
|
|
with self._state_lock:
|
|
self._refresh_coverage(lock_held=True)
|
|
self._refresh_available_bounds(lock_held=True)
|
|
self._refresh_runtime_metrics(lock_held=True)
|
|
return copy.deepcopy(self._state)
|
|
|
|
def start(
|
|
self,
|
|
*,
|
|
start_date: date | None = None,
|
|
end_date: date | None = None,
|
|
chunk_days: int | None = None,
|
|
force: bool = False,
|
|
auto: bool = False,
|
|
) -> HistoricalImportStatus:
|
|
if not self.settings.history.get("enabled", True):
|
|
raise RuntimeError("Historical import is disabled")
|
|
|
|
chunk_days = max(int(chunk_days or self.settings.history.get("default_chunk_days", 7)), 1)
|
|
resolved = self._resolve_range(start_date=start_date, end_date=end_date)
|
|
if resolved is None:
|
|
with self._state_lock:
|
|
self._state.running = False
|
|
self._state.state = "idle"
|
|
self._state.message = "There are no missing days to import."
|
|
self._state.finished_at = datetime.utcnow()
|
|
self._refresh_coverage(lock_held=True)
|
|
self._refresh_available_bounds(lock_held=True)
|
|
self._refresh_runtime_metrics(lock_held=True)
|
|
return copy.deepcopy(self._state)
|
|
|
|
resolved_start, resolved_end = resolved
|
|
total_days = (resolved_end - resolved_start).days + 1
|
|
total_chunks = max(ceil(total_days / chunk_days), 1)
|
|
start_message = "Historical import started" if not auto else "Automatic historical sync started"
|
|
|
|
with self._state_lock:
|
|
if self._worker and self._worker.is_alive():
|
|
return copy.deepcopy(self._state)
|
|
|
|
self._cancel_event = threading.Event()
|
|
self._state = HistoricalImportStatus(
|
|
enabled=True,
|
|
running=True,
|
|
state="running",
|
|
job_id=uuid.uuid4().hex[:12],
|
|
started_at=datetime.utcnow(),
|
|
requested_start_date=resolved_start,
|
|
requested_end_date=resolved_end,
|
|
total_days=total_days,
|
|
chunk_days=chunk_days,
|
|
total_chunks=total_chunks,
|
|
active_chunk_index=1,
|
|
current_chunk_start=resolved_start,
|
|
current_chunk_end=min(resolved_start + timedelta(days=chunk_days - 1), resolved_end),
|
|
message=start_message,
|
|
default_chunk_days=self.settings.history.get("default_chunk_days", 7),
|
|
recent_chunks=[],
|
|
recent_events=[],
|
|
)
|
|
self._refresh_coverage(lock_held=True)
|
|
self._refresh_available_bounds(lock_held=True)
|
|
self._refresh_runtime_metrics(lock_held=True)
|
|
|
|
self._worker = threading.Thread(
|
|
target=self._run_worker,
|
|
kwargs={
|
|
"start_date": resolved_start,
|
|
"end_date": resolved_end,
|
|
"chunk_days": chunk_days,
|
|
"force": force,
|
|
"auto": auto,
|
|
},
|
|
name="pv-historical-backfill",
|
|
daemon=True,
|
|
)
|
|
self._worker.start()
|
|
|
|
self._record_event(
|
|
level="info",
|
|
title="Job started",
|
|
message=f"Range {resolved_start.isoformat()} -> {resolved_end.isoformat()}, chunk size {chunk_days} days",
|
|
)
|
|
return self.status()
|
|
|
|
def cancel(self) -> HistoricalImportStatus:
|
|
self._cancel_event.set()
|
|
with self._state_lock:
|
|
self._state.message = "Cancelling job..."
|
|
self._refresh_runtime_metrics(lock_held=True)
|
|
snapshot = copy.deepcopy(self._state)
|
|
self._record_event(level="warn", title="Cancellation requested", message="The user requested the job to stop.")
|
|
return snapshot
|
|
|
|
def run_blocking(
|
|
self,
|
|
*,
|
|
start_date: date | None = None,
|
|
end_date: date | None = None,
|
|
chunk_days: int | None = None,
|
|
force: bool = False,
|
|
) -> HistoricalImportStatus:
|
|
resolved = self._resolve_range(start_date=start_date, end_date=end_date)
|
|
if resolved is None:
|
|
return self.status()
|
|
resolved_start, resolved_end = resolved
|
|
chunk_days = max(int(chunk_days or self.settings.history.get("default_chunk_days", 7)), 1)
|
|
total_days = (resolved_end - resolved_start).days + 1
|
|
total_chunks = max(ceil(total_days / chunk_days), 1)
|
|
with self._state_lock:
|
|
self._state = HistoricalImportStatus(
|
|
enabled=True,
|
|
running=True,
|
|
state="running",
|
|
job_id=uuid.uuid4().hex[:12],
|
|
started_at=datetime.utcnow(),
|
|
requested_start_date=resolved_start,
|
|
requested_end_date=resolved_end,
|
|
total_days=total_days,
|
|
chunk_days=chunk_days,
|
|
total_chunks=total_chunks,
|
|
default_chunk_days=self.settings.history.get("default_chunk_days", 7),
|
|
recent_chunks=[],
|
|
recent_events=[],
|
|
)
|
|
self._record_event(
|
|
level="info",
|
|
title="Job started",
|
|
message=f"Range {resolved_start.isoformat()} -> {resolved_end.isoformat()}, chunk size {chunk_days} days",
|
|
)
|
|
self._run_worker(
|
|
start_date=resolved_start,
|
|
end_date=resolved_end,
|
|
chunk_days=chunk_days,
|
|
force=force,
|
|
auto=False,
|
|
)
|
|
return self.status()
|
|
|
|
def start_scheduler_if_enabled(self) -> None:
|
|
if not self.settings.history.get("enabled", True):
|
|
return
|
|
if not self.settings.history.get("auto_sync_enabled", False):
|
|
return
|
|
if self._scheduler and self._scheduler.is_alive():
|
|
return
|
|
self._scheduler_stop.clear()
|
|
self._scheduler = threading.Thread(target=self._scheduler_loop, name="pv-history-scheduler", daemon=True)
|
|
self._scheduler.start()
|
|
|
|
def _scheduler_loop(self) -> None:
|
|
interval_seconds = max(int(self.settings.history.get("auto_sync_interval_minutes", 30)), 1) * 60
|
|
if self.settings.history.get("auto_sync_on_start", False):
|
|
try:
|
|
self.start(auto=True)
|
|
except Exception as exc:
|
|
logger.warning("Unable to auto-start historical sync: %s", exc)
|
|
|
|
while not self._scheduler_stop.wait(interval_seconds):
|
|
try:
|
|
if self._worker and self._worker.is_alive():
|
|
continue
|
|
self.start(auto=True)
|
|
except Exception as exc:
|
|
logger.warning("Historical scheduler cycle failed: %s", exc)
|
|
|
|
def _run_worker(
|
|
self,
|
|
*,
|
|
start_date: date,
|
|
end_date: date,
|
|
chunk_days: int,
|
|
force: bool,
|
|
auto: bool,
|
|
) -> None:
|
|
total_chunks = max(ceil(((end_date - start_date).days + 1) / chunk_days), 1)
|
|
try:
|
|
chunk_index = 0
|
|
chunk_start = start_date
|
|
while chunk_start <= end_date:
|
|
if self._cancel_event.is_set():
|
|
self._record_event(level="warn", title="Cancelled", message="Historical import was cancelled by the user.")
|
|
self._finish("cancelled", running=False, message="Historical import was cancelled by the user.")
|
|
return
|
|
|
|
chunk_index += 1
|
|
chunk_end = min(chunk_start + timedelta(days=chunk_days - 1), end_date)
|
|
self._update_chunk(chunk_index, total_chunks, chunk_start, chunk_end)
|
|
imported, skipped, energy_kwh, cancelled = self._process_chunk(
|
|
chunk_index=chunk_index,
|
|
start_day=chunk_start,
|
|
end_day=chunk_end,
|
|
force=force,
|
|
)
|
|
if cancelled:
|
|
self._close_chunk(
|
|
chunk_index,
|
|
imported_days=imported,
|
|
skipped_days=skipped,
|
|
energy_kwh=energy_kwh,
|
|
state="cancelled",
|
|
note="Chunk stopped during processing",
|
|
)
|
|
self._record_event(level="warn", title="Cancelled", message="Historical import was cancelled by the user.")
|
|
self._finish("cancelled", running=False, message="Historical import was cancelled by the user.")
|
|
return
|
|
|
|
self._close_chunk(
|
|
chunk_index,
|
|
imported_days=imported,
|
|
skipped_days=skipped,
|
|
energy_kwh=energy_kwh,
|
|
state="completed",
|
|
note=f"Chunk completed: imported {imported}, skipped {skipped}",
|
|
)
|
|
self._record_event(
|
|
level="success",
|
|
title=f"Chunk {chunk_index}/{total_chunks} completed",
|
|
message=f"Range {chunk_start.isoformat()} -> {chunk_end.isoformat()}, imported {imported}, skipped {skipped}, energy {energy_kwh:.2f} kWh",
|
|
chunk_index=chunk_index,
|
|
)
|
|
chunk_start = chunk_end + timedelta(days=1)
|
|
|
|
final_message = "Historical synchronization completed" if auto else "Historical import completed"
|
|
self._record_event(level="success", title="Completed", message=final_message)
|
|
self._finish("completed", running=False, message=final_message)
|
|
except Exception as exc:
|
|
logger.exception("Historical import failed")
|
|
self._record_event(level="error", title="Import error", message=str(exc))
|
|
self._finish("failed", running=False, message="Historical import finished with an error.", last_error=str(exc))
|
|
|
|
def _process_chunk(self, *, chunk_index: int, start_day: date, end_day: date, force: bool) -> tuple[int, int, float, bool]:
|
|
imported_days = 0
|
|
skipped_days = 0
|
|
energy_kwh = 0.0
|
|
|
|
for day in self._date_range(start_day, end_day):
|
|
if self._cancel_event.is_set():
|
|
return imported_days, skipped_days, energy_kwh, True
|
|
|
|
if not force and self.repository.has_day(day):
|
|
skipped_days += 1
|
|
self._advance_day(
|
|
day,
|
|
imported=False,
|
|
message=f"Skipped {day.isoformat()} - day already exists in cache",
|
|
level="warn",
|
|
title="Day skipped",
|
|
chunk_index=chunk_index,
|
|
)
|
|
continue
|
|
|
|
total, source, samples_count = self.energy.total_for_full_day(day)
|
|
if samples_count <= 0:
|
|
skipped_days += 1
|
|
self._advance_day(
|
|
day,
|
|
imported=False,
|
|
message=f"Skipped {day.isoformat()} - no samples in InfluxDB",
|
|
level="warn",
|
|
title="No samples",
|
|
chunk_index=chunk_index,
|
|
)
|
|
continue
|
|
|
|
self.repository.upsert_daily_energy(
|
|
DailyEnergyRecord(
|
|
day=day,
|
|
energy_kwh=total,
|
|
source=source,
|
|
samples_count=samples_count,
|
|
)
|
|
)
|
|
imported_days += 1
|
|
energy_kwh += total
|
|
self._advance_day(
|
|
day,
|
|
imported=True,
|
|
message=f"Imported {day.isoformat()} ({total:.2f} kWh)",
|
|
level="success",
|
|
title="Day imported",
|
|
chunk_index=chunk_index,
|
|
energy_kwh=total,
|
|
)
|
|
|
|
return imported_days, skipped_days, round(energy_kwh, 3), False
|
|
|
|
def _advance_day(
|
|
self,
|
|
day: date,
|
|
*,
|
|
imported: bool,
|
|
message: str,
|
|
level: str,
|
|
title: str,
|
|
chunk_index: int,
|
|
energy_kwh: float | None = None,
|
|
) -> None:
|
|
with self._state_lock:
|
|
self._state.processed_days += 1
|
|
if imported:
|
|
self._state.imported_days += 1
|
|
else:
|
|
self._state.skipped_days += 1
|
|
self._state.current_date = day
|
|
self._state.message = message
|
|
self._refresh_coverage(lock_held=True)
|
|
self._refresh_runtime_metrics(lock_held=True)
|
|
suffix = f" Energy: {energy_kwh:.2f} kWh." if imported and energy_kwh is not None else ""
|
|
self._record_event(
|
|
level=level,
|
|
title=title,
|
|
message=f"{message}.{suffix}" if not message.endswith(".") else f"{message}{suffix}",
|
|
day=day,
|
|
chunk_index=chunk_index,
|
|
)
|
|
|
|
def _update_chunk(self, chunk_index: int, total_chunks: int, chunk_start: date, chunk_end: date) -> None:
|
|
chunk = HistoricalChunkProgress(
|
|
chunk_index=chunk_index,
|
|
total_chunks=total_chunks,
|
|
start_date=chunk_start,
|
|
end_date=chunk_end,
|
|
state="running",
|
|
started_at=datetime.utcnow(),
|
|
note=f"Active chunk {chunk_start.isoformat()} -> {chunk_end.isoformat()}",
|
|
)
|
|
with self._state_lock:
|
|
self._state.current_chunk_start = chunk_start
|
|
self._state.current_chunk_end = chunk_end
|
|
self._state.active_chunk_index = chunk_index
|
|
self._state.message = f"Processing range {chunk_start.isoformat()} -> {chunk_end.isoformat()}"
|
|
self._upsert_chunk_locked(chunk)
|
|
self._refresh_runtime_metrics(lock_held=True)
|
|
self._record_event(
|
|
level="info",
|
|
title=f"Chunk {chunk_index}/{total_chunks}",
|
|
message=f"Starting range {chunk_start.isoformat()} -> {chunk_end.isoformat()}",
|
|
chunk_index=chunk_index,
|
|
)
|
|
|
|
def _close_chunk(
|
|
self,
|
|
chunk_index: int,
|
|
*,
|
|
imported_days: int,
|
|
skipped_days: int,
|
|
energy_kwh: float,
|
|
state: str,
|
|
note: str,
|
|
) -> None:
|
|
with self._state_lock:
|
|
existing = self._find_chunk_locked(chunk_index)
|
|
started_at = existing.started_at if existing and existing.started_at else datetime.utcnow()
|
|
finished_at = datetime.utcnow()
|
|
processed_days = imported_days + skipped_days
|
|
duration_seconds = max((finished_at - started_at).total_seconds(), 0.0)
|
|
chunk = HistoricalChunkProgress(
|
|
chunk_index=chunk_index,
|
|
total_chunks=self._state.total_chunks,
|
|
start_date=existing.start_date if existing else self._state.current_chunk_start or self._state.requested_start_date or date.today(),
|
|
end_date=existing.end_date if existing else self._state.current_chunk_end or self._state.requested_end_date or date.today(),
|
|
processed_days=processed_days,
|
|
imported_days=imported_days,
|
|
skipped_days=skipped_days,
|
|
energy_kwh=round(energy_kwh, 3),
|
|
state=state,
|
|
started_at=started_at,
|
|
finished_at=finished_at,
|
|
duration_seconds=round(duration_seconds, 2),
|
|
note=note,
|
|
)
|
|
self._upsert_chunk_locked(chunk)
|
|
if state != "running":
|
|
self._state.message = note
|
|
self._refresh_runtime_metrics(lock_held=True)
|
|
|
|
def _finish(
|
|
self,
|
|
state: str,
|
|
*,
|
|
running: bool,
|
|
message: str,
|
|
last_error: str | None = None,
|
|
) -> None:
|
|
with self._state_lock:
|
|
self._state.running = running
|
|
self._state.state = state
|
|
self._state.finished_at = datetime.utcnow()
|
|
self._state.last_error = last_error
|
|
self._state.message = message
|
|
self._state.active_chunk_index = 0
|
|
self._refresh_coverage(lock_held=True)
|
|
self._refresh_available_bounds(lock_held=True)
|
|
self._refresh_runtime_metrics(lock_held=True)
|
|
|
|
def _resolve_range(self, *, start_date: date | None, end_date: date | None) -> tuple[date, date] | None:
|
|
today = now_local().date()
|
|
include_today = self.settings.history.get("include_today_in_sync", False)
|
|
default_end = today if include_today else today - timedelta(days=1)
|
|
resolved_end = end_date or default_end
|
|
|
|
if start_date is None:
|
|
coverage = self.repository.coverage()
|
|
if coverage.last_day:
|
|
resolved_start = coverage.last_day + timedelta(days=1)
|
|
else:
|
|
bootstrap_start = self.settings.history.get("bootstrap_start_date")
|
|
if bootstrap_start:
|
|
resolved_start = date.fromisoformat(bootstrap_start)
|
|
else:
|
|
available_start, _ = self._available_bounds()
|
|
resolved_start = available_start or resolved_end
|
|
else:
|
|
resolved_start = start_date
|
|
|
|
if resolved_start > resolved_end:
|
|
return None
|
|
return resolved_start, resolved_end
|
|
|
|
def _available_bounds(self) -> tuple[date | None, date | None]:
|
|
now_utc = datetime.utcnow()
|
|
cached = self._available_bounds_cache
|
|
if cached and (now_utc - cached[0]).total_seconds() < 300:
|
|
return cached[1], cached[2]
|
|
|
|
available_start: date | None = None
|
|
available_end: date | None = None
|
|
metric = self.catalog.safe_get(self.settings.analytics.get("production_metric_id", "energy_total"))
|
|
fallback = self.catalog.safe_get(self.settings.analytics.get("fallback_power_metric_id", "ac_power"))
|
|
source_metric = metric or fallback
|
|
if source_metric is not None:
|
|
first_point = self.influx.first_value(source_metric)
|
|
last_point = self.influx.last_value(source_metric)
|
|
available_start = first_point.timestamp.astimezone(self.energy.tz).date() if first_point else None
|
|
available_end = last_point.timestamp.astimezone(self.energy.tz).date() if last_point else None
|
|
self._available_bounds_cache = (now_utc, available_start, available_end)
|
|
return available_start, available_end
|
|
|
|
def _refresh_coverage(self, *, lock_held: bool = False) -> None:
|
|
coverage = self.repository.coverage()
|
|
available_start, available_end = self._available_bounds()
|
|
if available_start and available_end and available_start <= available_end:
|
|
available_days = (available_end - available_start).days + 1
|
|
missing_days = self.repository.count_missing_days(available_start, available_end)
|
|
coverage.available_days = available_days
|
|
coverage.missing_days = missing_days
|
|
imported_in_range = max(available_days - missing_days, 0)
|
|
coverage.coverage_pct = round((imported_in_range / available_days) * 100, 1) if available_days > 0 else None
|
|
else:
|
|
coverage.available_days = 0
|
|
coverage.missing_days = 0
|
|
coverage.coverage_pct = None
|
|
|
|
if lock_held:
|
|
self._state.coverage = coverage
|
|
else:
|
|
with self._state_lock:
|
|
self._state.coverage = coverage
|
|
|
|
def _refresh_available_bounds(self, *, lock_held: bool = False) -> None:
|
|
available_start, available_end = self._available_bounds()
|
|
if lock_held:
|
|
self._state.available_start_date = available_start
|
|
self._state.available_end_date = available_end
|
|
else:
|
|
with self._state_lock:
|
|
self._state.available_start_date = available_start
|
|
self._state.available_end_date = available_end
|
|
|
|
def _refresh_runtime_metrics(self, *, lock_held: bool = False) -> None:
|
|
def apply() -> None:
|
|
if self._state.started_at is None:
|
|
self._state.elapsed_seconds = None
|
|
self._state.estimated_remaining_seconds = None
|
|
self._state.avg_days_per_minute = None
|
|
return
|
|
|
|
end_reference = datetime.utcnow() if self._state.running or self._state.finished_at is None else self._state.finished_at
|
|
elapsed_seconds = max((end_reference - self._state.started_at).total_seconds(), 0.0)
|
|
self._state.elapsed_seconds = round(elapsed_seconds, 1)
|
|
|
|
if self._state.processed_days > 0 and elapsed_seconds > 0:
|
|
avg_days_per_minute = (self._state.processed_days / elapsed_seconds) * 60
|
|
remaining_days = max(self._state.total_days - self._state.processed_days, 0)
|
|
estimated_remaining = (remaining_days / self._state.processed_days) * elapsed_seconds
|
|
self._state.avg_days_per_minute = round(avg_days_per_minute, 2)
|
|
self._state.estimated_remaining_seconds = round(estimated_remaining, 1) if self._state.running else 0.0
|
|
else:
|
|
self._state.avg_days_per_minute = None
|
|
self._state.estimated_remaining_seconds = None if self._state.running else 0.0
|
|
|
|
if lock_held:
|
|
apply()
|
|
else:
|
|
with self._state_lock:
|
|
apply()
|
|
|
|
def _record_event(
|
|
self,
|
|
*,
|
|
level: str,
|
|
title: str,
|
|
message: str,
|
|
day: date | None = None,
|
|
chunk_index: int | None = None,
|
|
) -> None:
|
|
event = HistoricalActivityEvent(
|
|
timestamp=datetime.utcnow(),
|
|
level=level,
|
|
title=title,
|
|
message=message,
|
|
day=day,
|
|
chunk_index=chunk_index,
|
|
)
|
|
with self._state_lock:
|
|
self._state.recent_events.append(event)
|
|
self._state.recent_events = self._state.recent_events[-self.MAX_RECENT_EVENTS :]
|
|
|
|
def _find_chunk_locked(self, chunk_index: int) -> HistoricalChunkProgress | None:
|
|
for chunk in self._state.recent_chunks:
|
|
if chunk.chunk_index == chunk_index:
|
|
return chunk
|
|
return None
|
|
|
|
def _upsert_chunk_locked(self, chunk: HistoricalChunkProgress) -> None:
|
|
for index, existing in enumerate(self._state.recent_chunks):
|
|
if existing.chunk_index == chunk.chunk_index:
|
|
self._state.recent_chunks[index] = chunk
|
|
break
|
|
else:
|
|
self._state.recent_chunks.append(chunk)
|
|
self._state.recent_chunks = self._state.recent_chunks[-self.MAX_RECENT_CHUNKS :]
|
|
|
|
@staticmethod
|
|
def _date_range(start_day: date, end_day: date) -> Iterable[date]:
|
|
current = start_day
|
|
while current <= end_day:
|
|
yield current
|
|
current = current + timedelta(days=1)
|
|
|
|
|
|
@lru_cache(maxsize=1)
|
|
def get_historical_sync_service() -> HistoricalSyncService:
|
|
return HistoricalSyncService()
|