import smtplib from email import encoders from email.mime.base import MIMEBase from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText from pathlib import Path import requests from app.core.config import settings as app_settings from app.models.settings import GlobalSettings class NotificationService: def send_pushover(self, token: str, user_key: str, message: str, title: str = "RouterOS Backup") -> bool: response = requests.post( "https://api.pushover.net/1/messages.json", data={"token": token, "user": user_key, "message": message, "title": title}, timeout=15, ) return response.ok def send_email(self, settings: GlobalSettings, subject: str, body: str, attachment_path: str | None = None): if not (settings.smtp_host and settings.smtp_login and settings.smtp_password): raise ValueError("SMTP is not configured") recipient = (settings.recipient_email or settings.smtp_login or "").strip() if not recipient: raise ValueError("Recipient email is empty") msg = MIMEMultipart() msg["From"] = settings.smtp_login msg["To"] = recipient msg["Subject"] = subject msg.attach(MIMEText(body, "plain", "utf-8")) if attachment_path: attachment = Path(attachment_path) with attachment.open("rb") as handle: part = MIMEBase("application", "octet-stream") part.set_payload(handle.read()) encoders.encode_base64(part) part.add_header("Content-Disposition", f'attachment; filename="{attachment.name}"') msg.attach(part) with smtplib.SMTP(settings.smtp_host, settings.smtp_port, timeout=app_settings.smtp_timeout_seconds) as server: if app_settings.smtp_starttls: server.starttls() server.login(settings.smtp_login, settings.smtp_password) server.sendmail(settings.smtp_login, [recipient], msg.as_string()) def notify(self, settings: GlobalSettings, message: str, success: bool): if settings.notify_failures_only and success: return if settings.smtp_notifications_enabled: try: self.send_email(settings, "RouterOS Backup notification", message) except Exception: pass if settings.pushover_token and settings.pushover_userkey: try: self.send_pushover(settings.pushover_token, settings.pushover_userkey, message) except Exception: pass def send_test_email(self, settings: GlobalSettings): self.send_email(settings, "RouterOS Backup test", "This is a test email from RouterOS Backup Manager Next") def send_test_pushover(self, settings: GlobalSettings): if not (settings.pushover_token and settings.pushover_userkey): raise ValueError("Pushover is not configured") self.send_pushover( settings.pushover_token, settings.pushover_userkey, "Test pushover from RouterOS Backup Manager Next", ) notification_service = NotificationService()