from __future__ import annotations import sqlite3 from contextlib import contextmanager from dataclasses import dataclass from datetime import datetime from pathlib import Path from typing import Iterator @dataclass(frozen=True) class AuthUser: username: str password_hash: str role: str display_name: str is_active: bool = True created_at: datetime | None = None updated_at: datetime | None = None class SQLiteAuthUserRepository: def __init__(self, db_path: str) -> None: self.db_path = Path(db_path) self.db_path.parent.mkdir(parents=True, exist_ok=True) self.ensure_schema() @contextmanager def connect(self) -> Iterator[sqlite3.Connection]: conn = sqlite3.connect(self.db_path) conn.row_factory = sqlite3.Row try: conn.execute("PRAGMA journal_mode=WAL") conn.execute("PRAGMA synchronous=NORMAL") yield conn conn.commit() finally: conn.close() def ensure_schema(self) -> None: with self.connect() as conn: conn.execute( """ CREATE TABLE IF NOT EXISTS auth_users ( username TEXT PRIMARY KEY, password_hash TEXT NOT NULL, role TEXT NOT NULL, display_name TEXT NOT NULL, is_active INTEGER NOT NULL DEFAULT 1, created_at TEXT NOT NULL, updated_at TEXT NOT NULL ) """ ) conn.execute( "CREATE INDEX IF NOT EXISTS idx_auth_users_role ON auth_users(role)" ) def get_by_username(self, username: str) -> AuthUser | None: with self.connect() as conn: row = conn.execute( """ SELECT username, password_hash, role, display_name, is_active, created_at, updated_at FROM auth_users WHERE username = ? LIMIT 1 """, (username,), ).fetchone() if row is None: return None return AuthUser( username=row["username"], password_hash=row["password_hash"], role=row["role"], display_name=row["display_name"], is_active=bool(row["is_active"]), created_at=datetime.fromisoformat(row["created_at"]), updated_at=datetime.fromisoformat(row["updated_at"]), ) def upsert_user(self, *, username: str, password_hash: str, role: str, display_name: str, is_active: bool = True) -> AuthUser: now = datetime.utcnow().isoformat() with self.connect() as conn: conn.execute( """ INSERT INTO auth_users (username, password_hash, role, display_name, is_active, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?) ON CONFLICT(username) DO UPDATE SET password_hash = excluded.password_hash, role = excluded.role, display_name = excluded.display_name, is_active = excluded.is_active, updated_at = excluded.updated_at """, (username, password_hash, role, display_name, 1 if is_active else 0, now, now), ) return self.get_by_username(username) # type: ignore[return-value] def update_password(self, username: str, password_hash: str) -> AuthUser | None: now = datetime.utcnow().isoformat() with self.connect() as conn: cursor = conn.execute( "UPDATE auth_users SET password_hash = ?, updated_at = ? WHERE username = ?", (password_hash, now, username), ) if cursor.rowcount == 0: return None return self.get_by_username(username) def list_users(self) -> list[AuthUser]: with self.connect() as conn: rows = conn.execute( """ SELECT username, password_hash, role, display_name, is_active, created_at, updated_at FROM auth_users ORDER BY role DESC, username ASC """ ).fetchall() return [ AuthUser( username=row["username"], password_hash=row["password_hash"], role=row["role"], display_name=row["display_name"], is_active=bool(row["is_active"]), created_at=datetime.fromisoformat(row["created_at"]), updated_at=datetime.fromisoformat(row["updated_at"]), ) for row in rows ]