from __future__ import annotations import base64 import json import logging import ssl import urllib.error import urllib.parse import urllib.request from collections import defaultdict from datetime import datetime from typing import Iterable from app.core_settings import AppSettings, get_settings from app.models.definitions import MetricDefinition, SeriesPoint from app.services.metrics import to_float from app.utils.time import to_utc_iso logger = logging.getLogger(__name__) def _quote_identifier(value: str) -> str: return '"' + value.replace('"', '\\"') + '"' def _quote_literal(value: str) -> str: return "'" + value.replace("\\", "\\\\").replace("'", "\\'") + "'" class InfluxHTTPService: def __init__(self, settings: AppSettings | None = None) -> None: self.settings = settings or get_settings() @property def base_url(self) -> str: config = self.settings.influx return f"{config['scheme']}://{config['host']}:{config['port']}" def latest_values(self, metrics: Iterable[MetricDefinition]) -> dict[str, dict]: grouped: dict[str, list[MetricDefinition]] = defaultdict(list) for metric in metrics: grouped[metric.measurement].append(metric) payload: dict[str, dict] = {} for measurement, measurement_metrics in grouped.items(): conditions = " OR ".join( f'("entity_id" = {_quote_literal(metric.entity_id)})' for metric in measurement_metrics ) query = ( f'SELECT LAST("value") AS value ' f'FROM {_quote_identifier(measurement)} ' f'WHERE {conditions} ' f'GROUP BY "entity_id"' ) try: for series in self._execute(query): entity_id = (series.get("tags") or {}).get("entity_id") if not entity_id: continue metric = next((item for item in measurement_metrics if item.entity_id == entity_id), None) if metric is None: continue row = self._row_from_series(series) payload[metric.id] = { "value": row.get("value"), "timestamp": _parse_time(row.get("time")), } except Exception as exc: logger.warning("Influx latest_values error for %s: %s", measurement, exc) return payload def latest_value(self, metric: MetricDefinition) -> SeriesPoint | None: return self._single_value( f'SELECT LAST("value") AS value ' f'FROM {_quote_identifier(metric.measurement)} ' f'WHERE "entity_id" = {_quote_literal(metric.entity_id)}' ) def first_value(self, metric: MetricDefinition) -> SeriesPoint | None: return self._single_value( f'SELECT FIRST("value") AS value ' f'FROM {_quote_identifier(metric.measurement)} ' f'WHERE "entity_id" = {_quote_literal(metric.entity_id)}' ) def last_value(self, metric: MetricDefinition) -> SeriesPoint | None: return self.latest_value(metric) def gauge_history( self, metric: MetricDefinition, start: datetime, end: datetime, interval: str, aggregate: str = "mean", ) -> list[SeriesPoint]: query = ( f'SELECT {aggregate}("value") AS value ' f'FROM {_quote_identifier(metric.measurement)} ' f'WHERE "entity_id" = {_quote_literal(metric.entity_id)} ' f'AND time >= {_quote_literal(to_utc_iso(start))} ' f'AND time <= {_quote_literal(to_utc_iso(end))} ' f'GROUP BY time({interval}) fill(null)' ) points: list[SeriesPoint] = [] try: for series in self._execute(query): for row in self._rows_from_series(series): timestamp = _parse_time(row.get("time")) if timestamp is None: continue points.append(SeriesPoint(timestamp=timestamp, value=to_float(row.get("value")))) except Exception as exc: logger.warning("Influx gauge_history error for %s: %s", metric.id, exc) return points def grouped_last_series( self, metric: MetricDefinition, start: datetime, end: datetime, interval: str, ) -> list[SeriesPoint]: query = ( f'SELECT LAST("value") AS value ' f'FROM {_quote_identifier(metric.measurement)} ' f'WHERE "entity_id" = {_quote_literal(metric.entity_id)} ' f'AND time >= {_quote_literal(to_utc_iso(start))} ' f'AND time <= {_quote_literal(to_utc_iso(end))} ' f'GROUP BY time({interval}) fill(null)' ) points: list[SeriesPoint] = [] try: for series in self._execute(query): for row in self._rows_from_series(series): timestamp = _parse_time(row.get("time")) if timestamp is None: continue points.append(SeriesPoint(timestamp=timestamp, value=to_float(row.get("value")))) except Exception as exc: logger.warning("Influx grouped_last_series error for %s: %s", metric.id, exc) return points def last_before(self, metric: MetricDefinition, moment: datetime) -> SeriesPoint | None: query = ( f'SELECT LAST("value") AS value ' f'FROM {_quote_identifier(metric.measurement)} ' f'WHERE "entity_id" = {_quote_literal(metric.entity_id)} ' f'AND time < {_quote_literal(to_utc_iso(moment))}' ) try: series = self._execute(query) if not series: return None row = self._row_from_series(series[0]) timestamp = _parse_time(row.get("time")) value = to_float(row.get("value")) if timestamp is None or value is None: return None return SeriesPoint(timestamp=timestamp, value=value) except Exception as exc: logger.warning("Influx last_before error for %s: %s", metric.id, exc) return None def diagnose(self) -> dict: config = self.settings.influx payload = { "status": "connected", "reachable": True, "database_exists": False, "url": self.base_url, "database": config["database"], "username_masked": _mask_secret(config.get("username") or ""), "verify_ssl": bool(config.get("verify_ssl", False)), "timeout_seconds": int(config.get("timeout_seconds", 15)), "error": None, } try: series = self._execute("SHOW DATABASES") databases: set[str] = set() for item in series: for row in self._rows_from_series(item): value = row.get("name") if isinstance(value, str): databases.add(value) payload["database_exists"] = config["database"] in databases except Exception as exc: payload["status"] = "error" payload["reachable"] = False payload["error"] = str(exc) return payload def _single_value(self, query: str) -> SeriesPoint | None: try: series = self._execute(query) if not series: return None row = self._row_from_series(series[0]) timestamp = _parse_time(row.get("time")) value = to_float(row.get("value")) if timestamp is None or value is None: return None return SeriesPoint(timestamp=timestamp, value=value) except Exception as exc: logger.warning("Influx single value query error: %s", exc) return None def _execute(self, query: str) -> list[dict]: params = { "db": self.settings.influx["database"], "q": query, } url = f"{self.base_url}/query?{urllib.parse.urlencode(params)}" headers = {"Accept": "application/json"} username = self.settings.influx.get("username") or "" password = self.settings.influx.get("password") or "" if username: token = base64.b64encode(f"{username}:{password}".encode("utf-8")).decode("ascii") headers["Authorization"] = f"Basic {token}" request = urllib.request.Request(url, headers=headers, method="GET") verify_ssl = self.settings.influx.get("verify_ssl", False) timeout = self.settings.influx.get("timeout_seconds", 15) context = None if self.settings.influx.get("scheme") == "https" and not verify_ssl: context = ssl._create_unverified_context() try: with urllib.request.urlopen(request, timeout=timeout, context=context) as response: payload = json.loads(response.read().decode("utf-8")) except urllib.error.HTTPError as exc: body = exc.read().decode("utf-8", errors="ignore") raise RuntimeError(f"Influx HTTP {exc.code}: {body}") from exc except urllib.error.URLError as exc: raise RuntimeError(f"Influx connection error: {exc}") from exc results = payload.get("results") or [] if not results: return [] result = results[0] if "error" in result: raise RuntimeError(result["error"]) return result.get("series") or [] @staticmethod def _rows_from_series(series: dict) -> list[dict]: columns = series.get("columns") or [] rows = [] for values in series.get("values") or []: rows.append(dict(zip(columns, values))) return rows @classmethod def _row_from_series(cls, series: dict) -> dict: rows = cls._rows_from_series(series) return rows[0] if rows else {} def _parse_time(value: str | None) -> datetime | None: if not value: return None try: return datetime.fromisoformat(value.replace("Z", "+00:00")) except ValueError: return None def _mask_secret(value: str) -> str: if not value: return "" if len(value) <= 2: return "*" * len(value) return value[:1] + ("*" * max(len(value) - 2, 1)) + value[-1:]