Files
geoip_block_generator/app.py
Mateusz Gruszczyński 45fc1c6d55 add NI
2026-03-05 08:19:37 +01:00

316 lines
9.2 KiB
Python

"""
GeoIP Ban Generator - Web Application
"""
from flask import Flask, render_template, request, Response, jsonify
from geoip_handler import GeoIPHandler
from flask import jsonify, render_template, request
from pathlib import Path
from functools import wraps
from datetime import datetime
from api import api
import hashlib
import os
import sqlite3
import config
app = Flask(__name__,
static_folder=str(config.STATIC_DIR),
template_folder=str(config.TEMPLATE_DIR))
app.config['SECRET_KEY'] = config.SECRET_KEY
app.register_blueprint(api)
handler = GeoIPHandler()
CACHEABLE_PAGES = {
"/api-docs",
"/generator",
}
NO_CACHE_PREFIXES = (
"/api/",
)
STATIC_PREFIX = "/static/"
redis_cache = None
if config.REDIS_ENABLED:
try:
from redis_cache import RedisCache
redis_cache = RedisCache()
redis_health = redis_cache.health_check()
if redis_health['connected']:
print(f"[REDIS] Connected successfully - {redis_health['memory_used_mb']}MB used", flush=True)
else:
print(f"[REDIS] Connection failed: {redis_health.get('error')}", flush=True)
except Exception as e:
print(f"[REDIS] Failed to initialize: {e}", flush=True)
redis_cache = None
def get_file_hash(filepath: Path) -> str:
"""Generate MD5 hash for cache busting"""
try:
if filepath.exists():
with open(filepath, 'rb') as f:
return hashlib.md5(f.read()).hexdigest()[:8]
except:
pass
return 'v1'
def get_sqlite_status():
"""Get SQLite cache database status"""
db_path = config.GEOIP_DB_DIR / 'networks_cache.db'
if not db_path.exists():
return {
'exists': False,
'status': 'missing'
}
try:
file_size = db_path.stat().st_size
modified_time = datetime.fromtimestamp(db_path.stat().st_mtime)
conn = sqlite3.connect(str(db_path), timeout=5.0)
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM cache_metadata")
total_countries = cursor.fetchone()[0]
cursor.execute("SELECT SUM(network_count) FROM cache_metadata")
total_networks = cursor.fetchone()[0] or 0
conn.close()
return {
'exists': True,
'status': 'ok',
'file_size_mb': round(file_size / 1024 / 1024, 2),
'total_countries': total_countries,
'total_networks': total_networks,
'modified': modified_time.isoformat()
}
except Exception as e:
return {
'exists': True,
'status': 'error',
'error': str(e)
}
@app.context_processor
def inject_globals():
"""Inject global variables into templates"""
return {
'app_name': config.APP_NAME,
'app_version': config.APP_VERSION,
'logo_url': config.LOGO_URL,
'logo_link': config.LOGO_LINK,
'footer_text': config.FOOTER_TEXT,
'footer_link': config.FOOTER_LINK,
'footer_link_text': config.FOOTER_LINK_TEXT,
'css_hash': get_file_hash(config.STATIC_DIR / 'css' / 'style.css'),
'js_hash': get_file_hash(config.STATIC_DIR / 'js' / 'app.js'),
'get_country_flag': config.get_country_flag,
'redis_enabled': config.REDIS_ENABLED,
'redis_connected': redis_cache.health_check()['connected'] if redis_cache else False,
}
def _wants_json():
if request.path.startswith("/api/"):
return True
accept = (request.headers.get("Accept") or "").lower()
return "application/json" in accept
def _render_4xx(code, title, message):
payload = {
"success": False,
"error": title,
"message": message,
"path": request.path,
"status": code,
}
if _wants_json():
return jsonify(payload), code
return render_template(
"error.html",
status=code,
title=title,
message=message,
path=request.path
), code
@app.errorhandler(400)
def bad_request(e):
return _render_4xx(400, "Bad Request", "Request is invalid or missing required fields.")
@app.errorhandler(401)
def unauthorized(e):
return _render_4xx(401, "Unauthorized", "Authentication is required for this resource.")
@app.errorhandler(403)
def forbidden(e):
return _render_4xx(403, "Forbidden", "You don't have permission to access this resource.")
@app.errorhandler(404)
def not_found(e):
return _render_4xx(404, "Not Found", "The requested endpoint/page does not exist.")
@app.errorhandler(405)
def method_not_allowed(e):
return _render_4xx(405, "Method Not Allowed", "The HTTP method is not allowed for this endpoint.")
@app.route('/')
def index():
"""Main page"""
if config.MAXMIND_AUTO_UPDATE:
handler.check_and_update()
return render_template('index.html', countries=config.COMMON_COUNTRIES)
@app.route('/api-docs')
def api_docs():
"""API documentation page"""
return render_template('api.html')
@app.route("/generator")
def generator():
"""Script gwnerator"""
return render_template("generator.html")
@app.route('/favicon.ico')
def favicon():
return '', 204
@app.route('/health')
def health():
"""Health check endpoint for HAProxy/monitoring"""
health_status = {
'status': 'healthy',
'timestamp': datetime.now().isoformat(),
'components': {}
}
health_status['components']['flask'] = {
'status': 'ok',
'version': config.APP_VERSION
}
maxmind_exists = handler.mmdb_file.exists()
health_status['components']['maxmind_db'] = {
'status': 'ok' if maxmind_exists else 'missing',
'exists': maxmind_exists
}
if not maxmind_exists:
health_status['status'] = 'degraded'
sqlite_status = get_sqlite_status()
health_status['components']['sqlite_cache'] = sqlite_status
if not sqlite_status['exists'] or sqlite_status.get('status') != 'ok':
health_status['status'] = 'degraded'
if config.REDIS_ENABLED and redis_cache:
redis_health = redis_cache.health_check()
if redis_health.get('connected'):
health_status['components']['redis'] = {
'status': 'ok',
'memory_mb': redis_health.get('memory_used_mb', 0),
'keys': redis_health.get('keys', 0)
}
else:
health_status['components']['redis'] = {
'status': 'error',
'error': redis_health.get('error', 'Connection failed')
}
health_status['status'] = 'degraded'
else:
health_status['components']['redis'] = {
'status': 'disabled',
'enabled': False
}
status_code = 200 if health_status['status'] in ['healthy', 'degraded'] else 503
return jsonify(health_status), status_code
@app.route('/api/stats/summary')
def stats_summary():
"""Combined stats endpoint for dashboard"""
stats = {
'maxmind': {
'exists': handler.mmdb_file.exists(),
'needs_update': handler.needs_update()
},
'sqlite': get_sqlite_status(),
'redis': {'enabled': False}
}
if config.REDIS_ENABLED and redis_cache:
redis_health = redis_cache.health_check()
stats['redis'] = {
'enabled': True,
'connected': redis_health.get('connected', False),
'memory_mb': redis_health.get('memory_used_mb', 0),
'keys': redis_health.get('keys', 0)
}
return jsonify(stats)
def cache_control(max_age: int = None):
"""Decorator for static file cache control"""
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
response = f(*args, **kwargs)
if isinstance(response, Response):
if max_age:
response.headers['Cache-Control'] = f'public, max-age={max_age}'
else:
response.headers['Cache-Control'] = 'no-cache, no-store'
response.headers['Pragma'] = 'no-cache'
response.headers['Expires'] = '0'
return response
return decorated_function
return decorator
@app.after_request
def add_headers(response):
"""Add cache control headers based on request path"""
path = request.path
if path.startswith(STATIC_PREFIX):
if "Content-Disposition" in response.headers:
del response.headers["Content-Disposition"]
if config.ENABLE_CACHE_BUSTING:
response.headers["Cache-Control"] = (
f"public, max-age={config.CACHE_TTL_SECONDS}, immutable"
)
else:
response.headers["Cache-Control"] = f"public, max-age={config.CACHE_TTL_SECONDS}"
return response
if path in CACHEABLE_PAGES:
response.headers["Cache-Control"] = "public, max-age=300"
return response
if path == "/" or any(path.startswith(p) for p in NO_CACHE_PREFIXES):
response.headers["Cache-Control"] = "no-cache, no-store"
return response
return response
if __name__ == '__main__':
app.run(
host=config.FLASK_HOST,
port=config.FLASK_PORT,
debug=config.FLASK_DEBUG
)