first commit
This commit is contained in:
78
backend/app/services/notification_service.py
Normal file
78
backend/app/services/notification_service.py
Normal file
@@ -0,0 +1,78 @@
|
||||
import smtplib
|
||||
from email import encoders
|
||||
from email.mime.base import MIMEBase
|
||||
from email.mime.multipart import MIMEMultipart
|
||||
from email.mime.text import MIMEText
|
||||
from pathlib import Path
|
||||
|
||||
import requests
|
||||
|
||||
from app.core.config import settings as app_settings
|
||||
from app.models.settings import GlobalSettings
|
||||
|
||||
|
||||
class NotificationService:
|
||||
def send_pushover(self, token: str, user_key: str, message: str, title: str = "RouterOS Backup") -> bool:
|
||||
response = requests.post(
|
||||
"https://api.pushover.net/1/messages.json",
|
||||
data={"token": token, "user": user_key, "message": message, "title": title},
|
||||
timeout=15,
|
||||
)
|
||||
return response.ok
|
||||
|
||||
def send_email(self, settings: GlobalSettings, subject: str, body: str, attachment_path: str | None = None):
|
||||
if not (settings.smtp_host and settings.smtp_login and settings.smtp_password):
|
||||
raise ValueError("SMTP is not configured")
|
||||
recipient = (settings.recipient_email or settings.smtp_login or "").strip()
|
||||
if not recipient:
|
||||
raise ValueError("Recipient email is empty")
|
||||
|
||||
msg = MIMEMultipart()
|
||||
msg["From"] = settings.smtp_login
|
||||
msg["To"] = recipient
|
||||
msg["Subject"] = subject
|
||||
msg.attach(MIMEText(body, "plain", "utf-8"))
|
||||
|
||||
if attachment_path:
|
||||
attachment = Path(attachment_path)
|
||||
with attachment.open("rb") as handle:
|
||||
part = MIMEBase("application", "octet-stream")
|
||||
part.set_payload(handle.read())
|
||||
encoders.encode_base64(part)
|
||||
part.add_header("Content-Disposition", f'attachment; filename="{attachment.name}"')
|
||||
msg.attach(part)
|
||||
|
||||
with smtplib.SMTP(settings.smtp_host, settings.smtp_port, timeout=app_settings.smtp_timeout_seconds) as server:
|
||||
if app_settings.smtp_starttls:
|
||||
server.starttls()
|
||||
server.login(settings.smtp_login, settings.smtp_password)
|
||||
server.sendmail(settings.smtp_login, [recipient], msg.as_string())
|
||||
|
||||
def notify(self, settings: GlobalSettings, message: str, success: bool):
|
||||
if settings.notify_failures_only and success:
|
||||
return
|
||||
if settings.smtp_notifications_enabled:
|
||||
try:
|
||||
self.send_email(settings, "RouterOS Backup notification", message)
|
||||
except Exception:
|
||||
pass
|
||||
if settings.pushover_token and settings.pushover_userkey:
|
||||
try:
|
||||
self.send_pushover(settings.pushover_token, settings.pushover_userkey, message)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def send_test_email(self, settings: GlobalSettings):
|
||||
self.send_email(settings, "RouterOS Backup test", "This is a test email from RouterOS Backup Manager Next")
|
||||
|
||||
def send_test_pushover(self, settings: GlobalSettings):
|
||||
if not (settings.pushover_token and settings.pushover_userkey):
|
||||
raise ValueError("Pushover is not configured")
|
||||
self.send_pushover(
|
||||
settings.pushover_token,
|
||||
settings.pushover_userkey,
|
||||
"Test pushover from RouterOS Backup Manager Next",
|
||||
)
|
||||
|
||||
|
||||
notification_service = NotificationService()
|
||||
Reference in New Issue
Block a user