This commit is contained in:
Mateusz Gruszczyński
2026-04-15 10:33:26 +02:00
parent 14f83cd549
commit 9ddb203ec0
16 changed files with 348 additions and 257 deletions

View File

@@ -28,8 +28,6 @@ def serialize_router(router: Router, global_settings) -> RouterResponse:
effective_username = router_user
uses_global_switchos_credentials = False
has_effective_password = bool(router_password)
uses_global_ssh_key = False
has_effective_ssh_key = False
if router.device_type == 'switchos':
effective_username = router_user or default_swos_user
@@ -37,15 +35,11 @@ def serialize_router(router: Router, global_settings) -> RouterResponse:
(not router_user and default_swos_user) or (not router_password and default_swos_password)
)
has_effective_password = bool(router_password or default_swos_password)
else:
uses_password_auth = bool(router_password)
uses_global_ssh_key = bool(has_global_key and not has_router_key and not uses_password_auth)
has_effective_ssh_key = bool(has_router_key or uses_global_ssh_key)
payload = RouterResponse.model_validate(router, from_attributes=True).model_dump()
payload['effective_username'] = effective_username
payload['uses_global_ssh_key'] = uses_global_ssh_key
payload['has_effective_ssh_key'] = has_effective_ssh_key
payload['uses_global_ssh_key'] = router.device_type == 'routeros' and has_global_key and not has_router_key
payload['has_effective_ssh_key'] = router.device_type == 'routeros' and (has_router_key or has_global_key)
payload['uses_global_switchos_credentials'] = uses_global_switchos_credentials
payload['has_effective_password'] = has_effective_password
payload['supports_export'] = router.device_type == 'routeros'

View File

@@ -15,12 +15,6 @@ from app.services.swos_beta_service import swos_beta_service
class RouterService:
connect_timeout_seconds = 10
auth_timeout_seconds = 10
banner_timeout_seconds = 10
command_timeout_seconds = 20
sftp_timeout_seconds = 20
def ping(self, router: Router):
if getattr(router, 'disable_ping', False):
return {'router_id': router.id, 'reachable': False, 'latency_ms': None, 'disabled': True}
@@ -65,41 +59,28 @@ class RouterService:
def _connect(self, router: Router, global_ssh_key: str | None = None):
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
router_key = (router.ssh_key or '').strip()
router_password = (router.ssh_password or '').strip()
global_key = (global_ssh_key or '').strip()
use_password_auth = bool(router_password and not router_key)
key_source = router_key or ('' if use_password_auth else global_key)
connect_kwargs = {
'hostname': router.host,
'port': router.port,
'username': router.ssh_user,
'timeout': self.connect_timeout_seconds,
'auth_timeout': self.auth_timeout_seconds,
'banner_timeout': self.banner_timeout_seconds,
'allow_agent': False,
'look_for_keys': False,
}
key_source = router.ssh_key.strip() if router.ssh_key and router.ssh_key.strip() else (global_ssh_key or "")
if key_source:
pkey = self._load_pkey(key_source)
client.connect(pkey=pkey, **connect_kwargs)
client.connect(router.host, port=router.port, username=router.ssh_user, pkey=pkey, timeout=10)
else:
client.connect(password=router_password or None, **connect_kwargs)
transport = client.get_transport()
if transport is not None:
transport.set_keepalive(15)
client.connect(
router.host,
port=router.port,
username=router.ssh_user,
password=router.ssh_password,
timeout=10,
allow_agent=False,
look_for_keys=False,
banner_timeout=10,
)
return client
def export(self, router: Router, global_ssh_key: str | None = None) -> str:
if router.device_type != 'routeros':
raise ValueError('Export tekstowy jest dostępny tylko dla RouterOS.')
client = self._connect(router, global_ssh_key)
_, stdout, _ = client.exec_command('/export', timeout=self.command_timeout_seconds)
_, stdout, _ = client.exec_command('/export')
output = stdout.read().decode('utf-8', errors='ignore')
client.close()
return output
@@ -111,10 +92,9 @@ class RouterService:
return local_path
client = self._connect(router, global_ssh_key)
_, stdout, _ = client.exec_command(f'/system backup save name={backup_name}', timeout=self.command_timeout_seconds)
_, stdout, _ = client.exec_command(f'/system backup save name={backup_name}')
stdout.channel.recv_exit_status()
sftp = client.open_sftp()
sftp.get_channel().settimeout(self.sftp_timeout_seconds)
remote_file = f'{backup_name}.backup'
sftp.get(remote_file, local_path)
try:
@@ -130,7 +110,6 @@ class RouterService:
raise ValueError('Przywracanie plików jest dostępne tylko dla RouterOS.')
client = self._connect(router, global_ssh_key)
sftp = client.open_sftp()
sftp.get_channel().settimeout(self.sftp_timeout_seconds)
target_name = Path(local_backup_path).name
sftp.put(local_backup_path, target_name)
sftp.close()
@@ -140,9 +119,9 @@ class RouterService:
tested_at = datetime.utcnow()
try:
client = self._connect(router, global_ssh_key)
_, stdout, _ = client.exec_command('/system resource print without-paging', timeout=self.command_timeout_seconds)
_, stdout, _ = client.exec_command('/system resource print without-paging')
resource_output = stdout.read().decode('utf-8', errors='ignore')
_, stdout, _ = client.exec_command('/system identity print', timeout=self.command_timeout_seconds)
_, stdout, _ = client.exec_command('/system identity print')
identity_output = stdout.read().decode('utf-8', errors='ignore')
client.close()
model = 'Unknown'

View File

@@ -60,60 +60,3 @@ def test_router_list_marks_global_ssh_key_usage(monkeypatch, tmp_path):
payload = list_response.json()
assert payload[0]["uses_global_ssh_key"] is True
assert payload[0]["has_effective_ssh_key"] is True
def test_router_password_auth_overrides_global_ssh_key(monkeypatch, tmp_path):
monkeypatch.setenv("DATABASE_URL", f"sqlite:///{tmp_path / 'routers-password.db'}")
monkeypatch.setenv("DATA_DIR", str(tmp_path / 'data-password'))
monkeypatch.setenv("SECRET_KEY", "test-secret")
monkeypatch.setenv("DEFAULT_ADMIN_USERNAME", "admin")
monkeypatch.setenv("DEFAULT_ADMIN_PASSWORD", "admin")
with TestClient(app) as client:
login_response = client.post("/api/auth/login", data={"username": "admin", "password": "admin"})
token = login_response.json()["access_token"]
headers = {"Authorization": f"Bearer {token}"}
settings_response = client.put(
"/api/settings",
json={
"backup_retention_days": 7,
"log_retention_days": 7,
"export_cron": "",
"binary_cron": "",
"retention_cron": "",
"enable_auto_export": False,
"connection_test_interval_minutes": 0,
"global_ssh_key": "-----BEGIN OPENSSH PRIVATE KEY-----\nabc\n-----END OPENSSH PRIVATE KEY-----",
"pushover_token": None,
"pushover_userkey": None,
"notify_failures_only": True,
"smtp_host": None,
"smtp_port": 587,
"smtp_login": None,
"smtp_password": None,
"smtp_notifications_enabled": False,
"recipient_email": None,
"clear_global_ssh_key": False
},
headers=headers,
)
assert settings_response.status_code == 200
create_response = client.post(
"/api/routers",
json={
"name": "edge02",
"host": "10.0.0.2",
"port": 22,
"ssh_user": "admin",
"ssh_password": "secret-pass",
"ssh_key": None
},
headers=headers,
)
assert create_response.status_code == 200
payload = create_response.json()
assert payload["uses_global_ssh_key"] is False
assert payload["has_effective_ssh_key"] is False
assert payload["has_effective_password"] is True