278 lines
11 KiB
Python
278 lines
11 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
import platform
|
|
from pathlib import Path
|
|
|
|
import psutil
|
|
from flask import current_app
|
|
from sqlalchemy import inspect
|
|
|
|
from app.extensions import db
|
|
from app.models.audit_log import AuditLog
|
|
from app.models.catalog import Customer, InvoiceLine, Product
|
|
from app.models.company import Company, UserCompanyAccess
|
|
from app.models.invoice import Invoice, MailDelivery, NotificationLog, SyncEvent, Tag
|
|
from app.models.setting import AppSetting
|
|
from app.models.sync_log import SyncLog
|
|
from app.models.user import User
|
|
from app.services.ceidg_service import CeidgService
|
|
from app.services.company_service import CompanyService
|
|
from app.services.health_service import HealthService
|
|
from app.services.ksef_service import KSeFService
|
|
from app.services.settings_service import SettingsService
|
|
|
|
|
|
class SystemDataService:
|
|
APP_MODELS = [
|
|
('Użytkownicy', User),
|
|
('Firmy', Company),
|
|
('Dostępy do firm', UserCompanyAccess),
|
|
('Faktury', Invoice),
|
|
('Pozycje faktur', InvoiceLine),
|
|
('Klienci', Customer),
|
|
('Produkty', Product),
|
|
('Tagi', Tag),
|
|
('Logi synchronizacji', SyncLog),
|
|
('Zdarzenia sync', SyncEvent),
|
|
('Wysyłki maili', MailDelivery),
|
|
('Notyfikacje', NotificationLog),
|
|
('Logi audytu', AuditLog),
|
|
('Ustawienia', AppSetting),
|
|
]
|
|
|
|
def collect(self) -> dict:
|
|
company = CompanyService.get_current_company()
|
|
company_id = company.id if company else None
|
|
process = self._process_stats()
|
|
storage = self._storage_stats()
|
|
app = self._app_stats(company)
|
|
database = self._database_stats()
|
|
health = HealthService().get_status()
|
|
ksef = KSeFService(company_id=company_id).diagnostics()
|
|
ceidg = CeidgService().diagnostics()
|
|
return {
|
|
'overview': self._overview_cards(process, storage, app, database, health, ksef, ceidg),
|
|
'process': process,
|
|
'storage': storage,
|
|
'app': app,
|
|
'database': database,
|
|
'health': health,
|
|
'integrations': {
|
|
'ksef': ksef,
|
|
'ceidg': ceidg,
|
|
},
|
|
}
|
|
|
|
def _overview_cards(self, process: dict, storage: list[dict], app: dict, database: dict, health: dict, ksef: dict, ceidg: dict) -> list[dict]:
|
|
storage_total = sum(item['size_bytes'] for item in storage)
|
|
total_records = sum(item['rows'] for item in database['table_rows'])
|
|
return [
|
|
{
|
|
'label': 'CPU procesu',
|
|
'value': f"{process['cpu_percent']:.2f}%",
|
|
'subvalue': f"PID {process['pid']} · {process['threads']} wątków",
|
|
'icon': 'fa-microchip',
|
|
'tone': 'primary',
|
|
},
|
|
{
|
|
'label': 'RAM procesu',
|
|
'value': process['rss_human'],
|
|
'subvalue': f"System zajęty: {process['system_memory_percent']:.2f}% z {process['system_memory_total']}",
|
|
'icon': 'fa-memory',
|
|
'tone': 'info',
|
|
},
|
|
{
|
|
'label': 'Katalogi robocze',
|
|
'value': self._human_size(storage_total),
|
|
'subvalue': f'{len(storage)} lokalizacji monitorowanych',
|
|
'icon': 'fa-hard-drive',
|
|
'tone': 'warning',
|
|
},
|
|
{
|
|
'label': 'Użytkownicy / firmy',
|
|
'value': f"{app['users_count']} / {app['companies_count']}",
|
|
'subvalue': f"R/O: {'ON' if app['read_only_global'] else 'OFF'}",
|
|
'icon': 'fa-users',
|
|
'tone': 'secondary',
|
|
},
|
|
{
|
|
'label': 'Rekordy bazy',
|
|
'value': str(total_records),
|
|
'subvalue': f"{database['tables_count']} tabel · {database['engine']}",
|
|
'icon': 'fa-database',
|
|
'tone': 'secondary',
|
|
},
|
|
{
|
|
'label': 'Health',
|
|
'value': self._health_summary(health),
|
|
'subvalue': f"DB {health.get('db')} · SMTP {health.get('smtp')} · Redis {health.get('redis')}",
|
|
'icon': 'fa-heart-pulse',
|
|
'tone': 'success' if health.get('db') == 'ok' and health.get('ksef') in ['ok', 'mock'] else 'warning',
|
|
},
|
|
{
|
|
'label': 'KSeF',
|
|
'value': ksef.get('status', 'unknown').upper(),
|
|
'subvalue': ksef.get('message', 'Brak danych'),
|
|
'icon': 'fa-file-invoice',
|
|
'tone': 'success' if ksef.get('status') in ['ok', 'mock'] else 'danger',
|
|
},
|
|
{
|
|
'label': 'CEIDG',
|
|
'value': ceidg.get('status', 'unknown').upper(),
|
|
'subvalue': ceidg.get('message', 'Brak danych'),
|
|
'icon': 'fa-building-circle-check',
|
|
'tone': 'success' if ceidg.get('status') == 'ok' else 'danger',
|
|
},
|
|
]
|
|
|
|
def _process_stats(self) -> dict:
|
|
process = psutil.Process(os.getpid())
|
|
cpu_percent = process.cpu_percent(interval=0.1)
|
|
mem = process.memory_info()
|
|
system_mem = psutil.virtual_memory()
|
|
try:
|
|
open_files = len(process.open_files())
|
|
except Exception:
|
|
open_files = 0
|
|
return {
|
|
'pid': process.pid,
|
|
'cpu_percent': round(cpu_percent, 2),
|
|
'rss_bytes': int(mem.rss),
|
|
'rss_human': self._human_size(mem.rss),
|
|
'system_memory_total': self._human_size(system_mem.total),
|
|
'system_memory_percent': round(system_mem.percent, 2),
|
|
'threads': process.num_threads(),
|
|
'open_files': open_files,
|
|
'platform': platform.platform(),
|
|
'python': platform.python_version(),
|
|
}
|
|
|
|
def _storage_stats(self) -> list[dict]:
|
|
locations = [
|
|
('Instancja', Path(current_app.instance_path)),
|
|
('Archiwum XML', SettingsService.storage_path('app.archive_path', current_app.config['ARCHIVE_PATH'])),
|
|
('PDF', SettingsService.storage_path('app.pdf_path', current_app.config['PDF_PATH'])),
|
|
('Backupy', SettingsService.storage_path('app.backup_path', current_app.config['BACKUP_PATH'])),
|
|
('Certyfikaty', SettingsService.storage_path('app.certs_path', current_app.config['CERTS_PATH'])),
|
|
]
|
|
rows = []
|
|
for label, path in locations:
|
|
size_bytes = self._dir_size(path)
|
|
usage = psutil.disk_usage(str(path if path.exists() else path.parent))
|
|
rows.append({
|
|
'label': label,
|
|
'path': str(path),
|
|
'size_bytes': size_bytes,
|
|
'size_human': self._human_size(size_bytes),
|
|
'disk_total': self._human_size(usage.total),
|
|
'disk_free': self._human_size(usage.free),
|
|
'disk_percent': round(usage.percent, 2),
|
|
})
|
|
return rows
|
|
|
|
def _app_stats(self, company) -> dict:
|
|
users_count = User.query.count()
|
|
companies_count = Company.query.count()
|
|
counts = [{'label': label, 'count': model.query.count()} for label, model in self.APP_MODELS]
|
|
counts_sorted = sorted(counts, key=lambda item: item['count'], reverse=True)
|
|
return {
|
|
'current_company': company.name if company else 'Brak wybranej firmy',
|
|
'current_company_id': company.id if company else None,
|
|
'read_only_global': AppSetting.get('app.read_only_mode', 'false') == 'true',
|
|
'app_timezone': current_app.config.get('APP_TIMEZONE'),
|
|
'counts': counts_sorted,
|
|
'counts_top': counts_sorted[:6],
|
|
'users_count': int(users_count),
|
|
'companies_count': int(companies_count),
|
|
}
|
|
|
|
def _database_stats(self) -> dict:
|
|
engine = db.engine
|
|
inspector = inspect(engine)
|
|
table_names = inspector.get_table_names()
|
|
table_rows = []
|
|
for table_name in table_names:
|
|
table = db.metadata.tables.get(table_name)
|
|
if table is None:
|
|
continue
|
|
count = db.session.execute(db.select(db.func.count()).select_from(table)).scalar() or 0
|
|
table_rows.append({'table': table_name, 'rows': int(count)})
|
|
uri = current_app.config.get('SQLALCHEMY_DATABASE_URI', '')
|
|
sqlite_path = None
|
|
sqlite_size = None
|
|
if uri.startswith('sqlite:///') and not uri.endswith(':memory:'):
|
|
sqlite_path = uri.replace('sqlite:///', '', 1)
|
|
try:
|
|
sqlite_size = self._human_size(Path(sqlite_path).stat().st_size)
|
|
except FileNotFoundError:
|
|
sqlite_size = 'brak pliku'
|
|
table_rows_sorted = sorted(table_rows, key=lambda item: (-item['rows'], item['table']))
|
|
return {
|
|
'engine': engine.name,
|
|
'uri': self._mask_uri(uri),
|
|
'tables_count': len(table_rows),
|
|
'sqlite_path': sqlite_path,
|
|
'sqlite_size': sqlite_size,
|
|
'table_rows': table_rows_sorted,
|
|
'largest_tables': table_rows_sorted[:6],
|
|
}
|
|
|
|
@staticmethod
|
|
def json_preview(payload, max_len: int = 1200) -> str:
|
|
if payload is None:
|
|
return 'Brak danych.'
|
|
if isinstance(payload, str):
|
|
text = payload
|
|
else:
|
|
text = json.dumps(payload, ensure_ascii=False, indent=2, default=str)
|
|
return text if len(text) <= max_len else text[:max_len] + '\n...'
|
|
|
|
@staticmethod
|
|
def _dir_size(path: Path) -> int:
|
|
total = 0
|
|
if not path.exists():
|
|
return total
|
|
if path.is_file():
|
|
return path.stat().st_size
|
|
for root, _, files in os.walk(path):
|
|
for filename in files:
|
|
try:
|
|
total += (Path(root) / filename).stat().st_size
|
|
except OSError:
|
|
continue
|
|
return total
|
|
|
|
@staticmethod
|
|
def _human_size(size: int | float) -> str:
|
|
value = float(size or 0)
|
|
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
|
|
if value < 1024 or unit == 'TB':
|
|
return f'{value:.2f} {unit}'
|
|
value /= 1024
|
|
return f'{value:.2f} TB'
|
|
|
|
@staticmethod
|
|
def _mask_uri(uri: str) -> str:
|
|
if '@' in uri and '://' in uri:
|
|
prefix, suffix = uri.split('://', 1)
|
|
credentials, rest = suffix.split('@', 1)
|
|
if ':' in credentials:
|
|
user, _ = credentials.split(':', 1)
|
|
return f'{prefix}://{user}:***@{rest}'
|
|
return uri
|
|
|
|
@staticmethod
|
|
def _health_summary(health: dict) -> str:
|
|
tracked = {
|
|
'Baza': health.get('db'),
|
|
'SMTP': health.get('smtp'),
|
|
'Redis': health.get('redis'),
|
|
'KSeF': health.get('ksef'),
|
|
'CEIDG': health.get('ceidg'),
|
|
}
|
|
ok = sum(1 for value in tracked.values() if value in ['ok', 'mock', 'configured', 'fallback'])
|
|
total = len(tracked)
|
|
return f'{ok}/{total} OK'
|