first commit
This commit is contained in:
252
api.py
Normal file
252
api.py
Normal file
@@ -0,0 +1,252 @@
|
||||
from flask import Blueprint, jsonify, request, make_response
|
||||
import logging
|
||||
import csv
|
||||
from io import StringIO
|
||||
from datetime import datetime
|
||||
import config
|
||||
from cve_handler import CVEHandler
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
api_bp = Blueprint('api', __name__)
|
||||
cve_handler = CVEHandler()
|
||||
|
||||
def api_response(data: any, status: int = 200) -> tuple:
|
||||
return jsonify(data), status
|
||||
|
||||
def api_error(message: str, status: int = 400) -> tuple:
|
||||
return jsonify({
|
||||
'error': message,
|
||||
'timestamp': datetime.utcnow().isoformat()
|
||||
}), status
|
||||
|
||||
@api_bp.route('/vendors', methods=['GET'])
|
||||
def get_vendors():
|
||||
try:
|
||||
summary = cve_handler.get_all_vendors_summary()
|
||||
return api_response({
|
||||
'vendors': summary,
|
||||
'total_vendors': len(summary),
|
||||
'timestamp': datetime.utcnow().isoformat()
|
||||
})
|
||||
except Exception as e:
|
||||
logger.error(f"Error in get_vendors: {e}", exc_info=True)
|
||||
return api_error(str(e), 500)
|
||||
|
||||
@api_bp.route('/cve/<vendor_code>', methods=['GET'])
|
||||
def get_vendor_cves(vendor_code: str):
|
||||
try:
|
||||
vendor = next((v for v in config.VENDORS if v['code'] == vendor_code), None)
|
||||
if not vendor:
|
||||
return api_error(f"Unknown vendor: {vendor_code}", 404)
|
||||
|
||||
try:
|
||||
limit = min(
|
||||
int(request.args.get('limit', config.ITEMS_PER_PAGE)),
|
||||
config.MAX_ITEMS_PER_PAGE
|
||||
)
|
||||
offset = max(int(request.args.get('offset', 0)), 0)
|
||||
except ValueError:
|
||||
return api_error("Invalid pagination parameters", 400)
|
||||
|
||||
cves = cve_handler.get_vendor_cves(
|
||||
vendor_code,
|
||||
limit=limit,
|
||||
offset=offset
|
||||
)
|
||||
|
||||
return api_response({
|
||||
'vendor': {
|
||||
'code': vendor_code,
|
||||
'name': vendor['name']
|
||||
},
|
||||
'cves': cves,
|
||||
'count': len(cves),
|
||||
'limit': limit,
|
||||
'offset': offset,
|
||||
'timestamp': datetime.utcnow().isoformat()
|
||||
})
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error in get_vendor_cves: {e}", exc_info=True)
|
||||
return api_error(str(e), 500)
|
||||
|
||||
@api_bp.route('/stats/<vendor_code>', methods=['GET'])
|
||||
def get_vendor_stats(vendor_code: str):
|
||||
try:
|
||||
vendor = next((v for v in config.VENDORS if v['code'] == vendor_code), None)
|
||||
if not vendor:
|
||||
return api_error(f"Unknown vendor: {vendor_code}", 404)
|
||||
|
||||
stats = cve_handler.get_vendor_stats(vendor_code)
|
||||
|
||||
return api_response({
|
||||
'vendor': {
|
||||
'code': vendor_code,
|
||||
'name': vendor['name']
|
||||
},
|
||||
'stats': stats,
|
||||
'timestamp': datetime.utcnow().isoformat()
|
||||
})
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error in get_vendor_stats: {e}", exc_info=True)
|
||||
return api_error(str(e), 500)
|
||||
|
||||
@api_bp.route('/cve/<vendor_code>/filter', methods=['GET'])
|
||||
def filter_vendor_cves(vendor_code: str):
|
||||
try:
|
||||
vendor = next((v for v in config.VENDORS if v['code'] == vendor_code), None)
|
||||
if not vendor:
|
||||
return api_error(f"Unknown vendor: {vendor_code}", 404)
|
||||
|
||||
severity = request.args.get('severity', '').upper()
|
||||
year = request.args.get('year', type=int)
|
||||
|
||||
try:
|
||||
limit = min(
|
||||
int(request.args.get('limit', config.ITEMS_PER_PAGE)),
|
||||
config.MAX_ITEMS_PER_PAGE
|
||||
)
|
||||
offset = max(int(request.args.get('offset', 0)), 0)
|
||||
except ValueError:
|
||||
return api_error("Invalid pagination parameters", 400)
|
||||
|
||||
if severity and severity not in ['CRITICAL', 'HIGH', 'MEDIUM', 'LOW']:
|
||||
return api_error(f"Invalid severity: {severity}", 400)
|
||||
|
||||
cves = cve_handler.get_vendor_cves(
|
||||
vendor_code,
|
||||
limit=limit,
|
||||
offset=offset,
|
||||
severity=severity if severity else None,
|
||||
year=year
|
||||
)
|
||||
|
||||
return api_response({
|
||||
'vendor': {
|
||||
'code': vendor_code,
|
||||
'name': vendor['name']
|
||||
},
|
||||
'filters': {
|
||||
'severity': severity,
|
||||
'year': year
|
||||
},
|
||||
'cves': cves,
|
||||
'count': len(cves),
|
||||
'timestamp': datetime.utcnow().isoformat()
|
||||
})
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error in filter_vendor_cves: {e}", exc_info=True)
|
||||
return api_error(str(e), 500)
|
||||
|
||||
@api_bp.route('/search', methods=['GET'])
|
||||
def search_cves():
|
||||
if not config.ENABLE_SEARCH:
|
||||
return api_error("Search is disabled", 403)
|
||||
|
||||
try:
|
||||
query = request.args.get('q', '').strip()
|
||||
if not query:
|
||||
return api_error("Missing search query", 400)
|
||||
|
||||
if len(query) < 3:
|
||||
return api_error("Query too short (min 3 characters)", 400)
|
||||
|
||||
limit = min(
|
||||
int(request.args.get('limit', config.ITEMS_PER_PAGE)),
|
||||
config.MAX_ITEMS_PER_PAGE
|
||||
)
|
||||
|
||||
results = cve_handler.search_cves(query, limit=limit)
|
||||
|
||||
return api_response({
|
||||
'query': query,
|
||||
'results': results,
|
||||
'count': len(results),
|
||||
'timestamp': datetime.utcnow().isoformat()
|
||||
})
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error in search_cves: {e}", exc_info=True)
|
||||
return api_error(str(e), 500)
|
||||
|
||||
@api_bp.route('/update/<vendor_code>', methods=['POST'])
|
||||
def trigger_update(vendor_code: str):
|
||||
try:
|
||||
vendor = next((v for v in config.VENDORS if v['code'] == vendor_code), None)
|
||||
if not vendor:
|
||||
return api_error(f"Unknown vendor: {vendor_code}", 404)
|
||||
|
||||
logger.info(f"Manual update triggered for {vendor_code}")
|
||||
success = cve_handler.update_vendor_cache(vendor_code, force=True)
|
||||
|
||||
if success:
|
||||
return api_response({
|
||||
'message': f'Update triggered for {vendor_code}',
|
||||
'vendor': vendor['name'],
|
||||
'status': 'success',
|
||||
'timestamp': datetime.utcnow().isoformat()
|
||||
})
|
||||
else:
|
||||
return api_error('Update failed', 500)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error in trigger_update: {e}", exc_info=True)
|
||||
return api_error(str(e), 500)
|
||||
|
||||
@api_bp.route('/export/<vendor_code>/<format>', methods=['GET'])
|
||||
def export_cves(vendor_code: str, format: str):
|
||||
if not config.ENABLE_EXPORT:
|
||||
return api_error("Export is disabled", 403)
|
||||
|
||||
try:
|
||||
if format not in config.EXPORT_FORMATS:
|
||||
return api_error(f"Invalid format: {format}. Supported: {', '.join(config.EXPORT_FORMATS)}", 400)
|
||||
|
||||
vendor = next((v for v in config.VENDORS if v['code'] == vendor_code), None)
|
||||
if not vendor:
|
||||
return api_error(f"Unknown vendor: {vendor_code}", 404)
|
||||
|
||||
cves = cve_handler.get_vendor_cves(
|
||||
vendor_code,
|
||||
limit=config.EXPORT_MAX_ITEMS
|
||||
)
|
||||
|
||||
if format == 'json':
|
||||
return api_response({
|
||||
'vendor': {
|
||||
'code': vendor_code,
|
||||
'name': vendor['name']
|
||||
},
|
||||
'export_date': datetime.utcnow().isoformat(),
|
||||
'cve_count': len(cves),
|
||||
'cves': cves
|
||||
})
|
||||
|
||||
elif format == 'csv':
|
||||
output = StringIO()
|
||||
writer = csv.DictWriter(output, fieldnames=[
|
||||
'cve_id', 'severity', 'cvss_score', 'published_date',
|
||||
'last_modified', 'description'
|
||||
])
|
||||
writer.writeheader()
|
||||
|
||||
for cve in cves:
|
||||
writer.writerow({
|
||||
'cve_id': cve.get('cve_id', ''),
|
||||
'severity': cve.get('severity', ''),
|
||||
'cvss_score': cve.get('cvss_score', ''),
|
||||
'published_date': cve.get('published_date', ''),
|
||||
'last_modified': cve.get('last_modified', ''),
|
||||
'description': (cve.get('description', '') or '')[:500]
|
||||
})
|
||||
|
||||
response = make_response(output.getvalue())
|
||||
response.headers['Content-Type'] = 'text/csv; charset=utf-8'
|
||||
response.headers['Content-Disposition'] = f'attachment; filename={vendor_code}_cves_{datetime.utcnow().strftime("%Y%m%d")}.csv'
|
||||
return response
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error in export_cves: {e}", exc_info=True)
|
||||
return api_error(str(e), 500)
|
||||
Reference in New Issue
Block a user