331 lines
14 KiB
Python
331 lines
14 KiB
Python
from __future__ import annotations
|
|
|
|
from datetime import date
|
|
from decimal import Decimal
|
|
from io import BytesIO
|
|
from pathlib import Path
|
|
|
|
from flask import Blueprint, Response, current_app, flash, redirect, render_template, request, send_file, url_for
|
|
from flask_login import current_user, login_required
|
|
from sqlalchemy import asc, desc, or_
|
|
|
|
from ..extensions import db
|
|
from ..forms import BudgetForm, ExpenseForm
|
|
from ..models import Budget, Category, DocumentAttachment, Expense
|
|
from ..services.audit import log_action
|
|
from ..services.categorization import suggest_category_id
|
|
from ..services.export import export_expenses_csv, export_expenses_pdf
|
|
from ..services.files import allowed_file, save_document
|
|
from ..services.i18n import get_locale, translate as _
|
|
from ..services.ocr import OCRService
|
|
|
|
expenses_bp = Blueprint('expenses', __name__)
|
|
|
|
|
|
def _category_label(category: Category) -> str:
|
|
return category.localized_name(get_locale())
|
|
|
|
|
|
def populate_category_choices(form) -> None:
|
|
form.category_id.choices = [(0, '---')] + [
|
|
(category.id, _category_label(category))
|
|
for category in Category.query.filter_by(is_active=True).order_by(Category.name_pl).all()
|
|
]
|
|
|
|
|
|
@expenses_bp.route('/')
|
|
@login_required
|
|
def list_expenses():
|
|
year = request.args.get('year', date.today().year, type=int)
|
|
month = request.args.get('month', date.today().month, type=int)
|
|
category_id = request.args.get('category_id', type=int)
|
|
payment_method = request.args.get('payment_method', '', type=str)
|
|
q = (request.args.get('q', '') or '').strip()
|
|
status = request.args.get('status', '', type=str)
|
|
sort_by = request.args.get('sort_by', 'purchase_date', type=str)
|
|
sort_dir = request.args.get('sort_dir', 'desc', type=str)
|
|
group_by = request.args.get('group_by', 'category', type=str)
|
|
|
|
expenses_query = Expense.query.filter_by(user_id=current_user.id, is_deleted=False).filter(
|
|
Expense.purchase_date >= date(year, month, 1),
|
|
Expense.purchase_date < (date(year + (month == 12), 1 if month == 12 else month + 1, 1)),
|
|
)
|
|
if category_id:
|
|
expenses_query = expenses_query.filter(Expense.category_id == category_id)
|
|
if payment_method:
|
|
expenses_query = expenses_query.filter(Expense.payment_method == payment_method)
|
|
if status:
|
|
expenses_query = expenses_query.filter(Expense.status == status)
|
|
if q:
|
|
like = f'%{q}%'
|
|
expenses_query = expenses_query.filter(or_(Expense.title.ilike(like), Expense.vendor.ilike(like), Expense.description.ilike(like), Expense.tags.ilike(like)))
|
|
|
|
filtered = _apply_expense_sort(expenses_query, sort_by, sort_dir).all()
|
|
grouped_expenses = _group_expenses(filtered, group_by)
|
|
month_total = sum((expense.amount or Decimal('0')) for expense in filtered)
|
|
budgets = Budget.query.filter_by(user_id=current_user.id, year=year, month=month).all()
|
|
categories = Category.query.filter_by(is_active=True).order_by(Category.name_pl).all()
|
|
filters = {
|
|
'category_id': category_id or 0,
|
|
'payment_method': payment_method,
|
|
'q': q,
|
|
'status': status,
|
|
'sort_by': sort_by,
|
|
'sort_dir': sort_dir,
|
|
'group_by': group_by,
|
|
}
|
|
sort_options = [
|
|
('purchase_date', _('expenses.date')),
|
|
('amount', _('expenses.amount')),
|
|
('title', _('expenses.title')),
|
|
('vendor', _('expenses.vendor')),
|
|
('category', _('expenses.category')),
|
|
('payment_method', _('expenses.payment_method')),
|
|
('status', _('expenses.status')),
|
|
('created_at', _('expenses.added')),
|
|
]
|
|
return render_template(
|
|
'expenses/list.html',
|
|
expenses=filtered,
|
|
grouped_expenses=grouped_expenses,
|
|
budgets=budgets,
|
|
selected_year=year,
|
|
selected_month=month,
|
|
filters=filters,
|
|
categories=categories,
|
|
month_total=month_total,
|
|
sort_options=sort_options,
|
|
)
|
|
|
|
|
|
@expenses_bp.route('/create', methods=['GET', 'POST'])
|
|
@login_required
|
|
def create_expense():
|
|
form = ExpenseForm()
|
|
populate_category_choices(form)
|
|
if request.method == 'GET' and not form.purchase_date.data:
|
|
form.purchase_date.data = date.today()
|
|
form.currency.data = current_user.default_currency
|
|
if form.validate_on_submit():
|
|
expense = Expense(user_id=current_user.id)
|
|
_fill_expense_from_form(expense, form)
|
|
if not expense.title:
|
|
expense.title = expense.vendor or 'Expense'
|
|
db.session.add(expense)
|
|
db.session.flush()
|
|
_handle_uploaded_documents(expense, form)
|
|
log_action('expense_created', 'expense', expense.id, title=expense.title, amount=str(expense.amount))
|
|
db.session.commit()
|
|
flash(_('flash.expense_saved'), 'success')
|
|
return redirect(url_for('expenses.list_expenses'))
|
|
return render_template('expenses/create.html', form=form, expense=None)
|
|
|
|
|
|
@expenses_bp.route('/<int:expense_id>/edit', methods=['GET', 'POST'])
|
|
@login_required
|
|
def edit_expense(expense_id: int):
|
|
expense = Expense.query.filter_by(id=expense_id, user_id=current_user.id, is_deleted=False).first_or_404()
|
|
form = ExpenseForm(obj=expense)
|
|
populate_category_choices(form)
|
|
if request.method == 'GET' and expense.category_id:
|
|
form.category_id.data = expense.category_id
|
|
if form.validate_on_submit():
|
|
_fill_expense_from_form(expense, form)
|
|
db.session.flush()
|
|
_handle_uploaded_documents(expense, form)
|
|
db.session.commit()
|
|
log_action('expense_updated', 'expense', expense.id, title=expense.title, amount=str(expense.amount))
|
|
flash(_('flash.expense_updated'), 'success')
|
|
return redirect(url_for('expenses.list_expenses'))
|
|
return render_template('expenses/create.html', form=form, expense=expense)
|
|
|
|
|
|
@expenses_bp.route('/<int:expense_id>/delete', methods=['POST'])
|
|
@login_required
|
|
def delete_expense(expense_id: int):
|
|
expense = Expense.query.filter_by(id=expense_id, user_id=current_user.id).first_or_404()
|
|
expense.is_deleted = True
|
|
db.session.commit()
|
|
log_action('expense_deleted', 'expense', expense.id)
|
|
flash(_('flash.expense_deleted'), 'success')
|
|
return redirect(url_for('expenses.list_expenses'))
|
|
|
|
|
|
@expenses_bp.route('/budgets', methods=['GET', 'POST'])
|
|
@login_required
|
|
def budgets():
|
|
form = BudgetForm()
|
|
populate_category_choices(form)
|
|
if request.method == 'GET':
|
|
today = date.today()
|
|
form.year.data = today.year
|
|
form.month.data = today.month
|
|
if form.validate_on_submit():
|
|
budget = Budget.query.filter_by(
|
|
user_id=current_user.id,
|
|
category_id=form.category_id.data,
|
|
year=form.year.data,
|
|
month=form.month.data,
|
|
).first()
|
|
if not budget:
|
|
budget = Budget(user_id=current_user.id, category_id=form.category_id.data, year=form.year.data, month=form.month.data)
|
|
db.session.add(budget)
|
|
budget.amount = Decimal(str(form.amount.data))
|
|
budget.alert_percent = form.alert_percent.data
|
|
db.session.commit()
|
|
log_action('budget_saved', 'budget', budget.id, amount=str(budget.amount))
|
|
flash(_('flash.budget_saved'), 'success')
|
|
return redirect(url_for('expenses.budgets'))
|
|
items = Budget.query.filter_by(user_id=current_user.id).order_by(Budget.year.desc(), Budget.month.desc()).all()
|
|
return render_template('expenses/budgets.html', form=form, budgets=items)
|
|
|
|
|
|
@expenses_bp.route('/export.csv')
|
|
@login_required
|
|
def export_csv():
|
|
expenses = _filtered_export_query().order_by(Expense.purchase_date.desc()).all()
|
|
content = export_expenses_csv(expenses)
|
|
return Response(content, mimetype='text/csv', headers={'Content-Disposition': 'attachment; filename=expenses.csv'})
|
|
|
|
|
|
@expenses_bp.route('/export.pdf')
|
|
@login_required
|
|
def export_pdf():
|
|
expenses = _filtered_export_query().order_by(Expense.purchase_date.desc()).all()
|
|
data = export_expenses_pdf(expenses, title='Expense export')
|
|
return send_file(BytesIO(data), mimetype='application/pdf', as_attachment=True, download_name='expenses.pdf')
|
|
|
|
|
|
|
|
|
|
def _apply_expense_sort(query, sort_by: str, sort_dir: str):
|
|
descending = sort_dir != 'asc'
|
|
direction = desc if descending else asc
|
|
if sort_by == 'amount':
|
|
order = direction(Expense.amount)
|
|
elif sort_by == 'title':
|
|
order = direction(Expense.title)
|
|
elif sort_by == 'vendor':
|
|
order = direction(Expense.vendor)
|
|
elif sort_by == 'payment_method':
|
|
order = direction(Expense.payment_method)
|
|
elif sort_by == 'status':
|
|
order = direction(Expense.status)
|
|
elif sort_by == 'created_at':
|
|
order = direction(Expense.created_at)
|
|
elif sort_by == 'category':
|
|
query = query.outerjoin(Category)
|
|
order = direction(Category.name_pl)
|
|
else:
|
|
order = direction(Expense.purchase_date)
|
|
return query.order_by(order, desc(Expense.id))
|
|
|
|
|
|
def _group_expenses(expenses: list[Expense], group_by: str) -> list[dict]:
|
|
if group_by == 'none':
|
|
return [{'key': 'all', 'label': _('expenses.all_expenses'), 'items': expenses, 'total': sum((expense.amount or Decimal('0')) for expense in expenses)}]
|
|
|
|
groups: dict[str, dict] = {}
|
|
for expense in expenses:
|
|
if group_by == 'payment_method':
|
|
key = expense.payment_method or 'other'
|
|
label = expense.payment_method.title() if expense.payment_method else _('common.other')
|
|
elif group_by == 'status':
|
|
key = expense.status or 'unknown'
|
|
label = expense.status.replace('_', ' ').title() if expense.status else _('common.other')
|
|
else:
|
|
key = str(expense.category_id or 0)
|
|
label = expense.category.localized_name(get_locale()) if expense.category else _('common.uncategorized')
|
|
bucket = groups.setdefault(key, {'key': key, 'label': label, 'items': [], 'total': Decimal('0')})
|
|
bucket['items'].append(expense)
|
|
bucket['total'] += expense.amount or Decimal('0')
|
|
return sorted(groups.values(), key=lambda item: (-item['total'], item['label']))
|
|
|
|
def _filtered_export_query():
|
|
query = Expense.query.filter_by(user_id=current_user.id, is_deleted=False)
|
|
year = request.args.get('year', type=int)
|
|
month = request.args.get('month', type=int)
|
|
category_id = request.args.get('category_id', type=int)
|
|
payment_method = request.args.get('payment_method', '', type=str)
|
|
q = (request.args.get('q', '') or '').strip()
|
|
status = request.args.get('status', '', type=str)
|
|
if year:
|
|
query = query.filter(Expense.purchase_date >= date(year, month or 1, 1))
|
|
if month:
|
|
query = query.filter(Expense.purchase_date < date(year + (month == 12), 1 if month == 12 else month + 1, 1))
|
|
else:
|
|
query = query.filter(Expense.purchase_date < date(year + 1, 1, 1))
|
|
if category_id:
|
|
query = query.filter(Expense.category_id == category_id)
|
|
if payment_method:
|
|
query = query.filter(Expense.payment_method == payment_method)
|
|
if status:
|
|
query = query.filter(Expense.status == status)
|
|
if q:
|
|
like = f'%{q}%'
|
|
query = query.filter(or_(Expense.title.ilike(like), Expense.vendor.ilike(like), Expense.description.ilike(like), Expense.tags.ilike(like)))
|
|
return query
|
|
|
|
|
|
|
|
def _fill_expense_from_form(expense: Expense, form: ExpenseForm) -> None:
|
|
expense.title = form.title.data or ''
|
|
expense.vendor = form.vendor.data or ''
|
|
expense.description = form.description.data or ''
|
|
expense.amount = Decimal(str(form.amount.data))
|
|
expense.currency = form.currency.data
|
|
expense.purchase_date = form.purchase_date.data
|
|
expense.payment_method = form.payment_method.data
|
|
expense.category_id = form.category_id.data or None
|
|
expense.is_refund = form.is_refund.data
|
|
expense.is_business = form.is_business.data
|
|
expense.tags = form.tags.data or ''
|
|
expense.recurring_period = form.recurring_period.data
|
|
expense.status = form.status.data
|
|
if not expense.category_id:
|
|
expense.category_id = suggest_category_id(current_user.id, expense.vendor, expense.title)
|
|
|
|
|
|
def _handle_uploaded_documents(expense: Expense, form: ExpenseForm) -> None:
|
|
files = [item for item in request.files.getlist(form.document.name) if item and item.filename]
|
|
if not files:
|
|
return
|
|
upload_dir = Path(current_app.root_path) / 'static' / 'uploads'
|
|
preview_dir = Path(current_app.root_path) / 'static' / 'previews'
|
|
crop_box = None
|
|
if all([form.crop_x.data, form.crop_y.data, form.crop_w.data, form.crop_h.data]):
|
|
crop_box = tuple(int(float(v)) for v in [form.crop_x.data, form.crop_y.data, form.crop_w.data, form.crop_h.data])
|
|
ocr_service = OCRService()
|
|
for index, uploaded in enumerate(files):
|
|
if not allowed_file(uploaded.filename, current_app.config['UPLOAD_EXTENSIONS']):
|
|
continue
|
|
filename, preview = save_document(
|
|
uploaded,
|
|
upload_dir,
|
|
preview_dir,
|
|
rotate=form.rotate.data or 0,
|
|
crop_box=crop_box,
|
|
scale_percent=form.scale_percent.data or 100,
|
|
)
|
|
attachment = DocumentAttachment(
|
|
expense_id=expense.id,
|
|
original_filename=uploaded.filename,
|
|
stored_filename=filename,
|
|
preview_filename=preview,
|
|
mime_type=uploaded.mimetype or '',
|
|
sort_order=index,
|
|
)
|
|
db.session.add(attachment)
|
|
if index == 0:
|
|
expense.document_filename = filename
|
|
expense.preview_filename = preview
|
|
ocr_data = ocr_service.extract(upload_dir / filename)
|
|
expense.ocr_status = ocr_data.status
|
|
if not (form.title.data or '').strip():
|
|
expense.title = ocr_data.get('title') or expense.title or expense.vendor or 'Expense'
|
|
expense.vendor = ocr_data.get('vendor') or expense.vendor
|
|
if ocr_data.get('amount'):
|
|
expense.amount = Decimal(ocr_data['amount'])
|
|
if not expense.category_id:
|
|
expense.category_id = suggest_category_id(current_user.id, expense.vendor, expense.title)
|