99 lines
3.8 KiB
Python
99 lines
3.8 KiB
Python
import asyncio
|
|
import json
|
|
import logging
|
|
from typing import Dict, Any, Optional, List
|
|
|
|
from sqlalchemy import select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.core.config import settings
|
|
from app.models.dashboard import DashboardPanel
|
|
from app.services.mikrotik.factory import build_client
|
|
from app.services.mikrotik.client_rest import parse_bps
|
|
from app.services.streaming.hub import hub
|
|
|
|
log = logging.getLogger("poller")
|
|
|
|
class PanelPoller:
|
|
def __init__(self):
|
|
self._tasks: Dict[int, asyncio.Task] = {}
|
|
self._lock = asyncio.Lock()
|
|
|
|
async def ensure_running(self, panel_id: int, session: AsyncSession):
|
|
async with self._lock:
|
|
t = self._tasks.get(panel_id)
|
|
if t and not t.done():
|
|
return
|
|
self._tasks[panel_id] = asyncio.create_task(self._run(panel_id, session))
|
|
|
|
async def _run(self, panel_id: int, session: AsyncSession):
|
|
# UWAGA: session jest z zewnątrz; do prostoty używamy jednego session per WS.
|
|
# W prod lepiej robić osobne sesje w pętli / użyć SessionLocal.
|
|
while True:
|
|
conns = await hub.connections(panel_id)
|
|
if not conns:
|
|
# nikt nie subskrybuje -> zakończ
|
|
async with self._lock:
|
|
self._tasks.pop(panel_id, None)
|
|
return
|
|
|
|
pres = await session.execute(select(DashboardPanel).where(DashboardPanel.id == panel_id))
|
|
panel = pres.scalar_one_or_none()
|
|
if not panel:
|
|
await asyncio.sleep(1)
|
|
continue
|
|
|
|
cfg = {}
|
|
try:
|
|
cfg = json.loads(panel.config_json or "{}")
|
|
except Exception:
|
|
cfg = {}
|
|
|
|
interfaces: List[str] = [str(x) for x in (cfg.get("interfaces") or [])]
|
|
metrics: List[str] = [str(x) for x in (cfg.get("metrics") or ["rx_bps","tx_bps"])]
|
|
interval_ms = int(cfg.get("interval_ms") or settings.DEFAULT_POLL_INTERVAL_MS)
|
|
interval_ms = max(250, min(interval_ms, 5000))
|
|
|
|
client = await build_client(session, panel.router_id)
|
|
if not client:
|
|
payload = {"type":"error","message":"No client/credentials for router"}
|
|
for ws in conns:
|
|
try: await ws.send_text(json.dumps(payload))
|
|
except Exception: pass
|
|
await asyncio.sleep(interval_ms/1000)
|
|
continue
|
|
|
|
# jeśli brak interfaces, pobierz i ogranicz (żeby nie zabić routera)
|
|
if not interfaces:
|
|
try:
|
|
ifs = await client.list_interfaces()
|
|
interfaces = [i.get("name") for i in ifs if i.get("name")][:10]
|
|
except Exception:
|
|
interfaces = []
|
|
|
|
ts = int(asyncio.get_event_loop().time() * 1000)
|
|
rows = []
|
|
for iface in interfaces:
|
|
try:
|
|
raw = await client.monitor_traffic_once(iface)
|
|
row = {"iface": iface, "ts": ts}
|
|
# RouterOS REST typowo ma rx-bits-per-second / tx-bits-per-second
|
|
rx = parse_bps(raw.get("rx-bits-per-second"))
|
|
tx = parse_bps(raw.get("tx-bits-per-second"))
|
|
row["rx_bps"] = rx
|
|
row["tx_bps"] = tx
|
|
rows.append(row)
|
|
except Exception as e:
|
|
log.info("poll error panel=%s iface=%s err=%s", panel_id, iface, e)
|
|
|
|
msg = {"type":"traffic","panelId": panel_id, "data": rows, "metrics": metrics}
|
|
for ws in conns:
|
|
try:
|
|
await ws.send_text(json.dumps(msg))
|
|
except Exception:
|
|
pass
|
|
|
|
await asyncio.sleep(interval_ms / 1000)
|
|
|
|
panel_poller = PanelPoller()
|