from concurrent.futures import ThreadPoolExecutor from datetime import datetime import io from pathlib import Path import platform import re import subprocess import paramiko from sqlalchemy.orm import Session from app.models.router import Router from app.services.log_service import log_service from app.services.swos_beta_service import swos_beta_service class RouterService: def ping(self, router: Router): if getattr(router, 'disable_ping', False): return {'router_id': router.id, 'reachable': False, 'latency_ms': None, 'disabled': True} count_flag = '-n' if platform.system().lower().startswith('win') else '-c' timeout_flag = '-w' if platform.system().lower().startswith('win') else '-W' command = ['ping', count_flag, '1', timeout_flag, '1', router.host] try: completed = subprocess.run(command, capture_output=True, text=True, timeout=3, check=False) output = completed.stdout + "\n" + completed.stderr if completed.returncode != 0: return {'router_id': router.id, 'reachable': False, 'latency_ms': None, 'disabled': False} match = re.search(r'time[=<]\s*([0-9]+(?:[.,][0-9]+)?)\s*ms', output, re.IGNORECASE) latency = float(match.group(1).replace(',', '.')) if match else None return {'router_id': router.id, 'reachable': True, 'latency_ms': latency, 'disabled': False} except Exception: return {'router_id': router.id, 'reachable': False, 'latency_ms': None, 'disabled': False} def ping_many(self, routers: list[Router]): if not routers: return [] max_workers = min(8, max(1, len(routers))) with ThreadPoolExecutor(max_workers=max_workers) as executor: return list(executor.map(self.ping, routers)) def _load_pkey(self, ssh_key_str: str): key_str = (ssh_key_str or "").strip() key_buffer = io.StringIO(key_str) loaders = [ paramiko.RSAKey.from_private_key, paramiko.Ed25519Key.from_private_key, paramiko.ECDSAKey.from_private_key, ] last_error = None for loader in loaders: key_buffer.seek(0) try: return loader(key_buffer) except Exception as exc: last_error = exc raise ValueError("Failed to load SSH private key") from last_error def _connect(self, router: Router, global_ssh_key: str | None = None): client = paramiko.SSHClient() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) key_source = router.ssh_key.strip() if router.ssh_key and router.ssh_key.strip() else (global_ssh_key or "") if key_source: pkey = self._load_pkey(key_source) client.connect(router.host, port=router.port, username=router.ssh_user, pkey=pkey, timeout=10) else: client.connect( router.host, port=router.port, username=router.ssh_user, password=router.ssh_password, timeout=10, allow_agent=False, look_for_keys=False, banner_timeout=10, ) return client def export(self, router: Router, global_ssh_key: str | None = None) -> str: if router.device_type != 'routeros': raise ValueError('Export tekstowy jest dostępny tylko dla RouterOS.') client = self._connect(router, global_ssh_key) _, stdout, _ = client.exec_command('/export') output = stdout.read().decode('utf-8', errors='ignore') client.close() return output def binary_backup(self, router: Router, backup_name: str, local_path: str, global_ssh_key: str | None = None, global_settings=None) -> str: if router.device_type == 'switchos': downloaded = swos_beta_service.download_backup_for_router(router, global_settings) Path(local_path).write_bytes(downloaded.content) return local_path client = self._connect(router, global_ssh_key) _, stdout, _ = client.exec_command(f'/system backup save name={backup_name}') stdout.channel.recv_exit_status() sftp = client.open_sftp() remote_file = f'{backup_name}.backup' sftp.get(remote_file, local_path) try: sftp.remove(remote_file) except Exception: pass sftp.close() client.close() return local_path def upload_backup(self, router: Router, local_backup_path: str, global_ssh_key: str | None = None): if router.device_type != 'routeros': raise ValueError('Przywracanie plików jest dostępne tylko dla RouterOS.') client = self._connect(router, global_ssh_key) sftp = client.open_sftp() target_name = Path(local_backup_path).name sftp.put(local_backup_path, target_name) sftp.close() client.close() def _probe_routeros_connection(self, router: Router, global_ssh_key: str | None = None): tested_at = datetime.utcnow() try: client = self._connect(router, global_ssh_key) _, stdout, _ = client.exec_command('/system resource print without-paging') resource_output = stdout.read().decode('utf-8', errors='ignore') _, stdout, _ = client.exec_command('/system identity print') identity_output = stdout.read().decode('utf-8', errors='ignore') client.close() model = 'Unknown' uptime = 'Unknown' hostname = 'Unknown' version = 'Unknown' for line in resource_output.splitlines(): if 'board-name' in line: model = line.split(':', 1)[1].strip() if 'uptime' in line: uptime = line.split(':', 1)[1].strip() if 'version' in line: version = line.split(':', 1)[1].strip() for line in identity_output.splitlines(): if 'name' in line: hostname = line.split(':', 1)[1].strip() return { 'success': True, 'tested_at': tested_at, 'model': model, 'uptime': uptime, 'hostname': hostname, 'version': version, 'error': None, 'transport': 'ssh', 'server': None, 'auth_mode': 'ssh', 'http_status': None, 'backup_available': None, } except Exception as exc: return { 'success': False, 'tested_at': tested_at, 'model': 'Unknown', 'uptime': 'Unknown', 'hostname': router.name, 'version': None, 'error': str(exc), 'transport': 'ssh', 'server': None, 'auth_mode': 'ssh', 'http_status': None, 'backup_available': None, } def probe_connection(self, router: Router, global_ssh_key: str | None = None, global_settings=None): if router.device_type == 'switchos': return swos_beta_service.probe_router(router, global_settings) return self._probe_routeros_connection(router, global_ssh_key) def _store_connection_result(self, db: Session, router: Router, result: dict): router.last_connection_status = result['success'] router.last_connection_tested_at = result['tested_at'] router.last_connection_error = result.get('error') router.last_connection_hostname = result.get('hostname') router.last_connection_model = result.get('model') router.last_connection_version = result.get('version') router.last_connection_uptime = result.get('uptime') router.last_connection_transport = result.get('transport') router.last_connection_server = result.get('server') router.last_connection_auth_mode = result.get('auth_mode') router.last_connection_http_status = result.get('http_status') router.last_connection_backup_available = result.get('backup_available') db.add(router) db.commit() db.refresh(router) return result def _device_label(self, router: Router) -> str: platform = 'SwitchOS' if router.device_type == 'switchos' else 'RouterOS' return f'{platform} device {router.name}' def _build_connection_log_message(self, router: Router, result: dict) -> str: device_label = self._device_label(router) transport = result.get('transport') or 'unknown transport' auth_mode = result.get('auth_mode') http_status = result.get('http_status') backup_available = result.get('backup_available') hostname = result.get('hostname') model = result.get('model') version = result.get('version') uptime = result.get('uptime') server = result.get('server') details = [f'via {transport}', f'target={router.host}:{router.port}'] if router.device_type == 'routeros': if router.ssh_user: details.append(f'user={router.ssh_user}') if hostname: details.append(f'hostname={hostname}') if model and model != 'Unknown': details.append(f'model={model}') if version and version != 'Unknown': details.append(f'version={version}') if uptime and uptime != 'Unknown': details.append(f'uptime={uptime}') else: if auth_mode: details.append(f'auth={auth_mode}') if http_status: details.append(f'http={http_status}') if server: details.append(f'server={server}') if backup_available is not None: details.append(f'backup_available={"yes" if backup_available else "no"}') if hostname: details.append(f'hostname={hostname}') detail_suffix = f' ({", ".join(details)})' if details else '' if result.get('success'): return f'Connection test OK for {device_label}{detail_suffix}' error = result.get('error') or 'Unknown error' return f'Connection test FAILED for {device_label}{detail_suffix}: {error}' def test_connection(self, db: Session, router: Router, global_settings): result = self.probe_connection(router, global_settings.global_ssh_key, global_settings) stored_result = self._store_connection_result(db, router, result) log_service.add(db, self._build_connection_log_message(router, stored_result)) return stored_result router_service = RouterService()