252 lines
8.5 KiB
Python
252 lines
8.5 KiB
Python
from flask import Blueprint, jsonify, request, make_response
|
|
import logging
|
|
import csv
|
|
from io import StringIO
|
|
from datetime import datetime
|
|
import config
|
|
from cve_handler import CVEHandler
|
|
|
|
logger = logging.getLogger(__name__)
|
|
api_bp = Blueprint('api', __name__)
|
|
cve_handler = CVEHandler()
|
|
|
|
def api_response(data: any, status: int = 200) -> tuple:
|
|
return jsonify(data), status
|
|
|
|
def api_error(message: str, status: int = 400) -> tuple:
|
|
return jsonify({
|
|
'error': message,
|
|
'timestamp': datetime.utcnow().isoformat()
|
|
}), status
|
|
|
|
@api_bp.route('/vendors', methods=['GET'])
|
|
def get_vendors():
|
|
try:
|
|
summary = cve_handler.get_all_vendors_summary()
|
|
return api_response({
|
|
'vendors': summary,
|
|
'total_vendors': len(summary),
|
|
'timestamp': datetime.utcnow().isoformat()
|
|
})
|
|
except Exception as e:
|
|
logger.error(f"Error in get_vendors: {e}", exc_info=True)
|
|
return api_error(str(e), 500)
|
|
|
|
@api_bp.route('/cve/<vendor_code>', methods=['GET'])
|
|
def get_vendor_cves(vendor_code: str):
|
|
try:
|
|
vendor = next((v for v in config.VENDORS if v['code'] == vendor_code), None)
|
|
if not vendor:
|
|
return api_error(f"Unknown vendor: {vendor_code}", 404)
|
|
|
|
try:
|
|
limit = min(
|
|
int(request.args.get('limit', config.ITEMS_PER_PAGE)),
|
|
config.MAX_ITEMS_PER_PAGE
|
|
)
|
|
offset = max(int(request.args.get('offset', 0)), 0)
|
|
except ValueError:
|
|
return api_error("Invalid pagination parameters", 400)
|
|
|
|
cves = cve_handler.get_vendor_cves(
|
|
vendor_code,
|
|
limit=limit,
|
|
offset=offset
|
|
)
|
|
|
|
return api_response({
|
|
'vendor': {
|
|
'code': vendor_code,
|
|
'name': vendor['name']
|
|
},
|
|
'cves': cves,
|
|
'count': len(cves),
|
|
'limit': limit,
|
|
'offset': offset,
|
|
'timestamp': datetime.utcnow().isoformat()
|
|
})
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in get_vendor_cves: {e}", exc_info=True)
|
|
return api_error(str(e), 500)
|
|
|
|
@api_bp.route('/stats/<vendor_code>', methods=['GET'])
|
|
def get_vendor_stats(vendor_code: str):
|
|
try:
|
|
vendor = next((v for v in config.VENDORS if v['code'] == vendor_code), None)
|
|
if not vendor:
|
|
return api_error(f"Unknown vendor: {vendor_code}", 404)
|
|
|
|
stats = cve_handler.get_vendor_stats(vendor_code)
|
|
|
|
return api_response({
|
|
'vendor': {
|
|
'code': vendor_code,
|
|
'name': vendor['name']
|
|
},
|
|
'stats': stats,
|
|
'timestamp': datetime.utcnow().isoformat()
|
|
})
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in get_vendor_stats: {e}", exc_info=True)
|
|
return api_error(str(e), 500)
|
|
|
|
@api_bp.route('/cve/<vendor_code>/filter', methods=['GET'])
|
|
def filter_vendor_cves(vendor_code: str):
|
|
try:
|
|
vendor = next((v for v in config.VENDORS if v['code'] == vendor_code), None)
|
|
if not vendor:
|
|
return api_error(f"Unknown vendor: {vendor_code}", 404)
|
|
|
|
severity = request.args.get('severity', '').upper()
|
|
year = request.args.get('year', type=int)
|
|
|
|
try:
|
|
limit = min(
|
|
int(request.args.get('limit', config.ITEMS_PER_PAGE)),
|
|
config.MAX_ITEMS_PER_PAGE
|
|
)
|
|
offset = max(int(request.args.get('offset', 0)), 0)
|
|
except ValueError:
|
|
return api_error("Invalid pagination parameters", 400)
|
|
|
|
if severity and severity not in ['CRITICAL', 'HIGH', 'MEDIUM', 'LOW']:
|
|
return api_error(f"Invalid severity: {severity}", 400)
|
|
|
|
cves = cve_handler.get_vendor_cves(
|
|
vendor_code,
|
|
limit=limit,
|
|
offset=offset,
|
|
severity=severity if severity else None,
|
|
year=year
|
|
)
|
|
|
|
return api_response({
|
|
'vendor': {
|
|
'code': vendor_code,
|
|
'name': vendor['name']
|
|
},
|
|
'filters': {
|
|
'severity': severity,
|
|
'year': year
|
|
},
|
|
'cves': cves,
|
|
'count': len(cves),
|
|
'timestamp': datetime.utcnow().isoformat()
|
|
})
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in filter_vendor_cves: {e}", exc_info=True)
|
|
return api_error(str(e), 500)
|
|
|
|
@api_bp.route('/search', methods=['GET'])
|
|
def search_cves():
|
|
if not config.ENABLE_SEARCH:
|
|
return api_error("Search is disabled", 403)
|
|
|
|
try:
|
|
query = request.args.get('q', '').strip()
|
|
if not query:
|
|
return api_error("Missing search query", 400)
|
|
|
|
if len(query) < 3:
|
|
return api_error("Query too short (min 3 characters)", 400)
|
|
|
|
limit = min(
|
|
int(request.args.get('limit', config.ITEMS_PER_PAGE)),
|
|
config.MAX_ITEMS_PER_PAGE
|
|
)
|
|
|
|
results = cve_handler.search_cves(query, limit=limit)
|
|
|
|
return api_response({
|
|
'query': query,
|
|
'results': results,
|
|
'count': len(results),
|
|
'timestamp': datetime.utcnow().isoformat()
|
|
})
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in search_cves: {e}", exc_info=True)
|
|
return api_error(str(e), 500)
|
|
|
|
@api_bp.route('/update/<vendor_code>', methods=['POST'])
|
|
def trigger_update(vendor_code: str):
|
|
try:
|
|
vendor = next((v for v in config.VENDORS if v['code'] == vendor_code), None)
|
|
if not vendor:
|
|
return api_error(f"Unknown vendor: {vendor_code}", 404)
|
|
|
|
logger.info(f"Manual update triggered for {vendor_code}")
|
|
success = cve_handler.update_vendor_cache(vendor_code, force=True)
|
|
|
|
if success:
|
|
return api_response({
|
|
'message': f'Update triggered for {vendor_code}',
|
|
'vendor': vendor['name'],
|
|
'status': 'success',
|
|
'timestamp': datetime.utcnow().isoformat()
|
|
})
|
|
else:
|
|
return api_error('Update failed', 500)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in trigger_update: {e}", exc_info=True)
|
|
return api_error(str(e), 500)
|
|
|
|
@api_bp.route('/export/<vendor_code>/<format>', methods=['GET'])
|
|
def export_cves(vendor_code: str, format: str):
|
|
if not config.ENABLE_EXPORT:
|
|
return api_error("Export is disabled", 403)
|
|
|
|
try:
|
|
if format not in config.EXPORT_FORMATS:
|
|
return api_error(f"Invalid format: {format}. Supported: {', '.join(config.EXPORT_FORMATS)}", 400)
|
|
|
|
vendor = next((v for v in config.VENDORS if v['code'] == vendor_code), None)
|
|
if not vendor:
|
|
return api_error(f"Unknown vendor: {vendor_code}", 404)
|
|
|
|
cves = cve_handler.get_vendor_cves(
|
|
vendor_code,
|
|
limit=config.EXPORT_MAX_ITEMS
|
|
)
|
|
|
|
if format == 'json':
|
|
return api_response({
|
|
'vendor': {
|
|
'code': vendor_code,
|
|
'name': vendor['name']
|
|
},
|
|
'export_date': datetime.utcnow().isoformat(),
|
|
'cve_count': len(cves),
|
|
'cves': cves
|
|
})
|
|
|
|
elif format == 'csv':
|
|
output = StringIO()
|
|
writer = csv.DictWriter(output, fieldnames=[
|
|
'cve_id', 'severity', 'cvss_score', 'published_date',
|
|
'last_modified', 'description'
|
|
])
|
|
writer.writeheader()
|
|
|
|
for cve in cves:
|
|
writer.writerow({
|
|
'cve_id': cve.get('cve_id', ''),
|
|
'severity': cve.get('severity', ''),
|
|
'cvss_score': cve.get('cvss_score', ''),
|
|
'published_date': cve.get('published_date', ''),
|
|
'last_modified': cve.get('last_modified', ''),
|
|
'description': (cve.get('description', '') or '')[:500]
|
|
})
|
|
|
|
response = make_response(output.getvalue())
|
|
response.headers['Content-Type'] = 'text/csv; charset=utf-8'
|
|
response.headers['Content-Disposition'] = f'attachment; filename={vendor_code}_cves_{datetime.utcnow().strftime("%Y%m%d")}.csv'
|
|
return response
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in export_cves: {e}", exc_info=True)
|
|
return api_error(str(e), 500) |