Files
voltage-monitor/app.py
Mateusz Gruszczyński 8e14d38c38 wykres z gap
2026-03-02 17:39:54 +01:00

498 lines
16 KiB
Python

import os
import logging
import time
import hashlib
from datetime import datetime, timedelta, timezone
from flask import Flask, render_template, jsonify, request
from flask_socketio import SocketIO
from influxdb import InfluxDBClient
from werkzeug.exceptions import HTTPException
import config
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger("werkzeug")
logger.setLevel(logging.INFO)
app = Flask(__name__)
app.config["SECRET_KEY"] = config.FLASK_CONFIG["secret_key"]
app.config["SEND_FILE_MAX_AGE_DEFAULT"] = config.FLASK_CONFIG.get("static_cache_timeout", 60)
socketio = SocketIO(app, cors_allowed_origins="*", async_mode="threading", logger=False)
@app.errorhandler(500)
def server_error(error):
app.logger.error(f'Server error: {str(error)}')
return render_template('error.html',
status=500,
title='Błąd serwera',
message='Spróbuj ponownie za chwilę.',
footer=config.FOOTER), 500
@app.errorhandler(400)
@app.errorhandler(404)
def client_error(error):
return render_template('error.html',
status=error.code,
title=f'Błąd {error.code}',
message='Nieprawidłowe żądanie.',
footer=config.FOOTER), error.code
# --------------------
# Helpers
# --------------------
def is_api_request() -> bool:
return request.path.startswith("/api/")
def api_error(code: int, error: str, message: str = "", **extra):
payload = {"error": error, "code": code}
if message:
payload["message"] = message
if extra:
payload.update(extra)
return jsonify(payload), code
def parse_range(range_str: str):
"""
Akceptuje np. '24h', '7d'. Zwraca timedelta.
"""
if not range_str:
raise ValueError("empty")
s = range_str.replace(" ", "")
if len(s) < 2:
raise ValueError("too_short")
unit = s[-1]
if unit not in ("h", "d"):
raise ValueError("bad_unit")
digits = "".join(filter(str.isdigit, s[:-1]))
if not digits:
raise ValueError("no_number")
num = int(digits)
if num <= 0:
raise ValueError("non_positive")
return timedelta(hours=num) if unit == "h" else timedelta(days=num)
def get_influx_client():
client = InfluxDBClient(
host=config.INFLUXDB_CONFIG["host"],
port=config.INFLUXDB_CONFIG["port"],
database=config.INFLUXDB_CONFIG["database"],
)
if config.INFLUXDB_CONFIG.get("username"):
client.switch_user(config.INFLUXDB_CONFIG["username"], config.INFLUXDB_CONFIG.get("password", ""))
return client
def get_current_voltage(phase_id: int):
client = get_influx_client()
try:
entity_id = config.PHASES[phase_id]["entity_id"]
query = (
f'SELECT "value" FROM "{config.MEASUREMENT}" '
f'WHERE "entity_id" = \'{entity_id}\' ORDER BY time DESC LIMIT 1'
)
result = client.query(query)
points = list(result.get_points())
if points and points[0].get("value") is not None:
return {"voltage": round(float(points[0]["value"]), 2), "timestamp": points[0]["time"]}
return {"voltage": 0, "timestamp": None}
except Exception as e:
app.logger.warning(f"Influx current_voltage error: {e}")
return {"voltage": 0, "timestamp": None}
finally:
try:
client.close()
except Exception:
pass
# --------------------
# Error handlers (40x/50x)
# --------------------
@app.errorhandler(HTTPException)
def handle_http_exception(e: HTTPException):
if is_api_request():
return api_error(e.code or 500, "http_error", e.description or e.name, name=e.name)
return e
@app.errorhandler(Exception)
def handle_unhandled_exception(e: Exception):
app.logger.exception("Unhandled error")
if is_api_request():
return api_error(500, "server_error", "Internal Server Error")
raise
# --------------------
# Static cache busting
# --------------------
@app.context_processor
def inject_static_version():
def static_v(filename):
full_path = os.path.join(app.static_folder, filename)
try:
with open(full_path, "rb") as f:
v = hashlib.md5(f.read()).hexdigest()[:8]
except Exception:
v = "1"
return f"{request.script_root}/static/{filename}?v={v}"
return dict(static_v=static_v)
@app.after_request
def add_header(response):
if request.path.startswith("/static/"):
response.cache_control.max_age = 31536000
response.cache_control.public = True
response.headers.pop("Content-Disposition", None)
else:
response.cache_control.no_cache = True
response.cache_control.no_store = True
return response
# --------------------
# Routes
# --------------------
@app.route("/favicon.ico")
def favicon():
return "", 204
@app.route("/")
def index():
safe_phases = {
k: {
'label': v['label'],
'color': v['color']
}
for k, v in config.PHASES.items()
}
return render_template(
"index.html",
phases=safe_phases,
time_ranges=config.TIME_RANGES,
default_range=config.DEFAULT_TIME_RANGE,
footer=config.FOOTER,
)
@app.route("/api/timeseries/<int:phase_id>")
def get_timeseries(phase_id):
if phase_id not in config.PHASES:
return api_error(400, "invalid_phase", "Invalid phase")
range_param = request.args.get("range", config.DEFAULT_TIME_RANGE)
start_param = request.args.get("start")
end_param = request.args.get("end")
entity_id = config.PHASES[phase_id]["entity_id"]
if (start_param and not end_param) or (end_param and not start_param):
return api_error(400, "bad_range", "Provide both start and end")
if start_param and end_param:
try:
datetime.fromisoformat(start_param.replace("Z", "+00:00"))
datetime.fromisoformat(end_param.replace("Z", "+00:00"))
except Exception:
return api_error(400, "bad_datetime", "start/end must be ISO 8601")
time_filter = f"time >= '{start_param}' AND time <= '{end_param}'"
interval = config.DEFAULT_INTERVAL
else:
clean_range = range_param.replace(" ", "")
try:
parse_range(clean_range)
except Exception:
return api_error(400, "bad_range", "range must be like 24h or 7d")
time_filter = f"time > now() - {clean_range}"
interval = config.TIME_RANGES.get(range_param, {}).get("interval", config.DEFAULT_INTERVAL)
query = f'''
SELECT mean("value") AS voltage
FROM "{config.MEASUREMENT}"
WHERE "entity_id" = '{entity_id}'
AND {time_filter}
GROUP BY time({interval}) fill(null)
'''
client = get_influx_client()
try:
result = client.query(query)
data = []
for p in result.get_points():
v = p.get("voltage")
data.append(
{
"time": p["time"],
"voltage": (round(float(v), 2) if v is not None else None),
}
)
return jsonify(data)
except Exception as e:
app.logger.error(f"Timeseries Error: {e} | Query: {query}")
return api_error(503, "backend_unavailable", "InfluxDB unavailable")
finally:
try:
client.close()
except Exception:
pass
@app.route("/api/events")
def get_events():
range_p = request.args.get("range", "24h")
start_p = request.args.get("start")
end_p = request.args.get("end")
now_utc = datetime.now(timezone.utc)
if (start_p and not end_p) or (end_p and not start_p):
return api_error(400, "bad_range", "Provide both start and end")
if start_p and end_p:
try:
dt_view_start = datetime.fromisoformat(start_p.replace("Z", "+00:00"))
dt_view_end = datetime.fromisoformat(end_p.replace("Z", "+00:00"))
except Exception:
return api_error(400, "bad_datetime", "start/end must be ISO 8601")
time_filter = f"time >= '{start_p}' - 24h AND time <= '{end_p}'"
else:
clean_range = range_p.replace(" ", "")
try:
delta = parse_range(clean_range)
except Exception:
return api_error(400, "bad_range", "range must be like 24h or 7d")
if clean_range.endswith("d"):
try:
days = int("".join(filter(str.isdigit, clean_range[:-1])))
if days > config.MAX_EVENT_RANGE_DAYS:
return api_error(
400,
"range_too_large",
f"Zbyt duży zakres. Maksymalnie {config.MAX_EVENT_RANGE_DAYS} dni.",
)
except Exception:
pass
dt_view_start = now_utc - delta
dt_view_end = now_utc
time_filter = f"time > now() - {clean_range} - 24h"
all_events = []
client = get_influx_client()
try:
for p_id, p_cfg in config.PHASES.items():
query = f'''
SELECT min("value") AS vmin, max("value") AS vmax
FROM "{config.MEASUREMENT}"
WHERE "entity_id" = '{p_cfg["entity_id"]}'
AND {time_filter}
GROUP BY time({config.EVENT_DETECTION_INTERVAL}) fill(null)
'''
result = client.query(query)
points = list(result.get_points())
i = 0
while i < len(points):
vmin_val = points[i].get("vmin")
vmax_val = points[i].get("vmax")
# GAP (brak danych)
if vmin_val is None and vmax_val is None:
start_str = points[i]["time"]
dt_s = datetime.fromisoformat(start_str.replace("Z", "+00:00"))
j = i
while j + 1 < len(points) and points[j + 1].get("vmin") is None and points[j + 1].get("vmax") is None:
j += 1
end_str = points[j]["time"]
dt_e = datetime.fromisoformat(end_str.replace("Z", "+00:00"))
duration = (dt_e - dt_s).total_seconds() / 60 + 1
if duration >= config.MIN_EVENT_DURATION_MINUTES:
if dt_e >= dt_view_start and dt_s <= dt_view_end:
all_events.append(
{
"start": start_str,
"end": end_str,
"phase": p_id,
"type": "zanik",
"duration": int(round(duration, 0)),
"source": "gap",
}
)
i = j + 1
continue
vmin = float(vmin_val) if vmin_val is not None else None
vmax = float(vmax_val) if vmax_val is not None else None
ev_type = None
if vmin is not None and vmin < config.VOLTAGE_THRESHOLDS["outage"]:
ev_type = "zanik"
elif vmin is not None and config.VOLTAGE_THRESHOLDS["outage"] <= vmin < config.VOLTAGE_THRESHOLDS["min_safe"]:
ev_type = "niskie"
elif vmax is not None and vmax > config.VOLTAGE_THRESHOLDS["max_safe"]:
ev_type = "wysokie"
if ev_type:
start_str = points[i]["time"]
dt_s = datetime.fromisoformat(start_str.replace("Z", "+00:00"))
j = i
while j + 1 < len(points):
v_next_min = points[j + 1].get("vmin")
v_next_max = points[j + 1].get("vmax")
nm = float(v_next_min) if v_next_min is not None else None
nx = float(v_next_max) if v_next_max is not None else None
next_type = None
if nm is not None and nm < config.VOLTAGE_THRESHOLDS["outage"]:
next_type = "zanik"
elif nm is not None and config.VOLTAGE_THRESHOLDS["outage"] <= nm < config.VOLTAGE_THRESHOLDS["min_safe"]:
next_type = "niskie"
elif nx is not None and nx > config.VOLTAGE_THRESHOLDS["max_safe"]:
next_type = "wysokie"
if next_type == ev_type:
j += 1
else:
break
end_str = points[j]["time"]
dt_e = datetime.fromisoformat(end_str.replace("Z", "+00:00"))
duration = (dt_e - dt_s).total_seconds() / 60 + 1
if duration >= config.MIN_EVENT_DURATION_MINUTES:
if dt_e >= dt_view_start and dt_s <= dt_view_end:
all_events.append(
{
"start": start_str,
"end": end_str,
"phase": p_id,
"type": ev_type,
"duration": int(round(duration, 0)),
}
)
i = j
i += 1
return jsonify(sorted(all_events, key=lambda x: x["start"], reverse=True))
except Exception as e:
app.logger.error(f"Event Logic Error: {e}")
return api_error(503, "backend_unavailable", "InfluxDB unavailable")
finally:
try:
client.close()
except Exception:
pass
@app.route("/api/outages/<phase_id>")
def api_outages(phase_id):
# phase_id tu jest string -> waliduj jak w timeseries
try:
pid = int(phase_id)
except Exception:
return api_error(400, "invalid_phase", "Invalid phase")
if pid not in config.PHASES:
return api_error(400, "invalid_phase", "Invalid phase")
t_range = request.args.get("range", "24h")
try:
parse_range(t_range)
except Exception:
return api_error(400, "bad_range", "range must be like 24h or 7d")
entity_id = config.PHASES[pid]["entity_id"]
query = (
f'SELECT "value" FROM "{config.MEASUREMENT}" '
f'WHERE "entity_id" = \'{entity_id}\' '
f'AND "value" < {config.VOLTAGE_THRESHOLDS["outage_detection"]} '
f"AND time > now() - {t_range}"
)
client = get_influx_client()
try:
result = client.query(query)
return jsonify(list(result.get_points()))
except Exception:
return api_error(503, "backend_unavailable", "InfluxDB unavailable")
finally:
try:
client.close()
except Exception:
pass
# --------------------
# SocketIO worker
# --------------------
clients = 0
task = None
@socketio.on("connect")
def handle_connect():
global clients, task
clients += 1
if task is None:
task = socketio.start_background_task(background_voltage_update)
@socketio.on("disconnect")
def handle_disconnect():
global clients
clients = max(0, clients - 1)
def background_voltage_update():
global clients
last_refresh = 0
refresh_every_s = config.CHART_CONFIG.get("refresh_interval_seconds", 15)
interval_s = config.CHART_CONFIG["update_interval"] / 1000.0
while True:
if clients == 0:
socketio.sleep(1)
continue
try:
voltages = {"timestamp": None}
for pid in config.PHASES.keys():
res = get_current_voltage(pid)
voltages[f"phase{pid}"] = res["voltage"]
if res["timestamp"]:
voltages["timestamp"] = res["timestamp"]
socketio.emit("voltage_update", voltages)
now = time.time()
if now - last_refresh >= refresh_every_s:
socketio.emit("refresh_timeseries", {"ts": int(now)})
last_refresh = now
except Exception as e:
app.logger.error(f"Worker Error: {e}")
socketio.sleep(interval_s)
if __name__ == "__main__":
print("\n" + "=" * 50)
print(f"Voltage Monitor API / Port: {config.FLASK_CONFIG['port']}")
print("=" * 50 + "\n")
socketio.run(app, host="0.0.0.0", port=config.FLASK_CONFIG["port"], allow_unsafe_werkzeug=True)