from flask import Blueprint, jsonify, request, make_response import logging import csv from io import StringIO from datetime import datetime import config from cve_handler import CVEHandler logger = logging.getLogger(__name__) api_bp = Blueprint('api', __name__) cve_handler = CVEHandler() def api_response(data: any, status: int = 200) -> tuple: return jsonify(data), status def api_error(message: str, status: int = 400) -> tuple: return jsonify({ 'error': message, 'timestamp': datetime.utcnow().isoformat() }), status @api_bp.route('/vendors', methods=['GET']) def get_vendors(): try: summary = cve_handler.get_all_vendors_summary() return api_response({ 'vendors': summary, 'total_vendors': len(summary), 'timestamp': datetime.utcnow().isoformat() }) except Exception as e: logger.error(f"Error in get_vendors: {e}", exc_info=True) return api_error(str(e), 500) @api_bp.route('/cve/', methods=['GET']) def get_vendor_cves(vendor_code: str): try: vendor = next((v for v in config.VENDORS if v['code'] == vendor_code), None) if not vendor: return api_error(f"Unknown vendor: {vendor_code}", 404) try: limit = min( int(request.args.get('limit', config.ITEMS_PER_PAGE)), config.MAX_ITEMS_PER_PAGE ) offset = max(int(request.args.get('offset', 0)), 0) except ValueError: return api_error("Invalid pagination parameters", 400) cves = cve_handler.get_vendor_cves( vendor_code, limit=limit, offset=offset ) return api_response({ 'vendor': { 'code': vendor_code, 'name': vendor['name'] }, 'cves': cves, 'count': len(cves), 'limit': limit, 'offset': offset, 'timestamp': datetime.utcnow().isoformat() }) except Exception as e: logger.error(f"Error in get_vendor_cves: {e}", exc_info=True) return api_error(str(e), 500) @api_bp.route('/stats/', methods=['GET']) def get_vendor_stats(vendor_code: str): try: vendor = next((v for v in config.VENDORS if v['code'] == vendor_code), None) if not vendor: return api_error(f"Unknown vendor: {vendor_code}", 404) stats = cve_handler.get_vendor_stats(vendor_code) return api_response({ 'vendor': { 'code': vendor_code, 'name': vendor['name'] }, 'stats': stats, 'timestamp': datetime.utcnow().isoformat() }) except Exception as e: logger.error(f"Error in get_vendor_stats: {e}", exc_info=True) return api_error(str(e), 500) @api_bp.route('/cve//filter', methods=['GET']) def filter_vendor_cves(vendor_code: str): try: vendor = next((v for v in config.VENDORS if v['code'] == vendor_code), None) if not vendor: return api_error(f"Unknown vendor: {vendor_code}", 404) severity = request.args.get('severity', '').upper() year = request.args.get('year', type=int) try: limit = min( int(request.args.get('limit', config.ITEMS_PER_PAGE)), config.MAX_ITEMS_PER_PAGE ) offset = max(int(request.args.get('offset', 0)), 0) except ValueError: return api_error("Invalid pagination parameters", 400) if severity and severity not in ['CRITICAL', 'HIGH', 'MEDIUM', 'LOW']: return api_error(f"Invalid severity: {severity}", 400) cves = cve_handler.get_vendor_cves( vendor_code, limit=limit, offset=offset, severity=severity if severity else None, year=year ) return api_response({ 'vendor': { 'code': vendor_code, 'name': vendor['name'] }, 'filters': { 'severity': severity, 'year': year }, 'cves': cves, 'count': len(cves), 'timestamp': datetime.utcnow().isoformat() }) except Exception as e: logger.error(f"Error in filter_vendor_cves: {e}", exc_info=True) return api_error(str(e), 500) @api_bp.route('/search', methods=['GET']) def search_cves(): if not config.ENABLE_SEARCH: return api_error("Search is disabled", 403) try: query = request.args.get('q', '').strip() if not query: return api_error("Missing search query", 400) if len(query) < 3: return api_error("Query too short (min 3 characters)", 400) limit = min( int(request.args.get('limit', config.ITEMS_PER_PAGE)), config.MAX_ITEMS_PER_PAGE ) results = cve_handler.search_cves(query, limit=limit) return api_response({ 'query': query, 'results': results, 'count': len(results), 'timestamp': datetime.utcnow().isoformat() }) except Exception as e: logger.error(f"Error in search_cves: {e}", exc_info=True) return api_error(str(e), 500) @api_bp.route('/update/', methods=['POST']) def trigger_update(vendor_code: str): try: vendor = next((v for v in config.VENDORS if v['code'] == vendor_code), None) if not vendor: return api_error(f"Unknown vendor: {vendor_code}", 404) logger.info(f"Manual update triggered for {vendor_code}") success = cve_handler.update_vendor_cache(vendor_code, force=True) if success: return api_response({ 'message': f'Update triggered for {vendor_code}', 'vendor': vendor['name'], 'status': 'success', 'timestamp': datetime.utcnow().isoformat() }) else: return api_error('Update failed', 500) except Exception as e: logger.error(f"Error in trigger_update: {e}", exc_info=True) return api_error(str(e), 500) @api_bp.route('/export//', methods=['GET']) def export_cves(vendor_code: str, format: str): if not config.ENABLE_EXPORT: return api_error("Export is disabled", 403) try: if format not in config.EXPORT_FORMATS: return api_error(f"Invalid format: {format}. Supported: {', '.join(config.EXPORT_FORMATS)}", 400) vendor = next((v for v in config.VENDORS if v['code'] == vendor_code), None) if not vendor: return api_error(f"Unknown vendor: {vendor_code}", 404) cves = cve_handler.get_vendor_cves( vendor_code, limit=config.EXPORT_MAX_ITEMS ) if format == 'json': return api_response({ 'vendor': { 'code': vendor_code, 'name': vendor['name'] }, 'export_date': datetime.utcnow().isoformat(), 'cve_count': len(cves), 'cves': cves }) elif format == 'csv': output = StringIO() writer = csv.DictWriter(output, fieldnames=[ 'cve_id', 'severity', 'cvss_score', 'published_date', 'last_modified', 'description' ]) writer.writeheader() for cve in cves: writer.writerow({ 'cve_id': cve.get('cve_id', ''), 'severity': cve.get('severity', ''), 'cvss_score': cve.get('cvss_score', ''), 'published_date': cve.get('published_date', ''), 'last_modified': cve.get('last_modified', ''), 'description': (cve.get('description', '') or '')[:500] }) response = make_response(output.getvalue()) response.headers['Content-Type'] = 'text/csv; charset=utf-8' response.headers['Content-Disposition'] = f'attachment; filename={vendor_code}_cves_{datetime.utcnow().strftime("%Y%m%d")}.csv' return response except Exception as e: logger.error(f"Error in export_cves: {e}", exc_info=True) return api_error(str(e), 500)