from __future__ import annotations from decimal import Decimal from io import BytesIO from pathlib import Path import xml.etree.ElementTree as ET from flask import current_app from reportlab.lib import colors from reportlab.lib.pagesizes import A4 from reportlab.lib.styles import ParagraphStyle, getSampleStyleSheet from reportlab.lib.units import mm from reportlab.pdfbase import pdfmetrics from reportlab.pdfbase.ttfonts import TTFont from reportlab.platypus import Paragraph, SimpleDocTemplate, Spacer, Table, TableStyle from app.models.invoice import InvoiceType class PdfService: def __init__(self): self.font_name = self._register_font() def _register_font(self): candidates = [ "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", "/usr/share/fonts/dejavu/DejaVuSans.ttf", "/usr/share/fonts/TTF/DejaVuSans.ttf", ] for path in candidates: if Path(path).exists(): pdfmetrics.registerFont(TTFont("AppUnicode", path)) return "AppUnicode" return "Helvetica" def _styles(self): styles = getSampleStyleSheet() base = self.font_name styles["Normal"].fontName = base styles["Normal"].fontSize = 9.5 styles["Normal"].leading = 12 styles.add(ParagraphStyle(name="DocTitle", fontName=base, fontSize=18, leading=22, spaceAfter=4)) styles.add(ParagraphStyle(name="SectionTitle", fontName=base, fontSize=10, leading=12, spaceAfter=4)) styles.add(ParagraphStyle(name="Small", fontName=base, fontSize=8, leading=10)) styles.add(ParagraphStyle(name="Right", fontName=base, fontSize=9.5, leading=12, alignment=2)) return styles @staticmethod def _money(value, currency="PLN") -> str: return f"{Decimal(value):,.2f} {currency or 'PLN'}".replace(",", " ").replace(".", ",") def _safe(self, value) -> str: return str(value or "-").replace("&", "&").replace("<", "<").replace(">", ">") def _table_base_style(self, extra=None): styles = [ ("FONTNAME", (0, 0), (-1, -1), self.font_name), ("BOX", (0, 0), (-1, -1), 0.7, colors.black), ("INNERGRID", (0, 0), (-1, -1), 0.5, colors.black), ("LEFTPADDING", (0, 0), (-1, -1), 6), ("RIGHTPADDING", (0, 0), (-1, -1), 6), ("TOPPADDING", (0, 0), (-1, -1), 6), ("BOTTOMPADDING", (0, 0), (-1, -1), 6), ("VALIGN", (0, 0), (-1, -1), "MIDDLE"), ] if extra: styles.extend(extra) return TableStyle(styles) def _extract_lines_from_xml(self, xml_path): def to_decimal(value, default='0'): raw = str(value or '').strip() if not raw: return Decimal(default) raw = raw.replace(' ', '').replace(',', '.') try: return Decimal(raw) except Exception: return Decimal(default) def to_vat_rate(value): raw = str(value or '').strip().lower() if not raw: return Decimal('0') raw = raw.replace('%', '').replace(' ', '').replace(',', '.') try: return Decimal(raw) except Exception: return Decimal('0') def looks_numeric(value): raw = str(value or '').strip().replace(' ', '').replace(',', '.') if not raw: return False try: Decimal(raw) return True except Exception: return False if not xml_path: return [] try: xml_content = Path(xml_path).read_text(encoding='utf-8') root = ET.fromstring(xml_content) namespace_uri = root.tag.split('}')[0].strip('{') if '}' in root.tag else '' ns = {'fa': namespace_uri} if namespace_uri else {} row_path = './/fa:FaWiersz' if ns else './/FaWiersz' text_path = lambda name: f'fa:{name}' if ns else name lines = [] for row in root.findall(row_path, ns): p_8a = row.findtext(text_path('P_8A'), default='', namespaces=ns) p_8b = row.findtext(text_path('P_8B'), default='', namespaces=ns) if looks_numeric(p_8a): qty_raw = p_8a unit_raw = p_8b or 'szt.' else: unit_raw = p_8a or 'szt.' qty_raw = p_8b or '1' net = to_decimal( row.findtext(text_path('P_11'), default='', namespaces=ns) or row.findtext(text_path('P_11A'), default='', namespaces=ns) or '0' ) vat = to_decimal( row.findtext(text_path('P_12Z'), default='', namespaces=ns) or row.findtext(text_path('P_11Vat'), default='', namespaces=ns) or '0' ) lines.append({ 'description': (row.findtext(text_path('P_7'), default='', namespaces=ns) or '').strip(), 'quantity': to_decimal(qty_raw, '1'), 'unit': (unit_raw or 'szt.').strip(), 'unit_net': to_decimal( row.findtext(text_path('P_9A'), default='', namespaces=ns) or row.findtext(text_path('P_9B'), default='', namespaces=ns) or '0' ), 'vat_rate': to_vat_rate(row.findtext(text_path('P_12'), default='0', namespaces=ns)), 'net_amount': net, 'vat_amount': vat, 'gross_amount': net + vat, }) return lines except Exception as exc: current_app.logger.warning(f'PDF XML line parse error: {exc}') return [] @staticmethod def _first_non_empty(*values, default=""): for value in values: if value is None: continue if isinstance(value, str): if value.strip(): return value.strip() continue text = str(value).strip() if text: return text return default @staticmethod def _normalize_metadata_container(value): return value if isinstance(value, dict) else {} def _get_external_metadata(self, invoice): return self._normalize_metadata_container(getattr(invoice, "external_metadata", {}) or {}) def _get_ksef_metadata(self, invoice): external_metadata = self._get_external_metadata(invoice) ksef_meta = self._normalize_metadata_container(external_metadata.get("ksef", {}) or {}) return ksef_meta def _resolve_purchase_seller_data(self, invoice, fallback_name, fallback_tax): external_metadata = self._get_external_metadata(invoice) ksef_meta = self._get_ksef_metadata(invoice) seller_name = self._first_non_empty( getattr(invoice, "contractor_name", ""), external_metadata.get("contractor_name"), ksef_meta.get("contractor_name"), fallback_name, ) seller_tax = self._first_non_empty( getattr(invoice, "contractor_nip", ""), external_metadata.get("contractor_nip"), ksef_meta.get("contractor_nip"), fallback_tax, ) seller_street = self._first_non_empty( external_metadata.get("contractor_street"), ksef_meta.get("contractor_street"), external_metadata.get("seller_street"), ksef_meta.get("seller_street"), ) seller_city = self._first_non_empty( external_metadata.get("contractor_city"), ksef_meta.get("contractor_city"), external_metadata.get("seller_city"), ksef_meta.get("seller_city"), ) seller_postal_code = self._first_non_empty( external_metadata.get("contractor_postal_code"), ksef_meta.get("contractor_postal_code"), external_metadata.get("seller_postal_code"), ksef_meta.get("seller_postal_code"), ) seller_country = self._first_non_empty( external_metadata.get("contractor_country"), ksef_meta.get("contractor_country"), external_metadata.get("seller_country"), ksef_meta.get("seller_country"), ) seller_address = self._first_non_empty( external_metadata.get("contractor_address"), ksef_meta.get("contractor_address"), external_metadata.get("seller_address"), ksef_meta.get("seller_address"), getattr(invoice, "contractor_address", ""), ) if not seller_address: address_parts = [part for part in [seller_street, seller_postal_code, seller_city, seller_country] if part] seller_address = ", ".join(address_parts) return { "name": seller_name, "tax": seller_tax, "address": seller_address, } @staticmethod def _normalize_bank_account(value): return ''.join(str(value or '').split()) def _extract_payment_details_from_xml(self, xml_content): details = {'payment_form_code': '', 'payment_form_label': '', 'bank_account': '', 'bank_name': '', 'payment_due_date': ''} if not xml_content: return details try: root = ET.fromstring(xml_content) namespace_uri = root.tag.split('}')[0].strip('{') if '}' in root.tag else '' ns = {'fa': namespace_uri} if namespace_uri else {} def find_text(path): return (root.findtext(path, default='', namespaces=ns) or '').strip() form_code = find_text('.//fa:Platnosc/fa:FormaPlatnosci' if ns else './/Platnosc/FormaPlatnosci') bank_account = find_text('.//fa:Platnosc/fa:RachunekBankowy/fa:NrRB' if ns else './/Platnosc/RachunekBankowy/NrRB') bank_name = find_text('.//fa:Platnosc/fa:RachunekBankowy/fa:NazwaBanku' if ns else './/Platnosc/RachunekBankowy/NazwaBanku') payment_due_date = find_text('.//fa:Platnosc/fa:TerminPlatnosci/fa:Termin' if ns else './/Platnosc/TerminPlatnosci/Termin') labels = {'1': 'gotówka', '2': 'karta', '3': 'bon', '4': 'czek', '5': 'weksel', '6': 'przelew', '7': 'kompensata', '8': 'pobranie', '9': 'akredytywa', '10': 'polecenie zapłaty', '11': 'inny'} details.update({'payment_form_code': form_code, 'payment_form_label': labels.get(form_code, form_code), 'bank_account': self._normalize_bank_account(bank_account), 'bank_name': bank_name, 'payment_due_date': payment_due_date}) except Exception: pass return details def _resolve_payment_details(self, invoice): external_metadata = self._get_external_metadata(invoice) details = { 'payment_form_code': self._first_non_empty(external_metadata.get('payment_form_code')), 'payment_form_label': self._first_non_empty(external_metadata.get('payment_form_label')), 'bank_account': self._normalize_bank_account(self._first_non_empty(getattr(invoice, 'seller_bank_account', ''), external_metadata.get('seller_bank_account'))), 'bank_name': self._first_non_empty(external_metadata.get('seller_bank_name')), 'payment_due_date': self._first_non_empty(external_metadata.get('payment_due_date')), } if getattr(invoice, 'xml_path', None) and (not details['bank_account'] or not details['payment_form_code']): try: xml_content = Path(invoice.xml_path).read_text(encoding='utf-8') parsed = self._extract_payment_details_from_xml(xml_content) for key, value in parsed.items(): if value and not details.get(key): details[key] = value except Exception: pass return details def _resolve_seller_bank_account(self, invoice): details = self._resolve_payment_details(invoice) account = self._normalize_bank_account(details.get('bank_account', '')) if account: return account if getattr(invoice, 'invoice_type', None) == InvoiceType.PURCHASE: return '' company = getattr(invoice, 'company', None) return self._normalize_bank_account(getattr(company, 'bank_account', '') if company else '') def _build_pdf_filename_stem(self, invoice): raw = self._first_non_empty( getattr(invoice, "invoice_number", ""), getattr(invoice, "ksef_number", ""), "Faktura", ) return raw.replace("/", "_") def _build_pdf_title(self, invoice, invoice_kind, company_name): return self._first_non_empty( getattr(invoice, "invoice_number", ""), getattr(invoice, "ksef_number", ""), company_name, invoice_kind, "Faktura", ) @staticmethod def _set_pdf_metadata(canvas, doc, title, author, subject, creator="KSeF Manager"): try: canvas.setTitle(title) except Exception: pass try: canvas.setAuthor(author) except Exception: pass try: canvas.setSubject(subject) except Exception: pass try: canvas.setCreator(creator) except Exception: pass def render_invoice_pdf(self, invoice): buffer = BytesIO() doc = SimpleDocTemplate( buffer, pagesize=A4, leftMargin=14 * mm, rightMargin=14 * mm, topMargin=14 * mm, bottomMargin=14 * mm, ) styles = self._styles() story = [] company_name = self._safe(invoice.company.name if invoice.company else "Twoja firma") company_tax = self._safe(getattr(invoice.company, "tax_id", "") if invoice.company else "") company_address = self._safe(getattr(invoice.company, "address", "") if invoice.company else "") customer = getattr(invoice, "customer", None) customer_name = self._safe(getattr(customer, "name", invoice.contractor_name)) customer_tax = self._safe(getattr(customer, "tax_id", invoice.contractor_nip)) customer_address = self._safe(getattr(customer, "address", "")) customer_email = self._safe(getattr(customer, "email", "")) if invoice.invoice_type == InvoiceType.PURCHASE: purchase_seller = self._resolve_purchase_seller_data( invoice=invoice, fallback_name=invoice.contractor_name, fallback_tax=invoice.contractor_nip, ) seller_name = self._safe(purchase_seller["name"]) seller_tax = self._safe(purchase_seller["tax"]) seller_address = self._safe(purchase_seller["address"]) buyer_name = company_name buyer_tax = company_tax buyer_address = company_address buyer_email = "" else: seller_name = company_name seller_tax = company_tax seller_address = company_address buyer_name = customer_name buyer_tax = customer_tax buyer_address = customer_address buyer_email = customer_email nfz_meta = (invoice.external_metadata or {}).get("nfz", {}) invoice_kind = "FAKTURA NFZ" if invoice.source == "nfz" else "FAKTURA VAT" currency = invoice.currency or "PLN" pdf_title = self._build_pdf_title(invoice, invoice_kind, company_name) pdf_author = self._first_non_empty( invoice.company.name if invoice.company else "", "KSeF Manager", ) pdf_subject = self._first_non_empty( invoice_kind, getattr(invoice, "source", ""), "Faktura", ) story.append(Paragraph(invoice_kind, styles["DocTitle"])) header = Table( [ [ Paragraph( f"Numer faktury: {self._safe(invoice.invoice_number)}
" f"Data wystawienia: {self._safe(invoice.issue_date)}
" f"Waluta: {self._safe(currency)}", styles["Normal"], ), Paragraph( f"Numer KSeF: {self._safe(invoice.ksef_number)}
" f"Status: {self._safe(invoice.issued_status)}
" f"Typ źródła: {self._safe(invoice.source)}", styles["Normal"], ), ] ], colWidths=[88 * mm, 88 * mm], ) header.setStyle(self._table_base_style()) story.extend([header, Spacer(1, 5 * mm)]) buyer_email_html = f"
E-mail: {buyer_email}" if buyer_email not in {"", "-"} else "" payment_details = self._resolve_payment_details(invoice) seller_bank_account = self._safe(self._resolve_seller_bank_account(invoice)) payment_form_html = ( f"
Forma płatności: {self._safe(payment_details.get('payment_form_label'))}" if payment_details.get('payment_form_label') else '' ) seller_bank_account_html = ( f"
Rachunek: {seller_bank_account}" if seller_bank_account not in {"", "-"} else "" ) seller_bank_name_html = ( f"
Bank: {self._safe(payment_details.get('bank_name'))}" if payment_details.get('bank_name') else '' ) parties = Table( [ [ Paragraph( f"Sprzedawca
{seller_name}
" f"NIP: {seller_tax}
" f"Adres: {seller_address}" f"{payment_form_html}" f"{seller_bank_account_html}" f"{seller_bank_name_html}", styles["Normal"], ), Paragraph( f"Nabywca
{buyer_name}
" f"NIP: {buyer_tax}
" f"Adres: {buyer_address}" f"{buyer_email_html}", styles["Normal"], ), ] ], colWidths=[88 * mm, 88 * mm], ) parties.setStyle(self._table_base_style()) story.extend([parties, Spacer(1, 5 * mm)]) if getattr(invoice, 'split_payment', False): story.extend([Paragraph('Mechanizm podzielonej płatności', styles['SectionTitle']), Spacer(1, 2 * mm)]) if nfz_meta: nfz_rows = [ [Paragraph("Pole NFZ", styles["Normal"]), Paragraph("Wartość", styles["Normal"])], [Paragraph("Oddział NFZ (IDWew)", styles["Normal"]), Paragraph(self._safe(nfz_meta.get("recipient_branch_id")), styles["Normal"])], [Paragraph("Nazwa oddziału", styles["Normal"]), Paragraph(self._safe(nfz_meta.get("recipient_branch_name")), styles["Normal"])], [Paragraph("Okres rozliczeniowy od", styles["Normal"]), Paragraph(self._safe(nfz_meta.get("settlement_from")), styles["Normal"])], [Paragraph("Okres rozliczeniowy do", styles["Normal"]), Paragraph(self._safe(nfz_meta.get("settlement_to")), styles["Normal"])], [Paragraph("Identyfikator świadczeniodawcy", styles["Normal"]), Paragraph(self._safe(nfz_meta.get("provider_identifier")), styles["Normal"])], [Paragraph("Kod zakresu / świadczenia", styles["Normal"]), Paragraph(self._safe(nfz_meta.get("service_code")), styles["Normal"])], [Paragraph("Numer umowy / aneksu", styles["Normal"]), Paragraph(self._safe(nfz_meta.get("contract_number")), styles["Normal"])], [Paragraph("Identyfikator szablonu", styles["Normal"]), Paragraph(self._safe(nfz_meta.get("template_identifier")), styles["Normal"])], [Paragraph("Schemat", styles["Normal"]), Paragraph(self._safe(nfz_meta.get("nfz_schema", "FA(3)")), styles["Normal"])], ] nfz_table = Table(nfz_rows, colWidths=[62 * mm, 114 * mm], repeatRows=1) nfz_table.setStyle(self._table_base_style([("ALIGN", (0, 0), (-1, 0), "CENTER")])) story.extend([Paragraph("Dane NFZ", styles["SectionTitle"]), nfz_table, Spacer(1, 5 * mm)]) invoice_lines = invoice.lines.order_by("id").all() if hasattr(invoice.lines, "order_by") else list(invoice.lines) if not invoice_lines: invoice_lines = [ type("TmpLine", (), line) for line in self._extract_lines_from_xml(getattr(invoice, "xml_path", None)) ] lines = [[ Paragraph("Pozycja", styles["Normal"]), Paragraph("Ilość", styles["Normal"]), Paragraph("JM", styles["Normal"]), Paragraph("VAT", styles["Normal"]), Paragraph("Netto", styles["Normal"]), Paragraph("Brutto", styles["Normal"]), ]] if invoice_lines: for line in invoice_lines: lines.append([ Paragraph(self._safe(line.description), styles["Normal"]), Paragraph(self._safe(line.quantity), styles["Right"]), Paragraph(self._safe(line.unit), styles["Right"]), Paragraph(f"{Decimal(line.vat_rate):.0f}%", styles["Right"]), Paragraph(self._money(line.net_amount, currency), styles["Right"]), Paragraph(self._money(line.gross_amount, currency), styles["Right"]), ]) else: lines.append([ Paragraph("Brak pozycji na fakturze.", styles["Normal"]), Paragraph("-", styles["Right"]), Paragraph("-", styles["Right"]), Paragraph("-", styles["Right"]), Paragraph("-", styles["Right"]), Paragraph("-", styles["Right"]), ]) items = Table( lines, colWidths=[82 * mm, 18 * mm, 16 * mm, 16 * mm, 28 * mm, 30 * mm], repeatRows=1, ) items.setStyle(self._table_base_style([ ("ALIGN", (1, 0), (-1, -1), "RIGHT"), ("ALIGN", (0, 0), (0, -1), "LEFT"), ])) story.extend([Paragraph("Pozycje faktury", styles["SectionTitle"]), items, Spacer(1, 5 * mm)]) summary = Table( [ [Paragraph("Netto", styles["Normal"]), Paragraph(self._money(invoice.net_amount, currency), styles["Right"])], [Paragraph("VAT", styles["Normal"]), Paragraph(self._money(invoice.vat_amount, currency), styles["Right"])], [Paragraph("Razem brutto", styles["Normal"]), Paragraph(f"{self._money(invoice.gross_amount, currency)}", styles["Right"])], ], colWidths=[48 * mm, 42 * mm], ) summary.setStyle(self._table_base_style([ ("ALIGN", (0, 0), (-1, -1), "RIGHT"), ("ALIGN", (0, 0), (0, -1), "LEFT"), ])) summary_wrap = Table([["", summary]], colWidths=[86 * mm, 90 * mm]) summary_wrap.setStyle(TableStyle([("VALIGN", (0, 0), (-1, -1), "TOP")])) story.extend([summary_wrap, Spacer(1, 5 * mm)]) note = ( "Dokument zawiera pola wymagane dla rozliczeń NFZ i został przygotowany do wysyłki w schemacie FA(3)." if nfz_meta else "Dokument wygenerowany przez KSeF Manager." ) story.append(Paragraph(note, styles["Small"])) def _apply_metadata(canvas, pdf_doc): self._set_pdf_metadata( canvas=canvas, doc=pdf_doc, title=pdf_title, author=pdf_author, subject=pdf_subject, ) doc.build(story, onFirstPage=_apply_metadata, onLaterPages=_apply_metadata) pdf_bytes = buffer.getvalue() path = Path(current_app.config["PDF_PATH"]) / f"{self._build_pdf_filename_stem(invoice)}.pdf" path.write_bytes(pdf_bytes) invoice.pdf_path = str(path) return pdf_bytes, path def month_pdf(self, entries, title): buffer = BytesIO() doc = SimpleDocTemplate( buffer, pagesize=A4, leftMargin=16 * mm, rightMargin=16 * mm, topMargin=16 * mm, bottomMargin=16 * mm, ) styles = self._styles() rows = [[Paragraph("Numer", styles["Normal"]), Paragraph("Kontrahent", styles["Normal"]), Paragraph("Brutto", styles["Normal"])]] for invoice in entries: rows.append([ Paragraph(str(invoice.invoice_number), styles["Normal"]), Paragraph(str(invoice.contractor_name), styles["Normal"]), Paragraph(self._money(invoice.gross_amount, getattr(invoice, "currency", "PLN")), styles["Right"]), ]) table = Table(rows, colWidths=[45 * mm, 95 * mm, 35 * mm], repeatRows=1) table.setStyle(self._table_base_style([("ALIGN", (2, 0), (2, -1), "RIGHT")])) pdf_title = self._first_non_empty(title, "Zestawienie faktur") pdf_author = "KSeF Manager" pdf_subject = "Miesięczne zestawienie faktur" def _apply_metadata(canvas, pdf_doc): self._set_pdf_metadata( canvas=canvas, doc=pdf_doc, title=pdf_title, author=pdf_author, subject=pdf_subject, ) doc.build([Paragraph(title, styles["DocTitle"]), Spacer(1, 4 * mm), table], onFirstPage=_apply_metadata, onLaterPages=_apply_metadata) return buffer.getvalue()