from __future__ import annotations from datetime import date from decimal import Decimal from io import BytesIO from pathlib import Path from flask import Blueprint, Response, current_app, flash, redirect, render_template, request, send_file, url_for from flask_login import current_user, login_required from sqlalchemy import asc, desc, or_ from ..extensions import db from ..forms import BudgetForm, ExpenseForm from ..models import Budget, Category, DocumentAttachment, Expense from ..services.audit import log_action from ..services.categorization import suggest_category_id from ..services.export import export_expenses_csv, export_expenses_pdf from ..services.files import allowed_file, save_document from ..services.i18n import get_locale, translate as _ from ..services.ocr import OCRService expenses_bp = Blueprint('expenses', __name__) def _category_label(category: Category) -> str: return category.localized_name(get_locale()) def populate_category_choices(form) -> None: form.category_id.choices = [(0, '---')] + [ (category.id, _category_label(category)) for category in Category.query.filter_by(is_active=True).order_by(Category.name_pl).all() ] @expenses_bp.route('/') @login_required def list_expenses(): year = request.args.get('year', date.today().year, type=int) month = request.args.get('month', date.today().month, type=int) category_id = request.args.get('category_id', type=int) payment_method = request.args.get('payment_method', '', type=str) q = (request.args.get('q', '') or '').strip() status = request.args.get('status', '', type=str) sort_by = request.args.get('sort_by', 'purchase_date', type=str) sort_dir = request.args.get('sort_dir', 'desc', type=str) group_by = request.args.get('group_by', 'category', type=str) expenses_query = Expense.query.filter_by(user_id=current_user.id, is_deleted=False).filter( Expense.purchase_date >= date(year, month, 1), Expense.purchase_date < (date(year + (month == 12), 1 if month == 12 else month + 1, 1)), ) if category_id: expenses_query = expenses_query.filter(Expense.category_id == category_id) if payment_method: expenses_query = expenses_query.filter(Expense.payment_method == payment_method) if status: expenses_query = expenses_query.filter(Expense.status == status) if q: like = f'%{q}%' expenses_query = expenses_query.filter(or_(Expense.title.ilike(like), Expense.vendor.ilike(like), Expense.description.ilike(like), Expense.tags.ilike(like))) filtered = _apply_expense_sort(expenses_query, sort_by, sort_dir).all() grouped_expenses = _group_expenses(filtered, group_by) month_total = sum((expense.amount or Decimal('0')) for expense in filtered) budgets = Budget.query.filter_by(user_id=current_user.id, year=year, month=month).all() categories = Category.query.filter_by(is_active=True).order_by(Category.name_pl).all() filters = { 'category_id': category_id or 0, 'payment_method': payment_method, 'q': q, 'status': status, 'sort_by': sort_by, 'sort_dir': sort_dir, 'group_by': group_by, } sort_options = [ ('purchase_date', _('expenses.date')), ('amount', _('expenses.amount')), ('title', _('expenses.title')), ('vendor', _('expenses.vendor')), ('category', _('expenses.category')), ('payment_method', _('expenses.payment_method')), ('status', _('expenses.status')), ('created_at', _('expenses.added')), ] return render_template( 'expenses/list.html', expenses=filtered, grouped_expenses=grouped_expenses, budgets=budgets, selected_year=year, selected_month=month, filters=filters, categories=categories, month_total=month_total, sort_options=sort_options, ) @expenses_bp.route('/create', methods=['GET', 'POST']) @login_required def create_expense(): form = ExpenseForm() populate_category_choices(form) if request.method == 'GET' and not form.purchase_date.data: form.purchase_date.data = date.today() form.currency.data = current_user.default_currency if form.validate_on_submit(): expense = Expense(user_id=current_user.id) _fill_expense_from_form(expense, form) if not expense.title: expense.title = expense.vendor or 'Expense' db.session.add(expense) db.session.flush() _handle_uploaded_documents(expense, form) log_action('expense_created', 'expense', expense.id, title=expense.title, amount=str(expense.amount)) db.session.commit() flash(_('flash.expense_saved'), 'success') return redirect(url_for('expenses.list_expenses')) return render_template('expenses/create.html', form=form, expense=None) @expenses_bp.route('//edit', methods=['GET', 'POST']) @login_required def edit_expense(expense_id: int): expense = Expense.query.filter_by(id=expense_id, user_id=current_user.id, is_deleted=False).first_or_404() form = ExpenseForm(obj=expense) populate_category_choices(form) if request.method == 'GET' and expense.category_id: form.category_id.data = expense.category_id if form.validate_on_submit(): _fill_expense_from_form(expense, form) db.session.flush() _handle_uploaded_documents(expense, form) db.session.commit() log_action('expense_updated', 'expense', expense.id, title=expense.title, amount=str(expense.amount)) flash(_('flash.expense_updated'), 'success') return redirect(url_for('expenses.list_expenses')) return render_template('expenses/create.html', form=form, expense=expense) @expenses_bp.route('//delete', methods=['POST']) @login_required def delete_expense(expense_id: int): expense = Expense.query.filter_by(id=expense_id, user_id=current_user.id).first_or_404() expense.is_deleted = True db.session.commit() log_action('expense_deleted', 'expense', expense.id) flash(_('flash.expense_deleted'), 'success') return redirect(url_for('expenses.list_expenses')) @expenses_bp.route('/budgets', methods=['GET', 'POST']) @login_required def budgets(): form = BudgetForm() populate_category_choices(form) if request.method == 'GET': today = date.today() form.year.data = today.year form.month.data = today.month if form.validate_on_submit(): budget = Budget.query.filter_by( user_id=current_user.id, category_id=form.category_id.data, year=form.year.data, month=form.month.data, ).first() if not budget: budget = Budget(user_id=current_user.id, category_id=form.category_id.data, year=form.year.data, month=form.month.data) db.session.add(budget) budget.amount = Decimal(str(form.amount.data)) budget.alert_percent = form.alert_percent.data db.session.commit() log_action('budget_saved', 'budget', budget.id, amount=str(budget.amount)) flash(_('flash.budget_saved'), 'success') return redirect(url_for('expenses.budgets')) items = Budget.query.filter_by(user_id=current_user.id).order_by(Budget.year.desc(), Budget.month.desc()).all() return render_template('expenses/budgets.html', form=form, budgets=items) @expenses_bp.route('/export.csv') @login_required def export_csv(): expenses = _filtered_export_query().order_by(Expense.purchase_date.desc()).all() content = export_expenses_csv(expenses) return Response(content, mimetype='text/csv', headers={'Content-Disposition': 'attachment; filename=expenses.csv'}) @expenses_bp.route('/export.pdf') @login_required def export_pdf(): expenses = _filtered_export_query().order_by(Expense.purchase_date.desc()).all() data = export_expenses_pdf(expenses, title='Expense export') return send_file(BytesIO(data), mimetype='application/pdf', as_attachment=True, download_name='expenses.pdf') def _apply_expense_sort(query, sort_by: str, sort_dir: str): descending = sort_dir != 'asc' direction = desc if descending else asc if sort_by == 'amount': order = direction(Expense.amount) elif sort_by == 'title': order = direction(Expense.title) elif sort_by == 'vendor': order = direction(Expense.vendor) elif sort_by == 'payment_method': order = direction(Expense.payment_method) elif sort_by == 'status': order = direction(Expense.status) elif sort_by == 'created_at': order = direction(Expense.created_at) elif sort_by == 'category': query = query.outerjoin(Category) order = direction(Category.name_pl) else: order = direction(Expense.purchase_date) return query.order_by(order, desc(Expense.id)) def _group_expenses(expenses: list[Expense], group_by: str) -> list[dict]: if group_by == 'none': return [{'key': 'all', 'label': _('expenses.all_expenses'), 'items': expenses, 'total': sum((expense.amount or Decimal('0')) for expense in expenses)}] groups: dict[str, dict] = {} for expense in expenses: if group_by == 'payment_method': key = expense.payment_method or 'other' label = expense.payment_method.title() if expense.payment_method else _('common.other') elif group_by == 'status': key = expense.status or 'unknown' label = expense.status.replace('_', ' ').title() if expense.status else _('common.other') else: key = str(expense.category_id or 0) label = expense.category.localized_name(get_locale()) if expense.category else _('common.uncategorized') bucket = groups.setdefault(key, {'key': key, 'label': label, 'items': [], 'total': Decimal('0')}) bucket['items'].append(expense) bucket['total'] += expense.amount or Decimal('0') return sorted(groups.values(), key=lambda item: (-item['total'], item['label'])) def _filtered_export_query(): query = Expense.query.filter_by(user_id=current_user.id, is_deleted=False) year = request.args.get('year', type=int) month = request.args.get('month', type=int) category_id = request.args.get('category_id', type=int) payment_method = request.args.get('payment_method', '', type=str) q = (request.args.get('q', '') or '').strip() status = request.args.get('status', '', type=str) if year: query = query.filter(Expense.purchase_date >= date(year, month or 1, 1)) if month: query = query.filter(Expense.purchase_date < date(year + (month == 12), 1 if month == 12 else month + 1, 1)) else: query = query.filter(Expense.purchase_date < date(year + 1, 1, 1)) if category_id: query = query.filter(Expense.category_id == category_id) if payment_method: query = query.filter(Expense.payment_method == payment_method) if status: query = query.filter(Expense.status == status) if q: like = f'%{q}%' query = query.filter(or_(Expense.title.ilike(like), Expense.vendor.ilike(like), Expense.description.ilike(like), Expense.tags.ilike(like))) return query def _fill_expense_from_form(expense: Expense, form: ExpenseForm) -> None: expense.title = form.title.data or '' expense.vendor = form.vendor.data or '' expense.description = form.description.data or '' expense.amount = Decimal(str(form.amount.data)) expense.currency = form.currency.data expense.purchase_date = form.purchase_date.data expense.payment_method = form.payment_method.data expense.category_id = form.category_id.data or None expense.is_refund = form.is_refund.data expense.is_business = form.is_business.data expense.tags = form.tags.data or '' expense.recurring_period = form.recurring_period.data expense.status = form.status.data if not expense.category_id: expense.category_id = suggest_category_id(current_user.id, expense.vendor, expense.title) def _handle_uploaded_documents(expense: Expense, form: ExpenseForm) -> None: files = [item for item in request.files.getlist(form.document.name) if item and item.filename] if not files: return upload_dir = Path(current_app.root_path) / 'static' / 'uploads' preview_dir = Path(current_app.root_path) / 'static' / 'previews' crop_box = None if all([form.crop_x.data, form.crop_y.data, form.crop_w.data, form.crop_h.data]): crop_box = tuple(int(float(v)) for v in [form.crop_x.data, form.crop_y.data, form.crop_w.data, form.crop_h.data]) ocr_service = OCRService() for index, uploaded in enumerate(files): if not allowed_file(uploaded.filename, current_app.config['UPLOAD_EXTENSIONS']): continue filename, preview = save_document( uploaded, upload_dir, preview_dir, rotate=form.rotate.data or 0, crop_box=crop_box, scale_percent=form.scale_percent.data or 100, ) attachment = DocumentAttachment( expense_id=expense.id, original_filename=uploaded.filename, stored_filename=filename, preview_filename=preview, mime_type=uploaded.mimetype or '', sort_order=index, ) db.session.add(attachment) if index == 0: expense.document_filename = filename expense.preview_filename = preview ocr_data = ocr_service.extract(upload_dir / filename) expense.ocr_status = ocr_data.status if not (form.title.data or '').strip(): expense.title = ocr_data.get('title') or expense.title or expense.vendor or 'Expense' expense.vendor = ocr_data.get('vendor') or expense.vendor if ocr_data.get('amount'): expense.amount = Decimal(ocr_data['amount']) if not expense.category_id: expense.category_id = suggest_category_id(current_user.id, expense.vendor, expense.title)