from __future__ import annotations import platform from pathlib import Path from secrets import token_urlsafe from sqlalchemy import text from flask import Blueprint, current_app, flash, redirect, render_template, request, url_for from flask_login import current_user, login_required from ..extensions import db from ..forms import CategoryForm, UserAdminForm from ..models import AppSetting, AuditLog, Category, User from ..services.reporting import send_due_reports from ..services.audit import log_action from ..services.i18n import translate as _ from ..services.mail import MailService from ..utils import admin_required admin_bp = Blueprint('admin', __name__) def _db_info(): db_info = {'engine': db.engine.name, 'url': str(db.engine.url).replace(db.engine.url.password or '', '***') if db.engine.url.password else str(db.engine.url)} try: with db.engine.connect() as conn: if db.engine.name == 'sqlite': version = conn.execute(text('select sqlite_version()')).scalar() else: version = conn.execute(text('select version()')).scalar() except Exception: version = 'unknown' return db_info, version @admin_bp.route('/') @login_required @admin_required def dashboard(): db_info, version = _db_info() stats = {'users': User.query.count(), 'categories': Category.query.count(), 'audit_logs': AuditLog.query.count(), 'admins': User.query.filter_by(role='admin').count()} upload_dir = Path(current_app.root_path) / 'static' / 'uploads' preview_dir = Path(current_app.root_path) / 'static' / 'previews' upload_count = len(list(upload_dir.glob('*'))) if upload_dir.exists() else 0 preview_count = len(list(preview_dir.glob('*'))) if preview_dir.exists() else 0 system = {'python': platform.python_version(), 'platform': platform.platform(), 'flask_env': current_app.config['ENV_NAME'], 'instance_path': current_app.instance_path, 'max_upload_mb': current_app.config['MAX_CONTENT_LENGTH'] // 1024 // 1024, 'upload_count': upload_count, 'preview_count': preview_count, 'webhook_enabled': bool(AppSetting.get('webhook_api_token', '')), 'scheduler_enabled': AppSetting.get('report_scheduler_enabled', 'false') == 'true'} return render_template('admin/dashboard.html', stats=stats, db_info=db_info, db_version=version, system=system, recent_logs=AuditLog.query.order_by(AuditLog.created_at.desc()).limit(20).all()) @admin_bp.route('/audit') @login_required @admin_required def audit(): logs = AuditLog.query.order_by(AuditLog.created_at.desc()).limit(200).all() return render_template('admin/audit.html', logs=logs) @admin_bp.route('/categories', methods=['GET', 'POST']) @login_required @admin_required def categories(): form = CategoryForm() if form.validate_on_submit(): existing = Category.query.filter(Category.user_id.is_(None), Category.key == form.key.data.strip().lower()).first() category = existing or Category(user_id=None, key=form.key.data.strip().lower(), name=form.name_en.data.strip()) if not existing: db.session.add(category) category.key = form.key.data.strip().lower() category.name = form.name_en.data.strip() category.name_pl = form.name_pl.data.strip() category.name_en = form.name_en.data.strip() category.color = form.color.data category.is_active = form.is_active.data db.session.commit() log_action('category_saved', 'category', category.id, key=category.key) flash(_('flash.category_saved'), 'success') return redirect(url_for('admin.categories')) return render_template('admin/categories.html', form=form, categories=Category.query.filter(Category.user_id.is_(None)).order_by(Category.name_pl).all()) @admin_bp.route('/users', methods=['GET', 'POST']) @login_required @admin_required def users(): form = UserAdminForm() form.role.choices = [('user', _('user.role_user')), ('admin', _('user.role_admin'))] form.language.choices = [('pl', _('language.polish')), ('en', _('language.english'))] form.report_frequency.choices = [('off', _('report.off')), ('daily', _('report.daily')), ('weekly', _('report.weekly')), ('monthly', _('report.monthly'))] form.theme.choices = [('light', _('theme.light')), ('dark', _('theme.dark'))] edit_user_id = request.args.get('edit', type=int) editing_user = db.session.get(User, edit_user_id) if edit_user_id else None if request.method == 'GET' and editing_user: form.full_name.data = editing_user.full_name form.email.data = editing_user.email form.role.data = editing_user.role form.language.data = editing_user.language form.report_frequency.data = editing_user.report_frequency form.theme.data = editing_user.theme form.is_active_user.data = editing_user.is_active_user form.must_change_password.data = editing_user.must_change_password if form.validate_on_submit(): if edit_user_id: user = db.session.get(User, edit_user_id) if not user: flash(_('error.404_title'), 'danger') return redirect(url_for('admin.users')) duplicate = User.query.filter(User.email == form.email.data.lower(), User.id != user.id).first() if duplicate: flash(_('flash.user_exists'), 'danger') else: user.full_name = form.full_name.data user.email = form.email.data.lower() user.role = form.role.data user.language = form.language.data user.report_frequency = form.report_frequency.data or 'off' user.theme = form.theme.data or 'light' user.is_active_user = form.is_active_user.data user.must_change_password = form.must_change_password.data db.session.commit() log_action('user_updated', 'user', user.id, email=user.email) flash(_('flash.user_updated'), 'success') return redirect(url_for('admin.users')) else: if User.query.filter_by(email=form.email.data.lower()).first(): flash(_('flash.user_exists'), 'danger') else: temp_password = token_urlsafe(8) user = User(full_name=form.full_name.data, email=form.email.data.lower(), role=form.role.data, language=form.language.data, report_frequency=form.report_frequency.data or 'off', theme=form.theme.data or 'light', is_active_user=form.is_active_user.data, must_change_password=form.must_change_password.data) user.set_password(temp_password) db.session.add(user) db.session.commit() MailService().send_template(user.email, 'Your new account', 'new_account', user=user, temp_password=temp_password) log_action('user_created', 'user', user.id, email=user.email) flash(_('flash.user_created'), 'success') return redirect(url_for('admin.users')) users_list = User.query.order_by(User.created_at.desc()).all() return render_template('admin/users.html', form=form, users=users_list, editing_user=editing_user) @admin_bp.route('/users//toggle-password-change', methods=['POST']) @login_required @admin_required def toggle_password_change(user_id: int): user = db.session.get(User, user_id) if user is None: return redirect(url_for('admin.users')) user.must_change_password = not user.must_change_password db.session.commit() log_action('user_toggle_password_change', 'user', user.id, must_change_password=user.must_change_password) flash(_('flash.user_flag_updated'), 'success') return redirect(url_for('admin.users')) @admin_bp.route('/settings', methods=['GET', 'POST']) @login_required @admin_required def settings(): if request.method == 'POST': pairs = { 'registration_enabled': 'true' if request.form.get('registration_enabled') else 'false', 'max_upload_mb': request.form.get('max_upload_mb', '10'), 'smtp_host': request.form.get('smtp_host', ''), 'smtp_port': request.form.get('smtp_port', '465'), 'smtp_username': request.form.get('smtp_username', ''), 'smtp_password': request.form.get('smtp_password', ''), 'smtp_sender': request.form.get('smtp_sender', 'no-reply@example.com'), 'smtp_security': request.form.get('smtp_security', 'ssl'), 'company_name': request.form.get('company_name', 'Expense Monitor'), 'webhook_api_token': request.form.get('webhook_api_token', ''), 'reports_enabled': 'true' if request.form.get('reports_enabled') else 'false', 'report_scheduler_enabled': 'true' if request.form.get('report_scheduler_enabled') else 'false', } for key, value in pairs.items(): AppSetting.set(key, value) db.session.commit() log_action('settings_saved', 'settings') flash(_('flash.settings_saved'), 'success') return redirect(url_for('admin.settings')) values = {setting.key: setting.value for setting in AppSetting.query.order_by(AppSetting.key).all()} if 'smtp_security' not in values: values['smtp_security'] = 'ssl' values.setdefault('reports_enabled', 'true') return render_template('admin/settings.html', values=values) @admin_bp.route('/run-reports', methods=['POST']) @login_required @admin_required def run_reports(): sent = send_due_reports() log_action('reports_sent_manual', 'reports', 'manual', sent=sent) flash(f'Queued/sent reports: {sent}', 'success') return redirect(url_for('admin.dashboard'))