Files
cve_monitor/app.py
Mateusz Gruszczyński e999fc77c2 app
2026-02-14 09:46:26 +01:00

355 lines
11 KiB
Python

#!/usr/bin/env python3
from flask import Flask, render_template, make_response, request, jsonify
import gzip
import hashlib
import logging
from functools import wraps
from datetime import datetime
import threading
import time
import os
from logging.handlers import RotatingFileHandler
from dotenv import load_dotenv
load_dotenv()
import config
from cve_handler import CVEHandler, update_all_vendors
import api
def setup_logging():
log_level = getattr(logging, config.LOG_LEVEL.upper(), logging.INFO)
formatter = logging.Formatter(config.LOG_FORMAT)
root_logger = logging.getLogger()
root_logger.handlers.clear()
root_logger.setLevel(log_level)
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
console_handler.setLevel(log_level)
root_logger.addHandler(console_handler)
if config.LOG_FILE:
os.makedirs(os.path.dirname(config.LOG_FILE), exist_ok=True)
file_handler = RotatingFileHandler(
config.LOG_FILE,
maxBytes=config.LOG_MAX_BYTES,
backupCount=config.LOG_BACKUP_COUNT
)
file_handler.setFormatter(formatter)
file_handler.setLevel(log_level)
root_logger.addHandler(file_handler)
logging.getLogger('werkzeug').handlers.clear()
logging.getLogger('werkzeug').setLevel(logging.WARNING if not config.DEBUG else logging.INFO)
logging.getLogger('urllib3').setLevel(logging.WARNING)
logging.getLogger('requests').setLevel(logging.WARNING)
setup_logging()
logger = logging.getLogger(__name__)
app = Flask(__name__)
app.config['JSON_SORT_KEYS'] = False
if not config.DEBUG:
import logging as flask_logging
flask_logging.getLogger('werkzeug').disabled = True
app.register_blueprint(api.api_bp, url_prefix='/api')
cve_handler = CVEHandler()
@app.context_processor
def inject_config():
return {'config': config}
def add_security_headers(response):
if config.ENABLE_SECURITY_HEADERS:
for header, value in config.SECURITY_HEADERS.items():
response.headers[header] = value
return response
def gzip_response(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if not config.ENABLE_COMPRESSION:
return f(*args, **kwargs)
response = make_response(f(*args, **kwargs))
accept_encoding = request.headers.get('Accept-Encoding', '')
if 'gzip' not in accept_encoding.lower():
return response
if response.status_code < 200 or response.status_code >= 300:
return response
if response.headers.get('Content-Encoding'):
return response
gzip_buffer = gzip.compress(response.get_data())
response.set_data(gzip_buffer)
response.headers['Content-Encoding'] = 'gzip'
response.headers['Content-Length'] = len(gzip_buffer)
response.headers['Vary'] = 'Accept-Encoding'
return response
return decorated_function
def etag_support(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if not config.ENABLE_ETAG:
return f(*args, **kwargs)
response = make_response(f(*args, **kwargs))
content = response.get_data()
etag = hashlib.md5(content).hexdigest()
response.headers['ETag'] = f'"{etag}"'
response.headers['Cache-Control'] = f'public, max-age={config.CACHE_HOURS * 3600}'
if_none_match = request.headers.get('If-None-Match')
if if_none_match and if_none_match.strip('"') == etag:
return make_response('', 304)
return response
return decorated_function
@app.route('/')
@gzip_response
@etag_support
def index():
return render_template('index.html', vendors=config.VENDORS)
@app.route('/health')
def health():
try:
with cve_handler.get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) as count FROM cve_cache")
cve_count = cursor.fetchone()['count']
cursor.execute("SELECT COUNT(*) as count FROM cve_metadata")
vendors_count = cursor.fetchone()['count']
health_data = {
'status': 'healthy',
'timestamp': datetime.utcnow().isoformat(),
'version': config.APP_VERSION,
'database': {
'status': 'connected',
'cve_count': cve_count,
'vendors_tracked': vendors_count
},
'features': {
'auto_update': config.ENABLE_AUTO_UPDATE,
'discord_bot': config.DISCORD_BOT_ENABLED,
'charts': config.ENABLE_CHARTS,
'search': config.ENABLE_SEARCH,
'export': config.ENABLE_EXPORT
}
}
return jsonify(health_data), 200
except Exception as e:
logger.error(f"Health check failed: {e}")
return jsonify({
'status': 'unhealthy',
'timestamp': datetime.utcnow().isoformat(),
'error': str(e)
}), 503
@app.route('/favicon.ico')
def favicon():
return '', 204
@app.route('/version')
def version():
return jsonify({
'app_name': config.APP_NAME,
'version': config.APP_VERSION,
'python_version': '3.14',
'flask_version': '3.0.2'
})
@app.errorhandler(404)
def not_found(error):
if request.path.startswith('/api/'):
return jsonify({'error': 'Endpoint not found'}), 404
return render_template('404.html'), 404
@app.errorhandler(500)
def internal_error(error):
logger.error(f"Internal error: {error}")
if request.path.startswith('/api/'):
return jsonify({'error': 'Internal server error'}), 500
return render_template('500.html'), 500
@app.after_request
def after_request(response):
response = add_security_headers(response)
if config.DEBUG and logger.isEnabledFor(logging.DEBUG):
logger.debug(
f"{request.method} {request.path} - "
f"{response.status_code} - {request.remote_addr}"
)
return response
def background_update_task():
logger.info("Background update task started")
time.sleep(60)
while True:
try:
if config.ENABLE_AUTO_UPDATE:
logger.info("Running scheduled CVE update...")
update_all_vendors()
logger.info("Scheduled update completed successfully")
else:
logger.debug("Auto-update disabled, skipping")
except Exception as e:
logger.error(f"Error in background update: {e}", exc_info=True)
sleep_seconds = config.UPDATE_INTERVAL_HOURS * 3600
logger.info(f"Next update in {config.UPDATE_INTERVAL_HOURS} hours")
time.sleep(sleep_seconds)
def start_background_tasks():
is_gunicorn = (
'gunicorn' in os.environ.get('SERVER_SOFTWARE', '').lower() or
os.environ.get('GUNICORN_CMD_ARGS') is not None
)
is_dev_mode = __name__ == '__main__'
if is_gunicorn:
logger.info("=" * 70)
logger.info("PRODUCTION MODE (Gunicorn)")
logger.info("=" * 70)
logger.info("Background tasks handled by separate containers:")
logger.info(" Scheduler: cve-monitor-scheduler")
logger.info(" Discord Bot: cve-monitor-discord")
logger.info("=" * 70)
return
if is_dev_mode:
logger.info("=" * 70)
logger.info("DEVELOPMENT MODE (Standalone Flask)")
logger.info("=" * 70)
logger.info("Starting background tasks in threads...")
logger.info("")
if config.ENABLE_AUTO_UPDATE:
try:
update_thread = threading.Thread(
target=background_update_task,
daemon=True,
name="CVE-Update-Thread"
)
update_thread.start()
logger.info(" Scheduler thread: STARTED")
except Exception as e:
logger.error(f" Scheduler thread: FAILED - {e}")
else:
logger.info(" Scheduler thread: DISABLED")
if config.DISCORD_BOT_ENABLED:
try:
from discord_bot import start_discord_bot
discord_thread = threading.Thread(
target=start_discord_bot,
daemon=True,
name="Discord-Bot-Thread"
)
discord_thread.start()
logger.info(" Discord bot thread: STARTED")
except ImportError:
logger.warning(" Discord bot thread: FAILED - discord.py not installed")
except Exception as e:
logger.error(f" Discord bot thread: FAILED - {e}")
else:
logger.info(" Discord bot thread: DISABLED")
logger.info("")
logger.info("=" * 70)
else:
logger.info("Production mode: background tasks in separate containers")
def initialize_app():
logger.info(f"{'='*60}")
logger.info(f"{config.APP_NAME} v{config.APP_VERSION}")
logger.info(f"{'='*60}")
logger.info(f"Environment: {'DEBUG' if config.DEBUG else 'PRODUCTION'}")
logger.info(f"Database: {config.DATABASE_PATH}")
logger.info(f"Update interval: {config.UPDATE_INTERVAL_HOURS}h")
logger.info(f"Features:")
logger.info(f" - Security headers: {config.ENABLE_SECURITY_HEADERS}")
logger.info(f" - Compression: {config.ENABLE_COMPRESSION}")
logger.info(f" - Auto-update: {config.ENABLE_AUTO_UPDATE}")
logger.info(f" - Discord bot: {config.DISCORD_BOT_ENABLED}")
logger.info(f" - Charts: {config.ENABLE_CHARTS}")
logger.info(f" - Search: {config.ENABLE_SEARCH}")
logger.info(f" - Export: {config.ENABLE_EXPORT}")
if os.path.exists(config.DATABASE_PATH):
try:
with cve_handler.get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) as count FROM cve_cache")
count = cursor.fetchone()['count']
logger.info(f"Database contains {count} CVEs")
except Exception as e:
logger.warning(f"Could not read database: {e}")
else:
logger.info("Database will be created on first update")
logger.info(f"{'='*60}")
start_background_tasks()
_initialized = False
if __name__ == '__main__':
if not _initialized:
initialize_app()
_initialized = True
logger.info(f"Starting Flask development server on {config.HOST}:{config.PORT}")
app.run(
host=config.HOST,
port=config.PORT,
debug=config.DEBUG,
threaded=True,
use_reloader=False
)
else:
if not _initialized:
initialize_app()
_initialized = True