Files
ksef_app/app/services/pdf_service.py
Mateusz Gruszczyński 35571df778 push
2026-03-13 11:03:13 +01:00

609 lines
26 KiB
Python

from __future__ import annotations
from decimal import Decimal
from io import BytesIO
from pathlib import Path
import xml.etree.ElementTree as ET
from flask import current_app
from reportlab.lib import colors
from reportlab.lib.pagesizes import A4
from reportlab.lib.styles import ParagraphStyle, getSampleStyleSheet
from reportlab.lib.units import mm
from reportlab.pdfbase import pdfmetrics
from reportlab.pdfbase.ttfonts import TTFont
from reportlab.platypus import Paragraph, SimpleDocTemplate, Spacer, Table, TableStyle
from app.models.invoice import InvoiceType
class PdfService:
def __init__(self):
self.font_name = self._register_font()
def _register_font(self):
candidates = [
"/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf",
"/usr/share/fonts/dejavu/DejaVuSans.ttf",
"/usr/share/fonts/TTF/DejaVuSans.ttf",
]
for path in candidates:
if Path(path).exists():
pdfmetrics.registerFont(TTFont("AppUnicode", path))
return "AppUnicode"
return "Helvetica"
def _styles(self):
styles = getSampleStyleSheet()
base = self.font_name
styles["Normal"].fontName = base
styles["Normal"].fontSize = 9.5
styles["Normal"].leading = 12
styles.add(ParagraphStyle(name="DocTitle", fontName=base, fontSize=18, leading=22, spaceAfter=4))
styles.add(ParagraphStyle(name="SectionTitle", fontName=base, fontSize=10, leading=12, spaceAfter=4))
styles.add(ParagraphStyle(name="Small", fontName=base, fontSize=8, leading=10))
styles.add(ParagraphStyle(name="Right", fontName=base, fontSize=9.5, leading=12, alignment=2))
return styles
@staticmethod
def _money(value, currency="PLN") -> str:
return f"{Decimal(value):,.2f} {currency or 'PLN'}".replace(",", " ").replace(".", ",")
def _safe(self, value) -> str:
return str(value or "-").replace("&", "&amp;").replace("<", "&lt;").replace(">", "&gt;")
def _table_base_style(self, extra=None):
styles = [
("FONTNAME", (0, 0), (-1, -1), self.font_name),
("BOX", (0, 0), (-1, -1), 0.7, colors.black),
("INNERGRID", (0, 0), (-1, -1), 0.5, colors.black),
("LEFTPADDING", (0, 0), (-1, -1), 6),
("RIGHTPADDING", (0, 0), (-1, -1), 6),
("TOPPADDING", (0, 0), (-1, -1), 6),
("BOTTOMPADDING", (0, 0), (-1, -1), 6),
("VALIGN", (0, 0), (-1, -1), "MIDDLE"),
]
if extra:
styles.extend(extra)
return TableStyle(styles)
def _extract_lines_from_xml(self, xml_path):
def to_decimal(value, default='0'):
raw = str(value or '').strip()
if not raw:
return Decimal(default)
raw = raw.replace(' ', '').replace(',', '.')
try:
return Decimal(raw)
except Exception:
return Decimal(default)
def to_vat_rate(value):
raw = str(value or '').strip().lower()
if not raw:
return Decimal('0')
raw = raw.replace('%', '').replace(' ', '').replace(',', '.')
try:
return Decimal(raw)
except Exception:
return Decimal('0')
def looks_numeric(value):
raw = str(value or '').strip().replace(' ', '').replace(',', '.')
if not raw:
return False
try:
Decimal(raw)
return True
except Exception:
return False
if not xml_path:
return []
try:
xml_content = Path(xml_path).read_text(encoding='utf-8')
root = ET.fromstring(xml_content)
namespace_uri = root.tag.split('}')[0].strip('{') if '}' in root.tag else ''
ns = {'fa': namespace_uri} if namespace_uri else {}
row_path = './/fa:FaWiersz' if ns else './/FaWiersz'
text_path = lambda name: f'fa:{name}' if ns else name
lines = []
for row in root.findall(row_path, ns):
p_8a = row.findtext(text_path('P_8A'), default='', namespaces=ns)
p_8b = row.findtext(text_path('P_8B'), default='', namespaces=ns)
if looks_numeric(p_8a):
qty_raw = p_8a
unit_raw = p_8b or 'szt.'
else:
unit_raw = p_8a or 'szt.'
qty_raw = p_8b or '1'
net = to_decimal(
row.findtext(text_path('P_11'), default='', namespaces=ns)
or row.findtext(text_path('P_11A'), default='', namespaces=ns)
or '0'
)
vat = to_decimal(
row.findtext(text_path('P_12Z'), default='', namespaces=ns)
or row.findtext(text_path('P_11Vat'), default='', namespaces=ns)
or '0'
)
lines.append({
'description': (row.findtext(text_path('P_7'), default='', namespaces=ns) or '').strip(),
'quantity': to_decimal(qty_raw, '1'),
'unit': (unit_raw or 'szt.').strip(),
'unit_net': to_decimal(
row.findtext(text_path('P_9A'), default='', namespaces=ns)
or row.findtext(text_path('P_9B'), default='', namespaces=ns)
or '0'
),
'vat_rate': to_vat_rate(row.findtext(text_path('P_12'), default='0', namespaces=ns)),
'net_amount': net,
'vat_amount': vat,
'gross_amount': net + vat,
})
return lines
except Exception as exc:
current_app.logger.warning(f'PDF XML line parse error: {exc}')
return []
@staticmethod
def _first_non_empty(*values, default=""):
for value in values:
if value is None:
continue
if isinstance(value, str):
if value.strip():
return value.strip()
continue
text = str(value).strip()
if text:
return text
return default
@staticmethod
def _normalize_metadata_container(value):
return value if isinstance(value, dict) else {}
def _get_external_metadata(self, invoice):
return self._normalize_metadata_container(getattr(invoice, "external_metadata", {}) or {})
def _get_ksef_metadata(self, invoice):
external_metadata = self._get_external_metadata(invoice)
ksef_meta = self._normalize_metadata_container(external_metadata.get("ksef", {}) or {})
return ksef_meta
def _resolve_purchase_seller_data(self, invoice, fallback_name, fallback_tax):
external_metadata = self._get_external_metadata(invoice)
ksef_meta = self._get_ksef_metadata(invoice)
seller_name = self._first_non_empty(
getattr(invoice, "contractor_name", ""),
external_metadata.get("contractor_name"),
ksef_meta.get("contractor_name"),
fallback_name,
)
seller_tax = self._first_non_empty(
getattr(invoice, "contractor_nip", ""),
external_metadata.get("contractor_nip"),
ksef_meta.get("contractor_nip"),
fallback_tax,
)
seller_street = self._first_non_empty(
external_metadata.get("contractor_street"),
ksef_meta.get("contractor_street"),
external_metadata.get("seller_street"),
ksef_meta.get("seller_street"),
)
seller_city = self._first_non_empty(
external_metadata.get("contractor_city"),
ksef_meta.get("contractor_city"),
external_metadata.get("seller_city"),
ksef_meta.get("seller_city"),
)
seller_postal_code = self._first_non_empty(
external_metadata.get("contractor_postal_code"),
ksef_meta.get("contractor_postal_code"),
external_metadata.get("seller_postal_code"),
ksef_meta.get("seller_postal_code"),
)
seller_country = self._first_non_empty(
external_metadata.get("contractor_country"),
ksef_meta.get("contractor_country"),
external_metadata.get("seller_country"),
ksef_meta.get("seller_country"),
)
seller_address = self._first_non_empty(
external_metadata.get("contractor_address"),
ksef_meta.get("contractor_address"),
external_metadata.get("seller_address"),
ksef_meta.get("seller_address"),
getattr(invoice, "contractor_address", ""),
)
if not seller_address:
address_parts = [part for part in [seller_street, seller_postal_code, seller_city, seller_country] if part]
seller_address = ", ".join(address_parts)
return {
"name": seller_name,
"tax": seller_tax,
"address": seller_address,
}
@staticmethod
def _normalize_bank_account(value):
return ''.join(str(value or '').split())
def _extract_payment_details_from_xml(self, xml_content):
details = {'payment_form_code': '', 'payment_form_label': '', 'bank_account': '', 'bank_name': '', 'payment_due_date': ''}
if not xml_content:
return details
try:
root = ET.fromstring(xml_content)
namespace_uri = root.tag.split('}')[0].strip('{') if '}' in root.tag else ''
ns = {'fa': namespace_uri} if namespace_uri else {}
def find_text(path):
return (root.findtext(path, default='', namespaces=ns) or '').strip()
form_code = find_text('.//fa:Platnosc/fa:FormaPlatnosci' if ns else './/Platnosc/FormaPlatnosci')
bank_account = find_text('.//fa:Platnosc/fa:RachunekBankowy/fa:NrRB' if ns else './/Platnosc/RachunekBankowy/NrRB')
bank_name = find_text('.//fa:Platnosc/fa:RachunekBankowy/fa:NazwaBanku' if ns else './/Platnosc/RachunekBankowy/NazwaBanku')
payment_due_date = find_text('.//fa:Platnosc/fa:TerminPlatnosci/fa:Termin' if ns else './/Platnosc/TerminPlatnosci/Termin')
labels = {'1': 'gotówka', '2': 'karta', '3': 'bon', '4': 'czek', '5': 'weksel', '6': 'przelew', '7': 'kompensata', '8': 'pobranie', '9': 'akredytywa', '10': 'polecenie zapłaty', '11': 'inny'}
details.update({'payment_form_code': form_code, 'payment_form_label': labels.get(form_code, form_code), 'bank_account': self._normalize_bank_account(bank_account), 'bank_name': bank_name, 'payment_due_date': payment_due_date})
except Exception:
pass
return details
def _resolve_payment_details(self, invoice):
external_metadata = self._get_external_metadata(invoice)
details = {
'payment_form_code': self._first_non_empty(external_metadata.get('payment_form_code')),
'payment_form_label': self._first_non_empty(external_metadata.get('payment_form_label')),
'bank_account': self._normalize_bank_account(self._first_non_empty(getattr(invoice, 'seller_bank_account', ''), external_metadata.get('seller_bank_account'))),
'bank_name': self._first_non_empty(external_metadata.get('seller_bank_name')),
'payment_due_date': self._first_non_empty(external_metadata.get('payment_due_date')),
}
if getattr(invoice, 'xml_path', None) and (not details['bank_account'] or not details['payment_form_code']):
try:
xml_content = Path(invoice.xml_path).read_text(encoding='utf-8')
parsed = self._extract_payment_details_from_xml(xml_content)
for key, value in parsed.items():
if value and not details.get(key):
details[key] = value
except Exception:
pass
return details
def _resolve_seller_bank_account(self, invoice):
details = self._resolve_payment_details(invoice)
account = self._normalize_bank_account(details.get('bank_account', ''))
if account:
return account
if getattr(invoice, 'invoice_type', None) == InvoiceType.PURCHASE:
return ''
company = getattr(invoice, 'company', None)
return self._normalize_bank_account(getattr(company, 'bank_account', '') if company else '')
def _build_pdf_filename_stem(self, invoice):
raw = self._first_non_empty(
getattr(invoice, "invoice_number", ""),
getattr(invoice, "ksef_number", ""),
"Faktura",
)
return raw.replace("/", "_")
def _build_pdf_title(self, invoice, invoice_kind, company_name):
return self._first_non_empty(
getattr(invoice, "invoice_number", ""),
getattr(invoice, "ksef_number", ""),
company_name,
invoice_kind,
"Faktura",
)
@staticmethod
def _set_pdf_metadata(canvas, doc, title, author, subject, creator="KSeF Manager"):
try:
canvas.setTitle(title)
except Exception:
pass
try:
canvas.setAuthor(author)
except Exception:
pass
try:
canvas.setSubject(subject)
except Exception:
pass
try:
canvas.setCreator(creator)
except Exception:
pass
def render_invoice_pdf(self, invoice):
buffer = BytesIO()
doc = SimpleDocTemplate(
buffer,
pagesize=A4,
leftMargin=14 * mm,
rightMargin=14 * mm,
topMargin=14 * mm,
bottomMargin=14 * mm,
)
styles = self._styles()
story = []
company_name = self._safe(invoice.company.name if invoice.company else "Twoja firma")
company_tax = self._safe(getattr(invoice.company, "tax_id", "") if invoice.company else "")
company_address = self._safe(getattr(invoice.company, "address", "") if invoice.company else "")
customer = getattr(invoice, "customer", None)
customer_name = self._safe(getattr(customer, "name", invoice.contractor_name))
customer_tax = self._safe(getattr(customer, "tax_id", invoice.contractor_nip))
customer_address = self._safe(getattr(customer, "address", ""))
customer_email = self._safe(getattr(customer, "email", ""))
if invoice.invoice_type == InvoiceType.PURCHASE:
purchase_seller = self._resolve_purchase_seller_data(
invoice=invoice,
fallback_name=invoice.contractor_name,
fallback_tax=invoice.contractor_nip,
)
seller_name = self._safe(purchase_seller["name"])
seller_tax = self._safe(purchase_seller["tax"])
seller_address = self._safe(purchase_seller["address"])
buyer_name = company_name
buyer_tax = company_tax
buyer_address = company_address
buyer_email = ""
else:
seller_name = company_name
seller_tax = company_tax
seller_address = company_address
buyer_name = customer_name
buyer_tax = customer_tax
buyer_address = customer_address
buyer_email = customer_email
nfz_meta = (invoice.external_metadata or {}).get("nfz", {})
invoice_kind = "FAKTURA NFZ" if invoice.source == "nfz" else "FAKTURA VAT"
currency = invoice.currency or "PLN"
pdf_title = self._build_pdf_title(invoice, invoice_kind, company_name)
pdf_author = self._first_non_empty(
invoice.company.name if invoice.company else "",
"KSeF Manager",
)
pdf_subject = self._first_non_empty(
invoice_kind,
getattr(invoice, "source", ""),
"Faktura",
)
story.append(Paragraph(invoice_kind, styles["DocTitle"]))
header = Table(
[
[
Paragraph(
f"<b>Numer faktury:</b> {self._safe(invoice.invoice_number)}<br/>"
f"<b>Data wystawienia:</b> {self._safe(invoice.issue_date)}<br/>"
f"<b>Waluta:</b> {self._safe(currency)}",
styles["Normal"],
),
Paragraph(
f"<b>Numer KSeF:</b> {self._safe(invoice.ksef_number)}<br/>"
f"<b>Status:</b> {self._safe(invoice.issued_status)}<br/>"
f"<b>Typ źródła:</b> {self._safe(invoice.source)}",
styles["Normal"],
),
]
],
colWidths=[88 * mm, 88 * mm],
)
header.setStyle(self._table_base_style())
story.extend([header, Spacer(1, 5 * mm)])
buyer_email_html = f"<br/>E-mail: {buyer_email}" if buyer_email not in {"", "-"} else ""
payment_details = self._resolve_payment_details(invoice)
seller_bank_account = self._safe(self._resolve_seller_bank_account(invoice))
payment_form_html = (
f"<br/>Forma płatności: {self._safe(payment_details.get('payment_form_label'))}"
if payment_details.get('payment_form_label') else ''
)
seller_bank_account_html = (
f"<br/>Rachunek: {seller_bank_account}"
if seller_bank_account not in {"", "-"}
else ""
)
seller_bank_name_html = (
f"<br/>Bank: {self._safe(payment_details.get('bank_name'))}"
if payment_details.get('bank_name') else ''
)
parties = Table(
[
[
Paragraph(
f"<b>Sprzedawca</b><br/>{seller_name}<br/>"
f"NIP: {seller_tax}<br/>"
f"Adres: {seller_address}"
f"{payment_form_html}"
f"{seller_bank_account_html}"
f"{seller_bank_name_html}",
styles["Normal"],
),
Paragraph(
f"<b>Nabywca</b><br/>{buyer_name}<br/>"
f"NIP: {buyer_tax}<br/>"
f"Adres: {buyer_address}"
f"{buyer_email_html}",
styles["Normal"],
),
]
],
colWidths=[88 * mm, 88 * mm],
)
parties.setStyle(self._table_base_style())
story.extend([parties, Spacer(1, 5 * mm)])
if getattr(invoice, 'split_payment', False):
story.extend([Paragraph('Mechanizm podzielonej płatności', styles['SectionTitle']), Spacer(1, 2 * mm)])
if nfz_meta:
nfz_rows = [
[Paragraph("<b>Pole NFZ</b>", styles["Normal"]), Paragraph("<b>Wartość</b>", styles["Normal"])],
[Paragraph("Oddział NFZ (IDWew)", styles["Normal"]), Paragraph(self._safe(nfz_meta.get("recipient_branch_id")), styles["Normal"])],
[Paragraph("Nazwa oddziału", styles["Normal"]), Paragraph(self._safe(nfz_meta.get("recipient_branch_name")), styles["Normal"])],
[Paragraph("Okres rozliczeniowy od", styles["Normal"]), Paragraph(self._safe(nfz_meta.get("settlement_from")), styles["Normal"])],
[Paragraph("Okres rozliczeniowy do", styles["Normal"]), Paragraph(self._safe(nfz_meta.get("settlement_to")), styles["Normal"])],
[Paragraph("Identyfikator świadczeniodawcy", styles["Normal"]), Paragraph(self._safe(nfz_meta.get("provider_identifier")), styles["Normal"])],
[Paragraph("Kod zakresu / świadczenia", styles["Normal"]), Paragraph(self._safe(nfz_meta.get("service_code")), styles["Normal"])],
[Paragraph("Numer umowy / aneksu", styles["Normal"]), Paragraph(self._safe(nfz_meta.get("contract_number")), styles["Normal"])],
[Paragraph("Identyfikator szablonu", styles["Normal"]), Paragraph(self._safe(nfz_meta.get("template_identifier")), styles["Normal"])],
[Paragraph("Schemat", styles["Normal"]), Paragraph(self._safe(nfz_meta.get("nfz_schema", "FA(3)")), styles["Normal"])],
]
nfz_table = Table(nfz_rows, colWidths=[62 * mm, 114 * mm], repeatRows=1)
nfz_table.setStyle(self._table_base_style([("ALIGN", (0, 0), (-1, 0), "CENTER")]))
story.extend([Paragraph("Dane NFZ", styles["SectionTitle"]), nfz_table, Spacer(1, 5 * mm)])
invoice_lines = invoice.lines.order_by("id").all() if hasattr(invoice.lines, "order_by") else list(invoice.lines)
if not invoice_lines:
invoice_lines = [
type("TmpLine", (), line)
for line in self._extract_lines_from_xml(getattr(invoice, "xml_path", None))
]
lines = [[
Paragraph("<b>Pozycja</b>", styles["Normal"]),
Paragraph("<b>Ilość</b>", styles["Normal"]),
Paragraph("<b>JM</b>", styles["Normal"]),
Paragraph("<b>VAT</b>", styles["Normal"]),
Paragraph("<b>Netto</b>", styles["Normal"]),
Paragraph("<b>Brutto</b>", styles["Normal"]),
]]
if invoice_lines:
for line in invoice_lines:
lines.append([
Paragraph(self._safe(line.description), styles["Normal"]),
Paragraph(self._safe(line.quantity), styles["Right"]),
Paragraph(self._safe(line.unit), styles["Right"]),
Paragraph(f"{Decimal(line.vat_rate):.0f}%", styles["Right"]),
Paragraph(self._money(line.net_amount, currency), styles["Right"]),
Paragraph(self._money(line.gross_amount, currency), styles["Right"]),
])
else:
lines.append([
Paragraph("Brak pozycji na fakturze.", styles["Normal"]),
Paragraph("-", styles["Right"]),
Paragraph("-", styles["Right"]),
Paragraph("-", styles["Right"]),
Paragraph("-", styles["Right"]),
Paragraph("-", styles["Right"]),
])
items = Table(
lines,
colWidths=[82 * mm, 18 * mm, 16 * mm, 16 * mm, 28 * mm, 30 * mm],
repeatRows=1,
)
items.setStyle(self._table_base_style([
("ALIGN", (1, 0), (-1, -1), "RIGHT"),
("ALIGN", (0, 0), (0, -1), "LEFT"),
]))
story.extend([Paragraph("Pozycje faktury", styles["SectionTitle"]), items, Spacer(1, 5 * mm)])
summary = Table(
[
[Paragraph("Netto", styles["Normal"]), Paragraph(self._money(invoice.net_amount, currency), styles["Right"])],
[Paragraph("VAT", styles["Normal"]), Paragraph(self._money(invoice.vat_amount, currency), styles["Right"])],
[Paragraph("<b>Razem brutto</b>", styles["Normal"]), Paragraph(f"<b>{self._money(invoice.gross_amount, currency)}</b>", styles["Right"])],
],
colWidths=[48 * mm, 42 * mm],
)
summary.setStyle(self._table_base_style([
("ALIGN", (0, 0), (-1, -1), "RIGHT"),
("ALIGN", (0, 0), (0, -1), "LEFT"),
]))
summary_wrap = Table([["", summary]], colWidths=[86 * mm, 90 * mm])
summary_wrap.setStyle(TableStyle([("VALIGN", (0, 0), (-1, -1), "TOP")]))
story.extend([summary_wrap, Spacer(1, 5 * mm)])
note = (
"Dokument zawiera pola wymagane dla rozliczeń NFZ i został przygotowany do wysyłki w schemacie FA(3)."
if nfz_meta
else "Dokument wygenerowany przez KSeF Manager."
)
story.append(Paragraph(note, styles["Small"]))
def _apply_metadata(canvas, pdf_doc):
self._set_pdf_metadata(
canvas=canvas,
doc=pdf_doc,
title=pdf_title,
author=pdf_author,
subject=pdf_subject,
)
doc.build(story, onFirstPage=_apply_metadata, onLaterPages=_apply_metadata)
pdf_bytes = buffer.getvalue()
path = Path(current_app.config["PDF_PATH"]) / f"{self._build_pdf_filename_stem(invoice)}.pdf"
path.write_bytes(pdf_bytes)
invoice.pdf_path = str(path)
return pdf_bytes, path
def month_pdf(self, entries, title):
buffer = BytesIO()
doc = SimpleDocTemplate(
buffer,
pagesize=A4,
leftMargin=16 * mm,
rightMargin=16 * mm,
topMargin=16 * mm,
bottomMargin=16 * mm,
)
styles = self._styles()
rows = [[Paragraph("<b>Numer</b>", styles["Normal"]), Paragraph("<b>Kontrahent</b>", styles["Normal"]), Paragraph("<b>Brutto</b>", styles["Normal"])]]
for invoice in entries:
rows.append([
Paragraph(str(invoice.invoice_number), styles["Normal"]),
Paragraph(str(invoice.contractor_name), styles["Normal"]),
Paragraph(self._money(invoice.gross_amount, getattr(invoice, "currency", "PLN")), styles["Right"]),
])
table = Table(rows, colWidths=[45 * mm, 95 * mm, 35 * mm], repeatRows=1)
table.setStyle(self._table_base_style([("ALIGN", (2, 0), (2, -1), "RIGHT")]))
pdf_title = self._first_non_empty(title, "Zestawienie faktur")
pdf_author = "KSeF Manager"
pdf_subject = "Miesięczne zestawienie faktur"
def _apply_metadata(canvas, pdf_doc):
self._set_pdf_metadata(
canvas=canvas,
doc=pdf_doc,
title=pdf_title,
author=pdf_author,
subject=pdf_subject,
)
doc.build([Paragraph(title, styles["DocTitle"]), Spacer(1, 4 * mm), table], onFirstPage=_apply_metadata, onLaterPages=_apply_metadata)
return buffer.getvalue()