from __future__ import annotations from datetime import datetime, timedelta from decimal import Decimal from pathlib import Path from werkzeug.security import generate_password_hash from flask import Blueprint, abort, flash, redirect, render_template, request, send_file, url_for from flask_login import current_user, login_required from sqlalchemy import String, cast, or_ from app.extensions import db from app.forms.admin import AdminCompanyForm, AdminUserForm, AccessForm, PasswordResetForm, CeidgConfigForm, DatabaseBackupForm, GlobalKsefDefaultsForm, GlobalMailSettingsForm, GlobalNfzSettingsForm, GlobalNotificationSettingsForm, LogCleanupForm, SharedCompanyKsefForm from app.models.audit_log import AuditLog from app.models.catalog import Customer, Product, InvoiceLine from app.models.company import Company, UserCompanyAccess from app.models.invoice import Invoice, InvoiceStatus, InvoiceType, MailDelivery, NotificationLog, SyncEvent from app.models.setting import AppSetting from app.models.sync_log import SyncLog from app.models.user import User from app.services.audit_service import AuditService from app.services.backup_service import BackupService from app.services.ceidg_service import CeidgService from app.services.company_service import CompanyService from app.services.health_service import HealthService from app.services.ksef_service import RequestsKSeFAdapter from app.services.settings_service import SettingsService from app.services.system_data_service import SystemDataService from app.utils.decorators import roles_required bp = Blueprint('admin', __name__, url_prefix='/admin') def _mock_invoice_filter(): metadata_text = cast(Invoice.external_metadata, String) return or_( Invoice.source == 'mock', Invoice.issued_status == 'issued_mock', Invoice.ksef_number.ilike('%MOCK%'), Invoice.invoice_number.ilike('%MOCK%'), metadata_text.ilike('%"source": "mock"%'), metadata_text.ilike('%"mock"%'), ) def _cleanup_mock_catalog(company_ids: set[int]) -> dict: deleted_customers = 0 deleted_products = 0 if not company_ids: return {'customers': 0, 'products': 0} customer_candidates = Customer.query.filter(Customer.company_id.in_(company_ids)).all() for customer in customer_candidates: looks_like_demo = (customer.name or '').lower().startswith('klient demo') or (customer.email or '').lower() == 'demo@example.com' if looks_like_demo and customer.invoices.count() == 0: db.session.delete(customer) deleted_customers += 1 product_candidates = Product.query.filter(Product.company_id.in_(company_ids)).all() for product in product_candidates: looks_like_demo = (product.name or '').lower() == 'abonament miesięczny' or (product.sku or '').upper() == 'SUB-MONTH' linked_lines = InvoiceLine.query.filter_by(product_id=product.id).count() if looks_like_demo and linked_lines == 0: db.session.delete(product) deleted_products += 1 return {'customers': deleted_customers, 'products': deleted_products} def _admin_dashboard_context() -> dict: mock_enabled = AppSetting.query.filter( AppSetting.key.like('company.%.ksef.mock_mode'), AppSetting.value == 'true' ).count() global_ro = AppSetting.get('app.read_only_mode', 'false') == 'true' ceidg_environment = CeidgService.get_environment() ceidg_form = CeidgConfigForm(environment=ceidg_environment) ceidg_url = CeidgService.get_api_url(ceidg_environment) cleanup_form = LogCleanupForm(days=90) backup_form = DatabaseBackupForm() return { 'users': User.query.count(), 'companies': Company.query.count(), 'audits': AuditLog.query.count(), 'mock_enabled': mock_enabled, 'global_ro': global_ro, 'ceidg_form': ceidg_form, 'ceidg_url': ceidg_url, 'ceidg_api_key_configured': CeidgService.has_api_key(), 'cleanup_form': cleanup_form, 'backup_form': backup_form, 'ceidg_environment': ceidg_environment, 'backup_meta': BackupService().get_database_backup_meta(), } @bp.route("/") @login_required @roles_required('admin') def index(): return render_template('admin/index.html', **_admin_dashboard_context()) @bp.route('/users') @login_required @roles_required('admin') def users(): return render_template('admin/users.html', users=User.query.order_by(User.name).all()) @bp.route('/users/new', methods=['GET', 'POST']) @bp.route('/users//edit', methods=['GET', 'POST']) @login_required @roles_required('admin') def user_form(user_id=None): user = db.session.get(User, user_id) if user_id else None form = AdminUserForm(obj=user) form.company_id.choices = [(0, '— bez przypisania —')] + [(c.id, c.name) for c in Company.query.order_by(Company.name).all()] if request.method == 'GET' and user: form.force_password_change.data = user.force_password_change form.is_blocked.data = user.is_blocked if form.validate_on_submit(): if not user: user = User(email=form.email.data.lower(), name=form.name.data, role=form.role.data, password_hash=generate_password_hash(form.password.data or 'ChangeMe123!')) db.session.add(user) db.session.flush() else: user.email = form.email.data.lower() user.name = form.name.data user.role = form.role.data if form.password.data: user.password_hash = generate_password_hash(form.password.data) user.force_password_change = bool(form.force_password_change.data) user.is_blocked = bool(form.is_blocked.data) db.session.commit() if form.company_id.data: CompanyService.assign_user(user, db.session.get(Company, form.company_id.data), form.access_level.data) AuditService().log('save_user', 'user', user.id, f'role={user.role}, blocked={user.is_blocked}') flash('Zapisano użytkownika.', 'success') return redirect(url_for('admin.user_access', user_id=user.id)) accesses = UserCompanyAccess.query.filter_by(user_id=user.id).all() if user else [] return render_template('admin/user_form.html', form=form, user=user, accesses=accesses) @bp.route('/users//access', methods=['GET', 'POST']) @login_required @roles_required('admin') def user_access(user_id): user = db.session.get(User, user_id) form = AccessForm() form.company_id.choices = [(c.id, c.name) for c in Company.query.order_by(Company.name).all()] if form.validate_on_submit(): access = UserCompanyAccess.query.filter_by(user_id=user.id, company_id=form.company_id.data).first() if not access: access = UserCompanyAccess(user_id=user.id, company_id=form.company_id.data) db.session.add(access) access.access_level = form.access_level.data db.session.commit() AuditService().log('save_access', 'user', user.id, f'company={form.company_id.data}, level={form.access_level.data}') flash('Zapisano uprawnienia do firmy.', 'success') return redirect(url_for('admin.user_access', user_id=user.id)) accesses = UserCompanyAccess.query.filter_by(user_id=user.id).all() return render_template('admin/user_access.html', user=user, form=form, accesses=accesses) @bp.post('/users//access//delete') @login_required @roles_required('admin') def delete_access(user_id, access_id): access = db.session.get(UserCompanyAccess, access_id) if access and access.user_id == user_id: db.session.delete(access) db.session.commit() AuditService().log('delete_access', 'user', user_id, f'access={access_id}') flash('Usunięto dostęp.', 'info') return redirect(url_for('admin.user_access', user_id=user_id)) @bp.route('/users//reset-password', methods=['GET', 'POST']) @login_required @roles_required('admin') def reset_password(user_id): user = db.session.get(User, user_id) form = PasswordResetForm() if form.validate_on_submit(): user.password_hash = generate_password_hash(form.password.data) user.force_password_change = bool(form.force_password_change.data) db.session.commit() AuditService().log('reset_password', 'user', user.id, 'reset by admin') flash('Hasło zostało zresetowane.', 'success') return redirect(url_for('admin.users')) return render_template('admin/reset_password.html', form=form, user=user) @bp.post('/users//toggle-block') @login_required @roles_required('admin') def toggle_block(user_id): user = db.session.get(User, user_id) user.is_blocked = not user.is_blocked db.session.commit() AuditService().log('toggle_block', 'user', user.id, f'blocked={user.is_blocked}') flash('Zmieniono status blokady użytkownika.', 'warning') return redirect(url_for('admin.users')) @bp.route('/companies') @login_required @roles_required('admin') def companies(): return render_template('admin/companies.html', companies=Company.query.order_by(Company.name).all()) @bp.route('/companies/new', methods=['GET', 'POST']) @bp.route('/companies//edit', methods=['GET', 'POST']) @login_required @roles_required('admin') def company_form(company_id=None): company = db.session.get(Company, company_id) if company_id else None form = AdminCompanyForm(obj=company) if request.method == 'GET': if company: form.sync_interval_minutes.data = str(company.sync_interval_minutes) form.mock_mode.data = AppSetting.get(f'company.{company.id}.ksef.mock_mode', 'false') == 'true' else: form.mock_mode.data = False if form.fetch_submit.data and form.validate_on_submit(): lookup = CeidgService().fetch_company(form.tax_id.data) if lookup.get('ok'): form.name.data = lookup.get('name') or form.name.data form.regon.data = lookup.get('regon') or form.regon.data form.address.data = lookup.get('address') or form.address.data form.tax_id.data = lookup.get('tax_id') or form.tax_id.data flash('Pobrano dane firmy z CEIDG.', 'success') else: flash(lookup.get('message', 'Nie udało się pobrać danych z CEIDG.'), 'warning') elif form.submit.data and form.validate_on_submit(): created = company is None if not company: company = Company() db.session.add(company) company.name = form.name.data company.tax_id = form.tax_id.data or '' company.regon = form.regon.data or '' company.address = form.address.data or '' company.bank_account = (form.bank_account.data or '').strip() company.is_active = bool(form.is_active.data) company.sync_enabled = bool(form.sync_enabled.data) company.sync_interval_minutes = int(form.sync_interval_minutes.data or 60) company.note = form.note.data or '' db.session.commit() AppSetting.set(f'company.{company.id}.ksef.mock_mode', str(bool(form.mock_mode.data)).lower()) db.session.commit() if created: CompanyService.assign_user(user=current_user, company=company, access_level='full', switch_after=True) AuditService().log('save_company', 'company', company.id, company.name) flash('Zapisano firmę.', 'success') return redirect(url_for('admin.companies')) return render_template('admin/company_form.html', form=form, company=company) @bp.post('/mock-data/generate') @login_required @roles_required('admin') def generate_mock_data(): companies = Company.query.order_by(Company.id).all() for company in companies: AppSetting.set(f'company.{company.id}.ksef.mock_mode', 'true') if not Product.query.filter_by(company_id=company.id).first(): db.session.add(Product(company_id=company.id, name='Abonament miesięczny', sku='SUB-MONTH', unit='usł.', net_price=Decimal('199.00'), vat_rate=Decimal('23'))) if not Customer.query.filter_by(company_id=company.id).first(): db.session.add(Customer(company_id=company.id, name=f'Klient demo {company.id}', tax_id=f'5250000{company.id:03d}', email='demo@example.com', address='Warszawa, Polska')) db.session.flush() if Invoice.query.filter_by(company_id=company.id).count() == 0: customer = Customer.query.filter_by(company_id=company.id).first() for idx in range(1, 4): invoice = Invoice( company_id=company.id, customer_id=customer.id if customer else None, ksef_number=f'MOCK/{company.id}/{idx}', invoice_number=f'FV/{company.id}/{idx:03d}/2026', contractor_name=customer.name if customer else f'Klient demo {company.id}', contractor_nip=customer.tax_id if customer else f'5250000{company.id:03d}', issue_date=datetime.utcnow().date() - timedelta(days=idx), received_date=datetime.utcnow().date() - timedelta(days=idx), fetched_at=datetime.utcnow(), net_amount=Decimal('199.00'), vat_amount=Decimal('45.77'), gross_amount=Decimal('244.77'), invoice_type=InvoiceType.SALE, status=InvoiceStatus.SENT, source='mock', issued_status='sent', issued_to_ksef_at=datetime.utcnow(), ) db.session.add(invoice) db.session.flush() db.session.commit() AuditService().log('generate_mock_data', 'system', 0, f'companies={len(companies)}') flash('Wygenerowano dane mock.', 'success') return redirect(url_for('admin.index')) @bp.post('/mock-data/clear') @login_required @roles_required('admin') def clear_mock_data(): invoices = Invoice.query.filter(_mock_invoice_filter()).all() company_ids = {invoice.company_id for invoice in invoices if invoice.company_id} deleted_invoices = 0 for invoice in invoices: db.session.delete(invoice) deleted_invoices += 1 catalog_deleted = _cleanup_mock_catalog(company_ids) for company_id in company_ids: AppSetting.set(f'company.{company_id}.ksef.mock_mode', 'false') db.session.commit() AuditService().log( 'clear_mock_data', 'system', 0, f'invoices={deleted_invoices}, customers={catalog_deleted["customers"]}, products={catalog_deleted["products"]}, companies={len(company_ids)}', ) flash( f'Usunięto dane mock: faktury {deleted_invoices}, klienci {catalog_deleted["customers"]}, produkty {catalog_deleted["products"]}.', 'info', ) return redirect(url_for('admin.index')) @bp.post('/ceidg/save') @login_required @roles_required('admin') def save_ceidg_settings(): form = CeidgConfigForm() if form.validate_on_submit(): environment = (form.environment.data or 'production').strip().lower() if environment not in {'production', 'test'}: environment = 'production' api_key = (form.api_key.data or '').strip() AppSetting.set('ceidg.environment', environment) api_key_updated = False if api_key: AppSetting.set('ceidg.api_key', api_key, encrypt=True) api_key_updated = True db.session.commit() AuditService().log( 'save_ceidg_settings', 'system', 0, f'environment={environment}, api_key_updated={api_key_updated}', ) flash('Zapisano konfigurację CEIDG.', 'success') else: flash('Nie udało się zapisać konfiguracji CEIDG.', 'danger') return redirect(url_for('admin.index')) @bp.post('/read-only/toggle') @login_required @roles_required('admin') def toggle_global_read_only(): enabled = request.form.get('enabled') == '1' AppSetting.set('app.read_only_mode', 'true' if enabled else 'false') db.session.commit() AuditService().log('toggle_global_read_only', 'system', 0, f'enabled={enabled}') flash('Zmieniono globalny tryb tylko do odczytu.', 'warning' if enabled else 'success') return redirect(url_for('admin.index')) @bp.post('/logs/cleanup') @login_required @roles_required('admin') def cleanup_logs(): form = LogCleanupForm() if not form.validate_on_submit(): flash('Podaj poprawną liczbę dni.', 'danger') return redirect(url_for('admin.index')) cutoff = datetime.utcnow() - timedelta(days=form.days.data) deleted = {} targets = [ ('audit', AuditLog, AuditLog.created_at), ('sync', SyncLog, SyncLog.created_at), ('notifications', NotificationLog, NotificationLog.created_at), ('mail_delivery', MailDelivery, MailDelivery.created_at), ('sync_events', SyncEvent, SyncEvent.created_at), ] for label, model, column in targets: deleted[label] = model.query.filter(column < cutoff).delete(synchronize_session=False) removed_files = 0 log_dir = Path('instance') for pattern in ['app.log.*', '*.log.*']: for file_path in log_dir.glob(pattern): try: if datetime.utcfromtimestamp(file_path.stat().st_mtime) < cutoff: file_path.unlink() removed_files += 1 except OSError: continue db.session.commit() AuditService().log('cleanup_logs', 'system', 0, f'days={form.days.data}, deleted={deleted}, files={removed_files}') flash(f'Usunięto stare logi starsze niż {form.days.data} dni. DB: {sum(deleted.values())}, pliki: {removed_files}.', 'success') return redirect(url_for('admin.index')) @bp.post('/database/backup') @login_required @roles_required('admin') def database_backup(): form = DatabaseBackupForm() if not form.validate_on_submit(): flash('Nie udało się uruchomić backupu bazy.', 'danger') return redirect(url_for('admin.index')) backup_path = BackupService().create_database_backup() AuditService().log('database_backup', 'system', 0, backup_path) return send_file(backup_path, as_attachment=True, download_name=Path(backup_path).name) @bp.route('/global-settings', methods=['GET', 'POST']) @login_required @roles_required('admin') def global_settings(): current_company = CompanyService.get_current_company() company_id = current_company.id if current_company else None mail_form = GlobalMailSettingsForm(prefix='mail', server=SettingsService.get('mail.server', ''), port=SettingsService.get('mail.port', '587'), username=SettingsService.get('mail.username', ''), sender=SettingsService.get('mail.sender', ''), security_mode=(SettingsService.get('mail.security_mode', '') or ('tls' if SettingsService.get('mail.tls', 'true') == 'true' else 'none'))) notify_form = GlobalNotificationSettingsForm(prefix='notify', pushover_user_key=SettingsService.get('notify.pushover_user_key', ''), min_amount=SettingsService.get('notify.min_amount', '0'), quiet_hours=SettingsService.get('notify.quiet_hours', ''), enabled=SettingsService.get('notify.enabled', 'false') == 'true') nfz_form = GlobalNfzSettingsForm(prefix='nfz', enabled=SettingsService.get('modules.nfz_enabled', 'false') == 'true') ksef_defaults_form = GlobalKsefDefaultsForm(prefix='kdef', environment=SettingsService.get('ksef.default_environment', 'prod'), auth_mode=SettingsService.get('ksef.default_auth_mode', 'token'), client_id=SettingsService.get('ksef.default_client_id', '')) shared_ksef_form = SharedCompanyKsefForm(prefix='shared', environment=SettingsService.get('ksef.environment', 'prod', company_id=company_id), auth_mode=SettingsService.get('ksef.auth_mode', 'token', company_id=company_id), client_id=SettingsService.get('ksef.client_id', '', company_id=company_id), certificate_name=SettingsService.get('ksef.certificate_name', '', company_id=company_id)) if mail_form.submit.data and mail_form.validate_on_submit(): SettingsService.set_many({'mail.server': mail_form.server.data or '', 'mail.port': mail_form.port.data or '587', 'mail.username': mail_form.username.data or '', 'mail.password': (mail_form.password.data or SettingsService.get_secret('mail.password', ''), True), 'mail.sender': mail_form.sender.data or '', 'mail.security_mode': mail_form.security_mode.data or 'tls', 'mail.tls': str((mail_form.security_mode.data or 'tls') == 'tls').lower()}) flash('Zapisano globalne ustawienia SMTP.', 'success') return redirect(url_for('admin.global_settings')) if notify_form.submit.data and notify_form.validate_on_submit(): SettingsService.set_many({'notify.pushover_user_key': notify_form.pushover_user_key.data or '', 'notify.pushover_api_token': (notify_form.pushover_api_token.data or SettingsService.get_secret('notify.pushover_api_token', ''), True), 'notify.min_amount': notify_form.min_amount.data or '0', 'notify.quiet_hours': notify_form.quiet_hours.data or '', 'notify.enabled': str(bool(notify_form.enabled.data)).lower()}) flash('Zapisano globalne ustawienia Pushover.', 'success') return redirect(url_for('admin.global_settings')) if nfz_form.submit.data and nfz_form.validate_on_submit(): SettingsService.set_many({'modules.nfz_enabled': str(bool(nfz_form.enabled.data)).lower()}) flash('Zapisano globalne ustawienia modułu NFZ.', 'success') return redirect(url_for('admin.global_settings')) if ksef_defaults_form.submit.data and ksef_defaults_form.validate_on_submit(): SettingsService.set_many({'ksef.default_environment': ksef_defaults_form.environment.data or 'prod', 'ksef.default_auth_mode': ksef_defaults_form.auth_mode.data or 'token', 'ksef.default_client_id': ksef_defaults_form.client_id.data or ''}) flash('Zapisano domyślne parametry KSeF.', 'success') return redirect(url_for('admin.global_settings')) if shared_ksef_form.submit.data and shared_ksef_form.validate_on_submit() and company_id: SettingsService.set_many({'ksef.environment': shared_ksef_form.environment.data or 'prod', 'ksef.base_url': RequestsKSeFAdapter.ENVIRONMENT_URLS.get(shared_ksef_form.environment.data or 'prod', RequestsKSeFAdapter.ENVIRONMENT_URLS['prod']), 'ksef.auth_mode': shared_ksef_form.auth_mode.data or 'token', 'ksef.client_id': shared_ksef_form.client_id.data or '', 'ksef.certificate_name': shared_ksef_form.certificate_name.data or '', 'ksef.token': (shared_ksef_form.token.data or SettingsService.get_secret('ksef.token', '', company_id=company_id), True), 'ksef.certificate_data': (shared_ksef_form.certificate_data.data or SettingsService.get_secret('ksef.certificate_data', '', company_id=company_id), True)}, company_id=company_id) flash('Zapisano współdzielony profil KSeF dla aktywnej firmy.', 'success') return redirect(url_for('admin.global_settings')) return render_template('admin/global_settings.html', mail_form=mail_form, notify_form=notify_form, nfz_form=nfz_form, ksef_defaults_form=ksef_defaults_form, shared_ksef_form=shared_ksef_form, current_company=current_company, shared_token_configured=bool(SettingsService.get_secret('ksef.token', '', company_id=company_id)) if company_id else False, shared_cert_configured=bool(SettingsService.get_secret('ksef.certificate_data', '', company_id=company_id)) if company_id else False) @bp.route('/maintenance') @login_required @roles_required('admin') def maintenance(): return render_template('admin/maintenance.html', **_admin_dashboard_context()) @bp.route('/audit') @login_required @roles_required('admin') def audit(): logs = AuditLog.query.order_by(AuditLog.created_at.desc()).limit(200).all() return render_template('admin/audit.html', logs=logs) @bp.route('/health') @login_required @roles_required('admin') def health(): return redirect(url_for('admin.system_data')) @bp.route('/system-data') @login_required @roles_required('admin') def system_data(): data = SystemDataService().collect() return render_template('admin/system_data.html', data=data, json_preview=SystemDataService.json_preview)