Files
voltage-monitor/app.py

328 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
import warnings
import logging
import time
from flask import Flask, render_template, jsonify, request
from flask_socketio import SocketIO
from influxdb import InfluxDBClient
from datetime import datetime, timedelta, timezone
import config
import hashlib
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger('werkzeug')
logger.setLevel(logging.INFO)
app = Flask(__name__)
app.config['SECRET_KEY'] = config.FLASK_CONFIG['secret_key']
app.config['SEND_FILE_MAX_AGE_DEFAULT'] = config.FLASK_CONFIG.get('static_cache_timeout', 60)
# Bez Eventlet (deprecated) prosty tryb wątkowy.
socketio = SocketIO(app, cors_allowed_origins="*", async_mode='threading', logger=False)
def get_file_hash(filename):
full_path = os.path.join(app.static_folder, filename)
try:
with open(full_path, "rb") as f:
return hashlib.md5(f.read()).hexdigest()[:8]
except FileNotFoundError:
return "1"
# Klient InfluxDB v1
def get_influx_client():
client = InfluxDBClient(
host=config.INFLUXDB_CONFIG['host'],
port=config.INFLUXDB_CONFIG['port'],
database=config.INFLUXDB_CONFIG['database']
)
if config.INFLUXDB_CONFIG['username']:
client.switch_user(config.INFLUXDB_CONFIG['username'], config.INFLUXDB_CONFIG['password'])
return client
# --- LOGIKA ---
def get_current_voltage(phase_id):
client = get_influx_client()
entity_id = config.PHASES[phase_id]['entity_id']
query = f'SELECT "value" FROM "{config.MEASUREMENT}" WHERE "entity_id" = \'{entity_id}\' ORDER BY time DESC LIMIT 1'
try:
result = client.query(query)
points = list(result.get_points())
if points and points[0].get('value') is not None:
return {'voltage': round(float(points[0]['value']), 2), 'timestamp': points[0]['time']}
except Exception as e:
print(f"Current Error: {e}")
finally:
client.close()
return {'voltage': 0, 'timestamp': None}
# --- ENDPOINTY ---
@app.context_processor
def inject_static_version():
def static_v(filename):
full_path = os.path.join(app.static_folder, filename)
try:
with open(full_path, "rb") as f:
v = hashlib.md5(f.read()).hexdigest()[:8]
except Exception:
v = "1"
return f"{request.script_root}/static/{filename}?v={v}"
return dict(static_v=static_v)
@app.after_request
def add_header(response):
if request.path.startswith('/static/'):
response.cache_control.max_age = 31536000
response.cache_control.public = True
response.headers.pop('Content-Disposition', None)
else:
response.cache_control.no_cache = True
response.cache_control.no_store = True
#response.cache_control.must_revalidate = True
return response
@app.route('/favicon.ico')
def favicon():
return '', 204
@app.route('/')
def index():
return render_template('index.html', phases=config.PHASES, time_ranges=config.TIME_RANGES,
default_range=config.DEFAULT_TIME_RANGE, footer=config.FOOTER)
@app.route('/api/timeseries/<int:phase_id>')
def get_timeseries(phase_id):
if phase_id not in config.PHASES:
return jsonify({'error': 'Invalid phase'}), 400
client = get_influx_client()
range_param = request.args.get('range', config.DEFAULT_TIME_RANGE)
start_param = request.args.get('start')
end_param = request.args.get('end')
entity_id = config.PHASES[phase_id]['entity_id']
if start_param and end_param:
time_filter = f"time >= '{start_param}' AND time <= '{end_param}'"
interval = config.DEFAULT_INTERVAL
else:
clean_range = range_param.replace(" ", "")
time_filter = f"time > now() - {clean_range}"
if range_param in config.TIME_RANGES:
interval = config.TIME_RANGES[range_param]['interval']
else:
interval = config.DEFAULT_INTERVAL
query = f'''
SELECT mean("value") AS voltage
FROM "{config.MEASUREMENT}"
WHERE "entity_id" = '{entity_id}'
AND {time_filter}
GROUP BY time({interval}) fill(none)
'''
try:
result = client.query(query)
data = [{"time": p['time'], "voltage": round(p['voltage'], 2)}
for p in result.get_points()
if p.get('voltage') is not None]
return jsonify(data)
except Exception as e:
app.logger.error(f"Timeseries Error: {e} | Query: {query}")
return jsonify([])
finally:
client.close()
@app.route('/api/events')
def get_events():
client = get_influx_client()
range_p = request.args.get('range', '24h')
start_p = request.args.get('start')
end_p = request.args.get('end')
now_utc = datetime.now(timezone.utc)
# Blokada dla zakresów powyżej MAX_EVENT_RANGE_DAYS
if not (start_p and end_p) and "d" in range_p:
try:
days = int(''.join(filter(str.isdigit, range_p)))
if days > config.MAX_EVENT_RANGE_DAYS:
return jsonify({
"error": "range_too_large",
"message": f"Zbyt duży zakres. Maksymalnie {config.MAX_EVENT_RANGE_DAYS} dni. Zaznacz obszar na wykresie lub wybierz własny zakres."
})
except:
pass
if start_p and end_p:
dt_view_start = datetime.fromisoformat(start_p.replace('Z', '+00:00'))
dt_view_end = datetime.fromisoformat(end_p.replace('Z', '+00:00'))
time_filter = f"time >= '{start_p}' - 24h AND time <= '{end_p}'"
else:
clean_range = range_p.replace(" ", "")
num = int(''.join(filter(str.isdigit, clean_range)))
unit = clean_range[-1]
delta = timedelta(hours=num) if unit == 'h' else timedelta(days=num)
dt_view_start = now_utc - delta
dt_view_end = now_utc
time_filter = f"time > now() - {clean_range} - 24h"
all_events = []
try:
for p_id, p_cfg in config.PHASES.items():
# min()+max() + fill(null):
# - brak danych (np. restart Influxa) NIE udaje 0V
# - "wysokie" wykrywamy po vmax, a "niskie/zanik" po vmin
query = f'''
SELECT min("value") AS vmin, max("value") AS vmax
FROM "{config.MEASUREMENT}"
WHERE "entity_id" = '{p_cfg["entity_id"]}'
AND {time_filter}
GROUP BY time({config.EVENT_DETECTION_INTERVAL}) fill(null)
'''
result = client.query(query)
points = list(result.get_points())
i = 0
while i < len(points):
vmin_val = points[i].get('vmin')
vmax_val = points[i].get('vmax')
if vmin_val is None and vmax_val is None:
start_str = points[i]['time']
dt_s = datetime.fromisoformat(start_str.replace('Z', '+00:00'))
j = i
while j + 1 < len(points):
if points[j+1].get('vmin') is None and points[j+1].get('vmax') is None:
j += 1
else:
break
end_str = points[j]['time']
dt_e = datetime.fromisoformat(end_str.replace('Z', '+00:00'))
duration = (dt_e - dt_s).total_seconds() / 60 + 1
if duration >= config.MIN_EVENT_DURATION_MINUTES:
if dt_e >= dt_view_start and dt_s <= dt_view_end:
all_events.append({
"start": start_str,
"end": end_str,
"phase": p_id,
"type": "zanik",
"duration": int(round(duration, 0)),
"source": "gap"
})
i = j + 1
continue
vmin = float(vmin_val) if vmin_val is not None else None
vmax = float(vmax_val) if vmax_val is not None else None
ev_type = None
# Używamy progów z config.VOLTAGE_THRESHOLDS
if vmin is not None and vmin < config.VOLTAGE_THRESHOLDS['outage']:
ev_type = "zanik"
elif vmin is not None and config.VOLTAGE_THRESHOLDS['outage'] <= vmin < config.VOLTAGE_THRESHOLDS['min_safe']:
ev_type = "niskie"
elif vmax is not None and vmax > config.VOLTAGE_THRESHOLDS['max_safe']:
ev_type = "wysokie"
# else: ev_type = None (wartość w zakresie bezpiecznym 207-253V)
if ev_type:
start_str = points[i]['time']
dt_s = datetime.fromisoformat(start_str.replace('Z', '+00:00'))
j = i
while j + 1 < len(points):
v_next_min = points[j+1].get('vmin')
v_next_max = points[j+1].get('vmax')
next_type = None
nm = float(v_next_min) if v_next_min is not None else None
nx = float(v_next_max) if v_next_max is not None else None
if nm is not None and nm < config.VOLTAGE_THRESHOLDS['outage']:
next_type = "zanik"
elif nm is not None and config.VOLTAGE_THRESHOLDS['outage'] <= nm < config.VOLTAGE_THRESHOLDS['min_safe']:
next_type = "niskie"
elif nx is not None and nx > config.VOLTAGE_THRESHOLDS['max_safe']:
next_type = "wysokie"
# else: next_type = None
if next_type == ev_type:
j += 1
else:
break
end_str = points[j]['time']
dt_e = datetime.fromisoformat(end_str.replace('Z', '+00:00'))
duration = (dt_e - dt_s).total_seconds() / 60 + 1
# Używamy progu minimalnego czasu trwania z config
if duration >= config.MIN_EVENT_DURATION_MINUTES:
if dt_e >= dt_view_start and dt_s <= dt_view_end:
all_events.append({
"start": start_str,
"end": end_str,
"phase": p_id,
"type": ev_type,
"duration": int(round(duration, 0))
})
i = j
i += 1
return jsonify(sorted(all_events, key=lambda x: x['start'], reverse=True))
except Exception as e:
app.logger.error(f"Event Logic Error: {e}")
return jsonify([])
finally:
client.close()
@app.route('/api/outages/<phase_id>')
def api_outages(phase_id):
client = get_influx_client()
t_range = request.args.get('range', '24h')
entity_id = config.PHASES[phase_id]['entity_id']
query = f'SELECT "value" FROM "{config.MEASUREMENT}" WHERE "entity_id" = \'{entity_id}\' AND "value" < {config.VOLTAGE_THRESHOLDS["outage_detection"]} AND time > now() - {t_range}'
try:
result = client.query(query)
return jsonify(list(result.get_points()))
finally:
client.close()
def background_voltage_update():
last_refresh = 0
refresh_every_s = config.CHART_CONFIG.get("refresh_interval_seconds", 15)
while True:
try:
voltages = {'timestamp': None}
for pid in config.PHASES.keys():
res = get_current_voltage(pid)
voltages[f'phase{pid}'] = res['voltage']
if res['timestamp']:
voltages['timestamp'] = res['timestamp']
# aktualizacja gauge
socketio.emit('voltage_update', voltages)
# cykliczny refresh wykresu + eventów
now = time.time()
if now - last_refresh >= refresh_every_s:
socketio.emit('refresh_timeseries', {'ts': int(now)})
last_refresh = now
except Exception as e:
print(f"Worker Error: {e}")
time.sleep(config.CHART_CONFIG['update_interval'] / 1000.0)
if __name__ == '__main__':
print("\n" + "="*50)
print(f"Voltage Monitor API / Port: {config.FLASK_CONFIG['port']}")
print("="*50 + "\n")
socketio.start_background_task(background_voltage_update)
socketio.run(app, host='0.0.0.0', port=config.FLASK_CONFIG['port'])