72 lines
2.6 KiB
Python
72 lines
2.6 KiB
Python
from datetime import datetime
|
|
from sqlalchemy import text
|
|
from app.extensions import db
|
|
from app.services.company_service import CompanyService
|
|
from app.services.ksef_service import KSeFService
|
|
from app.services.settings_service import SettingsService
|
|
from app.services.ceidg_service import CeidgService
|
|
from app.services.redis_service import RedisService
|
|
|
|
|
|
class HealthService:
|
|
CACHE_TTL = 300
|
|
|
|
def cache_key(self, company_id):
|
|
return f'health.status.company.{company_id or "global"}'
|
|
|
|
def get_cached_status(self, company_id=None):
|
|
return RedisService.get_json(self.cache_key(company_id))
|
|
|
|
def warm_cache(self, company_id=None):
|
|
status = self.get_status(force_refresh=True, company_id=company_id)
|
|
RedisService.set_json(self.cache_key(company_id), status, ttl=self.CACHE_TTL)
|
|
return status
|
|
|
|
def get_status(self, force_refresh: bool = False, company_id=None):
|
|
if not force_refresh:
|
|
cached = self.get_cached_status(company_id)
|
|
if cached:
|
|
return cached
|
|
|
|
company = CompanyService.get_current_company()
|
|
if company_id is None and company:
|
|
company_id = company.id
|
|
redis_status, redis_details = RedisService.ping()
|
|
|
|
status = {
|
|
'timestamp': datetime.utcnow().isoformat(),
|
|
'db': 'ok',
|
|
'smtp': 'configured' if SettingsService.get_effective('mail.server', company_id=company_id) else 'not_configured',
|
|
'redis': redis_status,
|
|
'redis_details': redis_details,
|
|
'ksef': 'unknown',
|
|
'ceidg': 'unknown',
|
|
'ksef_message': '',
|
|
'ceidg_message': '',
|
|
}
|
|
|
|
try:
|
|
db.session.execute(text('SELECT 1'))
|
|
except Exception:
|
|
status['db'] = 'error'
|
|
|
|
try:
|
|
ping = KSeFService(company_id=company_id).ping()
|
|
status['ksef'] = ping.get('status', 'unknown')
|
|
status['ksef_message'] = ping.get('message', '')
|
|
except Exception as exc:
|
|
status['ksef'] = 'error'
|
|
status['ksef_message'] = str(exc)
|
|
|
|
try:
|
|
ping = CeidgService().diagnostics()
|
|
status['ceidg'] = ping.get('status', 'unknown')
|
|
status['ceidg_message'] = ping.get('message', '')
|
|
except Exception as exc:
|
|
status['ceidg'] = 'error'
|
|
status['ceidg_message'] = str(exc)
|
|
|
|
status['critical'] = status['db'] != 'ok' or status['ksef'] not in ['ok', 'mock'] or status['ceidg'] != 'ok'
|
|
RedisService.set_json(self.cache_key(company_id), status, ttl=self.CACHE_TTL)
|
|
return status
|