Files
mikrotik_backup_system/backend/app/services/notification_service.py
Mateusz Gruszczyński 3da6c2832c first commit
2026-04-14 11:39:46 +02:00

79 lines
3.1 KiB
Python

import smtplib
from email import encoders
from email.mime.base import MIMEBase
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from pathlib import Path
import requests
from app.core.config import settings as app_settings
from app.models.settings import GlobalSettings
class NotificationService:
def send_pushover(self, token: str, user_key: str, message: str, title: str = "RouterOS Backup") -> bool:
response = requests.post(
"https://api.pushover.net/1/messages.json",
data={"token": token, "user": user_key, "message": message, "title": title},
timeout=15,
)
return response.ok
def send_email(self, settings: GlobalSettings, subject: str, body: str, attachment_path: str | None = None):
if not (settings.smtp_host and settings.smtp_login and settings.smtp_password):
raise ValueError("SMTP is not configured")
recipient = (settings.recipient_email or settings.smtp_login or "").strip()
if not recipient:
raise ValueError("Recipient email is empty")
msg = MIMEMultipart()
msg["From"] = settings.smtp_login
msg["To"] = recipient
msg["Subject"] = subject
msg.attach(MIMEText(body, "plain", "utf-8"))
if attachment_path:
attachment = Path(attachment_path)
with attachment.open("rb") as handle:
part = MIMEBase("application", "octet-stream")
part.set_payload(handle.read())
encoders.encode_base64(part)
part.add_header("Content-Disposition", f'attachment; filename="{attachment.name}"')
msg.attach(part)
with smtplib.SMTP(settings.smtp_host, settings.smtp_port, timeout=app_settings.smtp_timeout_seconds) as server:
if app_settings.smtp_starttls:
server.starttls()
server.login(settings.smtp_login, settings.smtp_password)
server.sendmail(settings.smtp_login, [recipient], msg.as_string())
def notify(self, settings: GlobalSettings, message: str, success: bool):
if settings.notify_failures_only and success:
return
if settings.smtp_notifications_enabled:
try:
self.send_email(settings, "Mikrotik Backup System notification", message)
except Exception:
pass
if settings.pushover_token and settings.pushover_userkey:
try:
self.send_pushover(settings.pushover_token, settings.pushover_userkey, message)
except Exception:
pass
def send_test_email(self, settings: GlobalSettings):
self.send_email(settings, "Mikrotik Backup System test", "This is a test email from Mikrotik Backup System")
def send_test_pushover(self, settings: GlobalSettings):
if not (settings.pushover_token and settings.pushover_userkey):
raise ValueError("Pushover is not configured")
self.send_pushover(
settings.pushover_token,
settings.pushover_userkey,
"Test pushover from Mikrotik Backup System",
)
notification_service = NotificationService()