#!/usr/bin/env python3 """Import data from the original Flask SQLite database into the new schema. Usage: python backend/scripts/migrate_legacy_sqlite.py /path/to/backup_routeros.db """ from __future__ import annotations import sqlite3 import sys from datetime import datetime from pathlib import Path ROOT = Path(__file__).resolve().parents[1] sys.path.insert(0, str(ROOT)) from app.core.security import get_password_hash # noqa: E402 from app.db.session import SessionLocal, init_db # noqa: E402 from app.models.backup import Backup # noqa: E402 from app.models.log import OperationLog # noqa: E402 from app.models.router import Router # noqa: E402 from app.models.settings import GlobalSettings # noqa: E402 from app.models.user import User # noqa: E402 def parse_dt(value): if not value: return None for fmt in ("%Y-%m-%d %H:%M:%S.%f", "%Y-%m-%d %H:%M:%S"): try: return datetime.strptime(value, fmt) except ValueError: pass return None def main() -> int: if len(sys.argv) != 2: print("Usage: python backend/scripts/migrate_legacy_sqlite.py /path/to/legacy.db") return 1 source_path = Path(sys.argv[1]).resolve() if not source_path.exists(): print(f"Legacy DB not found: {source_path}") return 1 init_db() source = sqlite3.connect(str(source_path)) source.row_factory = sqlite3.Row dest = SessionLocal() try: user_map: dict[int, int] = {} for row in source.execute("SELECT id, username, password_hash FROM users ORDER BY id"): existing = dest.query(User).filter(User.username == row["username"]).first() if existing: user_map[row["id"]] = existing.id continue user = User(username=row["username"], password_hash=row["password_hash"] or get_password_hash("admin")) dest.add(user) dest.flush() user_map[row["id"]] = user.id router_map: dict[int, int] = {} for row in source.execute( "SELECT id, owner_id, name, host, port, ssh_user, ssh_key, ssh_password, created_at FROM routers ORDER BY id" ): router = Router( owner_id=user_map.get(row["owner_id"], next(iter(user_map.values()), 1)), name=row["name"], host=row["host"], port=row["port"] or 22, ssh_user=row["ssh_user"] or "admin", ssh_key=row["ssh_key"], ssh_password=row["ssh_password"], created_at=parse_dt(row["created_at"]), ) dest.add(router) dest.flush() router_map[row["id"]] = router.id for row in source.execute( "SELECT router_id, file_path, backup_type, created_at, checksum FROM backups ORDER BY id" ): file_name = Path(row["file_path"] or "backup").name backup = Backup( router_id=router_map[row["router_id"]], file_path=row["file_path"], file_name=file_name, backup_type=row["backup_type"] or "export", created_at=parse_dt(row["created_at"]), checksum=row["checksum"], ) dest.add(backup) for row in source.execute("SELECT message, timestamp FROM operation_logs ORDER BY id"): dest.add(OperationLog(message=row["message"], timestamp=parse_dt(row["timestamp"]))) legacy_settings = source.execute("SELECT * FROM global_settings ORDER BY id LIMIT 1").fetchone() if legacy_settings: settings = dest.query(GlobalSettings).first() or GlobalSettings() for key in legacy_settings.keys(): if hasattr(settings, key): setattr(settings, key, legacy_settings[key]) dest.add(settings) dest.commit() print("Migration completed") return 0 finally: source.close() dest.close() if __name__ == "__main__": raise SystemExit(main())