76 lines
2.7 KiB
Python
76 lines
2.7 KiB
Python
import json
|
|
from fastapi import APIRouter, WebSocket, WebSocketDisconnect, Depends
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sqlalchemy import select
|
|
|
|
from app.api.deps import db_session
|
|
from app.core.security import read_session_token, SESSION_COOKIE
|
|
from app.models.user import User
|
|
from app.models.dashboard import DashboardPanel, Permission, Dashboard
|
|
from app.services.streaming.hub import hub
|
|
from app.services.streaming.poller import panel_poller
|
|
|
|
router = APIRouter()
|
|
|
|
async def ws_current_user(ws: WebSocket, session: AsyncSession) -> User | None:
|
|
token = ws.cookies.get(SESSION_COOKIE)
|
|
uid = read_session_token(token) if token else None
|
|
if not uid:
|
|
return None
|
|
res = await session.execute(select(User).where(User.id == uid))
|
|
return res.scalar_one_or_none()
|
|
|
|
@router.websocket("/ws/stream")
|
|
async def ws_stream(websocket: WebSocket, session: AsyncSession = Depends(db_session)):
|
|
await websocket.accept()
|
|
user = await ws_current_user(websocket, session)
|
|
if not user or not user.is_active:
|
|
await websocket.close(code=4401)
|
|
return
|
|
|
|
subscribed_panel_id = None
|
|
try:
|
|
# pierwszy msg: {"panelId": 123}
|
|
first = await websocket.receive_text()
|
|
msg = json.loads(first)
|
|
panel_id = int(msg.get("panelId"))
|
|
subscribed_panel_id = panel_id
|
|
|
|
pres = await session.execute(select(DashboardPanel).where(DashboardPanel.id == panel_id))
|
|
panel = pres.scalar_one_or_none()
|
|
if not panel:
|
|
await websocket.close(code=4404)
|
|
return
|
|
|
|
# dashboard ownership lub (w przyszłości) shared
|
|
dres = await session.execute(select(Dashboard).where(Dashboard.id == panel.dashboard_id))
|
|
dash = dres.scalar_one_or_none()
|
|
if not dash or dash.owner_user_id != user.id:
|
|
await websocket.close(code=4403)
|
|
return
|
|
|
|
# permission do routera
|
|
if user.role != "admin":
|
|
pr = await session.execute(select(Permission).where(Permission.user_id == user.id, Permission.router_id == panel.router_id, Permission.can_view == True)) # noqa
|
|
if not pr.scalar_one_or_none():
|
|
await websocket.close(code=4403)
|
|
return
|
|
|
|
await hub.subscribe(panel_id, websocket)
|
|
await panel_poller.ensure_running(panel_id, session)
|
|
|
|
# keepalive loop: klient może wysyłać ping
|
|
while True:
|
|
_ = await websocket.receive_text()
|
|
|
|
except WebSocketDisconnect:
|
|
pass
|
|
except Exception:
|
|
pass
|
|
finally:
|
|
if subscribed_panel_id is not None:
|
|
try:
|
|
await hub.unsubscribe(subscribed_panel_id, websocket)
|
|
except Exception:
|
|
pass
|