from __future__ import annotations import csv import io import math import os import re from datetime import datetime from pathlib import Path import httpx from io import BytesIO from bs4 import BeautifulSoup from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Response from fastapi.responses import HTMLResponse, StreamingResponse, JSONResponse from fastapi.staticfiles import StaticFiles from reportlab.lib import colors from reportlab.lib.pagesizes import A4, landscape from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle from reportlab.lib.units import cm from reportlab.pdfbase import pdfmetrics from reportlab.pdfbase.ttfonts import TTFont from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Table, TableStyle, PageBreak from reportlab.graphics.shapes import Drawing, Line, PolyLine, Rect, String, Wedge from openpyxl import load_workbook from .models import SimulationRequest from .simulator import simulate BASE_DIR = Path(__file__).resolve().parent EXPORT_DIR = Path(os.environ.get("MORTGAGE_EXPORT_DIR", "./app/exports")) EXPORT_DIR.mkdir(exist_ok=True) FONT_DIR = BASE_DIR / "static" / "fonts" FONT_PATH = FONT_DIR / "DejaVuSans.ttf" FONT_BOLD_PATH = FONT_DIR / "DejaVuSans-Bold.ttf" FONT_OBLIQUE_PATH = FONT_DIR / "DejaVuSans-Oblique.ttf" if FONT_PATH.exists(): pdfmetrics.registerFont(TTFont("AppFont", str(FONT_PATH))) APP_FONT = "AppFont" if FONT_BOLD_PATH.exists(): pdfmetrics.registerFont(TTFont("AppFont-Bold", str(FONT_BOLD_PATH))) APP_FONT_BOLD = "AppFont-Bold" else: APP_FONT_BOLD = APP_FONT if FONT_OBLIQUE_PATH.exists(): pdfmetrics.registerFont(TTFont("AppFont-Oblique", str(FONT_OBLIQUE_PATH))) APP_FONT_OBLIQUE = "AppFont-Oblique" else: APP_FONT_OBLIQUE = APP_FONT else: APP_FONT = "Helvetica" APP_FONT_BOLD = "Helvetica-Bold" APP_FONT_OBLIQUE = "Helvetica-Oblique" app = FastAPI(title="Symulator kredytu hipotecznego", version="1.0.0") app.mount("/static", StaticFiles(directory=BASE_DIR / "static"), name="static") @app.middleware("http") async def no_cache_headers(request, call_next): response = await call_next(request) response.headers["Cache-Control"] = "no-store, no-cache, must-revalidate" return response @app.get("/favicon.ico") def favicon() -> Response: return Response(status_code=204) @app.get("/", response_class=HTMLResponse) def index() -> str: return (BASE_DIR / "static" / "index.html").read_text(encoding="utf-8") @app.post("/api/simulate") def api_simulate(req: SimulationRequest): return simulate(req) @app.websocket("/ws/simulate") async def ws_simulate(websocket: WebSocket): await websocket.accept() try: while True: data = await websocket.receive_json() req = SimulationRequest.model_validate(data) result = simulate(req) await websocket.send_json(result.model_dump()) except WebSocketDisconnect: return except Exception as exc: await websocket.send_json({"error": str(exc)}) @app.get("/api/rate/nbp") async def nbp_rate(): """ Pobiera średnie oprocentowanie nowych kredytów mieszkaniowych PLN z XLSX NBP. Uwaga: to nie jest stopa referencyjna NBP, tylko średnie oprocentowanie kredytów. """ url = "https://static.nbp.pl/dane/statystyka/inne/stopy_proc_pl_srdW.xlsx" try: async with httpx.AsyncClient(timeout=20, follow_redirects=True) as client: resp = await client.get( url, headers={ "User-Agent": "Mozilla/5.0 mortgage-simulator/1.0", "Accept": "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet,*/*", }, ) resp.raise_for_status() wb = load_workbook(BytesIO(resp.content), data_only=True, read_only=True) ws = wb["4 OPN2PLN"] target_row = None for row in ws.iter_rows(): values = [cell.value for cell in row] joined = " ".join(str(v).lower() for v in values if v is not None) if "nieruchomości mieszkaniowe" in joined or "nieruchomosci mieszkaniowe" in joined: target_row = values break if not target_row: return JSONResponse( { "error": "Nie znaleziono wiersza: na nieruchomości mieszkaniowe", "url": url, "sheet": "4 OPN2PLN", }, status_code=502, ) # Daty są w wierszu 2, wartości w znalezionym wierszu. dates = [cell.value for cell in ws[2]] latest_rate = None latest_date = None for idx in range(len(target_row) - 1, -1, -1): value = target_row[idx] if isinstance(value, (int, float)): latest_rate = float(value) latest_date_cell = dates[idx] if idx < len(dates) else None if isinstance(latest_date_cell, datetime): latest_date = latest_date_cell.date().isoformat() elif latest_date_cell: latest_date = str(latest_date_cell) break if latest_rate is None: return JSONResponse( { "error": "Nie znaleziono wartości liczbowej oprocentowania kredytu mieszkaniowego", "url": url, "sheet": "4 OPN2PLN", }, status_code=502, ) # W XLSX wartości są jako ułamek, np. 0.0591 = 5.91%. rate_percent = round(latest_rate * 100, 4) return { "source": "NBP", "kind": "average_new_housing_loan_pln_rate", "label": "Średnie oprocentowanie nowych kredytów mieszkaniowych PLN", "url": url, "sheet": "4 OPN2PLN", "period": latest_date, "rate": rate_percent, "fetched_at": datetime.utcnow().isoformat() + "Z", } except Exception as exc: return JSONResponse( {"error": str(exc), "url": url}, status_code=502, ) @app.post("/api/export/csv") def export_csv(req: SimulationRequest): result = simulate(req) buf = io.StringIO() writer = csv.writer(buf, delimiter=";") writer.writerow(["miesiac", "data_splaty", "dni", "oprocentowanie", "rata", "kapital", "odsetki", "nadplata", "prowizja_nadplaty", "saldo", "karencja", "odsetki_narastajaco", "koszt_narastajaco", "nadplaty_narastajaco"]) for row in result.schedule: writer.writerow([row.month, row.due_date, row.days, row.rate, row.payment, row.principal_part, row.interest_part, row.overpayment, row.overpayment_fee, row.remaining, row.grace_type.value, row.cumulative_interest, row.cumulative_cost, row.cumulative_overpayment]) writer.writerow([]) writer.writerow(["Podsumowanie"]) for key, value in result.summary.model_dump().items(): writer.writerow([key, value]) data = buf.getvalue().encode("utf-8-sig") return StreamingResponse( io.BytesIO(data), media_type="text/csv", headers={"Content-Disposition": "attachment; filename=symulacja-kredytu.csv"}, ) def _money(value: float) -> str: return f"{value:,.2f} PLN".replace(",", " ") def _pct(value: float) -> str: return f"{value:.2f}%" def _repeat_label(value: str) -> str: return {"once": "jednorazowo", "monthly": "co miesiac", "yearly": "co rok"}.get(value, value) def _effect_label(value: str) -> str: return {"shorten": "skrocenie okresu", "lower_payment": "zmniejszenie raty"}.get(value, value) def _installment_label(value: str) -> str: return {"equal": "rowne", "decreasing": "malejace"}.get(value, value) def _add_page_number(canvas_obj, doc): canvas_obj.saveState() canvas_obj.setFont(APP_FONT, 7) canvas_obj.setFillColor(colors.HexColor("#6b7280")) canvas_obj.drawString(1.4 * cm, 0.9 * cm, "Symulator edukacyjny - wyniki orientacyjne") canvas_obj.drawRightString(A4[0] - 1.4 * cm, 0.9 * cm, f"Strona {doc.page}") canvas_obj.restoreState() def _line_chart(rows, value_attr: str, title: str, width=480, height=165) -> Drawing: drawing = Drawing(width, height) margin_l, margin_r, margin_b, margin_t = 42, 12, 24, 24 plot_w = width - margin_l - margin_r plot_h = height - margin_t - margin_b x0, y0 = margin_l, margin_b values = [float(getattr(r, value_attr)) for r in rows] if not values: values = [0] max_v = max(values) or 1 count = max(len(values) - 1, 1) drawing.add(String(0, height - 12, title, fontName=APP_FONT_BOLD, fontSize=9, fillColor=colors.HexColor("#111827"))) drawing.add(Line(x0, y0, x0, y0 + plot_h, strokeColor=colors.HexColor("#9ca3af"), strokeWidth=0.7)) drawing.add(Line(x0, y0, x0 + plot_w, y0, strokeColor=colors.HexColor("#9ca3af"), strokeWidth=0.7)) for i in range(1, 5): gy = y0 + plot_h * i / 4 drawing.add(Line(x0, gy, x0 + plot_w, gy, strokeColor=colors.HexColor("#e5e7eb"), strokeWidth=0.4)) label = _money(max_v * i / 4).replace(" PLN", "") drawing.add(String(2, gy - 3, label, fontName=APP_FONT, fontSize=6, fillColor=colors.HexColor("#6b7280"))) points = [] step = max(1, math.ceil(len(values) / 420)) sampled = values[::step] sampled_count = max(len(sampled) - 1, 1) for idx, value in enumerate(sampled): x = x0 + plot_w * idx / sampled_count y = y0 + plot_h * (value / max_v) points.extend([x, y]) drawing.add(PolyLine(points, strokeColor=colors.HexColor("#2563eb"), strokeWidth=1.4)) drawing.add(String(x0, 6, "1", fontName=APP_FONT, fontSize=6, fillColor=colors.HexColor("#6b7280"))) drawing.add(String(x0 + plot_w - 18, 6, str(len(values)), fontName=APP_FONT, fontSize=6, fillColor=colors.HexColor("#6b7280"))) return drawing def _bar_chart(yearly: dict[int, float], title: str, width=480, height=165) -> Drawing: drawing = Drawing(width, height) margin_l, margin_r, margin_b, margin_t = 42, 12, 24, 24 plot_w = width - margin_l - margin_r plot_h = height - margin_t - margin_b x0, y0 = margin_l, margin_b items = list(yearly.items())[:35] max_v = max([v for _, v in items], default=1) or 1 drawing.add(String(0, height - 12, title, fontName=APP_FONT_BOLD, fontSize=9, fillColor=colors.HexColor("#111827"))) drawing.add(Line(x0, y0, x0, y0 + plot_h, strokeColor=colors.HexColor("#9ca3af"), strokeWidth=0.7)) drawing.add(Line(x0, y0, x0 + plot_w, y0, strokeColor=colors.HexColor("#9ca3af"), strokeWidth=0.7)) for i in range(1, 5): gy = y0 + plot_h * i / 4 drawing.add(Line(x0, gy, x0 + plot_w, gy, strokeColor=colors.HexColor("#e5e7eb"), strokeWidth=0.4)) if items: gap = 2 bar_w = max(3, (plot_w / len(items)) - gap) for idx, (year, value) in enumerate(items): bh = plot_h * value / max_v x = x0 + idx * (plot_w / len(items)) + gap / 2 drawing.add(Rect(x, y0, bar_w, bh, fillColor=colors.HexColor("#64748b"), strokeColor=None)) if len(items) <= 25 or idx % 2 == 0: drawing.add(String(x, 6, str(year), fontName=APP_FONT, fontSize=5.5, fillColor=colors.HexColor("#6b7280"))) return drawing def _pie_chart(parts: list[tuple[str, float, str]], title: str, width=240, height=165) -> Drawing: drawing = Drawing(width, height) drawing.add(String(0, height - 12, title, fontName=APP_FONT_BOLD, fontSize=9, fillColor=colors.HexColor("#111827"))) total = sum(v for _, v, _ in parts) or 1 cx, cy, r = 65, 78, 45 start = 90 for name, value, color in parts: extent = 360 * value / total drawing.add(Wedge(cx, cy, r, start, start + extent, fillColor=colors.HexColor(color), strokeColor=colors.white, strokeWidth=0.5)) start += extent ly = height - 38 for name, value, color in parts: drawing.add(Rect(130, ly - 2, 7, 7, fillColor=colors.HexColor(color), strokeColor=None)) pct = value / total * 100 drawing.add(String(142, ly, f"{name}: {pct:.1f}%", fontName=APP_FONT, fontSize=7, fillColor=colors.HexColor("#111827"))) ly -= 14 return drawing @app.post("/api/export/pdf") def export_pdf(req: SimulationRequest): result = simulate(req) buffer = io.BytesIO() doc = SimpleDocTemplate( buffer, pagesize=A4, rightMargin=1.2 * cm, leftMargin=1.2 * cm, topMargin=1.3 * cm, bottomMargin=1.4 * cm, title="Symulacja kredytu hipotecznego", ) styles = getSampleStyleSheet() for style in styles.byName.values(): style.fontName = APP_FONT styles["Title"].fontName = APP_FONT_BOLD styles["Heading2"].fontName = APP_FONT_BOLD styles.add(ParagraphStyle(name="Small", parent=styles["Normal"], fontSize=8, leading=10, fontName=APP_FONT)) styles.add(ParagraphStyle(name="Tiny", parent=styles["Normal"], fontSize=6.7, leading=8, fontName=APP_FONT)) story = [] story.append(Paragraph("Symulacja kredytu hipotecznego - pelny raport", styles["Title"])) story.append(Paragraph(f"Wygenerowano: {datetime.now().strftime('%Y-%m-%d %H:%M')}", styles["Small"])) story.append(Spacer(1, 0.25 * cm)) params = [ ["Kwota kredytu", _money(req.principal), "Okres", f"{req.term_months or req.years * 12} mies. ({round((req.term_months or req.years * 12) / 12, 1)} lat)"], ["Stopa bazowa", _pct(req.base_rate), "Marza", _pct(req.margin)], ["Oprocentowanie startowe", _pct(req.base_rate + req.margin), "Typ rat", _installment_label(req.installment_type.value)], ["Efekt nadplat", _effect_label(req.overpayment_effect.value), "Liczba rat po symulacji", str(result.summary.months)], ["Data startu", req.loan_start_date.isoformat(), "Dzien splaty", str(req.due_day)], ["Przesuwaj dni wolne", "tak" if req.move_due_date_to_business_day else "nie", "Data konca", result.summary.payoff_date or "-"], ["Okres ochronny nadplat", str(req.overpayment_protection_months or "brak"), "Domyslna prowizja", _pct(req.overpayment_protection_commission_percent)], ] story.append(Paragraph("Parametry wejściowe", styles["Heading2"])) story.append(Table(params, colWidths=[4.9 * cm, 3.4 * cm, 4.4 * cm, 3.9 * cm], style=TableStyle([ ("GRID", (0, 0), (-1, -1), 0.25, colors.HexColor("#cbd5e1")), ("BACKGROUND", (0, 0), (-1, -1), colors.HexColor("#f8fafc")), ("FONTNAME", (0, 0), (-1, -1), APP_FONT), ("FONTNAME", (0, 0), (0, -1), APP_FONT_BOLD), ("FONTNAME", (2, 0), (2, -1), APP_FONT_BOLD), ("FONTSIZE", (0, 0), (-1, -1), 7.4), ("VALIGN", (0, 0), (-1, -1), "TOP"), ]))) story.append(Spacer(1, 0.35 * cm)) s = result.summary summary = [ ["Liczba rat", s.months, "Bazowo", s.baseline_months], ["Suma zaplacona", _money(s.total_paid), "Suma odsetek", _money(s.total_interest)], ["Oszczednosc na odsetkach", _money(s.interest_saved), "Skrocenie okresu", f"{s.months_saved} mies."], ["Suma nadplat", _money(s.total_overpayment), "Prowizje nadplat", _money(s.total_overpayment_fees)], ["Srednia / maks. rata", f"{_money(s.average_payment)} / {_money(s.max_payment)}", "Data splaty", s.payoff_date or "-"], ] story.append(Paragraph("Podsumowanie", styles["Heading2"])) story.append(Table(summary, colWidths=[4.3 * cm, 4.25 * cm, 3.7 * cm, 4.35 * cm], style=TableStyle([ ("GRID", (0, 0), (-1, -1), 0.25, colors.HexColor("#cbd5e1")), ("BACKGROUND", (0, 0), (-1, -1), colors.HexColor("#ffffff")), ("FONTNAME", (0, 0), (-1, -1), APP_FONT), ("FONTNAME", (0, 0), (0, -1), APP_FONT_BOLD), ("FONTNAME", (2, 0), (2, -1), APP_FONT_BOLD), ("FONTSIZE", (0, 0), (-1, -1), 7.0), ("VALIGN", (0, 0), (-1, -1), "TOP"), ]))) story.append(Spacer(1, 0.35 * cm)) story.append(Paragraph("Wykresy", styles["Heading2"])) yearly = {} for row in result.schedule: year = math.ceil(row.month / 12) yearly[year] = yearly.get(year, 0.0) + row.interest_part story.append(_line_chart(result.schedule, "remaining", "Saldo kredytu w czasie")) story.append(Spacer(1, 0.15 * cm)) story.append(_line_chart(result.schedule, "payment", "Rata w czasie")) story.append(Spacer(1, 0.15 * cm)) story.append(Table([[ _bar_chart(yearly, "Odsetki rocznie", width=285, height=150), _pie_chart([ ("Kapital", req.principal, "#2563eb"), ("Odsetki", s.total_interest, "#64748b"), ("Nadplaty", s.total_overpayment, "#16a34a"), ], "Struktura kosztow", width=220, height=150), ]], colWidths=[9.6 * cm, 7.3 * cm])) story.append(PageBreak()) story.append(Paragraph("Zmienne oprocentowanie", styles["Heading2"])) rate_rows = [["Od miesiaca", "Oprocentowanie roczne"]] rate_rows.append(["1", _pct(req.base_rate + req.margin)]) for change in sorted(req.rate_changes, key=lambda x: x.month): rate_rows.append([str(change.month), _pct(change.annual_rate)]) story.append(Table(rate_rows, repeatRows=1, colWidths=[4 * cm, 6 * cm], style=TableStyle([ ("BACKGROUND", (0, 0), (-1, 0), colors.HexColor("#e5e7eb")), ("GRID", (0, 0), (-1, -1), 0.25, colors.HexColor("#cbd5e1")), ("FONTNAME", (0, 0), (-1, -1), APP_FONT), ("FONTNAME", (0, 0), (-1, 0), APP_FONT_BOLD), ("FONTSIZE", (0, 0), (-1, -1), 8), ]))) story.append(Spacer(1, 0.35 * cm)) story.append(Paragraph("Nadplaty", styles["Heading2"])) over_rows = [["Miesiac", "Kwota", "Prowizja", "Prow. do mies.", "Powtarzanie", "Nadplaty do mies."]] if req.overpayments: for op in sorted(req.overpayments, key=lambda x: x.month): over_rows.append([str(op.month), _money(op.amount), _pct(op.commission_percent), str(op.commission_until_month or "bez limitu"), _repeat_label(op.repeat), str(op.until_month or "-")]) else: over_rows.append(["-", "-", "-", "-", "-", "-"]) story.append(Table(over_rows, repeatRows=1, colWidths=[1.8 * cm, 2.8 * cm, 2.1 * cm, 2.7 * cm, 2.8 * cm, 2.8 * cm], style=TableStyle([ ("BACKGROUND", (0, 0), (-1, 0), colors.HexColor("#e5e7eb")), ("GRID", (0, 0), (-1, -1), 0.25, colors.HexColor("#cbd5e1")), ("FONTNAME", (0, 0), (-1, -1), APP_FONT), ("FONTNAME", (0, 0), (-1, 0), APP_FONT_BOLD), ("FONTSIZE", (0, 0), (-1, -1), 8), ]))) story.append(Spacer(1, 0.35 * cm)) story.append(Paragraph("Kredyt historyczny", styles["Heading2"])) hist_rows = [["Miesiac", "Oprocentowanie", "Karencja", "Nadplata", "Prowizja", "Opis"]] if req.historical_months: for item in sorted(req.historical_months, key=lambda x: x.month): hist_rows.append([ str(item.month), _pct(item.annual_rate) if item.annual_rate is not None else "-", item.grace_type.value, _money(item.overpayment), _pct(item.overpayment_commission_percent), item.note or "-", ]) else: hist_rows.append(["-", "-", "-", "-", "-", "-"]) story.append(Table(hist_rows, repeatRows=1, colWidths=[2 * cm, 2.6 * cm, 2.6 * cm, 2.8 * cm, 2.2 * cm, 4.5 * cm], style=TableStyle([ ("BACKGROUND", (0, 0), (-1, 0), colors.HexColor("#e5e7eb")), ("GRID", (0, 0), (-1, -1), 0.25, colors.HexColor("#cbd5e1")), ("FONTNAME", (0, 0), (-1, -1), APP_FONT), ("FONTNAME", (0, 0), (-1, 0), APP_FONT_BOLD), ("FONTSIZE", (0, 0), (-1, -1), 7.5), ]))) story.append(PageBreak()) story.append(Paragraph("Kompletny harmonogram wszystkich rat", styles["Heading2"])) schedule_rows = [["Mies.", "Data", "Dni", "Oproc.", "Rata", "Kapital", "Odsetki", "Nadplata", "Prow.", "Koszt nar.", "Saldo"]] for row in result.schedule: schedule_rows.append([ row.month, row.due_date, row.days, _pct(row.rate), _money(row.payment), _money(row.principal_part), _money(row.interest_part), _money(row.overpayment), _money(row.overpayment_fee), _money(row.cumulative_cost), _money(row.remaining), ]) table = Table(schedule_rows, repeatRows=1, colWidths=[0.9 * cm, 1.75 * cm, 0.75 * cm, 1.15 * cm, 1.85 * cm, 1.85 * cm, 1.85 * cm, 1.85 * cm, 1.5 * cm, 2.0 * cm, 2.05 * cm]) table.setStyle(TableStyle([ ("BACKGROUND", (0, 0), (-1, 0), colors.HexColor("#e5e7eb")), ("TEXTCOLOR", (0, 0), (-1, 0), colors.HexColor("#111827")), ("GRID", (0, 0), (-1, -1), 0.2, colors.HexColor("#cbd5e1")), ("FONTNAME", (0, 0), (-1, -1), APP_FONT), ("FONTNAME", (0, 0), (-1, 0), APP_FONT_BOLD), ("FONTSIZE", (0, 0), (-1, -1), 6.5), ("LEADING", (0, 0), (-1, -1), 7.5), ("ALIGN", (0, 1), (-1, -1), "RIGHT"), ("ALIGN", (0, 0), (-1, 0), "CENTER"), ("VALIGN", (0, 0), (-1, -1), "MIDDLE"), ("ROWBACKGROUNDS", (0, 1), (-1, -1), [colors.white, colors.HexColor("#f8fafc")]), ])) story.append(table) doc.build(story, onFirstPage=_add_page_number, onLaterPages=_add_page_number) buffer.seek(0) return Response( buffer.read(), media_type="application/pdf", headers={"Content-Disposition": "attachment; filename=symulacja-kredytu.pdf"}, )