from __future__ import annotations from datetime import date from decimal import Decimal from flask import Blueprint, abort, flash, redirect, render_template, request, url_for from flask_login import login_required from app.extensions import db from app.forms.nfz import NFZ_BRANCH_CHOICES, NfzInvoiceForm from app.models.catalog import Customer, InvoiceLine, Product from app.models.invoice import Invoice, InvoiceStatus, InvoiceType from app.services.audit_service import AuditService from app.services.company_service import CompanyService from app.services.invoice_service import InvoiceService from app.services.ksef_service import KSeFService from app.services.settings_service import SettingsService bp = Blueprint('nfz', __name__, url_prefix='/nfz') NFZ_NIP = '1070001057' NFZ_BRANCH_MAP = dict(NFZ_BRANCH_CHOICES) def _company(): return CompanyService.get_current_company() def _module_enabled(company_id: int | None) -> bool: return bool(company_id) and SettingsService.get_effective('modules.nfz_enabled', 'false', company_id=company_id) == 'true' def _invoice_or_404(invoice_id: int): company = _company() invoice = db.session.get(Invoice, invoice_id) if not invoice or not company or invoice.company_id != company.id or invoice.source != 'nfz': abort(404) return invoice def _prepare_form(form: NfzInvoiceForm, company): customers = Customer.query.filter_by(company_id=company.id, is_active=True).order_by(Customer.name).all() products = Product.query.filter_by(company_id=company.id, is_active=True).order_by(Product.name).all() form.customer_id.choices = [(c.id, f'{c.name} ({c.tax_id})' if c.tax_id else c.name) for c in customers] form.product_id.choices = [(p.id, f'{p.name} - {p.net_price} PLN') for p in products] return customers, products def _hydrate_form_from_invoice(form: NfzInvoiceForm, invoice: Invoice, *, duplicate: bool = False): meta = (invoice.external_metadata or {}).get('nfz', {}) line = invoice.lines.first() form.customer_id.data = invoice.customer_id form.product_id.data = line.product_id if line and line.product_id else None form.quantity.data = line.quantity if line else Decimal('1') form.unit_net.data = line.unit_net if line else invoice.net_amount form.invoice_number.data = f'{invoice.invoice_number}/COPY' if duplicate else invoice.invoice_number form.nfz_branch_id.data = meta.get('recipient_branch_id') if meta.get('settlement_from'): form.settlement_from.data = date.fromisoformat(meta['settlement_from']) if meta.get('settlement_to'): form.settlement_to.data = date.fromisoformat(meta['settlement_to']) form.template_identifier.data = meta.get('template_identifier', '') form.provider_identifier.data = meta.get('provider_identifier', '') form.service_code.data = meta.get('service_code', '') form.contract_number.data = meta.get('contract_number', '') def _build_nfz_metadata(form: NfzInvoiceForm): return { 'nfz': { 'recipient_nip': NFZ_NIP, 'recipient_branch_id': form.nfz_branch_id.data, 'recipient_branch_name': NFZ_BRANCH_MAP.get(form.nfz_branch_id.data, form.nfz_branch_id.data), 'settlement_from': form.settlement_from.data.isoformat(), 'settlement_to': form.settlement_to.data.isoformat(), 'template_identifier': (form.template_identifier.data or '').strip(), 'provider_identifier': form.provider_identifier.data.strip(), 'service_code': form.service_code.data.strip(), 'contract_number': form.contract_number.data.strip(), 'nfz_schema': 'FA(3)', 'required_fields': ['IDWew', 'P_6_Od', 'P_6_Do', 'identyfikator-swiadczeniodawcy', 'Indeks', 'NrUmowy'], } } def _save_invoice_from_form(invoice: Invoice | None, form: NfzInvoiceForm, company, *, send_to_ksef: bool): customer = db.session.get(Customer, form.customer_id.data) product = db.session.get(Product, form.product_id.data) qty = Decimal(str(form.quantity.data or 1)) unit_net = Decimal(str(form.unit_net.data or product.net_price)) net = (qty * unit_net).quantize(Decimal('0.01')) vat = (net * (Decimal(str(product.vat_rate)) / Decimal('100'))).quantize(Decimal('0.01')) gross = net + vat number = form.invoice_number.data or (invoice.invoice_number if invoice else InvoiceService().next_sale_number(company.id, 'monthly')) metadata = _build_nfz_metadata(form) if invoice is None: invoice = Invoice( company_id=company.id, customer_id=customer.id, ksef_number=f'NFZ-PENDING/{number}', invoice_number=number, contractor_name=f'Narodowy Fundusz Zdrowia - {NFZ_BRANCH_MAP.get(form.nfz_branch_id.data, form.nfz_branch_id.data)}', contractor_nip=NFZ_NIP, issue_date=InvoiceService().today_date(), received_date=InvoiceService().today_date(), net_amount=net, vat_amount=vat, gross_amount=gross, seller_bank_account=(company.bank_account or '').strip(), invoice_type=InvoiceType.SALE, status=InvoiceStatus.NEW, source='nfz', issued_status='draft' if not send_to_ksef else 'pending', external_metadata=dict(metadata, seller_bank_account=(company.bank_account or '').strip()), html_preview='', ) db.session.add(invoice) db.session.flush() else: invoice.customer_id = customer.id invoice.invoice_number = number invoice.ksef_number = f'NFZ-PENDING/{number}' if not invoice.issued_to_ksef_at else invoice.ksef_number invoice.contractor_name = f'Narodowy Fundusz Zdrowia - {NFZ_BRANCH_MAP.get(form.nfz_branch_id.data, form.nfz_branch_id.data)}' invoice.contractor_nip = NFZ_NIP invoice.net_amount = net invoice.vat_amount = vat invoice.gross_amount = gross invoice.seller_bank_account = (company.bank_account or '').strip() invoice.external_metadata = dict(metadata, seller_bank_account=invoice.seller_bank_account) invoice.issued_status = 'draft' if not send_to_ksef else 'pending' line = invoice.lines.first() if not line: line = InvoiceLine(invoice_id=invoice.id) db.session.add(line) line.product_id = product.id line.description = product.name line.quantity = qty line.unit = product.unit line.unit_net = unit_net line.vat_rate = product.vat_rate line.net_amount = net line.vat_amount = vat line.gross_amount = gross payload = InvoiceService().build_ksef_payload(invoice) InvoiceService().persist_issued_assets(invoice, xml_content=payload['xml_content']) if send_to_ksef: result = KSeFService(company_id=company.id).issue_invoice(payload) invoice.ksef_number = result.get('ksef_number', invoice.ksef_number) invoice.issued_status = result.get('status', 'issued') invoice.issued_to_ksef_at = InvoiceService().utcnow() InvoiceService().persist_issued_assets(invoice, xml_content=result.get('xml_content') or payload['xml_content']) return invoice, result return invoice, None @bp.before_request @login_required def ensure_enabled(): company = _company() if not _module_enabled(company.id if company else None): abort(404) @bp.route('/', methods=['GET', 'POST']) @login_required def index(): company = _company() form = NfzInvoiceForm() customers, products = _prepare_form(form, company) drafts = ( Invoice.query.filter_by(company_id=company.id, source='nfz') .order_by(Invoice.created_at.desc()) .limit(10) .all() ) if request.method == 'GET': duplicate_id = request.args.get('duplicate_id', type=int) if duplicate_id: _hydrate_form_from_invoice(form, _invoice_or_404(duplicate_id), duplicate=True) else: created_customer_id = request.args.get('created_customer_id', type=int) created_product_id = request.args.get('created_product_id', type=int) if created_customer_id and any(c.id == created_customer_id for c in customers): form.customer_id.data = created_customer_id elif customers and not form.customer_id.data: form.customer_id.data = customers[0].id if created_product_id and any(p.id == created_product_id for p in products): selected = next((p for p in products if p.id == created_product_id), None) form.product_id.data = created_product_id form.unit_net.data = selected.net_price if selected else form.unit_net.data elif products and not form.product_id.data: form.product_id.data = products[0].id form.unit_net.data = products[0].net_price if not form.invoice_number.data: form.invoice_number.data = InvoiceService().next_sale_number(company.id, 'monthly') if form.validate_on_submit(): if SettingsService.read_only_enabled(company_id=company.id): flash('Tryb tylko do odczytu jest aktywny dla tej firmy.', 'warning') return redirect(url_for('nfz.index')) if form.settlement_to.data < form.settlement_from.data: flash('Data końcowa okresu rozliczeniowego nie może być wcześniejsza niż data początkowa.', 'warning') return render_template('nfz/index.html', form=form, drafts=drafts, company=company, spec_fields=spec_fields()) invoice, result = _save_invoice_from_form(None, form, company, send_to_ksef=bool(form.submit.data)) if result: flash(result.get('message', 'Zapisano i wysłano fakturę NFZ do KSeF.'), 'success') AuditService().log('send_nfz_invoice_to_ksef', 'invoice', invoice.id, invoice.ksef_number) else: flash('Zapisano roboczą fakturę NFZ.', 'success') AuditService().log('draft_nfz_invoice', 'invoice', invoice.id, invoice.invoice_number) db.session.commit() return redirect(url_for('invoices.detail', invoice_id=invoice.id)) return render_template('nfz/index.html', form=form, drafts=drafts, company=company, spec_fields=spec_fields()) @bp.route('//edit', methods=['GET', 'POST']) @login_required def edit(invoice_id): invoice = _invoice_or_404(invoice_id) if InvoiceService.invoice_locked(invoice): flash('Ta faktura została już wysłana do KSeF i nie można jej edytować.', 'warning') return redirect(url_for('invoices.detail', invoice_id=invoice.id)) company = _company() form = NfzInvoiceForm() _prepare_form(form, company) if request.method == 'GET': _hydrate_form_from_invoice(form, invoice) created_customer_id = request.args.get('created_customer_id', type=int) created_product_id = request.args.get('created_product_id', type=int) customers = Customer.query.filter_by(company_id=company.id, is_active=True).order_by(Customer.name).all() products = Product.query.filter_by(company_id=company.id, is_active=True).order_by(Product.name).all() if created_customer_id and any(c.id == created_customer_id for c in customers): form.customer_id.data = created_customer_id if created_product_id and any(p.id == created_product_id for p in products): selected = next((p for p in products if p.id == created_product_id), None) form.product_id.data = created_product_id if selected: form.unit_net.data = selected.net_price if form.validate_on_submit(): if form.settlement_to.data < form.settlement_from.data: flash('Data końcowa okresu rozliczeniowego nie może być wcześniejsza niż data początkowa.', 'warning') return render_template('nfz/index.html', form=form, drafts=[], company=company, spec_fields=spec_fields(), editing_invoice=invoice) invoice, result = _save_invoice_from_form(invoice, form, company, send_to_ksef=bool(form.submit.data)) db.session.commit() if result: flash(result.get('message', 'Zapisano i wysłano fakturę NFZ do KSeF.'), 'success') else: flash('Zapisano zmiany w fakturze NFZ.', 'success') return redirect(url_for('invoices.detail', invoice_id=invoice.id)) return render_template('nfz/index.html', form=form, drafts=[], company=company, spec_fields=spec_fields(), editing_invoice=invoice) @bp.post('//send-to-ksef') @login_required def send_to_ksef(invoice_id): invoice = _invoice_or_404(invoice_id) if InvoiceService.invoice_locked(invoice): flash('Ta faktura została już wysłana do KSeF i jest zablokowana do edycji.', 'warning') return redirect(url_for('invoices.detail', invoice_id=invoice.id)) payload = InvoiceService().build_ksef_payload(invoice) result = KSeFService(company_id=invoice.company_id).issue_invoice(payload) invoice.ksef_number = result.get('ksef_number', invoice.ksef_number) invoice.issued_status = result.get('status', 'issued') invoice.issued_to_ksef_at = InvoiceService().utcnow() invoice.external_metadata = dict(invoice.external_metadata or {}, ksef_send=result) InvoiceService().persist_issued_assets(invoice, xml_content=result.get('xml_content') or payload['xml_content']) db.session.commit() flash(result.get('message', 'Wysłano fakturę NFZ do KSeF.'), 'success') return redirect(url_for('invoices.detail', invoice_id=invoice.id)) def spec_fields(): return [ ('IDWew', 'Identyfikator OW NFZ w Podmiot3/DaneIdentyfikacyjne.'), ('P_6_Od / P_6_Do', 'Zakres dat okresu rozliczeniowego od i do.'), ('identyfikator-szablonu', 'Id szablonu z komunikatu R_UMX, gdy jest wymagany.'), ('identyfikator-swiadczeniodawcy', 'Kod świadczeniodawcy nadany w OW NFZ.'), ('Indeks', 'Kod zakresu świadczeń / wyróżnik / kod świadczenia.'), ('NrUmowy', 'Numer umowy NFZ, a dla aneksu także numer aneksu ugodowego.'), ]