725 lines
33 KiB
Python
725 lines
33 KiB
Python
from __future__ import annotations
|
|
from io import BytesIO
|
|
import csv
|
|
import zipfile
|
|
from decimal import Decimal
|
|
|
|
from sqlalchemy import or_
|
|
|
|
from flask import Blueprint, Response, abort, flash, redirect, render_template, request, send_file, url_for
|
|
from flask_login import login_required
|
|
|
|
from app.extensions import db
|
|
from app.forms.invoices import InvoiceFilterForm, InvoiceMetaForm
|
|
from app.forms.issued import IssuedInvoiceForm
|
|
from app.models.catalog import Customer, InvoiceLine, Product
|
|
from app.models.invoice import Invoice, InvoiceStatus, InvoiceType
|
|
from app.repositories.invoice_repository import InvoiceRepository
|
|
from app.services.audit_service import AuditService
|
|
from app.services.ceidg_service import CeidgService
|
|
from app.services.company_service import CompanyService
|
|
from app.services.invoice_service import InvoiceService
|
|
from app.services.ksef_service import KSeFService
|
|
from app.services.mail_service import MailService
|
|
from app.services.pdf_service import PdfService
|
|
from app.services.settings_service import SettingsService
|
|
|
|
bp = Blueprint('invoices', __name__, url_prefix='/invoices')
|
|
|
|
|
|
def _company():
|
|
return CompanyService.get_current_company()
|
|
|
|
|
|
def _require_company(redirect_endpoint='dashboard.index'):
|
|
company = _company()
|
|
if company:
|
|
return company
|
|
flash('Najpierw wybierz lub utwórz firmę, aby korzystać z tej sekcji.', 'warning')
|
|
return redirect(url_for(redirect_endpoint))
|
|
|
|
|
|
def _invoice_or_404(invoice_id):
|
|
company = _company()
|
|
invoice = db.session.get(Invoice, invoice_id)
|
|
if not invoice or (company and invoice.company_id != company.id):
|
|
abort(404)
|
|
return invoice
|
|
|
|
|
|
def _ensure_full_access(company_id):
|
|
if SettingsService.read_only_enabled(company_id=company_id):
|
|
flash('Tryb tylko do odczytu jest aktywny dla tej firmy.', 'warning')
|
|
return False
|
|
return True
|
|
|
|
|
|
def _redirect_with_prefill(endpoint, *, customer_id=None, product_id=None, **kwargs):
|
|
if customer_id:
|
|
kwargs['created_customer_id'] = customer_id
|
|
if product_id:
|
|
kwargs['created_product_id'] = product_id
|
|
return redirect(url_for(endpoint, **kwargs))
|
|
|
|
|
|
def _customer_from_invoice(invoice, company_id):
|
|
existing = None
|
|
if invoice.customer_id:
|
|
existing = db.session.get(Customer, invoice.customer_id)
|
|
if existing and existing.company_id == company_id:
|
|
return existing, False
|
|
|
|
query = Customer.query.filter_by(company_id=company_id)
|
|
if invoice.contractor_nip:
|
|
existing = query.filter(Customer.tax_id == invoice.contractor_nip).first()
|
|
if not existing:
|
|
existing = query.filter(Customer.name == invoice.contractor_name).first()
|
|
|
|
created = existing is None
|
|
customer = existing or Customer(company_id=company_id)
|
|
customer.name = customer.name or (invoice.contractor_name or '').strip()
|
|
customer.tax_id = customer.tax_id or (invoice.contractor_nip or '').strip()
|
|
customer.address = customer.address or (invoice.contractor_address or '').strip()
|
|
|
|
if customer.tax_id and (not customer.address or not customer.regon):
|
|
lookup = CeidgService().fetch_company(customer.tax_id)
|
|
if lookup.get('ok'):
|
|
customer.name = customer.name or (lookup.get('name') or '').strip()
|
|
customer.tax_id = customer.tax_id or (lookup.get('tax_id') or '').strip()
|
|
customer.address = customer.address or (lookup.get('address') or '').strip()
|
|
customer.regon = customer.regon or (lookup.get('regon') or '').strip()
|
|
|
|
db.session.add(customer)
|
|
db.session.flush()
|
|
if invoice.customer_id != customer.id:
|
|
invoice.customer_id = customer.id
|
|
return customer, created
|
|
|
|
|
|
@bp.route('/')
|
|
@login_required
|
|
def index():
|
|
form = InvoiceFilterForm(request.args)
|
|
company = _company()
|
|
query = InvoiceRepository().query_filtered(request.args, company_id=company.id if company else None)
|
|
page = request.args.get('page', 1, type=int)
|
|
pagination = query.paginate(page=page, per_page=15, error_out=False)
|
|
payment_details_map = {invoice.id: InvoiceService().resolve_payment_details(invoice) for invoice in pagination.items}
|
|
return render_template('invoices/index.html', form=form, pagination=pagination, company=company, payment_details_map=payment_details_map)
|
|
|
|
|
|
@bp.route('/monthly')
|
|
@login_required
|
|
def monthly():
|
|
company = _company()
|
|
period = request.args.get('period', 'month')
|
|
if period not in {'year', 'quarter', 'month'}:
|
|
period = 'month'
|
|
search = (request.args.get('q') or '').strip()
|
|
service = InvoiceService()
|
|
groups = service.grouped_summary(company_id=company.id if company else None, period=period, search=search)
|
|
comparisons = service.comparative_stats(company_id=company.id if company else None, search=search)
|
|
return render_template(
|
|
'invoices/monthly.html',
|
|
groups=groups,
|
|
comparisons=comparisons,
|
|
company=company,
|
|
period=period,
|
|
period_title=service.period_title(period),
|
|
search=search
|
|
)
|
|
|
|
|
|
@bp.route('/issued')
|
|
@login_required
|
|
def issued_list():
|
|
company = _company()
|
|
search = (request.args.get('q') or '').strip()
|
|
page = request.args.get('page', 1, type=int)
|
|
query = Invoice.query.filter(Invoice.company_id == (company.id if company else None), Invoice.source.in_(['issued', 'nfz']))
|
|
if search:
|
|
like = f'%{search}%'
|
|
query = query.filter(or_(
|
|
Invoice.invoice_number.ilike(like),
|
|
Invoice.ksef_number.ilike(like),
|
|
Invoice.contractor_name.ilike(like),
|
|
Invoice.contractor_nip.ilike(like)
|
|
))
|
|
pagination = query.order_by(Invoice.created_at.desc()).paginate(page=page, per_page=15, error_out=False)
|
|
payment_details_map = {invoice.id: InvoiceService().resolve_payment_details(invoice) for invoice in pagination.items}
|
|
return render_template('invoices/issued_list.html', pagination=pagination, invoices=pagination.items, search=search, payment_details_map=payment_details_map)
|
|
|
|
|
|
@bp.route('/issued/new', methods=['GET', 'POST'])
|
|
@login_required
|
|
def issued_new():
|
|
company = _company()
|
|
if not company:
|
|
return _require_company()
|
|
form = IssuedInvoiceForm(numbering_template='monthly')
|
|
customers = Customer.query.filter_by(company_id=company.id, is_active=True).order_by(Customer.name).all()
|
|
products = Product.query.filter_by(company_id=company.id, is_active=True).order_by(Product.name).all()
|
|
form.customer_id.choices = [(c.id, f'{c.name} ({c.tax_id})' if c.tax_id else c.name) for c in customers]
|
|
form.product_id.choices = [(p.id, f'{p.name} - {p.net_price} PLN') for p in products]
|
|
if request.method == 'GET':
|
|
duplicate_id = request.args.get('duplicate_id', type=int)
|
|
if duplicate_id:
|
|
src = _invoice_or_404(duplicate_id)
|
|
form.invoice_number.data = f'{src.invoice_number}/COPY'
|
|
form.numbering_template.data = 'custom'
|
|
if src.customer_id:
|
|
form.customer_id.data = src.customer_id
|
|
first_line = src.lines.first()
|
|
if first_line and first_line.product_id:
|
|
form.product_id.data = first_line.product_id
|
|
form.quantity.data = first_line.quantity
|
|
form.unit_net.data = first_line.unit_net
|
|
form.split_payment.data = bool(getattr(src, 'split_payment', False))
|
|
else:
|
|
created_customer_id = request.args.get('created_customer_id', type=int)
|
|
created_product_id = request.args.get('created_product_id', type=int)
|
|
if created_customer_id and any(c.id == created_customer_id for c in customers):
|
|
form.customer_id.data = created_customer_id
|
|
elif customers:
|
|
form.customer_id.data = customers[0].id
|
|
if created_product_id and any(p.id == created_product_id for p in products):
|
|
form.product_id.data = created_product_id
|
|
elif products:
|
|
form.product_id.data = products[0].id
|
|
if products and form.product_id.data and not form.unit_net.data:
|
|
selected = next((p for p in products if p.id == form.product_id.data), None)
|
|
if selected:
|
|
form.unit_net.data = selected.net_price
|
|
form.split_payment.data = bool(selected.split_payment_default)
|
|
if customers and products and not form.invoice_number.data:
|
|
form.invoice_number.data = InvoiceService().next_sale_number(company.id, form.numbering_template.data or 'monthly')
|
|
if form.validate_on_submit():
|
|
if not _ensure_full_access(company.id):
|
|
return redirect(url_for('invoices.issued_list'))
|
|
customer = db.session.get(Customer, form.customer_id.data)
|
|
product = db.session.get(Product, form.product_id.data)
|
|
qty = Decimal(str(form.quantity.data or 1))
|
|
unit_net = Decimal(str(form.unit_net.data or product.net_price))
|
|
net = (qty * unit_net).quantize(Decimal('0.01'))
|
|
vat = (net * (Decimal(str(product.vat_rate)) / Decimal('100'))).quantize(Decimal('0.01'))
|
|
gross = net + vat
|
|
split_payment = bool(form.split_payment.data or product.split_payment_default or gross > Decimal('15000'))
|
|
number = form.invoice_number.data or InvoiceService().next_sale_number(company.id, form.numbering_template.data)
|
|
send_to_ksef_now = bool(form.submit.data)
|
|
invoice = Invoice(
|
|
company_id=company.id,
|
|
customer_id=customer.id,
|
|
ksef_number=f'PENDING/{number}',
|
|
invoice_number=number,
|
|
contractor_name=customer.name,
|
|
contractor_nip=customer.tax_id,
|
|
issue_date=InvoiceService().today_date(),
|
|
received_date=InvoiceService().today_date(),
|
|
net_amount=net,
|
|
vat_amount=vat,
|
|
gross_amount=gross,
|
|
seller_bank_account=(company.bank_account or '').strip(),
|
|
split_payment=split_payment,
|
|
invoice_type=InvoiceType.SALE,
|
|
status=InvoiceStatus.NEW,
|
|
source='issued',
|
|
issued_status='draft' if not send_to_ksef_now else 'pending',
|
|
html_preview='',
|
|
external_metadata={'split_payment': split_payment, 'seller_bank_account': (company.bank_account or '').strip()},
|
|
)
|
|
db.session.add(invoice)
|
|
db.session.flush()
|
|
db.session.add(InvoiceLine(
|
|
invoice_id=invoice.id,
|
|
product_id=product.id,
|
|
description=product.name,
|
|
quantity=qty,
|
|
unit=product.unit,
|
|
unit_net=unit_net,
|
|
vat_rate=product.vat_rate,
|
|
net_amount=net,
|
|
vat_amount=vat,
|
|
gross_amount=gross,
|
|
))
|
|
InvoiceService().persist_issued_assets(invoice)
|
|
if send_to_ksef_now:
|
|
payload = {
|
|
'invoiceNumber': number,
|
|
'customer': {'name': customer.name, 'taxId': customer.tax_id},
|
|
'lines': [{'name': product.name, 'qty': float(qty), 'unitNet': float(unit_net), 'vatRate': float(product.vat_rate)}],
|
|
'metadata': {'split_payment': bool(invoice.split_payment)},
|
|
}
|
|
result = KSeFService(company_id=company.id).issue_invoice(payload)
|
|
invoice.ksef_number = result.get('ksef_number', invoice.ksef_number)
|
|
invoice.issued_status = result.get('status', 'issued')
|
|
invoice.issued_to_ksef_at = InvoiceService().utcnow()
|
|
invoice.external_metadata = dict(invoice.external_metadata or {}, ksef_send=result)
|
|
flash(result.get('message', 'Wysłano fakturę do KSeF.'), 'success')
|
|
AuditService().log('send_invoice_to_ksef', 'invoice', invoice.id, invoice.ksef_number)
|
|
else:
|
|
flash('Wygenerowano fakturę roboczą. Możesz ją jeszcze poprawić przed wysyłką do KSeF.', 'success')
|
|
AuditService().log('draft_invoice', 'invoice', invoice.id, invoice.invoice_number)
|
|
db.session.commit()
|
|
return redirect(url_for('invoices.detail', invoice_id=invoice.id))
|
|
preview_number = form.invoice_number.data or InvoiceService().next_sale_number(company.id, form.numbering_template.data or 'monthly') if customers and products else ''
|
|
return render_template('invoices/issued_form.html', form=form, customers=customers, products=products, preview_number=preview_number)
|
|
|
|
|
|
@bp.post('/issued/<int:invoice_id>/send-to-ksef')
|
|
@login_required
|
|
def send_to_ksef(invoice_id):
|
|
invoice = _invoice_or_404(invoice_id)
|
|
if invoice.source != 'issued':
|
|
abort(400)
|
|
if not _ensure_full_access(invoice.company_id):
|
|
return redirect(url_for('invoices.detail', invoice_id=invoice.id))
|
|
if InvoiceService.invoice_locked(invoice):
|
|
flash('Ta faktura została już wysłana do KSeF i jest zablokowana do edycji.', 'warning')
|
|
return redirect(url_for('invoices.detail', invoice_id=invoice.id))
|
|
if invoice.gross_amount > Decimal('15000'):
|
|
invoice.split_payment = True
|
|
invoice.external_metadata = dict(invoice.external_metadata or {}, split_payment=True)
|
|
first_line = invoice.lines.first()
|
|
payload = {
|
|
'invoiceNumber': invoice.invoice_number,
|
|
'customer': {'name': invoice.contractor_name, 'taxId': invoice.contractor_nip},
|
|
'metadata': {'split_payment': bool(invoice.split_payment)},
|
|
'lines': [{
|
|
'name': first_line.description if first_line else invoice.invoice_number,
|
|
'qty': float(first_line.quantity if first_line else 1),
|
|
'unitNet': float(first_line.unit_net if first_line else invoice.net_amount),
|
|
'vatRate': float(first_line.vat_rate if first_line else 23),
|
|
}],
|
|
}
|
|
result = KSeFService(company_id=invoice.company_id).issue_invoice(payload)
|
|
invoice.ksef_number = result.get('ksef_number', invoice.ksef_number)
|
|
invoice.issued_status = result.get('status', 'issued')
|
|
invoice.issued_to_ksef_at = InvoiceService().utcnow()
|
|
invoice.external_metadata = dict(invoice.external_metadata or {}, ksef_send=result)
|
|
InvoiceService().persist_issued_assets(invoice)
|
|
db.session.commit()
|
|
flash(result.get('message', 'Wysłano fakturę do KSeF.'), 'success')
|
|
return redirect(url_for('invoices.detail', invoice_id=invoice.id))
|
|
|
|
|
|
@bp.route('/customers', methods=['GET', 'POST'])
|
|
@login_required
|
|
def customers():
|
|
company = _company()
|
|
if not company:
|
|
return _require_company()
|
|
customer_id = request.args.get('customer_id', type=int)
|
|
editing = db.session.get(Customer, customer_id) if customer_id else None
|
|
if editing and editing.company_id != company.id:
|
|
editing = None
|
|
if request.method == 'POST' and request.form.get('fetch_ceidg') and _ensure_full_access(company.id):
|
|
lookup = CeidgService().fetch_company(request.form.get('tax_id'))
|
|
if lookup.get('ok'):
|
|
editing = editing or Customer(company_id=company.id)
|
|
editing.name = lookup.get('name') or editing.name
|
|
editing.tax_id = lookup.get('tax_id') or editing.tax_id
|
|
editing.address = lookup.get('address') or editing.address
|
|
editing.regon = lookup.get('regon') or editing.regon
|
|
flash('Pobrano dane klienta z rejestru przedsiębiorców.', 'success')
|
|
else:
|
|
flash(lookup.get('message', 'Nie udało się pobrać danych z CEIDG.'), 'warning')
|
|
elif request.method == 'POST' and _ensure_full_access(company.id):
|
|
target = editing or Customer(company_id=company.id)
|
|
target.name = request.form.get('name', '').strip()
|
|
target.tax_id = request.form.get('tax_id', '').strip()
|
|
target.email = request.form.get('email', '').strip()
|
|
target.address = request.form.get('address', '').strip()
|
|
target.regon = request.form.get('regon', '').strip()
|
|
db.session.add(target)
|
|
db.session.commit()
|
|
flash('Zapisano klienta.', 'success')
|
|
return redirect(url_for('invoices.customers'))
|
|
search = (request.args.get('q') or '').strip()
|
|
page = request.args.get('page', 1, type=int)
|
|
sort = (request.args.get('sort') or 'name_asc').strip()
|
|
query = Customer.query.filter_by(company_id=company.id)
|
|
if search:
|
|
like = f'%{search}%'
|
|
query = query.filter(or_(
|
|
Customer.name.ilike(like),
|
|
Customer.tax_id.ilike(like),
|
|
Customer.regon.ilike(like),
|
|
Customer.email.ilike(like),
|
|
Customer.address.ilike(like)
|
|
))
|
|
sort_map = {
|
|
'name_asc': Customer.name.asc(),
|
|
'name_desc': Customer.name.desc(),
|
|
'tax_id_asc': Customer.tax_id.asc(),
|
|
'tax_id_desc': Customer.tax_id.desc(),
|
|
}
|
|
pagination = query.order_by(sort_map.get(sort, Customer.name.asc()), Customer.id.asc()).paginate(page=page, per_page=15, error_out=False)
|
|
return render_template('invoices/customers.html', items=pagination.items, pagination=pagination, editing=editing, search=search, sort=sort)
|
|
|
|
|
|
@bp.route('/products', methods=['GET', 'POST'])
|
|
@login_required
|
|
def products():
|
|
company = _company()
|
|
if not company:
|
|
return _require_company()
|
|
product_id = request.args.get('product_id', type=int)
|
|
editing = db.session.get(Product, product_id) if product_id else None
|
|
if editing and editing.company_id != company.id:
|
|
editing = None
|
|
if request.method == 'POST' and _ensure_full_access(company.id):
|
|
target = editing or Product(company_id=company.id)
|
|
target.name = request.form.get('name', '').strip()
|
|
target.sku = request.form.get('sku', '').strip()
|
|
target.unit = request.form.get('unit', 'szt.').strip()
|
|
target.net_price = Decimal(request.form.get('net_price', '0') or '0')
|
|
target.vat_rate = Decimal(request.form.get('vat_rate', '23') or '23')
|
|
target.split_payment_default = bool(request.form.get('split_payment_default'))
|
|
db.session.add(target)
|
|
db.session.commit()
|
|
flash('Zapisano towar/usługę.', 'success')
|
|
return redirect(url_for('invoices.products'))
|
|
search = (request.args.get('q') or '').strip()
|
|
page = request.args.get('page', 1, type=int)
|
|
sort = (request.args.get('sort') or 'name_asc').strip()
|
|
query = Product.query.filter_by(company_id=company.id)
|
|
if search:
|
|
like = f'%{search}%'
|
|
query = query.filter(or_(Product.name.ilike(like), Product.sku.ilike(like), Product.unit.ilike(like)))
|
|
sort_map = {
|
|
'name_asc': Product.name.asc(),
|
|
'name_desc': Product.name.desc(),
|
|
'price_asc': Product.net_price.asc(),
|
|
'price_desc': Product.net_price.desc(),
|
|
'sku_asc': Product.sku.asc(),
|
|
'sku_desc': Product.sku.desc(),
|
|
}
|
|
pagination = query.order_by(sort_map.get(sort, Product.name.asc()), Product.id.asc()).paginate(page=page, per_page=15, error_out=False)
|
|
return render_template('invoices/products.html', items=pagination.items, pagination=pagination, editing=editing, search=search, sort=sort)
|
|
|
|
|
|
@bp.post('/customers/quick-create')
|
|
@login_required
|
|
def quick_create_customer():
|
|
company = _company()
|
|
if not company:
|
|
return _require_company()
|
|
if not _ensure_full_access(company.id):
|
|
return redirect(request.form.get('return_to') or url_for('invoices.issued_new'))
|
|
name = (request.form.get('name') or '').strip()
|
|
if not name:
|
|
flash('Podaj nazwę klienta.', 'warning')
|
|
return redirect(request.form.get('return_to') or url_for('invoices.issued_new'))
|
|
customer = Customer(
|
|
company_id=company.id,
|
|
name=name,
|
|
tax_id=(request.form.get('tax_id') or '').strip(),
|
|
email=(request.form.get('email') or '').strip(),
|
|
address=(request.form.get('address') or '').strip(),
|
|
regon=(request.form.get('regon') or '').strip()
|
|
)
|
|
db.session.add(customer)
|
|
db.session.commit()
|
|
flash('Dodano klienta i ustawiono go w formularzu.', 'success')
|
|
target = request.form.get('return_endpoint') or 'invoices.issued_new'
|
|
target_kwargs = {}
|
|
invoice_id = request.form.get('invoice_id', type=int)
|
|
if invoice_id and target in {'invoices.issued_edit', 'nfz.edit'}:
|
|
target_kwargs['invoice_id'] = invoice_id
|
|
return _redirect_with_prefill(target, customer_id=customer.id, **target_kwargs)
|
|
|
|
|
|
@bp.post('/products/quick-create')
|
|
@login_required
|
|
def quick_create_product():
|
|
company = _company()
|
|
if not company:
|
|
return _require_company()
|
|
if not _ensure_full_access(company.id):
|
|
return redirect(request.form.get('return_to') or url_for('invoices.issued_new'))
|
|
name = (request.form.get('name') or '').strip()
|
|
if not name:
|
|
flash('Podaj nazwę towaru lub usługi.', 'warning')
|
|
return redirect(request.form.get('return_to') or url_for('invoices.issued_new'))
|
|
product = Product(
|
|
company_id=company.id,
|
|
name=name,
|
|
sku=(request.form.get('sku') or '').strip(),
|
|
unit=(request.form.get('unit') or 'szt.').strip(),
|
|
net_price=Decimal(request.form.get('net_price', '0') or '0'),
|
|
vat_rate=Decimal(request.form.get('vat_rate', '23') or '23'),
|
|
split_payment_default=bool(request.form.get('split_payment_default'))
|
|
)
|
|
db.session.add(product)
|
|
db.session.commit()
|
|
flash('Dodano towar/usługę i ustawiono w formularzu.', 'success')
|
|
target = request.form.get('return_endpoint') or 'invoices.issued_new'
|
|
target_kwargs = {}
|
|
invoice_id = request.form.get('invoice_id', type=int)
|
|
if invoice_id and target in {'invoices.issued_edit', 'nfz.edit'}:
|
|
target_kwargs['invoice_id'] = invoice_id
|
|
return _redirect_with_prefill(target, product_id=product.id, **target_kwargs)
|
|
|
|
|
|
@bp.route('/issued/<int:invoice_id>/edit', methods=['GET', 'POST'])
|
|
@login_required
|
|
def issued_edit(invoice_id):
|
|
invoice = _invoice_or_404(invoice_id)
|
|
if invoice.source != 'issued':
|
|
abort(400)
|
|
if InvoiceService.invoice_locked(invoice):
|
|
flash('Ta faktura została już wysłana do KSeF i nie można jej edytować.', 'warning')
|
|
return redirect(url_for('invoices.detail', invoice_id=invoice.id))
|
|
company = invoice.company
|
|
if not company or not _ensure_full_access(company.id):
|
|
return redirect(url_for('invoices.detail', invoice_id=invoice.id))
|
|
form = IssuedInvoiceForm(numbering_template='custom')
|
|
customers = Customer.query.filter_by(company_id=company.id, is_active=True).order_by(Customer.name).all()
|
|
products = Product.query.filter_by(company_id=company.id, is_active=True).order_by(Product.name).all()
|
|
form.customer_id.choices = [(c.id, f'{c.name} ({c.tax_id})' if c.tax_id else c.name) for c in customers]
|
|
form.product_id.choices = [(p.id, f'{p.name} - {p.net_price} PLN') for p in products]
|
|
line = invoice.lines.first()
|
|
if request.method == 'GET':
|
|
created_customer_id = request.args.get('created_customer_id', type=int)
|
|
created_product_id = request.args.get('created_product_id', type=int)
|
|
form.customer_id.data = created_customer_id if created_customer_id and any(c.id == created_customer_id for c in customers) else (invoice.customer_id or (customers[0].id if customers else None))
|
|
form.invoice_number.data = invoice.invoice_number
|
|
form.product_id.data = created_product_id if created_product_id and any(p.id == created_product_id for p in products) else (line.product_id if line and line.product_id else (products[0].id if products else None))
|
|
form.quantity.data = line.quantity if line else 1
|
|
form.unit_net.data = line.unit_net if line else invoice.net_amount
|
|
form.split_payment.data = bool(invoice.split_payment or (form.product_id.data and next((p.split_payment_default for p in products if p.id == form.product_id.data), False)))
|
|
if created_product_id:
|
|
selected = next((p for p in products if p.id == created_product_id), None)
|
|
if selected:
|
|
form.unit_net.data = selected.net_price
|
|
if form.validate_on_submit():
|
|
customer = db.session.get(Customer, form.customer_id.data)
|
|
product = db.session.get(Product, form.product_id.data)
|
|
qty = Decimal(str(form.quantity.data or 1))
|
|
unit_net = Decimal(str(form.unit_net.data or product.net_price))
|
|
net = (qty * unit_net).quantize(Decimal('0.01'))
|
|
vat = (net * (Decimal(str(product.vat_rate)) / Decimal('100'))).quantize(Decimal('0.01'))
|
|
gross = net + vat
|
|
invoice.split_payment = bool(form.split_payment.data or product.split_payment_default or gross > Decimal('15000'))
|
|
invoice.customer_id = customer.id
|
|
invoice.invoice_number = form.invoice_number.data or invoice.invoice_number
|
|
invoice.contractor_name = customer.name
|
|
invoice.contractor_nip = customer.tax_id
|
|
invoice.net_amount = net
|
|
invoice.vat_amount = vat
|
|
invoice.gross_amount = gross
|
|
invoice.seller_bank_account = (company.bank_account or '').strip()
|
|
invoice.ksef_number = f'PENDING/{invoice.invoice_number}'
|
|
if not line:
|
|
line = InvoiceLine(invoice_id=invoice.id)
|
|
db.session.add(line)
|
|
line.product_id = product.id
|
|
line.description = product.name
|
|
line.quantity = qty
|
|
line.unit = product.unit
|
|
line.unit_net = unit_net
|
|
line.vat_rate = product.vat_rate
|
|
line.net_amount = net
|
|
line.vat_amount = vat
|
|
line.gross_amount = gross
|
|
InvoiceService().persist_issued_assets(invoice)
|
|
invoice.external_metadata = dict(invoice.external_metadata or {}, split_payment=invoice.split_payment, seller_bank_account=invoice.seller_bank_account)
|
|
invoice.issued_status = 'draft'
|
|
db.session.commit()
|
|
flash('Zapisano zmiany w fakturze roboczej.', 'success')
|
|
return redirect(url_for('invoices.detail', invoice_id=invoice.id))
|
|
preview_number = form.invoice_number.data or invoice.invoice_number
|
|
return render_template(
|
|
'invoices/issued_form.html',
|
|
form=form,
|
|
customers=customers,
|
|
products=products,
|
|
preview_number=preview_number,
|
|
editing_invoice=invoice
|
|
)
|
|
|
|
|
|
@bp.route('/<int:invoice_id>', methods=['GET', 'POST'])
|
|
@login_required
|
|
def detail(invoice_id):
|
|
invoice = _invoice_or_404(invoice_id)
|
|
service = InvoiceService()
|
|
service.mark_read(invoice)
|
|
locked = read_only = SettingsService.read_only_enabled(company_id=invoice.company_id) or InvoiceService.invoice_locked(invoice)
|
|
form = InvoiceMetaForm(
|
|
status=invoice.status.value,
|
|
tags=', '.join([t.name for t in invoice.tags]),
|
|
internal_note=invoice.internal_note,
|
|
queue_accounting=invoice.queue_accounting,
|
|
pinned=invoice.pinned
|
|
)
|
|
if form.validate_on_submit() and not locked:
|
|
service.update_metadata(invoice, form)
|
|
AuditService().log('update', 'invoice', invoice.id, 'Updated metadata')
|
|
flash('Zapisano zmiany.', 'success')
|
|
return redirect(url_for('invoices.detail', invoice_id=invoice.id))
|
|
|
|
xml_content = ''
|
|
if invoice.xml_path:
|
|
from pathlib import Path as _Path
|
|
path = _Path(invoice.xml_path)
|
|
if path.exists():
|
|
xml_content = path.read_text(encoding='utf-8')
|
|
|
|
invoice.html_preview = service.render_invoice_html(invoice)
|
|
linked_customer = db.session.get(Customer, invoice.customer_id) if invoice.customer_id else None
|
|
can_add_seller_customer = bool(invoice.contractor_name) and invoice.source not in ['issued', 'nfz']
|
|
|
|
if invoice.source in ['issued', 'nfz']:
|
|
if not xml_content or (invoice.source == 'issued' and not invoice.xml_path):
|
|
xml_content = service.persist_issued_assets(invoice)
|
|
else:
|
|
PdfService().render_invoice_pdf(invoice)
|
|
else:
|
|
if invoice.xml_path:
|
|
PdfService().render_invoice_pdf(invoice)
|
|
|
|
db.session.commit()
|
|
payment_details = service.resolve_payment_details(invoice)
|
|
return render_template(
|
|
'invoices/detail.html',
|
|
invoice=invoice,
|
|
form=form,
|
|
xml_content=xml_content,
|
|
edit_locked=locked,
|
|
linked_customer=linked_customer,
|
|
can_add_seller_customer=can_add_seller_customer,
|
|
payment_details=payment_details,
|
|
)
|
|
|
|
|
|
@bp.route('/<int:invoice_id>/duplicate')
|
|
@login_required
|
|
def duplicate(invoice_id):
|
|
invoice = _invoice_or_404(invoice_id)
|
|
if invoice.source == 'nfz':
|
|
return redirect(url_for('nfz.index', duplicate_id=invoice_id))
|
|
return redirect(url_for('invoices.issued_new', duplicate_id=invoice_id))
|
|
|
|
|
|
@bp.route('/<int:invoice_id>/pdf')
|
|
@login_required
|
|
def pdf(invoice_id):
|
|
invoice = _invoice_or_404(invoice_id)
|
|
pdf_bytes, _ = PdfService().render_invoice_pdf(invoice)
|
|
return send_file(BytesIO(pdf_bytes), download_name=f'{invoice.invoice_number}.pdf', mimetype='application/pdf')
|
|
|
|
|
|
@bp.route('/<int:invoice_id>/send', methods=['POST'])
|
|
@login_required
|
|
def send(invoice_id):
|
|
invoice = _invoice_or_404(invoice_id)
|
|
if not _ensure_full_access(invoice.company_id):
|
|
return redirect(url_for('invoices.detail', invoice_id=invoice.id))
|
|
|
|
recipient = (request.form.get('recipient') or '').strip()
|
|
if not recipient:
|
|
flash('Brak adresu e-mail odbiorcy.', 'warning')
|
|
return redirect(url_for('invoices.detail', invoice_id=invoice.id))
|
|
|
|
delivery = MailService(company_id=invoice.company_id).send_invoice(invoice, recipient)
|
|
|
|
if delivery.status == 'sent':
|
|
flash(f'Fakturę wysłano na adres: {delivery.recipient}.', 'success')
|
|
else:
|
|
error_message = (delivery.error_message or 'Nieznany błąd SMTP').strip()
|
|
flash(
|
|
f'Nie udało się wysłać faktury na adres: {delivery.recipient}. Błąd: {error_message}',
|
|
'danger'
|
|
)
|
|
|
|
return redirect(url_for('invoices.detail', invoice_id=invoice.id))
|
|
|
|
|
|
@bp.route('/export/csv')
|
|
@login_required
|
|
def export_csv():
|
|
company = _company()
|
|
rows = InvoiceRepository().query_filtered(request.args, company_id=company.id if company else None).all()
|
|
import io
|
|
sio = io.StringIO()
|
|
writer = csv.writer(sio)
|
|
writer.writerow(['Firma', 'Numer', 'KSeF', 'Kontrahent', 'NIP', 'Data', 'Netto', 'VAT', 'Brutto', 'Typ', 'Status'])
|
|
for invoice in rows:
|
|
writer.writerow([
|
|
invoice.company.name if invoice.company else '',
|
|
invoice.invoice_number,
|
|
invoice.ksef_number,
|
|
invoice.contractor_name,
|
|
invoice.contractor_nip,
|
|
invoice.issue_date.isoformat(),
|
|
str(invoice.net_amount),
|
|
str(invoice.vat_amount),
|
|
str(invoice.gross_amount),
|
|
invoice.invoice_type.value,
|
|
invoice.status.value
|
|
])
|
|
return Response(sio.getvalue(), mimetype='text/csv', headers={'Content-Disposition': 'attachment; filename=invoices.csv'})
|
|
|
|
|
|
@bp.route('/export/zip')
|
|
@login_required
|
|
def export_zip():
|
|
company = _company()
|
|
rows = InvoiceRepository().query_filtered(request.args, company_id=company.id if company else None).all()
|
|
mem = BytesIO()
|
|
with zipfile.ZipFile(mem, mode='w', compression=zipfile.ZIP_DEFLATED) as zf:
|
|
for invoice in rows:
|
|
if invoice.xml_path:
|
|
from pathlib import Path
|
|
path = Path(invoice.xml_path)
|
|
if path.exists():
|
|
zf.write(path, arcname=path.name)
|
|
mem.seek(0)
|
|
return send_file(mem, download_name='invoices.zip', mimetype='application/zip')
|
|
|
|
|
|
@bp.route('/monthly/<period>/pdf')
|
|
@login_required
|
|
def month_pdf(period):
|
|
company = _company()
|
|
groups = [g for g in InvoiceService().grouped_summary(company_id=company.id if company else None, period='month') if g['key'] == period]
|
|
if not groups:
|
|
return redirect(url_for('invoices.monthly'))
|
|
pdf_bytes = PdfService().month_pdf(groups[0]['entries'], f'Miesiąc {period}')
|
|
return send_file(BytesIO(pdf_bytes), download_name=f'{period}.pdf', mimetype='application/pdf')
|
|
|
|
|
|
@bp.route('/bulk', methods=['POST'])
|
|
@login_required
|
|
def bulk_action():
|
|
company = _company()
|
|
if SettingsService.read_only_enabled(company_id=company.id if company else None):
|
|
flash('Tryb read only jest aktywny.', 'warning')
|
|
return redirect(url_for('invoices.index'))
|
|
ids = request.form.getlist('invoice_ids')
|
|
action = request.form.get('action')
|
|
invoices = Invoice.query.filter(Invoice.company_id == (company.id if company else None), Invoice.id.in_(ids)).all()
|
|
for invoice in invoices:
|
|
if action == 'mark_accounted':
|
|
invoice.status = invoice.status.__class__.ACCOUNTED
|
|
elif action == 'queue_accounting':
|
|
invoice.queue_accounting = True
|
|
db.session.commit()
|
|
flash('Wykonano akcję masową.', 'success')
|
|
return redirect(url_for('invoices.index'))
|
|
|
|
@bp.post('/<int:invoice_id>/add-seller-customer')
|
|
@login_required
|
|
def add_seller_customer(invoice_id):
|
|
invoice = _invoice_or_404(invoice_id)
|
|
if invoice.source in ['issued', 'nfz']:
|
|
flash('Dla faktur sprzedażowych kontrahent jest już obsługiwany w kartotece odbiorców.', 'info')
|
|
return redirect(url_for('invoices.detail', invoice_id=invoice.id))
|
|
if not _ensure_full_access(invoice.company_id):
|
|
return redirect(url_for('invoices.detail', invoice_id=invoice.id))
|
|
|
|
customer, created = _customer_from_invoice(invoice, invoice.company_id)
|
|
db.session.commit()
|
|
flash('Dodano kontrahenta do kartoteki klientów.' if created else 'Powiązano kontrahenta z istniejącą kartoteką klientów.', 'success')
|
|
return redirect(url_for('invoices.customers', customer_id=customer.id)) |