397 lines
17 KiB
Python
397 lines
17 KiB
Python
from __future__ import annotations
|
|
|
|
import csv
|
|
import io
|
|
import math
|
|
import os
|
|
import re
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
import httpx
|
|
from bs4 import BeautifulSoup
|
|
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Response
|
|
from fastapi.responses import HTMLResponse, StreamingResponse, JSONResponse
|
|
from fastapi.staticfiles import StaticFiles
|
|
from reportlab.lib import colors
|
|
from reportlab.lib.pagesizes import A4, landscape
|
|
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
|
|
from reportlab.lib.units import cm
|
|
from reportlab.pdfbase import pdfmetrics
|
|
from reportlab.pdfbase.ttfonts import TTFont
|
|
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Table, TableStyle, PageBreak
|
|
from reportlab.graphics.shapes import Drawing, Line, PolyLine, Rect, String, Wedge
|
|
|
|
from .models import SimulationRequest
|
|
from .simulator import simulate
|
|
|
|
BASE_DIR = Path(__file__).resolve().parent
|
|
EXPORT_DIR = Path(os.environ.get("MORTGAGE_EXPORT_DIR", "./app/exports"))
|
|
EXPORT_DIR.mkdir(exist_ok=True)
|
|
|
|
FONT_DIR = BASE_DIR / "static" / "fonts"
|
|
FONT_PATH = FONT_DIR / "DejaVuSans.ttf"
|
|
FONT_BOLD_PATH = FONT_DIR / "DejaVuSans-Bold.ttf"
|
|
FONT_OBLIQUE_PATH = FONT_DIR / "DejaVuSans-Oblique.ttf"
|
|
|
|
if FONT_PATH.exists():
|
|
pdfmetrics.registerFont(TTFont("AppFont", str(FONT_PATH)))
|
|
APP_FONT = "AppFont"
|
|
|
|
if FONT_BOLD_PATH.exists():
|
|
pdfmetrics.registerFont(TTFont("AppFont-Bold", str(FONT_BOLD_PATH)))
|
|
APP_FONT_BOLD = "AppFont-Bold"
|
|
else:
|
|
APP_FONT_BOLD = APP_FONT
|
|
|
|
if FONT_OBLIQUE_PATH.exists():
|
|
pdfmetrics.registerFont(TTFont("AppFont-Oblique", str(FONT_OBLIQUE_PATH)))
|
|
APP_FONT_OBLIQUE = "AppFont-Oblique"
|
|
else:
|
|
APP_FONT_OBLIQUE = APP_FONT
|
|
else:
|
|
APP_FONT = "Helvetica"
|
|
APP_FONT_BOLD = "Helvetica-Bold"
|
|
APP_FONT_OBLIQUE = "Helvetica-Oblique"
|
|
|
|
app = FastAPI(title="Symulator kredytu hipotecznego", version="1.0.0")
|
|
app.mount("/static", StaticFiles(directory=BASE_DIR / "static"), name="static")
|
|
|
|
|
|
@app.middleware("http")
|
|
async def no_cache_headers(request, call_next):
|
|
response = await call_next(request)
|
|
response.headers["Cache-Control"] = "no-store, no-cache, must-revalidate"
|
|
return response
|
|
|
|
|
|
@app.get("/favicon.ico")
|
|
def favicon() -> Response:
|
|
return Response(status_code=204)
|
|
|
|
|
|
@app.get("/", response_class=HTMLResponse)
|
|
def index() -> str:
|
|
return (BASE_DIR / "static" / "index.html").read_text(encoding="utf-8")
|
|
|
|
|
|
@app.post("/api/simulate")
|
|
def api_simulate(req: SimulationRequest):
|
|
return simulate(req)
|
|
|
|
|
|
@app.websocket("/ws/simulate")
|
|
async def ws_simulate(websocket: WebSocket):
|
|
await websocket.accept()
|
|
try:
|
|
while True:
|
|
data = await websocket.receive_json()
|
|
req = SimulationRequest.model_validate(data)
|
|
result = simulate(req)
|
|
await websocket.send_json(result.model_dump())
|
|
except WebSocketDisconnect:
|
|
return
|
|
except Exception as exc:
|
|
await websocket.send_json({"error": str(exc)})
|
|
|
|
|
|
@app.get("/api/rate/nbp")
|
|
async def nbp_rate():
|
|
"""Pomocniczo pobiera stopę referencyjną NBP z oficjalnej strony HTML."""
|
|
url = "https://nbp.pl/en/monetary-policy/mpc-decisions/interest-rates/"
|
|
try:
|
|
async with httpx.AsyncClient(timeout=10, follow_redirects=True) as client:
|
|
resp = await client.get(url, headers={"User-Agent": "mortgage-simulator/1.0"})
|
|
resp.raise_for_status()
|
|
soup = BeautifulSoup(resp.text, "html.parser")
|
|
text = " ".join(soup.get_text(" ").split())
|
|
patterns = [
|
|
r"Reference rate[^0-9]{0,80}(\d+[\.,]\d+|\d+)\s*%",
|
|
r"minimum money market intervention rate[^0-9]{0,80}(\d+[\.,]\d+|\d+)\s*%",
|
|
]
|
|
for pattern in patterns:
|
|
match = re.search(pattern, text, flags=re.IGNORECASE)
|
|
if match:
|
|
value = float(match.group(1).replace(",", "."))
|
|
return {"source": "NBP", "url": url, "rate": value, "fetched_at": datetime.utcnow().isoformat() + "Z"}
|
|
return JSONResponse({"error": "Nie znaleziono stopy w treści strony NBP", "url": url}, status_code=502)
|
|
except Exception as exc:
|
|
return JSONResponse({"error": str(exc), "url": url}, status_code=502)
|
|
|
|
|
|
@app.post("/api/export/csv")
|
|
def export_csv(req: SimulationRequest):
|
|
result = simulate(req)
|
|
buf = io.StringIO()
|
|
writer = csv.writer(buf, delimiter=";")
|
|
writer.writerow(["miesiac", "oprocentowanie", "rata", "kapital", "odsetki", "nadplata", "saldo"])
|
|
for row in result.schedule:
|
|
writer.writerow([row.month, row.rate, row.payment, row.principal_part, row.interest_part, row.overpayment, row.remaining])
|
|
writer.writerow([])
|
|
writer.writerow(["Podsumowanie"])
|
|
for key, value in result.summary.model_dump().items():
|
|
writer.writerow([key, value])
|
|
data = buf.getvalue().encode("utf-8-sig")
|
|
return StreamingResponse(
|
|
io.BytesIO(data),
|
|
media_type="text/csv",
|
|
headers={"Content-Disposition": "attachment; filename=symulacja-kredytu.csv"},
|
|
)
|
|
|
|
|
|
def _money(value: float) -> str:
|
|
return f"{value:,.2f} PLN".replace(",", " ")
|
|
|
|
|
|
def _pct(value: float) -> str:
|
|
return f"{value:.2f}%"
|
|
|
|
|
|
def _repeat_label(value: str) -> str:
|
|
return {"once": "jednorazowo", "monthly": "co miesiac", "yearly": "co rok"}.get(value, value)
|
|
|
|
|
|
def _effect_label(value: str) -> str:
|
|
return {"shorten": "skrocenie okresu", "lower_payment": "zmniejszenie raty"}.get(value, value)
|
|
|
|
|
|
def _installment_label(value: str) -> str:
|
|
return {"equal": "rowne", "decreasing": "malejace"}.get(value, value)
|
|
|
|
|
|
def _add_page_number(canvas_obj, doc):
|
|
canvas_obj.saveState()
|
|
canvas_obj.setFont(APP_FONT, 7)
|
|
canvas_obj.setFillColor(colors.HexColor("#6b7280"))
|
|
canvas_obj.drawString(1.4 * cm, 0.9 * cm, "Symulator edukacyjny - wyniki orientacyjne")
|
|
canvas_obj.drawRightString(A4[0] - 1.4 * cm, 0.9 * cm, f"Strona {doc.page}")
|
|
canvas_obj.restoreState()
|
|
|
|
|
|
def _line_chart(rows, value_attr: str, title: str, width=480, height=165) -> Drawing:
|
|
drawing = Drawing(width, height)
|
|
margin_l, margin_r, margin_b, margin_t = 42, 12, 24, 24
|
|
plot_w = width - margin_l - margin_r
|
|
plot_h = height - margin_t - margin_b
|
|
x0, y0 = margin_l, margin_b
|
|
values = [float(getattr(r, value_attr)) for r in rows]
|
|
if not values:
|
|
values = [0]
|
|
max_v = max(values) or 1
|
|
count = max(len(values) - 1, 1)
|
|
|
|
drawing.add(String(0, height - 12, title, fontName=APP_FONT_BOLD, fontSize=9, fillColor=colors.HexColor("#111827")))
|
|
drawing.add(Line(x0, y0, x0, y0 + plot_h, strokeColor=colors.HexColor("#9ca3af"), strokeWidth=0.7))
|
|
drawing.add(Line(x0, y0, x0 + plot_w, y0, strokeColor=colors.HexColor("#9ca3af"), strokeWidth=0.7))
|
|
for i in range(1, 5):
|
|
gy = y0 + plot_h * i / 4
|
|
drawing.add(Line(x0, gy, x0 + plot_w, gy, strokeColor=colors.HexColor("#e5e7eb"), strokeWidth=0.4))
|
|
label = _money(max_v * i / 4).replace(" PLN", "")
|
|
drawing.add(String(2, gy - 3, label, fontName=APP_FONT, fontSize=6, fillColor=colors.HexColor("#6b7280")))
|
|
points = []
|
|
step = max(1, math.ceil(len(values) / 420))
|
|
sampled = values[::step]
|
|
sampled_count = max(len(sampled) - 1, 1)
|
|
for idx, value in enumerate(sampled):
|
|
x = x0 + plot_w * idx / sampled_count
|
|
y = y0 + plot_h * (value / max_v)
|
|
points.extend([x, y])
|
|
drawing.add(PolyLine(points, strokeColor=colors.HexColor("#2563eb"), strokeWidth=1.4))
|
|
drawing.add(String(x0, 6, "1", fontName=APP_FONT, fontSize=6, fillColor=colors.HexColor("#6b7280")))
|
|
drawing.add(String(x0 + plot_w - 18, 6, str(len(values)), fontName=APP_FONT, fontSize=6, fillColor=colors.HexColor("#6b7280")))
|
|
return drawing
|
|
|
|
|
|
def _bar_chart(yearly: dict[int, float], title: str, width=480, height=165) -> Drawing:
|
|
drawing = Drawing(width, height)
|
|
margin_l, margin_r, margin_b, margin_t = 42, 12, 24, 24
|
|
plot_w = width - margin_l - margin_r
|
|
plot_h = height - margin_t - margin_b
|
|
x0, y0 = margin_l, margin_b
|
|
items = list(yearly.items())[:35]
|
|
max_v = max([v for _, v in items], default=1) or 1
|
|
drawing.add(String(0, height - 12, title, fontName=APP_FONT_BOLD, fontSize=9, fillColor=colors.HexColor("#111827")))
|
|
drawing.add(Line(x0, y0, x0, y0 + plot_h, strokeColor=colors.HexColor("#9ca3af"), strokeWidth=0.7))
|
|
drawing.add(Line(x0, y0, x0 + plot_w, y0, strokeColor=colors.HexColor("#9ca3af"), strokeWidth=0.7))
|
|
for i in range(1, 5):
|
|
gy = y0 + plot_h * i / 4
|
|
drawing.add(Line(x0, gy, x0 + plot_w, gy, strokeColor=colors.HexColor("#e5e7eb"), strokeWidth=0.4))
|
|
if items:
|
|
gap = 2
|
|
bar_w = max(3, (plot_w / len(items)) - gap)
|
|
for idx, (year, value) in enumerate(items):
|
|
bh = plot_h * value / max_v
|
|
x = x0 + idx * (plot_w / len(items)) + gap / 2
|
|
drawing.add(Rect(x, y0, bar_w, bh, fillColor=colors.HexColor("#64748b"), strokeColor=None))
|
|
if len(items) <= 25 or idx % 2 == 0:
|
|
drawing.add(String(x, 6, str(year), fontName=APP_FONT, fontSize=5.5, fillColor=colors.HexColor("#6b7280")))
|
|
return drawing
|
|
|
|
|
|
def _pie_chart(parts: list[tuple[str, float, str]], title: str, width=240, height=165) -> Drawing:
|
|
drawing = Drawing(width, height)
|
|
drawing.add(String(0, height - 12, title, fontName=APP_FONT_BOLD, fontSize=9, fillColor=colors.HexColor("#111827")))
|
|
total = sum(v for _, v, _ in parts) or 1
|
|
cx, cy, r = 65, 78, 45
|
|
start = 90
|
|
for name, value, color in parts:
|
|
extent = 360 * value / total
|
|
drawing.add(Wedge(cx, cy, r, start, start + extent, fillColor=colors.HexColor(color), strokeColor=colors.white, strokeWidth=0.5))
|
|
start += extent
|
|
ly = height - 38
|
|
for name, value, color in parts:
|
|
drawing.add(Rect(130, ly - 2, 7, 7, fillColor=colors.HexColor(color), strokeColor=None))
|
|
pct = value / total * 100
|
|
drawing.add(String(142, ly, f"{name}: {pct:.1f}%", fontName=APP_FONT, fontSize=7, fillColor=colors.HexColor("#111827")))
|
|
ly -= 14
|
|
return drawing
|
|
|
|
|
|
@app.post("/api/export/pdf")
|
|
def export_pdf(req: SimulationRequest):
|
|
result = simulate(req)
|
|
buffer = io.BytesIO()
|
|
doc = SimpleDocTemplate(
|
|
buffer,
|
|
pagesize=A4,
|
|
rightMargin=1.2 * cm,
|
|
leftMargin=1.2 * cm,
|
|
topMargin=1.3 * cm,
|
|
bottomMargin=1.4 * cm,
|
|
title="Symulacja kredytu hipotecznego",
|
|
)
|
|
styles = getSampleStyleSheet()
|
|
for style in styles.byName.values():
|
|
style.fontName = APP_FONT
|
|
styles["Title"].fontName = APP_FONT_BOLD
|
|
styles["Heading2"].fontName = APP_FONT_BOLD
|
|
styles.add(ParagraphStyle(name="Small", parent=styles["Normal"], fontSize=8, leading=10, fontName=APP_FONT))
|
|
styles.add(ParagraphStyle(name="Tiny", parent=styles["Normal"], fontSize=6.7, leading=8, fontName=APP_FONT))
|
|
story = []
|
|
|
|
story.append(Paragraph("Symulacja kredytu hipotecznego - pelny raport", styles["Title"]))
|
|
story.append(Paragraph(f"Wygenerowano: {datetime.now().strftime('%Y-%m-%d %H:%M')}", styles["Small"]))
|
|
story.append(Spacer(1, 0.25 * cm))
|
|
|
|
params = [
|
|
["Kwota kredytu", _money(req.principal), "Okres", f"{req.years} lat"],
|
|
["Stopa bazowa", _pct(req.base_rate), "Marza", _pct(req.margin)],
|
|
["Oprocentowanie startowe", _pct(req.base_rate + req.margin), "Typ rat", _installment_label(req.installment_type.value)],
|
|
["Efekt nadplat", _effect_label(req.overpayment_effect.value), "Liczba rat po symulacji", str(result.summary.months)],
|
|
]
|
|
story.append(Paragraph("Parametry wejściowe", styles["Heading2"]))
|
|
story.append(Table(params, colWidths=[4.9 * cm, 3.4 * cm, 4.4 * cm, 3.9 * cm], style=TableStyle([
|
|
("GRID", (0, 0), (-1, -1), 0.25, colors.HexColor("#cbd5e1")),
|
|
("BACKGROUND", (0, 0), (-1, -1), colors.HexColor("#f8fafc")),
|
|
("FONTNAME", (0, 0), (-1, -1), APP_FONT),
|
|
("FONTNAME", (0, 0), (0, -1), APP_FONT_BOLD),
|
|
("FONTNAME", (2, 0), (2, -1), APP_FONT_BOLD),
|
|
("FONTSIZE", (0, 0), (-1, -1), 7.4),
|
|
("VALIGN", (0, 0), (-1, -1), "TOP"),
|
|
])))
|
|
story.append(Spacer(1, 0.35 * cm))
|
|
|
|
s = result.summary
|
|
summary = [
|
|
["Liczba rat", s.months, "Bazowo", s.baseline_months],
|
|
["Suma zaplacona", _money(s.total_paid), "Suma odsetek", _money(s.total_interest)],
|
|
["Oszczednosc na odsetkach", _money(s.interest_saved), "Skrocenie okresu", f"{s.months_saved} mies."],
|
|
["Suma nadplat", _money(s.total_overpayment), "Srednia / maks. rata", f"{_money(s.average_payment)} / {_money(s.max_payment)}"],
|
|
]
|
|
story.append(Paragraph("Podsumowanie", styles["Heading2"]))
|
|
story.append(Table(summary, colWidths=[4.9 * cm, 3.4 * cm, 4.4 * cm, 3.9 * cm], style=TableStyle([
|
|
("GRID", (0, 0), (-1, -1), 0.25, colors.HexColor("#cbd5e1")),
|
|
("BACKGROUND", (0, 0), (-1, -1), colors.HexColor("#ffffff")),
|
|
("FONTNAME", (0, 0), (-1, -1), APP_FONT),
|
|
("FONTNAME", (0, 0), (0, -1), APP_FONT_BOLD),
|
|
("FONTNAME", (2, 0), (2, -1), APP_FONT_BOLD),
|
|
("FONTSIZE", (0, 0), (-1, -1), 7.4),
|
|
("VALIGN", (0, 0), (-1, -1), "TOP"),
|
|
])))
|
|
story.append(Spacer(1, 0.35 * cm))
|
|
|
|
story.append(Paragraph("Wykresy", styles["Heading2"]))
|
|
yearly = {}
|
|
for row in result.schedule:
|
|
year = math.ceil(row.month / 12)
|
|
yearly[year] = yearly.get(year, 0.0) + row.interest_part
|
|
story.append(_line_chart(result.schedule, "remaining", "Saldo kredytu w czasie"))
|
|
story.append(Spacer(1, 0.15 * cm))
|
|
story.append(_line_chart(result.schedule, "payment", "Rata w czasie"))
|
|
story.append(Spacer(1, 0.15 * cm))
|
|
story.append(Table([[
|
|
_bar_chart(yearly, "Odsetki rocznie", width=285, height=150),
|
|
_pie_chart([
|
|
("Kapital", req.principal, "#2563eb"),
|
|
("Odsetki", s.total_interest, "#64748b"),
|
|
("Nadplaty", s.total_overpayment, "#16a34a"),
|
|
], "Struktura kosztow", width=220, height=150),
|
|
]], colWidths=[9.6 * cm, 7.3 * cm]))
|
|
story.append(PageBreak())
|
|
|
|
story.append(Paragraph("Zmienne oprocentowanie", styles["Heading2"]))
|
|
rate_rows = [["Od miesiaca", "Oprocentowanie roczne"]]
|
|
rate_rows.append(["1", _pct(req.base_rate + req.margin)])
|
|
for change in sorted(req.rate_changes, key=lambda x: x.month):
|
|
rate_rows.append([str(change.month), _pct(change.annual_rate)])
|
|
story.append(Table(rate_rows, repeatRows=1, colWidths=[4 * cm, 6 * cm], style=TableStyle([
|
|
("BACKGROUND", (0, 0), (-1, 0), colors.HexColor("#e5e7eb")),
|
|
("GRID", (0, 0), (-1, -1), 0.25, colors.HexColor("#cbd5e1")),
|
|
("FONTNAME", (0, 0), (-1, -1), APP_FONT),
|
|
("FONTNAME", (0, 0), (-1, 0), APP_FONT_BOLD),
|
|
("FONTSIZE", (0, 0), (-1, -1), 8),
|
|
])))
|
|
story.append(Spacer(1, 0.35 * cm))
|
|
|
|
story.append(Paragraph("Nadplaty", styles["Heading2"]))
|
|
over_rows = [["Miesiac", "Kwota", "Powtarzanie", "Do miesiaca"]]
|
|
if req.overpayments:
|
|
for op in sorted(req.overpayments, key=lambda x: x.month):
|
|
over_rows.append([str(op.month), _money(op.amount), _repeat_label(op.repeat), str(op.until_month or "-")])
|
|
else:
|
|
over_rows.append(["-", "-", "-", "-"])
|
|
story.append(Table(over_rows, repeatRows=1, colWidths=[3 * cm, 4 * cm, 4 * cm, 4 * cm], style=TableStyle([
|
|
("BACKGROUND", (0, 0), (-1, 0), colors.HexColor("#e5e7eb")),
|
|
("GRID", (0, 0), (-1, -1), 0.25, colors.HexColor("#cbd5e1")),
|
|
("FONTNAME", (0, 0), (-1, -1), APP_FONT),
|
|
("FONTNAME", (0, 0), (-1, 0), APP_FONT_BOLD),
|
|
("FONTSIZE", (0, 0), (-1, -1), 8),
|
|
])))
|
|
story.append(PageBreak())
|
|
|
|
story.append(Paragraph("Kompletny harmonogram wszystkich rat", styles["Heading2"]))
|
|
schedule_rows = [["Mies.", "Oproc.", "Rata", "Kapital", "Odsetki", "Nadplata", "Saldo"]]
|
|
for row in result.schedule:
|
|
schedule_rows.append([
|
|
row.month,
|
|
_pct(row.rate),
|
|
_money(row.payment),
|
|
_money(row.principal_part),
|
|
_money(row.interest_part),
|
|
_money(row.overpayment),
|
|
_money(row.remaining),
|
|
])
|
|
table = Table(schedule_rows, repeatRows=1, colWidths=[1.35 * cm, 1.7 * cm, 2.55 * cm, 2.55 * cm, 2.55 * cm, 2.55 * cm, 3.0 * cm])
|
|
table.setStyle(TableStyle([
|
|
("BACKGROUND", (0, 0), (-1, 0), colors.HexColor("#e5e7eb")),
|
|
("TEXTCOLOR", (0, 0), (-1, 0), colors.HexColor("#111827")),
|
|
("GRID", (0, 0), (-1, -1), 0.2, colors.HexColor("#cbd5e1")),
|
|
("FONTNAME", (0, 0), (-1, -1), APP_FONT),
|
|
("FONTNAME", (0, 0), (-1, 0), APP_FONT_BOLD),
|
|
("FONTSIZE", (0, 0), (-1, -1), 6.5),
|
|
("LEADING", (0, 0), (-1, -1), 7.5),
|
|
("ALIGN", (0, 1), (-1, -1), "RIGHT"),
|
|
("ALIGN", (0, 0), (-1, 0), "CENTER"),
|
|
("VALIGN", (0, 0), (-1, -1), "MIDDLE"),
|
|
("ROWBACKGROUNDS", (0, 1), (-1, -1), [colors.white, colors.HexColor("#f8fafc")]),
|
|
]))
|
|
story.append(table)
|
|
|
|
doc.build(story, onFirstPage=_add_page_number, onLaterPages=_add_page_number)
|
|
buffer.seek(0)
|
|
return Response(
|
|
buffer.read(),
|
|
media_type="application/pdf",
|
|
headers={"Content-Disposition": "attachment; filename=symulacja-kredytu.pdf"},
|
|
)
|