Files
expense_monitor/app/expenses/routes.py
Mateusz Gruszczyński 986ffb200a first commit
2026-03-13 15:17:32 +01:00

331 lines
14 KiB
Python

from __future__ import annotations
from datetime import date
from decimal import Decimal
from io import BytesIO
from pathlib import Path
from flask import Blueprint, Response, current_app, flash, redirect, render_template, request, send_file, url_for
from flask_login import current_user, login_required
from sqlalchemy import asc, desc, or_
from ..extensions import db
from ..forms import BudgetForm, ExpenseForm
from ..models import Budget, Category, DocumentAttachment, Expense
from ..services.audit import log_action
from ..services.categorization import suggest_category_id
from ..services.export import export_expenses_csv, export_expenses_pdf
from ..services.files import allowed_file, save_document
from ..services.i18n import get_locale, translate as _
from ..services.ocr import OCRService
expenses_bp = Blueprint('expenses', __name__)
def _category_label(category: Category) -> str:
return category.localized_name(get_locale())
def populate_category_choices(form) -> None:
form.category_id.choices = [(0, '---')] + [
(category.id, _category_label(category))
for category in Category.query.filter_by(is_active=True).order_by(Category.name_pl).all()
]
@expenses_bp.route('/')
@login_required
def list_expenses():
year = request.args.get('year', date.today().year, type=int)
month = request.args.get('month', date.today().month, type=int)
category_id = request.args.get('category_id', type=int)
payment_method = request.args.get('payment_method', '', type=str)
q = (request.args.get('q', '') or '').strip()
status = request.args.get('status', '', type=str)
sort_by = request.args.get('sort_by', 'purchase_date', type=str)
sort_dir = request.args.get('sort_dir', 'desc', type=str)
group_by = request.args.get('group_by', 'category', type=str)
expenses_query = Expense.query.filter_by(user_id=current_user.id, is_deleted=False).filter(
Expense.purchase_date >= date(year, month, 1),
Expense.purchase_date < (date(year + (month == 12), 1 if month == 12 else month + 1, 1)),
)
if category_id:
expenses_query = expenses_query.filter(Expense.category_id == category_id)
if payment_method:
expenses_query = expenses_query.filter(Expense.payment_method == payment_method)
if status:
expenses_query = expenses_query.filter(Expense.status == status)
if q:
like = f'%{q}%'
expenses_query = expenses_query.filter(or_(Expense.title.ilike(like), Expense.vendor.ilike(like), Expense.description.ilike(like), Expense.tags.ilike(like)))
filtered = _apply_expense_sort(expenses_query, sort_by, sort_dir).all()
grouped_expenses = _group_expenses(filtered, group_by)
month_total = sum((expense.amount or Decimal('0')) for expense in filtered)
budgets = Budget.query.filter_by(user_id=current_user.id, year=year, month=month).all()
categories = Category.query.filter_by(is_active=True).order_by(Category.name_pl).all()
filters = {
'category_id': category_id or 0,
'payment_method': payment_method,
'q': q,
'status': status,
'sort_by': sort_by,
'sort_dir': sort_dir,
'group_by': group_by,
}
sort_options = [
('purchase_date', _('expenses.date')),
('amount', _('expenses.amount')),
('title', _('expenses.title')),
('vendor', _('expenses.vendor')),
('category', _('expenses.category')),
('payment_method', _('expenses.payment_method')),
('status', _('expenses.status')),
('created_at', _('expenses.added')),
]
return render_template(
'expenses/list.html',
expenses=filtered,
grouped_expenses=grouped_expenses,
budgets=budgets,
selected_year=year,
selected_month=month,
filters=filters,
categories=categories,
month_total=month_total,
sort_options=sort_options,
)
@expenses_bp.route('/create', methods=['GET', 'POST'])
@login_required
def create_expense():
form = ExpenseForm()
populate_category_choices(form)
if request.method == 'GET' and not form.purchase_date.data:
form.purchase_date.data = date.today()
form.currency.data = current_user.default_currency
if form.validate_on_submit():
expense = Expense(user_id=current_user.id)
_fill_expense_from_form(expense, form)
if not expense.title:
expense.title = expense.vendor or 'Expense'
db.session.add(expense)
db.session.flush()
_handle_uploaded_documents(expense, form)
log_action('expense_created', 'expense', expense.id, title=expense.title, amount=str(expense.amount))
db.session.commit()
flash(_('flash.expense_saved'), 'success')
return redirect(url_for('expenses.list_expenses'))
return render_template('expenses/create.html', form=form, expense=None)
@expenses_bp.route('/<int:expense_id>/edit', methods=['GET', 'POST'])
@login_required
def edit_expense(expense_id: int):
expense = Expense.query.filter_by(id=expense_id, user_id=current_user.id, is_deleted=False).first_or_404()
form = ExpenseForm(obj=expense)
populate_category_choices(form)
if request.method == 'GET' and expense.category_id:
form.category_id.data = expense.category_id
if form.validate_on_submit():
_fill_expense_from_form(expense, form)
db.session.flush()
_handle_uploaded_documents(expense, form)
db.session.commit()
log_action('expense_updated', 'expense', expense.id, title=expense.title, amount=str(expense.amount))
flash(_('flash.expense_updated'), 'success')
return redirect(url_for('expenses.list_expenses'))
return render_template('expenses/create.html', form=form, expense=expense)
@expenses_bp.route('/<int:expense_id>/delete', methods=['POST'])
@login_required
def delete_expense(expense_id: int):
expense = Expense.query.filter_by(id=expense_id, user_id=current_user.id).first_or_404()
expense.is_deleted = True
db.session.commit()
log_action('expense_deleted', 'expense', expense.id)
flash(_('flash.expense_deleted'), 'success')
return redirect(url_for('expenses.list_expenses'))
@expenses_bp.route('/budgets', methods=['GET', 'POST'])
@login_required
def budgets():
form = BudgetForm()
populate_category_choices(form)
if request.method == 'GET':
today = date.today()
form.year.data = today.year
form.month.data = today.month
if form.validate_on_submit():
budget = Budget.query.filter_by(
user_id=current_user.id,
category_id=form.category_id.data,
year=form.year.data,
month=form.month.data,
).first()
if not budget:
budget = Budget(user_id=current_user.id, category_id=form.category_id.data, year=form.year.data, month=form.month.data)
db.session.add(budget)
budget.amount = Decimal(str(form.amount.data))
budget.alert_percent = form.alert_percent.data
db.session.commit()
log_action('budget_saved', 'budget', budget.id, amount=str(budget.amount))
flash(_('flash.budget_saved'), 'success')
return redirect(url_for('expenses.budgets'))
items = Budget.query.filter_by(user_id=current_user.id).order_by(Budget.year.desc(), Budget.month.desc()).all()
return render_template('expenses/budgets.html', form=form, budgets=items)
@expenses_bp.route('/export.csv')
@login_required
def export_csv():
expenses = _filtered_export_query().order_by(Expense.purchase_date.desc()).all()
content = export_expenses_csv(expenses)
return Response(content, mimetype='text/csv', headers={'Content-Disposition': 'attachment; filename=expenses.csv'})
@expenses_bp.route('/export.pdf')
@login_required
def export_pdf():
expenses = _filtered_export_query().order_by(Expense.purchase_date.desc()).all()
data = export_expenses_pdf(expenses, title='Expense export')
return send_file(BytesIO(data), mimetype='application/pdf', as_attachment=True, download_name='expenses.pdf')
def _apply_expense_sort(query, sort_by: str, sort_dir: str):
descending = sort_dir != 'asc'
direction = desc if descending else asc
if sort_by == 'amount':
order = direction(Expense.amount)
elif sort_by == 'title':
order = direction(Expense.title)
elif sort_by == 'vendor':
order = direction(Expense.vendor)
elif sort_by == 'payment_method':
order = direction(Expense.payment_method)
elif sort_by == 'status':
order = direction(Expense.status)
elif sort_by == 'created_at':
order = direction(Expense.created_at)
elif sort_by == 'category':
query = query.outerjoin(Category)
order = direction(Category.name_pl)
else:
order = direction(Expense.purchase_date)
return query.order_by(order, desc(Expense.id))
def _group_expenses(expenses: list[Expense], group_by: str) -> list[dict]:
if group_by == 'none':
return [{'key': 'all', 'label': _('expenses.all_expenses'), 'items': expenses, 'total': sum((expense.amount or Decimal('0')) for expense in expenses)}]
groups: dict[str, dict] = {}
for expense in expenses:
if group_by == 'payment_method':
key = expense.payment_method or 'other'
label = expense.payment_method.title() if expense.payment_method else _('common.other')
elif group_by == 'status':
key = expense.status or 'unknown'
label = expense.status.replace('_', ' ').title() if expense.status else _('common.other')
else:
key = str(expense.category_id or 0)
label = expense.category.localized_name(get_locale()) if expense.category else _('common.uncategorized')
bucket = groups.setdefault(key, {'key': key, 'label': label, 'items': [], 'total': Decimal('0')})
bucket['items'].append(expense)
bucket['total'] += expense.amount or Decimal('0')
return sorted(groups.values(), key=lambda item: (-item['total'], item['label']))
def _filtered_export_query():
query = Expense.query.filter_by(user_id=current_user.id, is_deleted=False)
year = request.args.get('year', type=int)
month = request.args.get('month', type=int)
category_id = request.args.get('category_id', type=int)
payment_method = request.args.get('payment_method', '', type=str)
q = (request.args.get('q', '') or '').strip()
status = request.args.get('status', '', type=str)
if year:
query = query.filter(Expense.purchase_date >= date(year, month or 1, 1))
if month:
query = query.filter(Expense.purchase_date < date(year + (month == 12), 1 if month == 12 else month + 1, 1))
else:
query = query.filter(Expense.purchase_date < date(year + 1, 1, 1))
if category_id:
query = query.filter(Expense.category_id == category_id)
if payment_method:
query = query.filter(Expense.payment_method == payment_method)
if status:
query = query.filter(Expense.status == status)
if q:
like = f'%{q}%'
query = query.filter(or_(Expense.title.ilike(like), Expense.vendor.ilike(like), Expense.description.ilike(like), Expense.tags.ilike(like)))
return query
def _fill_expense_from_form(expense: Expense, form: ExpenseForm) -> None:
expense.title = form.title.data or ''
expense.vendor = form.vendor.data or ''
expense.description = form.description.data or ''
expense.amount = Decimal(str(form.amount.data))
expense.currency = form.currency.data
expense.purchase_date = form.purchase_date.data
expense.payment_method = form.payment_method.data
expense.category_id = form.category_id.data or None
expense.is_refund = form.is_refund.data
expense.is_business = form.is_business.data
expense.tags = form.tags.data or ''
expense.recurring_period = form.recurring_period.data
expense.status = form.status.data
if not expense.category_id:
expense.category_id = suggest_category_id(current_user.id, expense.vendor, expense.title)
def _handle_uploaded_documents(expense: Expense, form: ExpenseForm) -> None:
files = [item for item in request.files.getlist(form.document.name) if item and item.filename]
if not files:
return
upload_dir = Path(current_app.root_path) / 'static' / 'uploads'
preview_dir = Path(current_app.root_path) / 'static' / 'previews'
crop_box = None
if all([form.crop_x.data, form.crop_y.data, form.crop_w.data, form.crop_h.data]):
crop_box = tuple(int(float(v)) for v in [form.crop_x.data, form.crop_y.data, form.crop_w.data, form.crop_h.data])
ocr_service = OCRService()
for index, uploaded in enumerate(files):
if not allowed_file(uploaded.filename, current_app.config['UPLOAD_EXTENSIONS']):
continue
filename, preview = save_document(
uploaded,
upload_dir,
preview_dir,
rotate=form.rotate.data or 0,
crop_box=crop_box,
scale_percent=form.scale_percent.data or 100,
)
attachment = DocumentAttachment(
expense_id=expense.id,
original_filename=uploaded.filename,
stored_filename=filename,
preview_filename=preview,
mime_type=uploaded.mimetype or '',
sort_order=index,
)
db.session.add(attachment)
if index == 0:
expense.document_filename = filename
expense.preview_filename = preview
ocr_data = ocr_service.extract(upload_dir / filename)
expense.ocr_status = ocr_data.status
if not (form.title.data or '').strip():
expense.title = ocr_data.get('title') or expense.title or expense.vendor or 'Expense'
expense.vendor = ocr_data.get('vendor') or expense.vendor
if ocr_data.get('amount'):
expense.amount = Decimal(ocr_data['amount'])
if not expense.category_id:
expense.category_id = suggest_category_id(current_user.id, expense.vendor, expense.title)