105 lines
4.4 KiB
Python
105 lines
4.4 KiB
Python
from datetime import datetime
|
|
from decimal import Decimal
|
|
import requests
|
|
|
|
from app.extensions import db
|
|
from app.models.company import UserCompanyAccess
|
|
from app.models.invoice import NotificationLog
|
|
from app.models.user import User
|
|
from app.services.mail_service import MailService
|
|
from app.services.settings_service import SettingsService
|
|
|
|
|
|
class NotificationService:
|
|
def __init__(self, company_id=None):
|
|
self.company_id = company_id
|
|
|
|
def should_notify(self, invoice):
|
|
if SettingsService.get_effective('notify.enabled', 'false', company_id=self.company_id) != 'true':
|
|
return False
|
|
min_amount = Decimal(SettingsService.get_effective('notify.min_amount', '0', company_id=self.company_id) or '0')
|
|
return Decimal(invoice.gross_amount) >= min_amount
|
|
|
|
def _pushover_credentials(self):
|
|
return {
|
|
'token': SettingsService.get_effective_secret('notify.pushover_api_token', '', company_id=self.company_id),
|
|
'user': SettingsService.get_effective('notify.pushover_user_key', '', company_id=self.company_id),
|
|
}
|
|
|
|
def _email_recipients(self):
|
|
if not self.company_id:
|
|
return []
|
|
rows = (
|
|
db.session.query(User.email)
|
|
.join(UserCompanyAccess, UserCompanyAccess.user_id == User.id)
|
|
.filter(UserCompanyAccess.company_id == self.company_id, User.is_blocked.is_(False))
|
|
.order_by(User.email.asc())
|
|
.all()
|
|
)
|
|
recipients = []
|
|
seen = set()
|
|
for (email,) in rows:
|
|
email = (email or '').strip().lower()
|
|
if not email or email in seen:
|
|
continue
|
|
seen.add(email)
|
|
recipients.append(email)
|
|
return recipients
|
|
|
|
def notify_new_invoice(self, invoice):
|
|
if not self.should_notify(invoice):
|
|
return []
|
|
message = f'Nowa faktura {invoice.invoice_number} / {invoice.contractor_name} / {invoice.gross_amount} PLN'
|
|
logs = [self._send_email_notification(invoice, message)]
|
|
logs.append(self._send_pushover(invoice.id, message))
|
|
return logs
|
|
|
|
def log_channel(self, invoice_id, channel, status, message):
|
|
log = NotificationLog(invoice_id=invoice_id, channel=channel, status=status, message=message, sent_at=datetime.utcnow())
|
|
db.session.add(log)
|
|
db.session.commit()
|
|
return log
|
|
|
|
def _send_email_notification(self, invoice, message):
|
|
recipients = self._email_recipients()
|
|
if not recipients:
|
|
return self.log_channel(invoice.id, 'email', 'skipped', 'Brak odbiorców e-mail przypisanych do aktywnej firmy')
|
|
|
|
mailer = MailService(company_id=self.company_id)
|
|
subject = f'Nowa faktura: {invoice.invoice_number}'
|
|
body = (
|
|
'W systemie KSeF Manager pojawiła się nowa faktura.\n\n'
|
|
f'Numer: {invoice.invoice_number}\n'
|
|
f'Kontrahent: {invoice.contractor_name}\n'
|
|
f'Kwota brutto: {invoice.gross_amount} PLN\n'
|
|
)
|
|
|
|
sent = 0
|
|
errors = []
|
|
for recipient in recipients:
|
|
try:
|
|
mailer.send_mail(recipient, subject, body)
|
|
sent += 1
|
|
except Exception as exc:
|
|
errors.append(f'{recipient}: {exc}')
|
|
|
|
if sent and not errors:
|
|
return self.log_channel(invoice.id, 'email', 'sent', f'{message} · odbiorców: {sent}')
|
|
if sent and errors:
|
|
return self.log_channel(invoice.id, 'email', 'error', f"Wysłano do {sent}/{len(recipients)} odbiorców. Błędy: {'; '.join(errors[:3])}")
|
|
return self.log_channel(invoice.id, 'email', 'error', 'Nie udało się wysłać powiadomień e-mail: ' + '; '.join(errors[:3]))
|
|
|
|
def _send_pushover(self, invoice_id, message):
|
|
creds = self._pushover_credentials()
|
|
if not creds['token'] or not creds['user']:
|
|
return self.log_channel(invoice_id, 'pushover', 'skipped', 'Brak konfiguracji Pushover')
|
|
try:
|
|
response = requests.post('https://api.pushover.net/1/messages.json', data={'token': creds['token'], 'user': creds['user'], 'message': message}, timeout=10)
|
|
response.raise_for_status()
|
|
return self.log_channel(invoice_id, 'pushover', 'sent', message)
|
|
except Exception as exc:
|
|
return self.log_channel(invoice_id, 'pushover', 'error', str(exc))
|
|
|
|
def send_test_pushover(self):
|
|
return self._send_pushover(None, 'KSeF Manager - test Pushover')
|