Files
solar-pv-dashboard/backend/app/storage/auth_users.py
Mateusz Gruszczyński 138059945e poprawki i zmiany ux
2026-03-26 09:30:39 +01:00

150 lines
5.4 KiB
Python

from __future__ import annotations
import sqlite3
from contextlib import contextmanager
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import Iterator
@dataclass(frozen=True)
class AuthUser:
username: str
password_hash: str
role: str
display_name: str
is_active: bool = True
created_at: datetime | None = None
updated_at: datetime | None = None
class SQLiteAuthUserRepository:
def __init__(self, db_path: str) -> None:
self.db_path = Path(db_path)
self.db_path.parent.mkdir(parents=True, exist_ok=True)
self.ensure_schema()
@contextmanager
def connect(self) -> Iterator[sqlite3.Connection]:
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
try:
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA synchronous=NORMAL")
yield conn
conn.commit()
finally:
conn.close()
def ensure_schema(self) -> None:
with self.connect() as conn:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS auth_users (
username TEXT PRIMARY KEY,
password_hash TEXT NOT NULL,
role TEXT NOT NULL,
display_name TEXT NOT NULL,
is_active INTEGER NOT NULL DEFAULT 1,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
)
"""
)
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_auth_users_role ON auth_users(role)"
)
def get_by_username(self, username: str) -> AuthUser | None:
with self.connect() as conn:
row = conn.execute(
"""
SELECT username, password_hash, role, display_name, is_active, created_at, updated_at
FROM auth_users
WHERE username = ?
LIMIT 1
""",
(username,),
).fetchone()
if row is None:
return None
return AuthUser(
username=row["username"],
password_hash=row["password_hash"],
role=row["role"],
display_name=row["display_name"],
is_active=bool(row["is_active"]),
created_at=datetime.fromisoformat(row["created_at"]),
updated_at=datetime.fromisoformat(row["updated_at"]),
)
def upsert_user(self, *, username: str, password_hash: str, role: str, display_name: str, is_active: bool = True) -> AuthUser:
now = datetime.utcnow().isoformat()
with self.connect() as conn:
conn.execute(
"""
INSERT INTO auth_users (username, password_hash, role, display_name, is_active, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(username) DO UPDATE SET
password_hash = excluded.password_hash,
role = excluded.role,
display_name = excluded.display_name,
is_active = excluded.is_active,
updated_at = excluded.updated_at
""",
(username, password_hash, role, display_name, 1 if is_active else 0, now, now),
)
return self.get_by_username(username) # type: ignore[return-value]
def update_password(self, username: str, password_hash: str) -> AuthUser | None:
now = datetime.utcnow().isoformat()
with self.connect() as conn:
cursor = conn.execute(
"UPDATE auth_users SET password_hash = ?, updated_at = ? WHERE username = ?",
(password_hash, now, username),
)
if cursor.rowcount == 0:
return None
return self.get_by_username(username)
def update_role(self, username: str, role: str) -> AuthUser | None:
now = datetime.utcnow().isoformat()
with self.connect() as conn:
cursor = conn.execute(
"UPDATE auth_users SET role = ?, updated_at = ? WHERE username = ?",
(role, now, username),
)
if cursor.rowcount == 0:
return None
return self.get_by_username(username)
def count_admin_users(self) -> int:
with self.connect() as conn:
row = conn.execute(
"SELECT COUNT(*) AS count FROM auth_users WHERE role = 'admin' AND is_active = 1"
).fetchone()
return int(row['count']) if row is not None else 0
def list_users(self) -> list[AuthUser]:
with self.connect() as conn:
rows = conn.execute(
"""
SELECT username, password_hash, role, display_name, is_active, created_at, updated_at
FROM auth_users
ORDER BY role DESC, username ASC
"""
).fetchall()
return [
AuthUser(
username=row["username"],
password_hash=row["password_hash"],
role=row["role"],
display_name=row["display_name"],
is_active=bool(row["is_active"]),
created_at=datetime.fromisoformat(row["created_at"]),
updated_at=datetime.fromisoformat(row["updated_at"]),
)
for row in rows
]