Files
cve_monitor/api.py
Mateusz Gruszczyński bc1b4279de first commit
2026-02-13 12:42:53 +01:00

252 lines
8.5 KiB
Python

from flask import Blueprint, jsonify, request, make_response
import logging
import csv
from io import StringIO
from datetime import datetime
import config
from cve_handler import CVEHandler
logger = logging.getLogger(__name__)
api_bp = Blueprint('api', __name__)
cve_handler = CVEHandler()
def api_response(data: any, status: int = 200) -> tuple:
return jsonify(data), status
def api_error(message: str, status: int = 400) -> tuple:
return jsonify({
'error': message,
'timestamp': datetime.utcnow().isoformat()
}), status
@api_bp.route('/vendors', methods=['GET'])
def get_vendors():
try:
summary = cve_handler.get_all_vendors_summary()
return api_response({
'vendors': summary,
'total_vendors': len(summary),
'timestamp': datetime.utcnow().isoformat()
})
except Exception as e:
logger.error(f"Error in get_vendors: {e}", exc_info=True)
return api_error(str(e), 500)
@api_bp.route('/cve/<vendor_code>', methods=['GET'])
def get_vendor_cves(vendor_code: str):
try:
vendor = next((v for v in config.VENDORS if v['code'] == vendor_code), None)
if not vendor:
return api_error(f"Unknown vendor: {vendor_code}", 404)
try:
limit = min(
int(request.args.get('limit', config.ITEMS_PER_PAGE)),
config.MAX_ITEMS_PER_PAGE
)
offset = max(int(request.args.get('offset', 0)), 0)
except ValueError:
return api_error("Invalid pagination parameters", 400)
cves = cve_handler.get_vendor_cves(
vendor_code,
limit=limit,
offset=offset
)
return api_response({
'vendor': {
'code': vendor_code,
'name': vendor['name']
},
'cves': cves,
'count': len(cves),
'limit': limit,
'offset': offset,
'timestamp': datetime.utcnow().isoformat()
})
except Exception as e:
logger.error(f"Error in get_vendor_cves: {e}", exc_info=True)
return api_error(str(e), 500)
@api_bp.route('/stats/<vendor_code>', methods=['GET'])
def get_vendor_stats(vendor_code: str):
try:
vendor = next((v for v in config.VENDORS if v['code'] == vendor_code), None)
if not vendor:
return api_error(f"Unknown vendor: {vendor_code}", 404)
stats = cve_handler.get_vendor_stats(vendor_code)
return api_response({
'vendor': {
'code': vendor_code,
'name': vendor['name']
},
'stats': stats,
'timestamp': datetime.utcnow().isoformat()
})
except Exception as e:
logger.error(f"Error in get_vendor_stats: {e}", exc_info=True)
return api_error(str(e), 500)
@api_bp.route('/cve/<vendor_code>/filter', methods=['GET'])
def filter_vendor_cves(vendor_code: str):
try:
vendor = next((v for v in config.VENDORS if v['code'] == vendor_code), None)
if not vendor:
return api_error(f"Unknown vendor: {vendor_code}", 404)
severity = request.args.get('severity', '').upper()
year = request.args.get('year', type=int)
try:
limit = min(
int(request.args.get('limit', config.ITEMS_PER_PAGE)),
config.MAX_ITEMS_PER_PAGE
)
offset = max(int(request.args.get('offset', 0)), 0)
except ValueError:
return api_error("Invalid pagination parameters", 400)
if severity and severity not in ['CRITICAL', 'HIGH', 'MEDIUM', 'LOW']:
return api_error(f"Invalid severity: {severity}", 400)
cves = cve_handler.get_vendor_cves(
vendor_code,
limit=limit,
offset=offset,
severity=severity if severity else None,
year=year
)
return api_response({
'vendor': {
'code': vendor_code,
'name': vendor['name']
},
'filters': {
'severity': severity,
'year': year
},
'cves': cves,
'count': len(cves),
'timestamp': datetime.utcnow().isoformat()
})
except Exception as e:
logger.error(f"Error in filter_vendor_cves: {e}", exc_info=True)
return api_error(str(e), 500)
@api_bp.route('/search', methods=['GET'])
def search_cves():
if not config.ENABLE_SEARCH:
return api_error("Search is disabled", 403)
try:
query = request.args.get('q', '').strip()
if not query:
return api_error("Missing search query", 400)
if len(query) < 3:
return api_error("Query too short (min 3 characters)", 400)
limit = min(
int(request.args.get('limit', config.ITEMS_PER_PAGE)),
config.MAX_ITEMS_PER_PAGE
)
results = cve_handler.search_cves(query, limit=limit)
return api_response({
'query': query,
'results': results,
'count': len(results),
'timestamp': datetime.utcnow().isoformat()
})
except Exception as e:
logger.error(f"Error in search_cves: {e}", exc_info=True)
return api_error(str(e), 500)
@api_bp.route('/update/<vendor_code>', methods=['POST'])
def trigger_update(vendor_code: str):
try:
vendor = next((v for v in config.VENDORS if v['code'] == vendor_code), None)
if not vendor:
return api_error(f"Unknown vendor: {vendor_code}", 404)
logger.info(f"Manual update triggered for {vendor_code}")
success = cve_handler.update_vendor_cache(vendor_code, force=True)
if success:
return api_response({
'message': f'Update triggered for {vendor_code}',
'vendor': vendor['name'],
'status': 'success',
'timestamp': datetime.utcnow().isoformat()
})
else:
return api_error('Update failed', 500)
except Exception as e:
logger.error(f"Error in trigger_update: {e}", exc_info=True)
return api_error(str(e), 500)
@api_bp.route('/export/<vendor_code>/<format>', methods=['GET'])
def export_cves(vendor_code: str, format: str):
if not config.ENABLE_EXPORT:
return api_error("Export is disabled", 403)
try:
if format not in config.EXPORT_FORMATS:
return api_error(f"Invalid format: {format}. Supported: {', '.join(config.EXPORT_FORMATS)}", 400)
vendor = next((v for v in config.VENDORS if v['code'] == vendor_code), None)
if not vendor:
return api_error(f"Unknown vendor: {vendor_code}", 404)
cves = cve_handler.get_vendor_cves(
vendor_code,
limit=config.EXPORT_MAX_ITEMS
)
if format == 'json':
return api_response({
'vendor': {
'code': vendor_code,
'name': vendor['name']
},
'export_date': datetime.utcnow().isoformat(),
'cve_count': len(cves),
'cves': cves
})
elif format == 'csv':
output = StringIO()
writer = csv.DictWriter(output, fieldnames=[
'cve_id', 'severity', 'cvss_score', 'published_date',
'last_modified', 'description'
])
writer.writeheader()
for cve in cves:
writer.writerow({
'cve_id': cve.get('cve_id', ''),
'severity': cve.get('severity', ''),
'cvss_score': cve.get('cvss_score', ''),
'published_date': cve.get('published_date', ''),
'last_modified': cve.get('last_modified', ''),
'description': (cve.get('description', '') or '')[:500]
})
response = make_response(output.getvalue())
response.headers['Content-Type'] = 'text/csv; charset=utf-8'
response.headers['Content-Disposition'] = f'attachment; filename={vendor_code}_cves_{datetime.utcnow().strftime("%Y%m%d")}.csv'
return response
except Exception as e:
logger.error(f"Error in export_cves: {e}", exc_info=True)
return api_error(str(e), 500)