88 lines
2.9 KiB
Python
88 lines
2.9 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
import sqlite3
|
|
from contextlib import contextmanager
|
|
from dataclasses import dataclass
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Any, Iterator
|
|
|
|
|
|
@dataclass
|
|
class KioskSettingsRecord:
|
|
mode: str
|
|
widgets: list[str]
|
|
realtime_range: str
|
|
analytics_range: str
|
|
analytics_bucket: str
|
|
compare_mode: str
|
|
updated_at: datetime | None = None
|
|
updated_by: str | None = None
|
|
|
|
|
|
class SQLiteKioskSettingsRepository:
|
|
def __init__(self, db_path: str) -> None:
|
|
self.db_path = Path(db_path)
|
|
self.db_path.parent.mkdir(parents=True, exist_ok=True)
|
|
self.ensure_schema()
|
|
|
|
@contextmanager
|
|
def connect(self) -> Iterator[sqlite3.Connection]:
|
|
conn = sqlite3.connect(self.db_path)
|
|
conn.row_factory = sqlite3.Row
|
|
try:
|
|
conn.execute("PRAGMA journal_mode=WAL")
|
|
conn.execute("PRAGMA synchronous=NORMAL")
|
|
yield conn
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
|
|
def ensure_schema(self) -> None:
|
|
with self.connect() as conn:
|
|
conn.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS kiosk_settings (
|
|
mode TEXT PRIMARY KEY,
|
|
payload_json TEXT NOT NULL,
|
|
updated_at TEXT NOT NULL,
|
|
updated_by TEXT
|
|
)
|
|
"""
|
|
)
|
|
|
|
def get(self, mode: str) -> dict[str, Any] | None:
|
|
with self.connect() as conn:
|
|
row = conn.execute(
|
|
"SELECT mode, payload_json, updated_at, updated_by FROM kiosk_settings WHERE mode = ? LIMIT 1",
|
|
(mode,),
|
|
).fetchone()
|
|
if row is None:
|
|
return None
|
|
payload = json.loads(row["payload_json"])
|
|
payload["mode"] = row["mode"]
|
|
payload["updated_at"] = row["updated_at"]
|
|
payload["updated_by"] = row["updated_by"]
|
|
return payload
|
|
|
|
def upsert(self, mode: str, payload: dict[str, Any], updated_by: str | None = None) -> dict[str, Any]:
|
|
now = datetime.utcnow().isoformat()
|
|
stored_payload = dict(payload)
|
|
stored_payload.pop("mode", None)
|
|
stored_payload.pop("updated_at", None)
|
|
stored_payload.pop("updated_by", None)
|
|
with self.connect() as conn:
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO kiosk_settings (mode, payload_json, updated_at, updated_by)
|
|
VALUES (?, ?, ?, ?)
|
|
ON CONFLICT(mode) DO UPDATE SET
|
|
payload_json = excluded.payload_json,
|
|
updated_at = excluded.updated_at,
|
|
updated_by = excluded.updated_by
|
|
""",
|
|
(mode, json.dumps(stored_payload, ensure_ascii=False), now, updated_by),
|
|
)
|
|
return self.get(mode) or {"mode": mode, **stored_payload, "updated_at": now, "updated_by": updated_by}
|