Files
voltage-monitor/app.py
Mateusz Gruszczyński 04c14e9f37 refactor v2
2026-03-02 09:58:40 +01:00

470 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
import logging
import time
import hashlib
from datetime import datetime, timedelta, timezone
from flask import Flask, render_template, jsonify, request
from flask_socketio import SocketIO
from influxdb import InfluxDBClient
from werkzeug.exceptions import HTTPException
import config
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger("werkzeug")
logger.setLevel(logging.INFO)
app = Flask(__name__)
app.config["SECRET_KEY"] = config.FLASK_CONFIG["secret_key"]
app.config["SEND_FILE_MAX_AGE_DEFAULT"] = config.FLASK_CONFIG.get("static_cache_timeout", 60)
# Bez Eventlet tryb wątkowy
socketio = SocketIO(app, cors_allowed_origins="*", async_mode="threading", logger=False)
# --------------------
# Helpers
# --------------------
def is_api_request() -> bool:
return request.path.startswith("/api/")
def api_error(code: int, error: str, message: str = "", **extra):
payload = {"error": error, "code": code}
if message:
payload["message"] = message
if extra:
payload.update(extra)
return jsonify(payload), code
def parse_range(range_str: str):
"""
Akceptuje np. '24h', '7d'. Zwraca timedelta.
"""
if not range_str:
raise ValueError("empty")
s = range_str.replace(" ", "")
if len(s) < 2:
raise ValueError("too_short")
unit = s[-1]
if unit not in ("h", "d"):
raise ValueError("bad_unit")
digits = "".join(filter(str.isdigit, s[:-1]))
if not digits:
raise ValueError("no_number")
num = int(digits)
if num <= 0:
raise ValueError("non_positive")
return timedelta(hours=num) if unit == "h" else timedelta(days=num)
def get_influx_client():
client = InfluxDBClient(
host=config.INFLUXDB_CONFIG["host"],
port=config.INFLUXDB_CONFIG["port"],
database=config.INFLUXDB_CONFIG["database"],
)
if config.INFLUXDB_CONFIG.get("username"):
client.switch_user(config.INFLUXDB_CONFIG["username"], config.INFLUXDB_CONFIG.get("password", ""))
return client
def get_current_voltage(phase_id: int):
client = get_influx_client()
try:
entity_id = config.PHASES[phase_id]["entity_id"]
query = (
f'SELECT "value" FROM "{config.MEASUREMENT}" '
f'WHERE "entity_id" = \'{entity_id}\' ORDER BY time DESC LIMIT 1'
)
result = client.query(query)
points = list(result.get_points())
if points and points[0].get("value") is not None:
return {"voltage": round(float(points[0]["value"]), 2), "timestamp": points[0]["time"]}
return {"voltage": 0, "timestamp": None}
except Exception as e:
app.logger.warning(f"Influx current_voltage error: {e}")
return {"voltage": 0, "timestamp": None}
finally:
try:
client.close()
except Exception:
pass
# --------------------
# Error handlers (40x/50x)
# --------------------
@app.errorhandler(HTTPException)
def handle_http_exception(e: HTTPException):
if is_api_request():
return api_error(e.code or 500, "http_error", e.description or e.name, name=e.name)
return e
@app.errorhandler(Exception)
def handle_unhandled_exception(e: Exception):
app.logger.exception("Unhandled error")
if is_api_request():
return api_error(500, "server_error", "Internal Server Error")
raise
# --------------------
# Static cache busting
# --------------------
@app.context_processor
def inject_static_version():
def static_v(filename):
full_path = os.path.join(app.static_folder, filename)
try:
with open(full_path, "rb") as f:
v = hashlib.md5(f.read()).hexdigest()[:8]
except Exception:
v = "1"
return f"{request.script_root}/static/{filename}?v={v}"
return dict(static_v=static_v)
@app.after_request
def add_header(response):
if request.path.startswith("/static/"):
response.cache_control.max_age = 31536000
response.cache_control.public = True
response.headers.pop("Content-Disposition", None)
else:
response.cache_control.no_cache = True
response.cache_control.no_store = True
return response
# --------------------
# Routes
# --------------------
@app.route("/favicon.ico")
def favicon():
return "", 204
@app.route("/")
def index():
return render_template(
"index.html",
phases=config.PHASES,
time_ranges=config.TIME_RANGES,
default_range=config.DEFAULT_TIME_RANGE,
footer=config.FOOTER,
)
@app.route("/api/timeseries/<int:phase_id>")
def get_timeseries(phase_id):
if phase_id not in config.PHASES:
return api_error(400, "invalid_phase", "Invalid phase")
range_param = request.args.get("range", config.DEFAULT_TIME_RANGE)
start_param = request.args.get("start")
end_param = request.args.get("end")
entity_id = config.PHASES[phase_id]["entity_id"]
if (start_param and not end_param) or (end_param and not start_param):
return api_error(400, "bad_range", "Provide both start and end")
if start_param and end_param:
try:
datetime.fromisoformat(start_param.replace("Z", "+00:00"))
datetime.fromisoformat(end_param.replace("Z", "+00:00"))
except Exception:
return api_error(400, "bad_datetime", "start/end must be ISO 8601")
time_filter = f"time >= '{start_param}' AND time <= '{end_param}'"
interval = config.DEFAULT_INTERVAL
else:
clean_range = range_param.replace(" ", "")
try:
parse_range(clean_range)
except Exception:
return api_error(400, "bad_range", "range must be like 24h or 7d")
time_filter = f"time > now() - {clean_range}"
interval = config.TIME_RANGES.get(range_param, {}).get("interval", config.DEFAULT_INTERVAL)
query = f'''
SELECT mean("value") AS voltage
FROM "{config.MEASUREMENT}"
WHERE "entity_id" = '{entity_id}'
AND {time_filter}
GROUP BY time({interval}) fill(none)
'''
client = get_influx_client()
try:
result = client.query(query)
data = [
{"time": p["time"], "voltage": round(p["voltage"], 2)}
for p in result.get_points()
if p.get("voltage") is not None
]
return jsonify(data)
except Exception as e:
app.logger.error(f"Timeseries Error: {e} | Query: {query}")
return api_error(503, "backend_unavailable", "InfluxDB unavailable")
finally:
try:
client.close()
except Exception:
pass
@app.route("/api/events")
def get_events():
range_p = request.args.get("range", "24h")
start_p = request.args.get("start")
end_p = request.args.get("end")
now_utc = datetime.now(timezone.utc)
if (start_p and not end_p) or (end_p and not start_p):
return api_error(400, "bad_range", "Provide both start and end")
if start_p and end_p:
try:
dt_view_start = datetime.fromisoformat(start_p.replace("Z", "+00:00"))
dt_view_end = datetime.fromisoformat(end_p.replace("Z", "+00:00"))
except Exception:
return api_error(400, "bad_datetime", "start/end must be ISO 8601")
time_filter = f"time >= '{start_p}' - 24h AND time <= '{end_p}'"
else:
clean_range = range_p.replace(" ", "")
try:
delta = parse_range(clean_range)
except Exception:
return api_error(400, "bad_range", "range must be like 24h or 7d")
if clean_range.endswith("d"):
try:
days = int("".join(filter(str.isdigit, clean_range[:-1])))
if days > config.MAX_EVENT_RANGE_DAYS:
return api_error(
400,
"range_too_large",
f"Zbyt duży zakres. Maksymalnie {config.MAX_EVENT_RANGE_DAYS} dni.",
)
except Exception:
pass
dt_view_start = now_utc - delta
dt_view_end = now_utc
time_filter = f"time > now() - {clean_range} - 24h"
all_events = []
client = get_influx_client()
try:
for p_id, p_cfg in config.PHASES.items():
query = f'''
SELECT min("value") AS vmin, max("value") AS vmax
FROM "{config.MEASUREMENT}"
WHERE "entity_id" = '{p_cfg["entity_id"]}'
AND {time_filter}
GROUP BY time({config.EVENT_DETECTION_INTERVAL}) fill(null)
'''
result = client.query(query)
points = list(result.get_points())
i = 0
while i < len(points):
vmin_val = points[i].get("vmin")
vmax_val = points[i].get("vmax")
# GAP (brak danych)
if vmin_val is None and vmax_val is None:
start_str = points[i]["time"]
dt_s = datetime.fromisoformat(start_str.replace("Z", "+00:00"))
j = i
while j + 1 < len(points) and points[j + 1].get("vmin") is None and points[j + 1].get("vmax") is None:
j += 1
end_str = points[j]["time"]
dt_e = datetime.fromisoformat(end_str.replace("Z", "+00:00"))
duration = (dt_e - dt_s).total_seconds() / 60 + 1
if duration >= config.MIN_EVENT_DURATION_MINUTES:
if dt_e >= dt_view_start and dt_s <= dt_view_end:
all_events.append(
{
"start": start_str,
"end": end_str,
"phase": p_id,
"type": "zanik",
"duration": int(round(duration, 0)),
"source": "gap",
}
)
i = j + 1
continue
vmin = float(vmin_val) if vmin_val is not None else None
vmax = float(vmax_val) if vmax_val is not None else None
ev_type = None
if vmin is not None and vmin < config.VOLTAGE_THRESHOLDS["outage"]:
ev_type = "zanik"
elif vmin is not None and config.VOLTAGE_THRESHOLDS["outage"] <= vmin < config.VOLTAGE_THRESHOLDS["min_safe"]:
ev_type = "niskie"
elif vmax is not None and vmax > config.VOLTAGE_THRESHOLDS["max_safe"]:
ev_type = "wysokie"
if ev_type:
start_str = points[i]["time"]
dt_s = datetime.fromisoformat(start_str.replace("Z", "+00:00"))
j = i
while j + 1 < len(points):
v_next_min = points[j + 1].get("vmin")
v_next_max = points[j + 1].get("vmax")
nm = float(v_next_min) if v_next_min is not None else None
nx = float(v_next_max) if v_next_max is not None else None
next_type = None
if nm is not None and nm < config.VOLTAGE_THRESHOLDS["outage"]:
next_type = "zanik"
elif nm is not None and config.VOLTAGE_THRESHOLDS["outage"] <= nm < config.VOLTAGE_THRESHOLDS["min_safe"]:
next_type = "niskie"
elif nx is not None and nx > config.VOLTAGE_THRESHOLDS["max_safe"]:
next_type = "wysokie"
if next_type == ev_type:
j += 1
else:
break
end_str = points[j]["time"]
dt_e = datetime.fromisoformat(end_str.replace("Z", "+00:00"))
duration = (dt_e - dt_s).total_seconds() / 60 + 1
if duration >= config.MIN_EVENT_DURATION_MINUTES:
if dt_e >= dt_view_start and dt_s <= dt_view_end:
all_events.append(
{
"start": start_str,
"end": end_str,
"phase": p_id,
"type": ev_type,
"duration": int(round(duration, 0)),
}
)
i = j
i += 1
return jsonify(sorted(all_events, key=lambda x: x["start"], reverse=True))
except Exception as e:
app.logger.error(f"Event Logic Error: {e}")
return api_error(503, "backend_unavailable", "InfluxDB unavailable")
finally:
try:
client.close()
except Exception:
pass
@app.route("/api/outages/<phase_id>")
def api_outages(phase_id):
# phase_id tu jest string -> waliduj jak w timeseries
try:
pid = int(phase_id)
except Exception:
return api_error(400, "invalid_phase", "Invalid phase")
if pid not in config.PHASES:
return api_error(400, "invalid_phase", "Invalid phase")
t_range = request.args.get("range", "24h")
try:
parse_range(t_range)
except Exception:
return api_error(400, "bad_range", "range must be like 24h or 7d")
entity_id = config.PHASES[pid]["entity_id"]
query = (
f'SELECT "value" FROM "{config.MEASUREMENT}" '
f'WHERE "entity_id" = \'{entity_id}\' '
f'AND "value" < {config.VOLTAGE_THRESHOLDS["outage_detection"]} '
f"AND time > now() - {t_range}"
)
client = get_influx_client()
try:
result = client.query(query)
return jsonify(list(result.get_points()))
except Exception:
return api_error(503, "backend_unavailable", "InfluxDB unavailable")
finally:
try:
client.close()
except Exception:
pass
# --------------------
# SocketIO worker
# --------------------
clients = 0
task = None
@socketio.on("connect")
def handle_connect():
global clients, task
clients += 1
if task is None:
task = socketio.start_background_task(background_voltage_update)
@socketio.on("disconnect")
def handle_disconnect():
global clients
clients = max(0, clients - 1)
def background_voltage_update():
global clients
last_refresh = 0
refresh_every_s = config.CHART_CONFIG.get("refresh_interval_seconds", 15)
interval_s = config.CHART_CONFIG["update_interval"] / 1000.0
while True:
if clients == 0:
socketio.sleep(1)
continue
try:
voltages = {"timestamp": None}
for pid in config.PHASES.keys():
res = get_current_voltage(pid)
voltages[f"phase{pid}"] = res["voltage"]
if res["timestamp"]:
voltages["timestamp"] = res["timestamp"]
socketio.emit("voltage_update", voltages)
now = time.time()
if now - last_refresh >= refresh_every_s:
socketio.emit("refresh_timeseries", {"ts": int(now)})
last_refresh = now
except Exception as e:
app.logger.error(f"Worker Error: {e}")
socketio.sleep(interval_s)
if __name__ == "__main__":
print("\n" + "=" * 50)
print(f"Voltage Monitor API / Port: {config.FLASK_CONFIG['port']}")
print("=" * 50 + "\n")
socketio.run(app, host="0.0.0.0", port=config.FLASK_CONFIG["port"], allow_unsafe_werkzeug=True)