from __future__ import annotations from collections import defaultdict from datetime import date, datetime from decimal import Decimal from pathlib import Path from xml.sax.saxutils import escape import xml.etree.ElementTree as ET from flask import current_app from sqlalchemy import or_ from app.extensions import db from app.models.catalog import InvoiceLine from app.models.invoice import Invoice, InvoiceStatus, InvoiceType, Tag, SyncEvent from app.repositories.invoice_repository import InvoiceRepository from app.services.company_service import CompanyService from app.services.ksef_service import KSeFService from app.services.settings_service import SettingsService class InvoiceService: PERIOD_LABELS = { "month": "miesięczne", "quarter": "kwartalne", "year": "roczne", } def __init__(self): self.repo = InvoiceRepository() def upsert_from_ksef(self, document, company): invoice = self.repo.get_by_ksef_number(document.ksef_number, company_id=company.id) created = False if not invoice: invoice = Invoice(ksef_number=document.ksef_number, company_id=company.id) db.session.add(invoice) created = True invoice.invoice_number = document.invoice_number invoice.contractor_name = document.contractor_name invoice.contractor_nip = document.contractor_nip invoice.issue_date = document.issue_date invoice.received_date = document.received_date invoice.fetched_at = document.fetched_at invoice.net_amount = document.net_amount invoice.vat_amount = document.vat_amount invoice.gross_amount = document.gross_amount invoice.invoice_type = InvoiceType(document.invoice_type) invoice.last_synced_at = datetime.utcnow() invoice.external_metadata = document.metadata invoice.source_hash = KSeFService.calc_hash(document.xml_content) invoice.xml_path = self._save_xml(company.id, invoice.ksef_number, document.xml_content) metadata = dict(document.metadata or {}) payment_details = self.extract_payment_details_from_xml(document.xml_content) if payment_details.get('payment_form_code') and not metadata.get('payment_form_code'): metadata['payment_form_code'] = payment_details['payment_form_code'] if payment_details.get('payment_form_label') and not metadata.get('payment_form_label'): metadata['payment_form_label'] = payment_details['payment_form_label'] if payment_details.get('bank_account') and not metadata.get('seller_bank_account'): metadata['seller_bank_account'] = payment_details['bank_account'] if payment_details.get('bank_name') and not metadata.get('seller_bank_name'): metadata['seller_bank_name'] = payment_details['bank_name'] if payment_details.get('payment_due_date') and not metadata.get('payment_due_date'): metadata['payment_due_date'] = payment_details['payment_due_date'] invoice.external_metadata = metadata invoice.seller_bank_account = payment_details.get('bank_account') or metadata.get('seller_bank_account') or invoice.seller_bank_account invoice.contractor_address = ( metadata.get("contractor_address") or ", ".join([ part for part in [ metadata.get("contractor_street"), metadata.get("contractor_postal_code"), metadata.get("contractor_city"), metadata.get("contractor_country"), ] if part ]) or "" ) db.session.flush() existing_lines = invoice.lines.count() if hasattr(invoice.lines, "count") else len(list(invoice.lines)) if existing_lines == 0: for line in self.extract_lines_from_xml(document.xml_content): db.session.add( InvoiceLine( invoice_id=invoice.id, description=line["description"], quantity=line["quantity"], unit=line["unit"], unit_net=line["unit_net"], vat_rate=line["vat_rate"], net_amount=line["net_amount"], vat_amount=line["vat_amount"], gross_amount=line["gross_amount"], ) ) if not invoice.html_preview: invoice.html_preview = self.render_invoice_html(invoice) db.session.flush() db.session.add( SyncEvent( invoice_id=invoice.id, status="created" if created else "updated", message="Pobrano z KSeF", ) ) SettingsService.set_many({"ksef.last_sync_at": datetime.utcnow().isoformat()}, company_id=company.id) return invoice, created def _save_xml(self, company_id, ksef_number, xml_content): base_path = SettingsService.storage_path("app.archive_path", current_app.config["ARCHIVE_PATH"]) / f"company_{company_id}" base_path.mkdir(parents=True, exist_ok=True) safe_name = ksef_number.replace("/", "_") + ".xml" path = Path(base_path) / safe_name path.write_text(xml_content, encoding="utf-8") return str(path) def extract_lines_from_xml(self, xml_content): def to_decimal(value, default='0'): raw = str(value or '').strip() if not raw: return Decimal(default) raw = raw.replace(' ', '').replace(',', '.') try: return Decimal(raw) except Exception: return Decimal(default) def to_vat_rate(value): raw = str(value or '').strip().lower() if not raw: return Decimal('0') raw = raw.replace('%', '').replace(' ', '').replace(',', '.') try: return Decimal(raw) except Exception: return Decimal('0') def looks_numeric(value): raw = str(value or '').strip().replace(' ', '').replace(',', '.') if not raw: return False try: Decimal(raw) return True except Exception: return False lines = [] try: root = ET.fromstring(xml_content) namespace_uri = root.tag.split('}')[0].strip('{') if '}' in root.tag else '' ns = {'fa': namespace_uri} if namespace_uri else {} row_path = './/fa:FaWiersz' if ns else './/FaWiersz' text_path = lambda name: f'fa:{name}' if ns else name for row in root.findall(row_path, ns): description = (row.findtext(text_path('P_7'), default='', namespaces=ns) or '').strip() p_8a = row.findtext(text_path('P_8A'), default='', namespaces=ns) p_8b = row.findtext(text_path('P_8B'), default='', namespaces=ns) if looks_numeric(p_8a): qty_raw = p_8a unit_raw = p_8b or 'szt.' else: unit_raw = p_8a or 'szt.' qty_raw = p_8b or '1' unit_net = ( row.findtext(text_path('P_9A'), default='', namespaces=ns) or row.findtext(text_path('P_9B'), default='', namespaces=ns) or '0' ) net = ( row.findtext(text_path('P_11'), default='', namespaces=ns) or row.findtext(text_path('P_11A'), default='', namespaces=ns) or '0' ) vat_rate = row.findtext(text_path('P_12'), default='0', namespaces=ns) vat = ( row.findtext(text_path('P_12Z'), default='', namespaces=ns) or row.findtext(text_path('P_11Vat'), default='', namespaces=ns) or '0' ) net_dec = to_decimal(net) vat_dec = to_decimal(vat) lines.append({ 'description': description, 'quantity': to_decimal(qty_raw, '1'), 'unit': (unit_raw or 'szt.').strip(), 'unit_net': to_decimal(unit_net), 'vat_rate': to_vat_rate(vat_rate), 'net_amount': net_dec, 'vat_amount': vat_dec, 'gross_amount': net_dec + vat_dec, }) except Exception as exc: current_app.logger.warning(f'KSeF XML line parse error: {exc}') return lines def extract_payment_details_from_xml(self, xml_content): details = { "payment_form_code": "", "payment_form_label": "", "bank_account": "", "bank_name": "", "payment_due_date": "", } if not xml_content: return details try: root = ET.fromstring(xml_content) namespace_uri = root.tag.split('}')[0].strip('{') if '}' in root.tag else '' ns = {'fa': namespace_uri} if namespace_uri else {} def find_text(path): return (root.findtext(path, default='', namespaces=ns) or '').strip() form_code = find_text('.//fa:Platnosc/fa:FormaPlatnosci' if ns else './/Platnosc/FormaPlatnosci') bank_account = find_text('.//fa:Platnosc/fa:RachunekBankowy/fa:NrRB' if ns else './/Platnosc/RachunekBankowy/NrRB') bank_name = find_text('.//fa:Platnosc/fa:RachunekBankowy/fa:NazwaBanku' if ns else './/Platnosc/RachunekBankowy/NazwaBanku') payment_due_date = find_text('.//fa:Platnosc/fa:TerminPlatnosci/fa:Termin' if ns else './/Platnosc/TerminPlatnosci/Termin') form_labels = { '1': 'gotówka', '2': 'karta', '3': 'bon', '4': 'czek', '5': 'weksel', '6': 'przelew', '7': 'kompensata', '8': 'pobranie', '9': 'akredytywa', '10': 'polecenie zapłaty', '11': 'inny', } details.update({ 'payment_form_code': form_code, 'payment_form_label': form_labels.get(form_code, form_code), 'bank_account': self._normalize_bank_account(bank_account), 'bank_name': bank_name, 'payment_due_date': payment_due_date, }) except Exception as exc: current_app.logger.warning(f'KSeF XML payment parse error: {exc}') return details @staticmethod def _first_non_empty(*values, default=""): for value in values: if value is None: continue if isinstance(value, str): if value.strip(): return value.strip() continue text = str(value).strip() if text: return text return default @staticmethod def _normalize_metadata_container(value): return value if isinstance(value, dict) else {} def _get_external_metadata(self, invoice): return self._normalize_metadata_container(getattr(invoice, "external_metadata", {}) or {}) def _get_ksef_metadata(self, invoice): external_metadata = self._get_external_metadata(invoice) return self._normalize_metadata_container(external_metadata.get("ksef", {}) or {}) def _resolve_purchase_seller_data(self, invoice): external_metadata = self._get_external_metadata(invoice) ksef_meta = self._get_ksef_metadata(invoice) seller_name = self._first_non_empty( getattr(invoice, "contractor_name", ""), external_metadata.get("contractor_name"), ksef_meta.get("contractor_name"), ) seller_tax_id = self._first_non_empty( getattr(invoice, "contractor_nip", ""), external_metadata.get("contractor_nip"), ksef_meta.get("contractor_nip"), ) seller_street = self._first_non_empty( external_metadata.get("contractor_street"), ksef_meta.get("contractor_street"), external_metadata.get("seller_street"), ksef_meta.get("seller_street"), ) seller_city = self._first_non_empty( external_metadata.get("contractor_city"), ksef_meta.get("contractor_city"), external_metadata.get("seller_city"), ksef_meta.get("seller_city"), ) seller_postal_code = self._first_non_empty( external_metadata.get("contractor_postal_code"), ksef_meta.get("contractor_postal_code"), external_metadata.get("seller_postal_code"), ksef_meta.get("seller_postal_code"), ) seller_country = self._first_non_empty( external_metadata.get("contractor_country"), ksef_meta.get("contractor_country"), external_metadata.get("seller_country"), ksef_meta.get("seller_country"), ) seller_address = self._first_non_empty( external_metadata.get("contractor_address"), ksef_meta.get("contractor_address"), external_metadata.get("seller_address"), ksef_meta.get("seller_address"), getattr(invoice, "contractor_address", ""), ) if not seller_address: address_parts = [part for part in [seller_street, seller_postal_code, seller_city, seller_country] if part] seller_address = ", ".join(address_parts) return { "name": seller_name, "tax_id": seller_tax_id, "address": seller_address, } def update_metadata(self, invoice, form): invoice.status = InvoiceStatus(form.status.data) invoice.internal_note = form.internal_note.data invoice.pinned = bool(form.pinned.data) invoice.queue_accounting = bool(form.queue_accounting.data) invoice.is_unread = invoice.status == InvoiceStatus.NEW requested_tags = [t.strip() for t in (form.tags.data or "").split(",") if t.strip()] invoice.tags.clear() for name in requested_tags: tag = Tag.query.filter_by(name=name).first() if not tag: tag = Tag(name=name, color="primary") db.session.add(tag) invoice.tags.append(tag) db.session.commit() return invoice def mark_read(self, invoice): if invoice.is_unread: invoice.is_unread = False invoice.status = InvoiceStatus.READ if invoice.status == InvoiceStatus.NEW else invoice.status invoice.read_at = datetime.utcnow() db.session.commit() def render_invoice_html(self, invoice): def esc(value): return str(value or "—").replace("&", "&").replace("<", "<").replace(">", ">") def money(value): return f"{Decimal(value):,.2f} {invoice.currency or 'PLN'}".replace(",", " ").replace(".", ",") company_name_raw = invoice.company.name if invoice.company else "Twoja firma" company_name = esc(company_name_raw) company_tax_id = esc(getattr(invoice.company, "tax_id", "") if invoice.company else "") company_address = esc(getattr(invoice.company, "address", "") if invoice.company else "") customer = getattr(invoice, "customer", None) customer_name = esc(getattr(customer, "name", invoice.contractor_name)) customer_tax_id = esc(getattr(customer, "tax_id", invoice.contractor_nip)) customer_address = esc(getattr(customer, "address", "")) customer_email = esc(getattr(customer, "email", "")) if invoice.invoice_type == InvoiceType.PURCHASE: purchase_seller = self._resolve_purchase_seller_data(invoice) seller_name = esc(purchase_seller["name"]) seller_tax_id = esc(purchase_seller["tax_id"]) seller_address = esc(purchase_seller["address"]) buyer_name = company_name buyer_tax_id = company_tax_id buyer_address = company_address buyer_email = "" else: seller_name = company_name seller_tax_id = company_tax_id seller_address = company_address buyer_name = customer_name buyer_tax_id = customer_tax_id buyer_address = customer_address buyer_email = customer_email nfz_meta = (invoice.external_metadata or {}).get("nfz", {}) invoice_kind = "FAKTURA NFZ" if invoice.source == "nfz" else "FAKTURA VAT" page_title = self._first_non_empty( getattr(invoice, "invoice_number", ""), getattr(invoice, "ksef_number", ""), company_name_raw, "Faktura", ) lines = invoice.lines.order_by("id").all() if hasattr(invoice.lines, "order_by") else list(invoice.lines) if not lines and invoice.xml_path: try: xml_content = Path(invoice.xml_path).read_text(encoding="utf-8") extracted = self.extract_lines_from_xml(xml_content) lines = [type("TmpLine", (), line) for line in extracted] except Exception: lines = [] lines_html = "".join( [ ( f"" f"{esc(line.description)}" f"{esc(line.quantity)}" f"{esc(line.unit)}" f"{esc(line.vat_rate)}%" f"{money(line.net_amount)}" f"{money(line.gross_amount)}" f"" ) for line in lines ] ) or ( "" "Brak pozycji na fakturze." "" ) nfz_rows = "" if nfz_meta: nfz_pairs = [ ("Oddział NFZ (IDWew)", nfz_meta.get("recipient_branch_id")), ("Nazwa oddziału", nfz_meta.get("recipient_branch_name")), ("Okres rozliczeniowy od", nfz_meta.get("settlement_from")), ("Okres rozliczeniowy do", nfz_meta.get("settlement_to")), ("Identyfikator świadczeniodawcy", nfz_meta.get("provider_identifier")), ("Kod zakresu / świadczenia", nfz_meta.get("service_code")), ("Numer umowy / aneksu", nfz_meta.get("contract_number")), ("Identyfikator szablonu", nfz_meta.get("template_identifier")), ("Schemat", nfz_meta.get("nfz_schema", "FA(3)")), ] nfz_rows = "".join( [ f"{esc(label)}{esc(value)}" for label, value in nfz_pairs ] ) nfz_rows = ( "
Dane NFZ
" "" "" f"{nfz_rows}
Pole NFZWartość
" ) buyer_email_html = f"
E-mail: {buyer_email}" if buyer_email not in {"", "—"} else "" split_payment_html = "
Mechanizm podzielonej płatności
" if getattr(invoice, 'split_payment', False) else "" payment_details = self.resolve_payment_details(invoice) seller_bank_account = esc(self._resolve_seller_bank_account(invoice)) payment_form_html = f"
Forma płatności: {esc(payment_details.get('payment_form_label'))}" if payment_details.get('payment_form_label') else '' seller_bank_account_html = f"
Rachunek bankowy: {seller_bank_account}" if seller_bank_account not in {'', '—'} else '' seller_bank_name_html = f"
Bank: {esc(payment_details.get('bank_name'))}" if payment_details.get('bank_name') else '' return ( f"
" f"
{invoice_kind}
" f"" f"" f"" f"
Numer faktury: {esc(invoice.invoice_number)}
Data wystawienia: {esc(invoice.issue_date)}
Waluta: {esc(invoice.currency or 'PLN')}
Numer KSeF: {esc(invoice.ksef_number)}
Status: {esc(invoice.issued_status)}
Typ źródła: {esc(invoice.source)}
" f"" f"" f"" f"
Sprzedawca
{seller_name}
NIP: {seller_tax_id}
Adres: {seller_address}{payment_form_html}{seller_bank_account_html}{seller_bank_name_html}
Nabywca
{buyer_name}
NIP: {buyer_tax_id}
Adres: {buyer_address}{buyer_email_html}
" f"{nfz_rows}" f"{split_payment_html}" "
Pozycje faktury
" "" "" "" "" "" "" "" "" "" "" "" f"{lines_html}
PozycjaIlośćJMVATNettoBrutto
" "" f"" f"" f"" "
Netto{money(invoice.net_amount)}
VAT{money(invoice.vat_amount)}
Razem brutto{money(invoice.gross_amount)}
" f"
{'Dokument zawiera pola wymagane dla rozliczeń NFZ i został przygotowany do wysyłki w schemacie FA(3).' if nfz_meta else 'Dokument wygenerowany przez KSeF Manager.'}
" "
" ) def monthly_groups(self, company_id=None): groups = [] for row in self.repo.monthly_summary(company_id): year = int(row.year) month = int(row.month) entries = self.repo.base_query(company_id).filter(Invoice.issue_date >= datetime(year, month, 1).date()) if month == 12: entries = entries.filter(Invoice.issue_date < datetime(year + 1, 1, 1).date()) else: entries = entries.filter(Invoice.issue_date < datetime(year, month + 1, 1).date()) groups.append( { "key": f"{year}-{month:02d}", "year": year, "month": month, "count": int(row.count or 0), "net": row.net or Decimal("0"), "vat": row.vat or Decimal("0"), "gross": row.gross or Decimal("0"), "entries": entries.order_by(Invoice.issue_date.desc(), Invoice.id.desc()).all(), } ) return groups def grouped_summary(self, company_id=None, *, period="month", search=None): rows = self.repo.summary_query(company_id, period=period, search=search) grouped_entries = defaultdict(list) entry_query = self.repo.base_query(company_id) if search: like = f"%{search}%" entry_query = entry_query.filter( or_( Invoice.invoice_number.ilike(like), Invoice.ksef_number.ilike(like), Invoice.contractor_name.ilike(like), Invoice.contractor_nip.ilike(like), ) ) for invoice in entry_query.order_by(Invoice.issue_date.desc(), Invoice.id.desc()).all(): if period == "year": key = f"{invoice.issue_date.year}" elif period == "quarter": quarter = ((invoice.issue_date.month - 1) // 3) + 1 key = f"{invoice.issue_date.year}-Q{quarter}" else: key = f"{invoice.issue_date.year}-{invoice.issue_date.month:02d}" grouped_entries[key].append(invoice) groups = [] for row in rows: year = int(row.year) month = int(getattr(row, "month", 0) or 0) quarter = int(getattr(row, "quarter", 0) or 0) if period == "year": key = f"{year}" label = f"Rok {year}" elif period == "quarter": key = f"{year}-Q{quarter}" label = f"Q{quarter} {year}" else: key = f"{year}-{month:02d}" label = f"{year}-{month:02d}" groups.append( { "key": key, "label": label, "year": year, "month": month, "quarter": quarter, "count": int(row.count or 0), "net": row.net or Decimal("0"), "vat": row.vat or Decimal("0"), "gross": row.gross or Decimal("0"), "entries": grouped_entries.get(key, []), } ) return groups def comparative_stats(self, company_id=None, *, search=None): rows = self.repo.summary_query(company_id, period="year", search=search) stats = [] previous = None for row in rows: year = int(row.year) gross = row.gross or Decimal("0") net = row.net or Decimal("0") count = int(row.count or 0) delta = None if previous is not None: delta = gross - previous["gross"] stats.append({"year": year, "gross": gross, "net": net, "count": count, "delta": delta}) previous = {"gross": gross} return stats def next_sale_number(self, company_id, template="monthly"): today = self.today_date() query = Invoice.query.filter(Invoice.company_id == company_id, Invoice.invoice_type == InvoiceType.SALE) if template == "yearly": query = query.filter(Invoice.issue_date >= date(today.year, 1, 1)) prefix = f"FV/{today.year}/" elif template == "custom": prefix = f"FV/{today.year}/{today.month:02d}/" query = query.filter(Invoice.issue_date >= date(today.year, today.month, 1)) if today.month == 12: query = query.filter(Invoice.issue_date < date(today.year + 1, 1, 1)) else: query = query.filter(Invoice.issue_date < date(today.year, today.month + 1, 1)) else: prefix = f"FV/{today.year}/{today.month:02d}/" query = query.filter(Invoice.issue_date >= date(today.year, today.month, 1)) if today.month == 12: query = query.filter(Invoice.issue_date < date(today.year + 1, 1, 1)) else: query = query.filter(Invoice.issue_date < date(today.year, today.month + 1, 1)) next_no = query.count() + 1 return f"{prefix}{next_no:04d}" def build_ksef_payload(self, invoice): lines = invoice.lines.order_by("id").all() if hasattr(invoice.lines, "order_by") else list(invoice.lines) nfz_meta = (invoice.external_metadata or {}).get("nfz", {}) xml_content = self.render_structured_xml(invoice, lines=lines, nfz_meta=nfz_meta) payload = { "invoiceNumber": invoice.invoice_number, "invoiceType": invoice.invoice_type.value if hasattr(invoice.invoice_type, "value") else str(invoice.invoice_type), "schemaVersion": nfz_meta.get("nfz_schema", "FA(3)"), "customer": { "name": invoice.contractor_name, "taxId": invoice.contractor_nip, }, "lines": [ { "name": line.description, "qty": float(line.quantity), "unit": line.unit, "unitNet": float(line.unit_net), "vatRate": float(line.vat_rate), "netAmount": float(line.net_amount), "grossAmount": float(line.gross_amount), } for line in lines ], "metadata": { "source": invoice.source, "companyId": invoice.company_id, "issueDate": invoice.issue_date.isoformat() if invoice.issue_date else "", "currency": invoice.currency, "nfz": nfz_meta, "split_payment": bool(getattr(invoice, 'split_payment', False)), }, "xml_content": xml_content, } if nfz_meta: payload["customer"]["internalBranchId"] = nfz_meta.get("recipient_branch_id", "") return payload @staticmethod def _xml_decimal(value, places=2): quant = Decimal('1').scaleb(-places) return format(Decimal(value or 0).quantize(quant), 'f') @staticmethod def _split_address_lines(address): raw = (address or '').strip() if not raw: return '', '' parts = [part.strip() for part in raw.replace('\n', ',').split(',') if part.strip()] if len(parts) <= 1: return raw[:512], '' line1 = ', '.join(parts[:2])[:512] line2 = ', '.join(parts[2:])[:512] return line1, line2 @staticmethod def _normalize_bank_account(value): raw = ''.join(str(value or '').split()) return raw def resolve_payment_details(self, invoice): external_metadata = self._normalize_metadata_container(getattr(invoice, 'external_metadata', {}) or {}) details = { 'payment_form_code': self._first_non_empty(external_metadata.get('payment_form_code')), 'payment_form_label': self._first_non_empty(external_metadata.get('payment_form_label')), 'bank_account': self._normalize_bank_account( self._first_non_empty( getattr(invoice, 'seller_bank_account', ''), external_metadata.get('seller_bank_account'), ) ), 'bank_name': self._first_non_empty(external_metadata.get('seller_bank_name')), 'payment_due_date': self._first_non_empty(external_metadata.get('payment_due_date')), } if getattr(invoice, 'xml_path', None) and (not details['bank_account'] or not details['payment_form_code'] or not details['bank_name'] or not details['payment_due_date']): try: xml_content = Path(invoice.xml_path).read_text(encoding='utf-8') parsed = self.extract_payment_details_from_xml(xml_content) for key, value in parsed.items(): if value and not details.get(key): details[key] = value except Exception: pass return details def _resolve_seller_bank_account(self, invoice): details = self.resolve_payment_details(invoice) account = self._normalize_bank_account(details.get('bank_account', '')) if account: return account if getattr(invoice, 'invoice_type', None) == InvoiceType.PURCHASE: return '' company = getattr(invoice, 'company', None) return self._normalize_bank_account(getattr(company, 'bank_account', '') if company else '') @staticmethod def _safe_tax_id(value): digits = ''.join(ch for ch in str(value or '') if ch.isdigit()) return digits def _append_xml_text(self, parent, tag, value): if value is None: return None text = str(value).strip() if not text: return None node = ET.SubElement(parent, tag) node.text = text return node def _append_address_node(self, parent, address, *, country_code='PL'): line1, line2 = self._split_address_lines(address) if not line1 and not line2: return None node = ET.SubElement(parent, 'Adres') ET.SubElement(node, 'KodKraju').text = country_code or 'PL' if line1: ET.SubElement(node, 'AdresL1').text = line1 if line2: ET.SubElement(node, 'AdresL2').text = line2 return node def _append_party_section(self, root, tag_name, *, name, tax_id, address, country_code='PL'): party = ET.SubElement(root, tag_name) ident = ET.SubElement(party, 'DaneIdentyfikacyjne') tax_digits = self._safe_tax_id(tax_id) if tax_digits: ET.SubElement(ident, 'NIP').text = tax_digits else: ET.SubElement(ident, 'BrakID').text = '1' ET.SubElement(ident, 'Nazwa').text = (name or 'Brak nazwy').strip() self._append_address_node(party, address, country_code=country_code) return party def _append_tax_summary(self, fa_node, lines): vat_groups = { '23': {'net': Decimal('0'), 'vat': Decimal('0'), 'p13': 'P_13_1', 'p14': 'P_14_1'}, '22': {'net': Decimal('0'), 'vat': Decimal('0'), 'p13': 'P_13_1', 'p14': 'P_14_1'}, '8': {'net': Decimal('0'), 'vat': Decimal('0'), 'p13': 'P_13_2', 'p14': 'P_14_2'}, '7': {'net': Decimal('0'), 'vat': Decimal('0'), 'p13': 'P_13_2', 'p14': 'P_14_2'}, '5': {'net': Decimal('0'), 'vat': Decimal('0'), 'p13': 'P_13_3', 'p14': 'P_14_3'}, } for line in lines: rate = Decimal(line.vat_rate or 0).normalize() rate_key = format(rate, 'f').rstrip('0').rstrip('.') if '.' in format(rate, 'f') else format(rate, 'f') group = vat_groups.get(rate_key) if not group: group = vat_groups.get('23') if Decimal(line.vat_amount or 0) > 0 else None if not group: continue group['net'] += Decimal(line.net_amount or 0) group['vat'] += Decimal(line.vat_amount or 0) for cfg in vat_groups.values(): if cfg['net']: ET.SubElement(fa_node, cfg['p13']).text = self._xml_decimal(cfg['net']) ET.SubElement(fa_node, cfg['p14']).text = self._xml_decimal(cfg['vat']) def _append_adnotations(self, fa_node, *, split_payment=False): adnotacje = ET.SubElement(fa_node, 'Adnotacje') ET.SubElement(adnotacje, 'P_16').text = '2' ET.SubElement(adnotacje, 'P_17').text = '2' ET.SubElement(adnotacje, 'P_18').text = '2' ET.SubElement(adnotacje, 'P_18A').text = '1' if split_payment else '2' zw = ET.SubElement(adnotacje, 'Zwolnienie') ET.SubElement(zw, 'P_19N').text = '1' ET.SubElement(adnotacje, 'P_23').text = '2' return adnotacje def _append_payment_details(self, fa_node, invoice): bank_account = self._resolve_seller_bank_account(invoice) if not bank_account: return None external_metadata = self._normalize_metadata_container(getattr(invoice, 'external_metadata', {}) or {}) bank_name = self._first_non_empty(external_metadata.get('seller_bank_name')) platnosc = ET.SubElement(fa_node, 'Platnosc') ET.SubElement(platnosc, 'FormaPlatnosci').text = '6' rachunek = ET.SubElement(platnosc, 'RachunekBankowy') ET.SubElement(rachunek, 'NrRB').text = bank_account if bank_name: ET.SubElement(rachunek, 'NazwaBanku').text = bank_name return platnosc def _append_nfz_extra_description(self, fa_node, nfz_meta): mapping = [ ('IDWew', nfz_meta.get('recipient_branch_id', '')), ('P_6_Od', nfz_meta.get('settlement_from', '')), ('P_6_Do', nfz_meta.get('settlement_to', '')), ('identyfikator-swiadczeniodawcy', nfz_meta.get('provider_identifier', '')), ('Indeks', nfz_meta.get('service_code', '')), ('NrUmowy', nfz_meta.get('contract_number', '')), ('identyfikator-szablonu', nfz_meta.get('template_identifier', '')), ] for key, value in mapping: if not str(value or '').strip(): continue node = ET.SubElement(fa_node, 'DodatkowyOpis') ET.SubElement(node, 'Klucz').text = str(key) ET.SubElement(node, 'Wartosc').text = str(value) def render_structured_xml(self, invoice, *, lines=None, nfz_meta=None): lines = lines if lines is not None else (invoice.lines.order_by('id').all() if hasattr(invoice.lines, 'order_by') else list(invoice.lines)) nfz_meta = nfz_meta or (invoice.external_metadata or {}).get('nfz', {}) seller = invoice.company or CompanyService.get_current_company() issue_date = invoice.issue_date.isoformat() if invoice.issue_date else self.today_date().isoformat() created_at = self.utcnow().replace(microsecond=0).isoformat() + 'Z' currency = (invoice.currency or 'PLN').strip() or 'PLN' schema_code = str(nfz_meta.get('nfz_schema', 'FA(3)') or 'FA(3)') schema_ns = 'http://crd.gov.pl/wzor/2025/06/25/13775/' if schema_code == 'FA(3)' else f'https://ksef.mf.gov.pl/schemat/faktura/{schema_code}' ET.register_namespace('', schema_ns) root = ET.Element('Faktura', {'xmlns': schema_ns}) naglowek = ET.SubElement(root, 'Naglowek') kod_formularza = ET.SubElement(naglowek, 'KodFormularza', {'kodSystemowy': 'FA (3)', 'wersjaSchemy': '1-0E'}) kod_formularza.text = 'FA' ET.SubElement(naglowek, 'WariantFormularza').text = '3' ET.SubElement(naglowek, 'DataWytworzeniaFa').text = created_at ET.SubElement(naglowek, 'SystemInfo').text = 'KSeF Manager' self._append_party_section( root, 'Podmiot1', name=getattr(seller, 'name', '') or '', tax_id=getattr(seller, 'tax_id', '') or '', address=getattr(seller, 'address', '') or '', ) self._append_party_section( root, 'Podmiot2', name=invoice.contractor_name or '', tax_id=invoice.contractor_nip or '', address=getattr(invoice, 'contractor_address', '') or '', ) if nfz_meta.get('recipient_branch_id'): podmiot3 = ET.SubElement(root, 'Podmiot3') ident = ET.SubElement(podmiot3, 'DaneIdentyfikacyjne') ET.SubElement(ident, 'IDWew').text = str(nfz_meta.get('recipient_branch_id')) ET.SubElement(podmiot3, 'Rola').text = '7' fa = ET.SubElement(root, 'Fa') ET.SubElement(fa, 'KodWaluty').text = currency ET.SubElement(fa, 'P_1').text = issue_date self._append_xml_text(fa, 'P_1M', nfz_meta.get('issue_place')) ET.SubElement(fa, 'P_2').text = (invoice.invoice_number or '').strip() ET.SubElement(fa, 'P_6').text = issue_date self._append_xml_text(fa, 'P_6_Od', nfz_meta.get('settlement_from')) self._append_xml_text(fa, 'P_6_Do', nfz_meta.get('settlement_to')) self._append_tax_summary(fa, lines) ET.SubElement(fa, 'P_15').text = self._xml_decimal(invoice.gross_amount) self._append_adnotations(fa, split_payment=bool(getattr(invoice, 'split_payment', False))) self._append_payment_details(fa, invoice) self._append_nfz_extra_description(fa, nfz_meta) for idx, line in enumerate(lines, start=1): row = ET.SubElement(fa, 'FaWiersz') ET.SubElement(row, 'NrWierszaFa').text = str(idx) if nfz_meta.get('service_date'): ET.SubElement(row, 'P_6A').text = str(nfz_meta.get('service_date')) ET.SubElement(row, 'P_7').text = str(line.description or '') ET.SubElement(row, 'P_8A').text = str(line.unit or 'szt.') ET.SubElement(row, 'P_8B').text = self._xml_decimal(line.quantity, places=6).rstrip('0').rstrip('.') or '0' ET.SubElement(row, 'P_9A').text = self._xml_decimal(line.unit_net, places=8).rstrip('0').rstrip('.') or '0' ET.SubElement(row, 'P_11').text = self._xml_decimal(line.net_amount) ET.SubElement(row, 'P_12').text = self._xml_decimal(line.vat_rate, places=2).rstrip('0').rstrip('.') or '0' if Decimal(line.vat_amount or 0): ET.SubElement(row, 'P_12Z').text = self._xml_decimal(line.vat_amount) stopka_parts = [] if getattr(invoice, 'split_payment', False): stopka_parts.append('Mechanizm podzielonej płatności.') seller_bank_account = self._resolve_seller_bank_account(invoice) if seller_bank_account: stopka_parts.append(f'Rachunek bankowy: {seller_bank_account}') if nfz_meta.get('contract_number'): stopka_parts.append(f"NFZ umowa: {nfz_meta.get('contract_number')}") if stopka_parts: stopka = ET.SubElement(root, 'Stopka') info = ET.SubElement(stopka, 'Informacje') ET.SubElement(info, 'StopkaFaktury').text = ' '.join(stopka_parts) return ET.tostring(root, encoding='utf-8', xml_declaration=True).decode('utf-8') def persist_issued_assets(self, invoice, xml_content=None): db.session.flush() invoice.html_preview = self.render_invoice_html(invoice) xml_content = xml_content or self.build_ksef_payload(invoice).get("xml_content") if xml_content: invoice.source_hash = KSeFService.calc_hash(xml_content) invoice.xml_path = self._save_xml(invoice.company_id, invoice.ksef_number or invoice.invoice_number, xml_content) PdfService = __import__("app.services.pdf_service", fromlist=["PdfService"]).PdfService PdfService().render_invoice_pdf(invoice) return xml_content @staticmethod def invoice_locked(invoice): return bool( getattr(invoice, "issued_to_ksef_at", None) or getattr(invoice, "issued_status", "") in {"issued", "issued_mock"} ) @staticmethod def today_date(): return date.today() @staticmethod def utcnow(): return datetime.utcnow() @staticmethod def period_title(period): return InvoiceService.PERIOD_LABELS.get(period, InvoiceService.PERIOD_LABELS["month"])