This repository has been archived on 2026-04-14. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
routeros_backup_next/backend/app/services/router_service.py
Mateusz Gruszczyński ff7dbcb4e4 first commit
2026-04-12 21:26:12 +02:00

141 lines
5.3 KiB
Python

from datetime import datetime
import io
from pathlib import Path
import paramiko
from sqlalchemy.orm import Session
from app.models.router import Router
class RouterService:
def _load_pkey(self, ssh_key_str: str):
key_str = (ssh_key_str or "").strip()
key_buffer = io.StringIO(key_str)
loaders = [
paramiko.RSAKey.from_private_key,
paramiko.Ed25519Key.from_private_key,
paramiko.ECDSAKey.from_private_key,
]
last_error = None
for loader in loaders:
key_buffer.seek(0)
try:
return loader(key_buffer)
except Exception as exc:
last_error = exc
raise ValueError("Failed to load SSH private key") from last_error
def _connect(self, router: Router, global_ssh_key: str | None = None):
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
key_source = router.ssh_key.strip() if router.ssh_key and router.ssh_key.strip() else (global_ssh_key or "")
if key_source:
pkey = self._load_pkey(key_source)
client.connect(router.host, port=router.port, username=router.ssh_user, pkey=pkey, timeout=10)
else:
client.connect(
router.host,
port=router.port,
username=router.ssh_user,
password=router.ssh_password,
timeout=10,
allow_agent=False,
look_for_keys=False,
banner_timeout=10,
)
return client
def export(self, router: Router, global_ssh_key: str | None = None) -> str:
client = self._connect(router, global_ssh_key)
_, stdout, _ = client.exec_command("/export")
output = stdout.read().decode("utf-8", errors="ignore")
client.close()
return output
def binary_backup(self, router: Router, backup_name: str, local_path: str, global_ssh_key: str | None = None) -> str:
client = self._connect(router, global_ssh_key)
_, stdout, _ = client.exec_command(f"/system backup save name={backup_name}")
stdout.channel.recv_exit_status()
sftp = client.open_sftp()
remote_file = f"{backup_name}.backup"
sftp.get(remote_file, local_path)
try:
sftp.remove(remote_file)
except Exception:
pass
sftp.close()
client.close()
return local_path
def upload_backup(self, router: Router, local_backup_path: str, global_ssh_key: str | None = None):
client = self._connect(router, global_ssh_key)
sftp = client.open_sftp()
target_name = Path(local_backup_path).name
sftp.put(local_backup_path, target_name)
sftp.close()
client.close()
def probe_connection(self, router: Router, global_ssh_key: str | None = None):
tested_at = datetime.utcnow()
try:
client = self._connect(router, global_ssh_key)
_, stdout, _ = client.exec_command("/system resource print without-paging")
resource_output = stdout.read().decode("utf-8", errors="ignore")
_, stdout, _ = client.exec_command("/system identity print")
identity_output = stdout.read().decode("utf-8", errors="ignore")
client.close()
model = "Unknown"
uptime = "Unknown"
hostname = "Unknown"
version = "Unknown"
for line in resource_output.splitlines():
if "board-name" in line:
model = line.split(":", 1)[1].strip()
if "uptime" in line:
uptime = line.split(":", 1)[1].strip()
if "version" in line:
version = line.split(":", 1)[1].strip()
for line in identity_output.splitlines():
if "name" in line:
hostname = line.split(":", 1)[1].strip()
return {
"success": True,
"tested_at": tested_at,
"model": model,
"uptime": uptime,
"hostname": hostname,
"version": version,
"error": None,
}
except Exception as exc:
return {
"success": False,
"tested_at": tested_at,
"model": "Unknown",
"uptime": "Unknown",
"hostname": router.name,
"version": None,
"error": str(exc),
}
def _store_connection_result(self, db: Session, router: Router, result: dict):
router.last_connection_status = result["success"]
router.last_connection_tested_at = result["tested_at"]
router.last_connection_error = result.get("error")
router.last_connection_hostname = result.get("hostname")
router.last_connection_model = result.get("model")
router.last_connection_version = result.get("version")
router.last_connection_uptime = result.get("uptime")
db.add(router)
db.commit()
db.refresh(router)
return result
def test_connection(self, db: Session, router: Router, global_ssh_key: str | None = None):
result = self.probe_connection(router, global_ssh_key)
return self._store_connection_result(db, router, result)
router_service = RouterService()