949 lines
44 KiB
Python
949 lines
44 KiB
Python
from __future__ import annotations
|
|
|
|
from collections import defaultdict
|
|
from datetime import date, datetime
|
|
from decimal import Decimal
|
|
from pathlib import Path
|
|
from xml.sax.saxutils import escape
|
|
import xml.etree.ElementTree as ET
|
|
|
|
from flask import current_app
|
|
|
|
from sqlalchemy import or_
|
|
|
|
from app.extensions import db
|
|
from app.models.catalog import InvoiceLine
|
|
from app.models.invoice import Invoice, InvoiceStatus, InvoiceType, Tag, SyncEvent
|
|
from app.repositories.invoice_repository import InvoiceRepository
|
|
from app.services.company_service import CompanyService
|
|
from app.services.ksef_service import KSeFService
|
|
from app.services.settings_service import SettingsService
|
|
|
|
|
|
class InvoiceService:
|
|
PERIOD_LABELS = {
|
|
"month": "miesięczne",
|
|
"quarter": "kwartalne",
|
|
"year": "roczne",
|
|
}
|
|
|
|
def __init__(self):
|
|
self.repo = InvoiceRepository()
|
|
|
|
def upsert_from_ksef(self, document, company):
|
|
invoice = self.repo.get_by_ksef_number(document.ksef_number, company_id=company.id)
|
|
created = False
|
|
if not invoice:
|
|
invoice = Invoice(ksef_number=document.ksef_number, company_id=company.id)
|
|
db.session.add(invoice)
|
|
created = True
|
|
|
|
invoice.invoice_number = document.invoice_number
|
|
invoice.contractor_name = document.contractor_name
|
|
invoice.contractor_nip = document.contractor_nip
|
|
invoice.issue_date = document.issue_date
|
|
invoice.received_date = document.received_date
|
|
invoice.fetched_at = document.fetched_at
|
|
invoice.net_amount = document.net_amount
|
|
invoice.vat_amount = document.vat_amount
|
|
invoice.gross_amount = document.gross_amount
|
|
invoice.invoice_type = InvoiceType(document.invoice_type)
|
|
invoice.last_synced_at = datetime.utcnow()
|
|
invoice.external_metadata = document.metadata
|
|
invoice.source_hash = KSeFService.calc_hash(document.xml_content)
|
|
invoice.xml_path = self._save_xml(company.id, invoice.ksef_number, document.xml_content)
|
|
metadata = dict(document.metadata or {})
|
|
payment_details = self.extract_payment_details_from_xml(document.xml_content)
|
|
if payment_details.get('payment_form_code') and not metadata.get('payment_form_code'):
|
|
metadata['payment_form_code'] = payment_details['payment_form_code']
|
|
if payment_details.get('payment_form_label') and not metadata.get('payment_form_label'):
|
|
metadata['payment_form_label'] = payment_details['payment_form_label']
|
|
if payment_details.get('bank_account') and not metadata.get('seller_bank_account'):
|
|
metadata['seller_bank_account'] = payment_details['bank_account']
|
|
if payment_details.get('bank_name') and not metadata.get('seller_bank_name'):
|
|
metadata['seller_bank_name'] = payment_details['bank_name']
|
|
if payment_details.get('payment_due_date') and not metadata.get('payment_due_date'):
|
|
metadata['payment_due_date'] = payment_details['payment_due_date']
|
|
invoice.external_metadata = metadata
|
|
invoice.seller_bank_account = payment_details.get('bank_account') or metadata.get('seller_bank_account') or invoice.seller_bank_account
|
|
invoice.contractor_address = (
|
|
metadata.get("contractor_address")
|
|
or ", ".join([
|
|
part for part in [
|
|
metadata.get("contractor_street"),
|
|
metadata.get("contractor_postal_code"),
|
|
metadata.get("contractor_city"),
|
|
metadata.get("contractor_country"),
|
|
] if part
|
|
])
|
|
or ""
|
|
)
|
|
|
|
db.session.flush()
|
|
|
|
existing_lines = invoice.lines.count() if hasattr(invoice.lines, "count") else len(list(invoice.lines))
|
|
if existing_lines == 0:
|
|
for line in self.extract_lines_from_xml(document.xml_content):
|
|
db.session.add(
|
|
InvoiceLine(
|
|
invoice_id=invoice.id,
|
|
description=line["description"],
|
|
quantity=line["quantity"],
|
|
unit=line["unit"],
|
|
unit_net=line["unit_net"],
|
|
vat_rate=line["vat_rate"],
|
|
net_amount=line["net_amount"],
|
|
vat_amount=line["vat_amount"],
|
|
gross_amount=line["gross_amount"],
|
|
)
|
|
)
|
|
|
|
if not invoice.html_preview:
|
|
invoice.html_preview = self.render_invoice_html(invoice)
|
|
|
|
db.session.flush()
|
|
db.session.add(
|
|
SyncEvent(
|
|
invoice_id=invoice.id,
|
|
status="created" if created else "updated",
|
|
message="Pobrano z KSeF",
|
|
)
|
|
)
|
|
SettingsService.set_many({"ksef.last_sync_at": datetime.utcnow().isoformat()}, company_id=company.id)
|
|
return invoice, created
|
|
|
|
def _save_xml(self, company_id, ksef_number, xml_content):
|
|
base_path = SettingsService.storage_path("app.archive_path", current_app.config["ARCHIVE_PATH"]) / f"company_{company_id}"
|
|
base_path.mkdir(parents=True, exist_ok=True)
|
|
safe_name = ksef_number.replace("/", "_") + ".xml"
|
|
path = Path(base_path) / safe_name
|
|
path.write_text(xml_content, encoding="utf-8")
|
|
return str(path)
|
|
|
|
def extract_lines_from_xml(self, xml_content):
|
|
def to_decimal(value, default='0'):
|
|
raw = str(value or '').strip()
|
|
if not raw:
|
|
return Decimal(default)
|
|
raw = raw.replace(' ', '').replace(',', '.')
|
|
try:
|
|
return Decimal(raw)
|
|
except Exception:
|
|
return Decimal(default)
|
|
|
|
def to_vat_rate(value):
|
|
raw = str(value or '').strip().lower()
|
|
if not raw:
|
|
return Decimal('0')
|
|
raw = raw.replace('%', '').replace(' ', '').replace(',', '.')
|
|
try:
|
|
return Decimal(raw)
|
|
except Exception:
|
|
return Decimal('0')
|
|
|
|
def looks_numeric(value):
|
|
raw = str(value or '').strip().replace(' ', '').replace(',', '.')
|
|
if not raw:
|
|
return False
|
|
try:
|
|
Decimal(raw)
|
|
return True
|
|
except Exception:
|
|
return False
|
|
|
|
lines = []
|
|
try:
|
|
root = ET.fromstring(xml_content)
|
|
namespace_uri = root.tag.split('}')[0].strip('{') if '}' in root.tag else ''
|
|
ns = {'fa': namespace_uri} if namespace_uri else {}
|
|
|
|
row_path = './/fa:FaWiersz' if ns else './/FaWiersz'
|
|
text_path = lambda name: f'fa:{name}' if ns else name
|
|
|
|
for row in root.findall(row_path, ns):
|
|
description = (row.findtext(text_path('P_7'), default='', namespaces=ns) or '').strip()
|
|
|
|
p_8a = row.findtext(text_path('P_8A'), default='', namespaces=ns)
|
|
p_8b = row.findtext(text_path('P_8B'), default='', namespaces=ns)
|
|
|
|
if looks_numeric(p_8a):
|
|
qty_raw = p_8a
|
|
unit_raw = p_8b or 'szt.'
|
|
else:
|
|
unit_raw = p_8a or 'szt.'
|
|
qty_raw = p_8b or '1'
|
|
|
|
unit_net = (
|
|
row.findtext(text_path('P_9A'), default='', namespaces=ns)
|
|
or row.findtext(text_path('P_9B'), default='', namespaces=ns)
|
|
or '0'
|
|
)
|
|
|
|
net = (
|
|
row.findtext(text_path('P_11'), default='', namespaces=ns)
|
|
or row.findtext(text_path('P_11A'), default='', namespaces=ns)
|
|
or '0'
|
|
)
|
|
|
|
vat_rate = row.findtext(text_path('P_12'), default='0', namespaces=ns)
|
|
|
|
vat = (
|
|
row.findtext(text_path('P_12Z'), default='', namespaces=ns)
|
|
or row.findtext(text_path('P_11Vat'), default='', namespaces=ns)
|
|
or '0'
|
|
)
|
|
|
|
net_dec = to_decimal(net)
|
|
vat_dec = to_decimal(vat)
|
|
|
|
lines.append({
|
|
'description': description,
|
|
'quantity': to_decimal(qty_raw, '1'),
|
|
'unit': (unit_raw or 'szt.').strip(),
|
|
'unit_net': to_decimal(unit_net),
|
|
'vat_rate': to_vat_rate(vat_rate),
|
|
'net_amount': net_dec,
|
|
'vat_amount': vat_dec,
|
|
'gross_amount': net_dec + vat_dec,
|
|
})
|
|
|
|
except Exception as exc:
|
|
current_app.logger.warning(f'KSeF XML line parse error: {exc}')
|
|
|
|
return lines
|
|
|
|
def extract_payment_details_from_xml(self, xml_content):
|
|
details = {
|
|
"payment_form_code": "",
|
|
"payment_form_label": "",
|
|
"bank_account": "",
|
|
"bank_name": "",
|
|
"payment_due_date": "",
|
|
}
|
|
if not xml_content:
|
|
return details
|
|
try:
|
|
root = ET.fromstring(xml_content)
|
|
namespace_uri = root.tag.split('}')[0].strip('{') if '}' in root.tag else ''
|
|
ns = {'fa': namespace_uri} if namespace_uri else {}
|
|
|
|
def find_text(path):
|
|
return (root.findtext(path, default='', namespaces=ns) or '').strip()
|
|
|
|
form_code = find_text('.//fa:Platnosc/fa:FormaPlatnosci' if ns else './/Platnosc/FormaPlatnosci')
|
|
bank_account = find_text('.//fa:Platnosc/fa:RachunekBankowy/fa:NrRB' if ns else './/Platnosc/RachunekBankowy/NrRB')
|
|
bank_name = find_text('.//fa:Platnosc/fa:RachunekBankowy/fa:NazwaBanku' if ns else './/Platnosc/RachunekBankowy/NazwaBanku')
|
|
payment_due_date = find_text('.//fa:Platnosc/fa:TerminPlatnosci/fa:Termin' if ns else './/Platnosc/TerminPlatnosci/Termin')
|
|
|
|
form_labels = {
|
|
'1': 'gotówka',
|
|
'2': 'karta',
|
|
'3': 'bon',
|
|
'4': 'czek',
|
|
'5': 'weksel',
|
|
'6': 'przelew',
|
|
'7': 'kompensata',
|
|
'8': 'pobranie',
|
|
'9': 'akredytywa',
|
|
'10': 'polecenie zapłaty',
|
|
'11': 'inny',
|
|
}
|
|
details.update({
|
|
'payment_form_code': form_code,
|
|
'payment_form_label': form_labels.get(form_code, form_code),
|
|
'bank_account': self._normalize_bank_account(bank_account),
|
|
'bank_name': bank_name,
|
|
'payment_due_date': payment_due_date,
|
|
})
|
|
except Exception as exc:
|
|
current_app.logger.warning(f'KSeF XML payment parse error: {exc}')
|
|
return details
|
|
|
|
@staticmethod
|
|
def _first_non_empty(*values, default=""):
|
|
|
|
for value in values:
|
|
if value is None:
|
|
continue
|
|
if isinstance(value, str):
|
|
if value.strip():
|
|
return value.strip()
|
|
continue
|
|
text = str(value).strip()
|
|
if text:
|
|
return text
|
|
return default
|
|
|
|
@staticmethod
|
|
def _normalize_metadata_container(value):
|
|
return value if isinstance(value, dict) else {}
|
|
|
|
def _get_external_metadata(self, invoice):
|
|
return self._normalize_metadata_container(getattr(invoice, "external_metadata", {}) or {})
|
|
|
|
def _get_ksef_metadata(self, invoice):
|
|
external_metadata = self._get_external_metadata(invoice)
|
|
return self._normalize_metadata_container(external_metadata.get("ksef", {}) or {})
|
|
|
|
def _resolve_purchase_seller_data(self, invoice):
|
|
external_metadata = self._get_external_metadata(invoice)
|
|
ksef_meta = self._get_ksef_metadata(invoice)
|
|
|
|
seller_name = self._first_non_empty(
|
|
getattr(invoice, "contractor_name", ""),
|
|
external_metadata.get("contractor_name"),
|
|
ksef_meta.get("contractor_name"),
|
|
)
|
|
seller_tax_id = self._first_non_empty(
|
|
getattr(invoice, "contractor_nip", ""),
|
|
external_metadata.get("contractor_nip"),
|
|
ksef_meta.get("contractor_nip"),
|
|
)
|
|
|
|
seller_street = self._first_non_empty(
|
|
external_metadata.get("contractor_street"),
|
|
ksef_meta.get("contractor_street"),
|
|
external_metadata.get("seller_street"),
|
|
ksef_meta.get("seller_street"),
|
|
)
|
|
seller_city = self._first_non_empty(
|
|
external_metadata.get("contractor_city"),
|
|
ksef_meta.get("contractor_city"),
|
|
external_metadata.get("seller_city"),
|
|
ksef_meta.get("seller_city"),
|
|
)
|
|
seller_postal_code = self._first_non_empty(
|
|
external_metadata.get("contractor_postal_code"),
|
|
ksef_meta.get("contractor_postal_code"),
|
|
external_metadata.get("seller_postal_code"),
|
|
ksef_meta.get("seller_postal_code"),
|
|
)
|
|
seller_country = self._first_non_empty(
|
|
external_metadata.get("contractor_country"),
|
|
ksef_meta.get("contractor_country"),
|
|
external_metadata.get("seller_country"),
|
|
ksef_meta.get("seller_country"),
|
|
)
|
|
|
|
seller_address = self._first_non_empty(
|
|
external_metadata.get("contractor_address"),
|
|
ksef_meta.get("contractor_address"),
|
|
external_metadata.get("seller_address"),
|
|
ksef_meta.get("seller_address"),
|
|
getattr(invoice, "contractor_address", ""),
|
|
)
|
|
|
|
if not seller_address:
|
|
address_parts = [part for part in [seller_street, seller_postal_code, seller_city, seller_country] if part]
|
|
seller_address = ", ".join(address_parts)
|
|
|
|
return {
|
|
"name": seller_name,
|
|
"tax_id": seller_tax_id,
|
|
"address": seller_address,
|
|
}
|
|
|
|
def update_metadata(self, invoice, form):
|
|
invoice.status = InvoiceStatus(form.status.data)
|
|
invoice.internal_note = form.internal_note.data
|
|
invoice.pinned = bool(form.pinned.data)
|
|
invoice.queue_accounting = bool(form.queue_accounting.data)
|
|
invoice.is_unread = invoice.status == InvoiceStatus.NEW
|
|
requested_tags = [t.strip() for t in (form.tags.data or "").split(",") if t.strip()]
|
|
invoice.tags.clear()
|
|
for name in requested_tags:
|
|
tag = Tag.query.filter_by(name=name).first()
|
|
if not tag:
|
|
tag = Tag(name=name, color="primary")
|
|
db.session.add(tag)
|
|
invoice.tags.append(tag)
|
|
db.session.commit()
|
|
return invoice
|
|
|
|
def mark_read(self, invoice):
|
|
if invoice.is_unread:
|
|
invoice.is_unread = False
|
|
invoice.status = InvoiceStatus.READ if invoice.status == InvoiceStatus.NEW else invoice.status
|
|
invoice.read_at = datetime.utcnow()
|
|
db.session.commit()
|
|
|
|
def render_invoice_html(self, invoice):
|
|
def esc(value):
|
|
return str(value or "—").replace("&", "&").replace("<", "<").replace(">", ">")
|
|
|
|
def money(value):
|
|
return f"{Decimal(value):,.2f} {invoice.currency or 'PLN'}".replace(",", " ").replace(".", ",")
|
|
|
|
company_name_raw = invoice.company.name if invoice.company else "Twoja firma"
|
|
company_name = esc(company_name_raw)
|
|
company_tax_id = esc(getattr(invoice.company, "tax_id", "") if invoice.company else "")
|
|
company_address = esc(getattr(invoice.company, "address", "") if invoice.company else "")
|
|
|
|
customer = getattr(invoice, "customer", None)
|
|
customer_name = esc(getattr(customer, "name", invoice.contractor_name))
|
|
customer_tax_id = esc(getattr(customer, "tax_id", invoice.contractor_nip))
|
|
customer_address = esc(getattr(customer, "address", ""))
|
|
customer_email = esc(getattr(customer, "email", ""))
|
|
|
|
if invoice.invoice_type == InvoiceType.PURCHASE:
|
|
purchase_seller = self._resolve_purchase_seller_data(invoice)
|
|
|
|
seller_name = esc(purchase_seller["name"])
|
|
seller_tax_id = esc(purchase_seller["tax_id"])
|
|
seller_address = esc(purchase_seller["address"])
|
|
|
|
buyer_name = company_name
|
|
buyer_tax_id = company_tax_id
|
|
buyer_address = company_address
|
|
buyer_email = ""
|
|
else:
|
|
seller_name = company_name
|
|
seller_tax_id = company_tax_id
|
|
seller_address = company_address
|
|
|
|
buyer_name = customer_name
|
|
buyer_tax_id = customer_tax_id
|
|
buyer_address = customer_address
|
|
buyer_email = customer_email
|
|
|
|
nfz_meta = (invoice.external_metadata or {}).get("nfz", {})
|
|
invoice_kind = "FAKTURA NFZ" if invoice.source == "nfz" else "FAKTURA VAT"
|
|
|
|
page_title = self._first_non_empty(
|
|
getattr(invoice, "invoice_number", ""),
|
|
getattr(invoice, "ksef_number", ""),
|
|
company_name_raw,
|
|
"Faktura",
|
|
)
|
|
|
|
lines = invoice.lines.order_by("id").all() if hasattr(invoice.lines, "order_by") else list(invoice.lines)
|
|
if not lines and invoice.xml_path:
|
|
try:
|
|
xml_content = Path(invoice.xml_path).read_text(encoding="utf-8")
|
|
extracted = self.extract_lines_from_xml(xml_content)
|
|
lines = [type("TmpLine", (), line) for line in extracted]
|
|
except Exception:
|
|
lines = []
|
|
|
|
lines_html = "".join(
|
|
[
|
|
(
|
|
f"<tr>"
|
|
f"<td style='padding:8px;border:1px solid #000'>{esc(line.description)}</td>"
|
|
f"<td style='padding:8px;border:1px solid #000;text-align:right'>{esc(line.quantity)}</td>"
|
|
f"<td style='padding:8px;border:1px solid #000;text-align:right'>{esc(line.unit)}</td>"
|
|
f"<td style='padding:8px;border:1px solid #000;text-align:right'>{esc(line.vat_rate)}%</td>"
|
|
f"<td style='padding:8px;border:1px solid #000;text-align:right'>{money(line.net_amount)}</td>"
|
|
f"<td style='padding:8px;border:1px solid #000;text-align:right'>{money(line.gross_amount)}</td>"
|
|
f"</tr>"
|
|
)
|
|
for line in lines
|
|
]
|
|
) or (
|
|
"<tr>"
|
|
"<td colspan='6' style='padding:12px;border:1px solid #000;text-align:center'>Brak pozycji na fakturze.</td>"
|
|
"</tr>"
|
|
)
|
|
|
|
nfz_rows = ""
|
|
if nfz_meta:
|
|
nfz_pairs = [
|
|
("Oddział NFZ (IDWew)", nfz_meta.get("recipient_branch_id")),
|
|
("Nazwa oddziału", nfz_meta.get("recipient_branch_name")),
|
|
("Okres rozliczeniowy od", nfz_meta.get("settlement_from")),
|
|
("Okres rozliczeniowy do", nfz_meta.get("settlement_to")),
|
|
("Identyfikator świadczeniodawcy", nfz_meta.get("provider_identifier")),
|
|
("Kod zakresu / świadczenia", nfz_meta.get("service_code")),
|
|
("Numer umowy / aneksu", nfz_meta.get("contract_number")),
|
|
("Identyfikator szablonu", nfz_meta.get("template_identifier")),
|
|
("Schemat", nfz_meta.get("nfz_schema", "FA(3)")),
|
|
]
|
|
nfz_rows = "".join(
|
|
[
|
|
f"<tr><td style='padding:8px;border:1px solid #000;width:38%'>{esc(label)}</td><td style='padding:8px;border:1px solid #000'>{esc(value)}</td></tr>"
|
|
for label, value in nfz_pairs
|
|
]
|
|
)
|
|
nfz_rows = (
|
|
"<div style='margin:18px 0 10px;font-weight:700'>Dane NFZ</div>"
|
|
"<table style='width:100%;border-collapse:collapse;margin-bottom:18px'>"
|
|
"<thead><tr><th style='padding:8px;border:1px solid #000;text-align:left'>Pole NFZ</th><th style='padding:8px;border:1px solid #000;text-align:left'>Wartość</th></tr></thead>"
|
|
f"<tbody>{nfz_rows}</tbody></table>"
|
|
)
|
|
|
|
buyer_email_html = f"<br>E-mail: {buyer_email}" if buyer_email not in {"", "—"} else ""
|
|
split_payment_html = "<div style='margin:12px 0;padding:10px;border:1px solid #000;font-weight:700'>Mechanizm podzielonej płatności</div>" if getattr(invoice, 'split_payment', False) else ""
|
|
payment_details = self.resolve_payment_details(invoice)
|
|
seller_bank_account = esc(self._resolve_seller_bank_account(invoice))
|
|
payment_form_html = f"<br>Forma płatności: {esc(payment_details.get('payment_form_label'))}" if payment_details.get('payment_form_label') else ''
|
|
seller_bank_account_html = f"<br>Rachunek bankowy: {seller_bank_account}" if seller_bank_account not in {'', '—'} else ''
|
|
seller_bank_name_html = f"<br>Bank: {esc(payment_details.get('bank_name'))}" if payment_details.get('bank_name') else ''
|
|
|
|
return (
|
|
f"<div style='font-family:Helvetica,Arial,sans-serif;color:#000;background:#fff;padding:8px'>"
|
|
f"<div style='font-size:24px;font-weight:700;margin-bottom:10px'>{invoice_kind}</div>"
|
|
f"<table style='width:100%;border-collapse:collapse;margin-bottom:16px'><tr>"
|
|
f"<td style='width:50%;padding:8px;border:1px solid #000;vertical-align:top'><strong>Numer faktury:</strong> {esc(invoice.invoice_number)}<br><strong>Data wystawienia:</strong> {esc(invoice.issue_date)}<br><strong>Waluta:</strong> {esc(invoice.currency or 'PLN')}</td>"
|
|
f"<td style='width:50%;padding:8px;border:1px solid #000;vertical-align:top'><strong>Numer KSeF:</strong> {esc(invoice.ksef_number)}<br><strong>Status:</strong> {esc(invoice.issued_status)}<br><strong>Typ źródła:</strong> {esc(invoice.source)}</td>"
|
|
f"</tr></table>"
|
|
f"<table style='width:100%;border-collapse:collapse;margin-bottom:16px'><tr>"
|
|
f"<td style='width:50%;padding:8px;border:1px solid #000;vertical-align:top'><strong>Sprzedawca</strong><br>{seller_name}<br>NIP: {seller_tax_id}<br>Adres: {seller_address}{payment_form_html}{seller_bank_account_html}{seller_bank_name_html}</td>"
|
|
f"<td style='width:50%;padding:8px;border:1px solid #000;vertical-align:top'><strong>Nabywca</strong><br>{buyer_name}<br>NIP: {buyer_tax_id}<br>Adres: {buyer_address}{buyer_email_html}</td>"
|
|
f"</tr></table>"
|
|
f"{nfz_rows}"
|
|
f"{split_payment_html}"
|
|
"<div style='margin:18px 0 10px;font-weight:700'>Pozycje faktury</div>"
|
|
"<table style='width:100%;border-collapse:collapse'>"
|
|
"<thead>"
|
|
"<tr>"
|
|
"<th style='padding:8px;border:1px solid #000;text-align:left'>Pozycja</th>"
|
|
"<th style='padding:8px;border:1px solid #000;text-align:right'>Ilość</th>"
|
|
"<th style='padding:8px;border:1px solid #000;text-align:right'>JM</th>"
|
|
"<th style='padding:8px;border:1px solid #000;text-align:right'>VAT</th>"
|
|
"<th style='padding:8px;border:1px solid #000;text-align:right'>Netto</th>"
|
|
"<th style='padding:8px;border:1px solid #000;text-align:right'>Brutto</th>"
|
|
"</tr>"
|
|
"</thead>"
|
|
f"<tbody>{lines_html}</tbody></table>"
|
|
"<table style='width:52%;border-collapse:collapse;margin-left:auto;margin-top:16px'>"
|
|
f"<tr><td style='padding:8px;border:1px solid #000'>Netto</td><td style='padding:8px;border:1px solid #000;text-align:right'>{money(invoice.net_amount)}</td></tr>"
|
|
f"<tr><td style='padding:8px;border:1px solid #000'>VAT</td><td style='padding:8px;border:1px solid #000;text-align:right'>{money(invoice.vat_amount)}</td></tr>"
|
|
f"<tr><td style='padding:8px;border:1px solid #000'><strong>Razem brutto</strong></td><td style='padding:8px;border:1px solid #000;text-align:right'><strong>{money(invoice.gross_amount)}</strong></td></tr>"
|
|
"</table>"
|
|
f"<div style='margin-top:16px;font-size:11px'>{'Dokument zawiera pola wymagane dla rozliczeń NFZ i został przygotowany do wysyłki w schemacie FA(3).' if nfz_meta else 'Dokument wygenerowany przez KSeF Manager.'}</div>"
|
|
"</div>"
|
|
)
|
|
|
|
def monthly_groups(self, company_id=None):
|
|
groups = []
|
|
for row in self.repo.monthly_summary(company_id):
|
|
year = int(row.year)
|
|
month = int(row.month)
|
|
entries = self.repo.base_query(company_id).filter(Invoice.issue_date >= datetime(year, month, 1).date())
|
|
if month == 12:
|
|
entries = entries.filter(Invoice.issue_date < datetime(year + 1, 1, 1).date())
|
|
else:
|
|
entries = entries.filter(Invoice.issue_date < datetime(year, month + 1, 1).date())
|
|
groups.append(
|
|
{
|
|
"key": f"{year}-{month:02d}",
|
|
"year": year,
|
|
"month": month,
|
|
"count": int(row.count or 0),
|
|
"net": row.net or Decimal("0"),
|
|
"vat": row.vat or Decimal("0"),
|
|
"gross": row.gross or Decimal("0"),
|
|
"entries": entries.order_by(Invoice.issue_date.desc(), Invoice.id.desc()).all(),
|
|
}
|
|
)
|
|
return groups
|
|
|
|
def grouped_summary(self, company_id=None, *, period="month", search=None):
|
|
rows = self.repo.summary_query(company_id, period=period, search=search)
|
|
grouped_entries = defaultdict(list)
|
|
entry_query = self.repo.base_query(company_id)
|
|
if search:
|
|
like = f"%{search}%"
|
|
entry_query = entry_query.filter(
|
|
or_(
|
|
Invoice.invoice_number.ilike(like),
|
|
Invoice.ksef_number.ilike(like),
|
|
Invoice.contractor_name.ilike(like),
|
|
Invoice.contractor_nip.ilike(like),
|
|
)
|
|
)
|
|
for invoice in entry_query.order_by(Invoice.issue_date.desc(), Invoice.id.desc()).all():
|
|
if period == "year":
|
|
key = f"{invoice.issue_date.year}"
|
|
elif period == "quarter":
|
|
quarter = ((invoice.issue_date.month - 1) // 3) + 1
|
|
key = f"{invoice.issue_date.year}-Q{quarter}"
|
|
else:
|
|
key = f"{invoice.issue_date.year}-{invoice.issue_date.month:02d}"
|
|
grouped_entries[key].append(invoice)
|
|
groups = []
|
|
for row in rows:
|
|
year = int(row.year)
|
|
month = int(getattr(row, "month", 0) or 0)
|
|
quarter = int(getattr(row, "quarter", 0) or 0)
|
|
if period == "year":
|
|
key = f"{year}"
|
|
label = f"Rok {year}"
|
|
elif period == "quarter":
|
|
key = f"{year}-Q{quarter}"
|
|
label = f"Q{quarter} {year}"
|
|
else:
|
|
key = f"{year}-{month:02d}"
|
|
label = f"{year}-{month:02d}"
|
|
groups.append(
|
|
{
|
|
"key": key,
|
|
"label": label,
|
|
"year": year,
|
|
"month": month,
|
|
"quarter": quarter,
|
|
"count": int(row.count or 0),
|
|
"net": row.net or Decimal("0"),
|
|
"vat": row.vat or Decimal("0"),
|
|
"gross": row.gross or Decimal("0"),
|
|
"entries": grouped_entries.get(key, []),
|
|
}
|
|
)
|
|
return groups
|
|
|
|
def comparative_stats(self, company_id=None, *, search=None):
|
|
rows = self.repo.summary_query(company_id, period="year", search=search)
|
|
stats = []
|
|
previous = None
|
|
for row in rows:
|
|
year = int(row.year)
|
|
gross = row.gross or Decimal("0")
|
|
net = row.net or Decimal("0")
|
|
count = int(row.count or 0)
|
|
delta = None
|
|
if previous is not None:
|
|
delta = gross - previous["gross"]
|
|
stats.append({"year": year, "gross": gross, "net": net, "count": count, "delta": delta})
|
|
previous = {"gross": gross}
|
|
return stats
|
|
|
|
def next_sale_number(self, company_id, template="monthly"):
|
|
today = self.today_date()
|
|
query = Invoice.query.filter(Invoice.company_id == company_id, Invoice.invoice_type == InvoiceType.SALE)
|
|
if template == "yearly":
|
|
query = query.filter(Invoice.issue_date >= date(today.year, 1, 1))
|
|
prefix = f"FV/{today.year}/"
|
|
elif template == "custom":
|
|
prefix = f"FV/{today.year}/{today.month:02d}/"
|
|
query = query.filter(Invoice.issue_date >= date(today.year, today.month, 1))
|
|
if today.month == 12:
|
|
query = query.filter(Invoice.issue_date < date(today.year + 1, 1, 1))
|
|
else:
|
|
query = query.filter(Invoice.issue_date < date(today.year, today.month + 1, 1))
|
|
else:
|
|
prefix = f"FV/{today.year}/{today.month:02d}/"
|
|
query = query.filter(Invoice.issue_date >= date(today.year, today.month, 1))
|
|
if today.month == 12:
|
|
query = query.filter(Invoice.issue_date < date(today.year + 1, 1, 1))
|
|
else:
|
|
query = query.filter(Invoice.issue_date < date(today.year, today.month + 1, 1))
|
|
next_no = query.count() + 1
|
|
return f"{prefix}{next_no:04d}"
|
|
|
|
def build_ksef_payload(self, invoice):
|
|
lines = invoice.lines.order_by("id").all() if hasattr(invoice.lines, "order_by") else list(invoice.lines)
|
|
nfz_meta = (invoice.external_metadata or {}).get("nfz", {})
|
|
xml_content = self.render_structured_xml(invoice, lines=lines, nfz_meta=nfz_meta)
|
|
payload = {
|
|
"invoiceNumber": invoice.invoice_number,
|
|
"invoiceType": invoice.invoice_type.value if hasattr(invoice.invoice_type, "value") else str(invoice.invoice_type),
|
|
"schemaVersion": nfz_meta.get("nfz_schema", "FA(3)"),
|
|
"customer": {
|
|
"name": invoice.contractor_name,
|
|
"taxId": invoice.contractor_nip,
|
|
},
|
|
"lines": [
|
|
{
|
|
"name": line.description,
|
|
"qty": float(line.quantity),
|
|
"unit": line.unit,
|
|
"unitNet": float(line.unit_net),
|
|
"vatRate": float(line.vat_rate),
|
|
"netAmount": float(line.net_amount),
|
|
"grossAmount": float(line.gross_amount),
|
|
}
|
|
for line in lines
|
|
],
|
|
"metadata": {
|
|
"source": invoice.source,
|
|
"companyId": invoice.company_id,
|
|
"issueDate": invoice.issue_date.isoformat() if invoice.issue_date else "",
|
|
"currency": invoice.currency,
|
|
"nfz": nfz_meta,
|
|
"split_payment": bool(getattr(invoice, 'split_payment', False)),
|
|
},
|
|
"xml_content": xml_content,
|
|
}
|
|
if nfz_meta:
|
|
payload["customer"]["internalBranchId"] = nfz_meta.get("recipient_branch_id", "")
|
|
return payload
|
|
|
|
@staticmethod
|
|
def _xml_decimal(value, places=2):
|
|
quant = Decimal('1').scaleb(-places)
|
|
return format(Decimal(value or 0).quantize(quant), 'f')
|
|
|
|
@staticmethod
|
|
def _split_address_lines(address):
|
|
raw = (address or '').strip()
|
|
if not raw:
|
|
return '', ''
|
|
parts = [part.strip() for part in raw.replace('\n', ',').split(',') if part.strip()]
|
|
if len(parts) <= 1:
|
|
return raw[:512], ''
|
|
line1 = ', '.join(parts[:2])[:512]
|
|
line2 = ', '.join(parts[2:])[:512]
|
|
return line1, line2
|
|
|
|
|
|
@staticmethod
|
|
def _normalize_bank_account(value):
|
|
raw = ''.join(str(value or '').split())
|
|
return raw
|
|
|
|
def resolve_payment_details(self, invoice):
|
|
external_metadata = self._normalize_metadata_container(getattr(invoice, 'external_metadata', {}) or {})
|
|
details = {
|
|
'payment_form_code': self._first_non_empty(external_metadata.get('payment_form_code')),
|
|
'payment_form_label': self._first_non_empty(external_metadata.get('payment_form_label')),
|
|
'bank_account': self._normalize_bank_account(
|
|
self._first_non_empty(
|
|
getattr(invoice, 'seller_bank_account', ''),
|
|
external_metadata.get('seller_bank_account'),
|
|
)
|
|
),
|
|
'bank_name': self._first_non_empty(external_metadata.get('seller_bank_name')),
|
|
'payment_due_date': self._first_non_empty(external_metadata.get('payment_due_date')),
|
|
}
|
|
if getattr(invoice, 'xml_path', None) and (not details['bank_account'] or not details['payment_form_code'] or not details['bank_name'] or not details['payment_due_date']):
|
|
try:
|
|
xml_content = Path(invoice.xml_path).read_text(encoding='utf-8')
|
|
parsed = self.extract_payment_details_from_xml(xml_content)
|
|
for key, value in parsed.items():
|
|
if value and not details.get(key):
|
|
details[key] = value
|
|
except Exception:
|
|
pass
|
|
return details
|
|
|
|
def _resolve_seller_bank_account(self, invoice):
|
|
details = self.resolve_payment_details(invoice)
|
|
account = self._normalize_bank_account(details.get('bank_account', ''))
|
|
if account:
|
|
return account
|
|
if getattr(invoice, 'invoice_type', None) == InvoiceType.PURCHASE:
|
|
return ''
|
|
company = getattr(invoice, 'company', None)
|
|
return self._normalize_bank_account(getattr(company, 'bank_account', '') if company else '')
|
|
|
|
@staticmethod
|
|
def _safe_tax_id(value):
|
|
digits = ''.join(ch for ch in str(value or '') if ch.isdigit())
|
|
return digits
|
|
|
|
def _append_xml_text(self, parent, tag, value):
|
|
if value is None:
|
|
return None
|
|
text = str(value).strip()
|
|
if not text:
|
|
return None
|
|
node = ET.SubElement(parent, tag)
|
|
node.text = text
|
|
return node
|
|
|
|
def _append_address_node(self, parent, address, *, country_code='PL'):
|
|
line1, line2 = self._split_address_lines(address)
|
|
if not line1 and not line2:
|
|
return None
|
|
node = ET.SubElement(parent, 'Adres')
|
|
ET.SubElement(node, 'KodKraju').text = country_code or 'PL'
|
|
if line1:
|
|
ET.SubElement(node, 'AdresL1').text = line1
|
|
if line2:
|
|
ET.SubElement(node, 'AdresL2').text = line2
|
|
return node
|
|
|
|
|
|
def _append_party_section(self, root, tag_name, *, name, tax_id, address, country_code='PL'):
|
|
party = ET.SubElement(root, tag_name)
|
|
ident = ET.SubElement(party, 'DaneIdentyfikacyjne')
|
|
tax_digits = self._safe_tax_id(tax_id)
|
|
if tax_digits:
|
|
ET.SubElement(ident, 'NIP').text = tax_digits
|
|
else:
|
|
ET.SubElement(ident, 'BrakID').text = '1'
|
|
ET.SubElement(ident, 'Nazwa').text = (name or 'Brak nazwy').strip()
|
|
self._append_address_node(party, address, country_code=country_code)
|
|
return party
|
|
|
|
def _append_tax_summary(self, fa_node, lines):
|
|
vat_groups = {
|
|
'23': {'net': Decimal('0'), 'vat': Decimal('0'), 'p13': 'P_13_1', 'p14': 'P_14_1'},
|
|
'22': {'net': Decimal('0'), 'vat': Decimal('0'), 'p13': 'P_13_1', 'p14': 'P_14_1'},
|
|
'8': {'net': Decimal('0'), 'vat': Decimal('0'), 'p13': 'P_13_2', 'p14': 'P_14_2'},
|
|
'7': {'net': Decimal('0'), 'vat': Decimal('0'), 'p13': 'P_13_2', 'p14': 'P_14_2'},
|
|
'5': {'net': Decimal('0'), 'vat': Decimal('0'), 'p13': 'P_13_3', 'p14': 'P_14_3'},
|
|
}
|
|
for line in lines:
|
|
rate = Decimal(line.vat_rate or 0).normalize()
|
|
rate_key = format(rate, 'f').rstrip('0').rstrip('.') if '.' in format(rate, 'f') else format(rate, 'f')
|
|
group = vat_groups.get(rate_key)
|
|
if not group:
|
|
group = vat_groups.get('23') if Decimal(line.vat_amount or 0) > 0 else None
|
|
if not group:
|
|
continue
|
|
group['net'] += Decimal(line.net_amount or 0)
|
|
group['vat'] += Decimal(line.vat_amount or 0)
|
|
|
|
for cfg in vat_groups.values():
|
|
if cfg['net']:
|
|
ET.SubElement(fa_node, cfg['p13']).text = self._xml_decimal(cfg['net'])
|
|
ET.SubElement(fa_node, cfg['p14']).text = self._xml_decimal(cfg['vat'])
|
|
|
|
def _append_adnotations(self, fa_node, *, split_payment=False):
|
|
adnotacje = ET.SubElement(fa_node, 'Adnotacje')
|
|
ET.SubElement(adnotacje, 'P_16').text = '2'
|
|
ET.SubElement(adnotacje, 'P_17').text = '2'
|
|
ET.SubElement(adnotacje, 'P_18').text = '2'
|
|
ET.SubElement(adnotacje, 'P_18A').text = '1' if split_payment else '2'
|
|
zw = ET.SubElement(adnotacje, 'Zwolnienie')
|
|
ET.SubElement(zw, 'P_19N').text = '1'
|
|
ET.SubElement(adnotacje, 'P_23').text = '2'
|
|
return adnotacje
|
|
|
|
|
|
def _append_payment_details(self, fa_node, invoice):
|
|
bank_account = self._resolve_seller_bank_account(invoice)
|
|
if not bank_account:
|
|
return None
|
|
external_metadata = self._normalize_metadata_container(getattr(invoice, 'external_metadata', {}) or {})
|
|
bank_name = self._first_non_empty(external_metadata.get('seller_bank_name'))
|
|
platnosc = ET.SubElement(fa_node, 'Platnosc')
|
|
ET.SubElement(platnosc, 'FormaPlatnosci').text = '6'
|
|
rachunek = ET.SubElement(platnosc, 'RachunekBankowy')
|
|
ET.SubElement(rachunek, 'NrRB').text = bank_account
|
|
if bank_name:
|
|
ET.SubElement(rachunek, 'NazwaBanku').text = bank_name
|
|
return platnosc
|
|
|
|
def _append_nfz_extra_description(self, fa_node, nfz_meta):
|
|
mapping = [
|
|
('IDWew', nfz_meta.get('recipient_branch_id', '')),
|
|
('P_6_Od', nfz_meta.get('settlement_from', '')),
|
|
('P_6_Do', nfz_meta.get('settlement_to', '')),
|
|
('identyfikator-swiadczeniodawcy', nfz_meta.get('provider_identifier', '')),
|
|
('Indeks', nfz_meta.get('service_code', '')),
|
|
('NrUmowy', nfz_meta.get('contract_number', '')),
|
|
('identyfikator-szablonu', nfz_meta.get('template_identifier', '')),
|
|
]
|
|
for key, value in mapping:
|
|
if not str(value or '').strip():
|
|
continue
|
|
node = ET.SubElement(fa_node, 'DodatkowyOpis')
|
|
ET.SubElement(node, 'Klucz').text = str(key)
|
|
ET.SubElement(node, 'Wartosc').text = str(value)
|
|
|
|
def render_structured_xml(self, invoice, *, lines=None, nfz_meta=None):
|
|
lines = lines if lines is not None else (invoice.lines.order_by('id').all() if hasattr(invoice.lines, 'order_by') else list(invoice.lines))
|
|
nfz_meta = nfz_meta or (invoice.external_metadata or {}).get('nfz', {})
|
|
seller = invoice.company or CompanyService.get_current_company()
|
|
issue_date = invoice.issue_date.isoformat() if invoice.issue_date else self.today_date().isoformat()
|
|
created_at = self.utcnow().replace(microsecond=0).isoformat() + 'Z'
|
|
currency = (invoice.currency or 'PLN').strip() or 'PLN'
|
|
schema_code = str(nfz_meta.get('nfz_schema', 'FA(3)') or 'FA(3)')
|
|
schema_ns = 'http://crd.gov.pl/wzor/2025/06/25/13775/' if schema_code == 'FA(3)' else f'https://ksef.mf.gov.pl/schemat/faktura/{schema_code}'
|
|
|
|
ET.register_namespace('', schema_ns)
|
|
root = ET.Element('Faktura', {'xmlns': schema_ns})
|
|
|
|
naglowek = ET.SubElement(root, 'Naglowek')
|
|
kod_formularza = ET.SubElement(naglowek, 'KodFormularza', {'kodSystemowy': 'FA (3)', 'wersjaSchemy': '1-0E'})
|
|
kod_formularza.text = 'FA'
|
|
ET.SubElement(naglowek, 'WariantFormularza').text = '3'
|
|
ET.SubElement(naglowek, 'DataWytworzeniaFa').text = created_at
|
|
ET.SubElement(naglowek, 'SystemInfo').text = 'KSeF Manager'
|
|
|
|
self._append_party_section(
|
|
root,
|
|
'Podmiot1',
|
|
name=getattr(seller, 'name', '') or '',
|
|
tax_id=getattr(seller, 'tax_id', '') or '',
|
|
address=getattr(seller, 'address', '') or '',
|
|
)
|
|
self._append_party_section(
|
|
root,
|
|
'Podmiot2',
|
|
name=invoice.contractor_name or '',
|
|
tax_id=invoice.contractor_nip or '',
|
|
address=getattr(invoice, 'contractor_address', '') or '',
|
|
)
|
|
|
|
if nfz_meta.get('recipient_branch_id'):
|
|
podmiot3 = ET.SubElement(root, 'Podmiot3')
|
|
ident = ET.SubElement(podmiot3, 'DaneIdentyfikacyjne')
|
|
ET.SubElement(ident, 'IDWew').text = str(nfz_meta.get('recipient_branch_id'))
|
|
ET.SubElement(podmiot3, 'Rola').text = '7'
|
|
|
|
fa = ET.SubElement(root, 'Fa')
|
|
ET.SubElement(fa, 'KodWaluty').text = currency
|
|
ET.SubElement(fa, 'P_1').text = issue_date
|
|
self._append_xml_text(fa, 'P_1M', nfz_meta.get('issue_place'))
|
|
ET.SubElement(fa, 'P_2').text = (invoice.invoice_number or '').strip()
|
|
ET.SubElement(fa, 'P_6').text = issue_date
|
|
self._append_xml_text(fa, 'P_6_Od', nfz_meta.get('settlement_from'))
|
|
self._append_xml_text(fa, 'P_6_Do', nfz_meta.get('settlement_to'))
|
|
|
|
self._append_tax_summary(fa, lines)
|
|
ET.SubElement(fa, 'P_15').text = self._xml_decimal(invoice.gross_amount)
|
|
self._append_adnotations(fa, split_payment=bool(getattr(invoice, 'split_payment', False)))
|
|
self._append_payment_details(fa, invoice)
|
|
self._append_nfz_extra_description(fa, nfz_meta)
|
|
|
|
for idx, line in enumerate(lines, start=1):
|
|
row = ET.SubElement(fa, 'FaWiersz')
|
|
ET.SubElement(row, 'NrWierszaFa').text = str(idx)
|
|
if nfz_meta.get('service_date'):
|
|
ET.SubElement(row, 'P_6A').text = str(nfz_meta.get('service_date'))
|
|
ET.SubElement(row, 'P_7').text = str(line.description or '')
|
|
ET.SubElement(row, 'P_8A').text = str(line.unit or 'szt.')
|
|
ET.SubElement(row, 'P_8B').text = self._xml_decimal(line.quantity, places=6).rstrip('0').rstrip('.') or '0'
|
|
ET.SubElement(row, 'P_9A').text = self._xml_decimal(line.unit_net, places=8).rstrip('0').rstrip('.') or '0'
|
|
ET.SubElement(row, 'P_11').text = self._xml_decimal(line.net_amount)
|
|
ET.SubElement(row, 'P_12').text = self._xml_decimal(line.vat_rate, places=2).rstrip('0').rstrip('.') or '0'
|
|
if Decimal(line.vat_amount or 0):
|
|
ET.SubElement(row, 'P_12Z').text = self._xml_decimal(line.vat_amount)
|
|
|
|
stopka_parts = []
|
|
if getattr(invoice, 'split_payment', False):
|
|
stopka_parts.append('Mechanizm podzielonej płatności.')
|
|
seller_bank_account = self._resolve_seller_bank_account(invoice)
|
|
if seller_bank_account:
|
|
stopka_parts.append(f'Rachunek bankowy: {seller_bank_account}')
|
|
if nfz_meta.get('contract_number'):
|
|
stopka_parts.append(f"NFZ umowa: {nfz_meta.get('contract_number')}")
|
|
if stopka_parts:
|
|
stopka = ET.SubElement(root, 'Stopka')
|
|
info = ET.SubElement(stopka, 'Informacje')
|
|
ET.SubElement(info, 'StopkaFaktury').text = ' '.join(stopka_parts)
|
|
|
|
return ET.tostring(root, encoding='utf-8', xml_declaration=True).decode('utf-8')
|
|
|
|
def persist_issued_assets(self, invoice, xml_content=None):
|
|
db.session.flush()
|
|
invoice.html_preview = self.render_invoice_html(invoice)
|
|
xml_content = xml_content or self.build_ksef_payload(invoice).get("xml_content")
|
|
if xml_content:
|
|
invoice.source_hash = KSeFService.calc_hash(xml_content)
|
|
invoice.xml_path = self._save_xml(invoice.company_id, invoice.ksef_number or invoice.invoice_number, xml_content)
|
|
PdfService = __import__("app.services.pdf_service", fromlist=["PdfService"]).PdfService
|
|
PdfService().render_invoice_pdf(invoice)
|
|
return xml_content
|
|
|
|
@staticmethod
|
|
def invoice_locked(invoice):
|
|
return bool(
|
|
getattr(invoice, "issued_to_ksef_at", None)
|
|
or getattr(invoice, "issued_status", "") in {"issued", "issued_mock"}
|
|
)
|
|
|
|
@staticmethod
|
|
def today_date():
|
|
return date.today()
|
|
|
|
@staticmethod
|
|
def utcnow():
|
|
return datetime.utcnow()
|
|
|
|
@staticmethod
|
|
def period_title(period):
|
|
return InvoiceService.PERIOD_LABELS.get(period, InvoiceService.PERIOD_LABELS["month"]) |