Files
cve_monitor/cve_handler.py
Mateusz Gruszczyński e134ee2692 app
2026-02-14 10:04:53 +01:00

854 lines
37 KiB
Python

import sqlite3
import json
import logging
import time
import os
from datetime import datetime, timedelta
from typing import List, Dict, Optional
from contextlib import contextmanager
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
import config
logging.basicConfig(level=config.LOG_LEVEL, format=config.LOG_FORMAT)
logger = logging.getLogger(__name__)
class CVEHandler:
def __init__(self, db_path: str = None):
self.db_path = db_path or config.DATABASE_PATH
db_dir = os.path.dirname(self.db_path)
if db_dir and not os.path.exists(db_dir):
os.makedirs(db_dir, exist_ok=True)
logger.info(f"Created database directory: {db_dir}")
self._init_database()
self.session = self._create_session()
def _create_session(self) -> requests.Session:
session = requests.Session()
retry_strategy = Retry(
total=3,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["GET"]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount("http://", adapter)
session.mount("https://", adapter)
return session
@contextmanager
def get_db_connection(self):
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
if config.DATABASE_WAL_MODE:
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA synchronous=NORMAL")
conn.execute(f"PRAGMA cache_size=-{config.DATABASE_CACHE_SIZE}")
try:
yield conn
conn.commit()
except Exception as e:
conn.rollback()
logger.error(f"Database error: {e}")
raise
finally:
conn.close()
def _init_database(self):
with self.get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS cve_cache (
cve_id TEXT PRIMARY KEY,
vendor_code TEXT NOT NULL,
description TEXT,
published_date TEXT,
last_modified TEXT,
cvss_score REAL,
cvss_vector TEXT,
severity TEXT,
refs TEXT,
cwe_ids TEXT,
affected_products TEXT,
raw_data TEXT,
discord_notified INTEGER DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS cve_metadata (
id INTEGER PRIMARY KEY AUTOINCREMENT,
vendor_code TEXT UNIQUE NOT NULL,
last_update TIMESTAMP,
total_cve_count INTEGER DEFAULT 0,
last_cve_id TEXT,
update_status TEXT,
error_message TEXT
)
""")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_vendor_code ON cve_cache(vendor_code)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_severity ON cve_cache(severity)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_published_date ON cve_cache(published_date DESC)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_cvss_score ON cve_cache(cvss_score DESC)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_updated_at ON cve_cache(updated_at DESC)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_discord_notified ON cve_cache(discord_notified)")
cursor.execute("PRAGMA table_info(cve_cache)")
columns = [col[1] for col in cursor.fetchall()]
if 'discord_notified' not in columns:
logger.info("Adding discord_notified column to existing table...")
cursor.execute("""
ALTER TABLE cve_cache
ADD COLUMN discord_notified INTEGER DEFAULT 0
""")
logger.info("✓ Column added successfully")
logger.info(f"Database initialized at {self.db_path}")
def fetch_cve_from_nvd(self, keywords: List[str], vendor_config: Dict = None, days_back: int = None) -> List[Dict]:
if days_back is None:
days_back = config.INITIAL_LOOKBACK_DAYS
if not vendor_config:
logger.error("vendor_config is required!")
return []
cpe_vendor = vendor_config.get('cpe_vendor')
cpe_product = vendor_config.get('cpe_product')
if not cpe_vendor:
logger.error(f"cpe_vendor is required in vendor_config!")
return []
require_cvss = vendor_config.get('require_cvss', True)
min_cvss = vendor_config.get('min_cvss')
max_days_per_chunk = 120
all_results = []
unique_cve_ids = set()
num_chunks = (days_back // max_days_per_chunk) + (1 if days_back % max_days_per_chunk else 0)
logger.info(f"Fetching CVEs from NVD")
logger.info(f" CPE filter: vendor='{cpe_vendor}'")
if cpe_product:
logger.info(f" product='{cpe_product}'")
if require_cvss:
logger.info(f" CVSS required: yes")
if min_cvss:
logger.info(f" Minimum CVSS: {min_cvss}")
logger.info(f" Status filter: Analyzed/Modified only")
logger.info(f" Platform filter: Ignore OS-only CPE entries")
logger.info(f" Date range: {days_back} days in {num_chunks} chunks")
for chunk_idx in range(num_chunks):
chunk_end = datetime.utcnow() - timedelta(days=chunk_idx * max_days_per_chunk)
days_in_chunk = min(max_days_per_chunk, days_back - (chunk_idx * max_days_per_chunk))
chunk_start = chunk_end - timedelta(days=days_in_chunk)
start_str = chunk_start.strftime('%Y-%m-%dT%H:%M:%S.000')
end_str = chunk_end.strftime('%Y-%m-%dT%H:%M:%S.000')
logger.info(f"Chunk {chunk_idx+1}/{num_chunks}: {start_str[:10]} to {end_str[:10]}")
params = {
'pubStartDate': start_str,
'pubEndDate': end_str,
'resultsPerPage': 2000,
'startIndex': 0
}
headers = {}
if config.NVD_API_KEY:
headers['apiKey'] = config.NVD_API_KEY
page = 0
while True:
try:
params['startIndex'] = page * 2000
response = self.session.get(
config.CVE_SOURCES['nvd']['url'],
params=params,
headers=headers,
timeout=config.CVE_SOURCES['nvd']['timeout']
)
if response.status_code != 200:
if page == 0:
logger.warning(f"Chunk {chunk_idx+1} returned {response.status_code}")
break
data = response.json()
vulnerabilities = data.get('vulnerabilities', [])
total_results = data.get('totalResults', 0)
results_per_page = data.get('resultsPerPage', 2000)
if page == 0:
logger.info(f" Total CVEs in chunk: {total_results}")
if not vulnerabilities:
break
matched_count = 0
for vuln in vulnerabilities:
cve_data = vuln.get('cve', {})
cve_id = cve_data.get('id', '')
if cve_id in unique_cve_ids:
continue
vuln_status = cve_data.get('vulnStatus', '')
if vuln_status in ['Rejected', 'Awaiting Analysis', 'Undergoing Analysis']:
continue
cpe_match_found = False
configurations = cve_data.get('configurations', [])
if not configurations:
continue
for config_item in configurations:
for node in config_item.get('nodes', []):
for cpe_match in node.get('cpeMatch', []):
cpe_uri = cpe_match.get('criteria', '').lower()
# Format: cpe:2.3:part:vendor:product:version:update:edition:...
cpe_parts = cpe_uri.split(':')
if len(cpe_parts) >= 6:
cpe_part_type = cpe_parts[2]
cpe_vendor_part = cpe_parts[3]
cpe_product_part = cpe_parts[4]
cpe_version_part = cpe_parts[5]
vendor_match = (cpe_vendor_part == cpe_vendor.lower())
if vendor_match:
is_platform_only = False
if cpe_part_type == 'o':
if cpe_version_part in ['-', '*', 'any']:
is_platform_only = True
platform_products = [
'windows', 'windows_server', 'windows_10', 'windows_11',
'macos', 'mac_os_x', 'ios', 'ipados', 'tvos', 'watchos',
'linux', 'linux_kernel',
'android', 'chrome_os',
'ubuntu', 'debian', 'centos', 'rhel', 'fedora',
'freebsd', 'netbsd', 'openbsd'
]
if cpe_product_part in platform_products:
if cpe_version_part in ['-', '*', 'any']:
is_platform_only = True
if is_platform_only:
continue
if cpe_product:
product_match = (cpe_product_part == cpe_product.lower())
if product_match:
cpe_match_found = True
break
else:
cpe_match_found = True
break
if cpe_match_found:
break
if cpe_match_found:
break
if not cpe_match_found:
continue
cvss_score = None
cvss_vector = None
severity = 'UNKNOWN'
metrics = cve_data.get('metrics', {})
for version, key in [('4.0', 'cvssMetricV40'),
('3.1', 'cvssMetricV31'),
('3.0', 'cvssMetricV30'),
('2.0', 'cvssMetricV2')]:
if key in metrics and metrics[key]:
cvss_data = metrics[key][0].get('cvssData', {})
cvss_score = cvss_data.get('baseScore')
cvss_vector = cvss_data.get('vectorString')
severity = cvss_data.get('baseSeverity', 'UNKNOWN')
if (not severity or severity == 'UNKNOWN') and cvss_score:
if cvss_score >= 9.0:
severity = 'CRITICAL'
elif cvss_score >= 7.0:
severity = 'HIGH'
elif cvss_score >= 4.0:
severity = 'MEDIUM'
elif cvss_score > 0:
severity = 'LOW'
break
if require_cvss and not cvss_score:
continue
if min_cvss and cvss_score and cvss_score < min_cvss:
continue
matched_count += 1
unique_cve_ids.add(cve_id)
descriptions = cve_data.get('descriptions', [])
description_text = ' '.join([desc.get('value', '') for desc in descriptions])
refs = [ref.get('url', '') for ref in cve_data.get('references', [])]
cwe_ids = []
for weakness in cve_data.get('weaknesses', []):
for desc in weakness.get('description', []):
value = desc.get('value', '')
if value.startswith('CWE-'):
cwe_ids.append(value)
all_results.append({
'cve_id': cve_id,
'description': description_text[:2000],
'published_date': cve_data.get('published', ''),
'last_modified': cve_data.get('lastModified', ''),
'cvss_score': cvss_score,
'cvss_vector': cvss_vector,
'severity': severity,
'references': json.dumps(refs),
'cwe_ids': json.dumps(cwe_ids),
'raw_data': json.dumps(cve_data)
})
if matched_count > 0:
logger.info(f" Page {page+1}: Matched {matched_count} CVEs")
if len(vulnerabilities) < results_per_page or (page + 1) * results_per_page >= total_results:
break
page += 1
if not config.NVD_API_KEY:
time.sleep(6)
else:
time.sleep(0.6)
except Exception as e:
logger.error(f"Error in chunk {chunk_idx+1} page {page+1}: {e}", exc_info=True)
break
if chunk_idx < num_chunks - 1:
delay = 6 if not config.NVD_API_KEY else 1
time.sleep(delay)
logger.info(f"✓ Total unique CVEs matched: {len(all_results)} (CPE + CVSS + Status + Platform filters)")
return all_results
def fetch_cve_from_github(self, keywords: List[str]) -> List[Dict]:
results = []
unique_cve_ids = set()
headers = {}
if config.GITHUB_TOKEN:
headers['Authorization'] = f'token {config.GITHUB_TOKEN}'
logger.info(f"Fetching advisories from GitHub for keywords: {keywords[:5]}")
page = 1
max_pages = 10
while page <= max_pages:
params = {
'per_page': 100,
'page': page,
'sort': 'published',
'direction': 'desc'
}
try:
response = self.session.get(
config.CVE_SOURCES['github']['url'],
params=params,
headers=headers,
timeout=config.CVE_SOURCES['github']['timeout']
)
response.raise_for_status()
advisories = response.json()
if not advisories:
logger.info(f"GitHub page {page}: No more results")
break
logger.info(f"GitHub page {page}: Got {len(advisories)} advisories")
matched_count = 0
for advisory in advisories:
summary = advisory.get('summary', '')
description = advisory.get('description', '')
full_text = (summary + ' ' + description).lower()
if not any(keyword.lower() in full_text for keyword in keywords):
continue
cve_id = None
for identifier in advisory.get('identifiers', []):
if identifier.get('type') == 'CVE':
cve_id = identifier.get('value')
break
if not cve_id:
cve_id = advisory.get('ghsa_id', f"GHSA-{advisory.get('id', 'unknown')}")
if cve_id in unique_cve_ids:
continue
unique_cve_ids.add(cve_id)
matched_count += 1
cvss_data = advisory.get('cvss', {})
cvss_score = cvss_data.get('score')
cvss_vector = cvss_data.get('vector_string')
severity = advisory.get('severity', 'UNKNOWN').upper()
cwe_list = [cwe.get('cwe_id', '') for cwe in advisory.get('cwes', [])]
results.append({
'cve_id': cve_id,
'description': (summary + ' ' + description)[:1000],
'published_date': advisory.get('published_at', ''),
'last_modified': advisory.get('updated_at', ''),
'cvss_score': cvss_score,
'cvss_vector': cvss_vector,
'severity': severity,
'references': json.dumps([advisory.get('html_url', '')]),
'cwe_ids': json.dumps(cwe_list),
'raw_data': json.dumps(advisory)
})
logger.info(f"GitHub page {page}: Matched {matched_count} advisories")
page += 1
time.sleep(1)
except Exception as e:
logger.error(f"Error fetching GitHub page {page}: {e}", exc_info=True)
break
logger.info(f"✓ Total unique CVEs from GitHub: {len(results)}")
return results
def update_vendor_cache(self, vendor_code: str, force: bool = False) -> bool:
vendor = next((v for v in config.VENDORS if v['code'] == vendor_code), None)
if not vendor:
logger.error(f"Unknown vendor code: {vendor_code}")
logger.error(f"Available vendors: {[v['code'] for v in config.VENDORS]}")
return False
if not force:
with self.get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"SELECT last_update FROM cve_metadata WHERE vendor_code = ?",
(vendor_code,)
)
row = cursor.fetchone()
if row and row['last_update']:
last_update = datetime.fromisoformat(row['last_update'])
time_since_update = datetime.utcnow() - last_update
cache_valid_hours = config.UPDATE_INTERVAL_HOURS
if time_since_update < timedelta(hours=cache_valid_hours):
logger.info(f"Cache for {vendor_code} is fresh (updated {time_since_update.seconds//3600}h ago)")
return True
logger.info(f"Updating cache for vendor: {vendor['name']} ({vendor_code})")
if vendor.get('cpe_vendor'):
logger.info(f"CPE vendor: {vendor['cpe_vendor']}")
if vendor.get('cpe_product'):
logger.info(f"CPE product: {vendor['cpe_product']}")
else:
logger.error(f"No CPE vendor configured for {vendor_code}!")
return False
try:
all_cves = []
days_back = config.INITIAL_LOOKBACK_DAYS if force else config.UPDATE_LOOKBACK_DAYS
nvd_cves = self.fetch_cve_from_nvd(
vendor['keywords'],
vendor_config=vendor,
days_back=days_back
)
all_cves.extend(nvd_cves)
logger.info(f"Collected {len(nvd_cves)} CVEs from NVD")
if config.CVE_SOURCES.get('github', {}).get('enabled', False):
github_cves = self.fetch_cve_from_github(vendor['keywords'])
all_cves.extend(github_cves)
logger.info(f"Collected {len(github_cves)} CVEs from GitHub")
unique_cves = {cve['cve_id']: cve for cve in all_cves}
logger.info(f"Total unique CVEs after deduplication: {len(unique_cves)}")
new_count = 0
updated_count = 0
with self.get_db_connection() as conn:
cursor = conn.cursor()
for cve_id, cve in unique_cves.items():
cursor.execute("""
SELECT discord_notified FROM cve_cache WHERE cve_id = ?
""", (cve_id,))
existing = cursor.fetchone()
if not existing:
cursor.execute("""
INSERT INTO cve_cache
(cve_id, vendor_code, description, published_date, last_modified,
cvss_score, cvss_vector, severity, refs, cwe_ids,
affected_products, raw_data, discord_notified,
created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 0, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
""", (
cve_id,
vendor_code,
cve.get('description'),
cve.get('published_date'),
cve.get('last_modified'),
cve.get('cvss_score'),
cve.get('cvss_vector'),
cve.get('severity'),
cve.get('references'),
cve.get('cwe_ids'),
None,
cve.get('raw_data')
))
new_count += 1
else:
old_notified = existing[0]
cursor.execute("""
UPDATE cve_cache SET
vendor_code = ?,
description = ?,
published_date = ?,
last_modified = ?,
cvss_score = ?,
cvss_vector = ?,
severity = ?,
refs = ?,
cwe_ids = ?,
affected_products = ?,
raw_data = ?,
discord_notified = ?,
updated_at = CURRENT_TIMESTAMP
WHERE cve_id = ?
""", (
vendor_code,
cve.get('description'),
cve.get('published_date'),
cve.get('last_modified'),
cve.get('cvss_score'),
cve.get('cvss_vector'),
cve.get('severity'),
cve.get('references'),
cve.get('cwe_ids'),
None,
cve.get('raw_data'),
old_notified,
cve_id
))
updated_count += 1
cursor.execute("""
INSERT OR REPLACE INTO cve_metadata
(vendor_code, last_update, total_cve_count, last_cve_id, update_status, error_message)
VALUES (?, CURRENT_TIMESTAMP, ?, ?, 'success', NULL)
""", (
vendor_code,
len(unique_cves),
list(unique_cves.keys())[0] if unique_cves else None
))
if new_count > 0:
logger.info(f"Added {new_count} new CVEs for {vendor['name']}")
if updated_count > 0:
logger.info(f"Updated {updated_count} existing CVEs for {vendor['name']}")
logger.info(f"Successfully processed {len(unique_cves)} CVEs for {vendor['name']}")
return True
except Exception as e:
logger.error(f"Error updating vendor cache for {vendor_code}: {e}", exc_info=True)
try:
with self.get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
INSERT OR REPLACE INTO cve_metadata
(vendor_code, last_update, update_status, error_message)
VALUES (?, CURRENT_TIMESTAMP, 'failed', ?)
""", (vendor_code, str(e)[:500]))
except:
pass
return False
def get_vendor_cves(self, vendor_code: str, limit: int = None, offset: int = 0,
severity: str = None, year: int = None) -> List[Dict]:
with self.get_db_connection() as conn:
cursor = conn.cursor()
query = """
SELECT cve_id, vendor_code, description, published_date, last_modified,
cvss_score, cvss_vector, severity, refs, cwe_ids, updated_at
FROM cve_cache
WHERE vendor_code = ?
"""
params = [vendor_code]
if severity:
query += " AND severity = ?"
params.append(severity.upper())
if year:
query += " AND strftime('%Y', published_date) = ?"
params.append(str(year))
query += " ORDER BY published_date DESC"
if limit:
query += f" LIMIT {limit} OFFSET {offset}"
cursor.execute(query, params)
results = []
for row in cursor.fetchall():
row_dict = dict(row)
if 'refs' in row_dict:
row_dict['references'] = row_dict.pop('refs')
results.append(row_dict)
return results
def get_vendor_stats(self, vendor_code: str) -> Dict:
with self.get_db_connection() as conn:
cursor = conn.cursor()
# Total count
cursor.execute(
"SELECT COUNT(*) as total FROM cve_cache WHERE vendor_code = ?",
(vendor_code,)
)
total = cursor.fetchone()['total']
# Severity distribution
cursor.execute("""
SELECT severity, COUNT(*) as count
FROM cve_cache
WHERE vendor_code = ?
GROUP BY severity
""", (vendor_code,))
severity_counts = {row['severity']: row['count'] for row in cursor.fetchall()}
# Monthly trend (last 12 months)
cursor.execute("""
SELECT strftime('%Y-%m', published_date) as month, COUNT(*) as count
FROM cve_cache
WHERE vendor_code = ? AND published_date >= date('now', '-12 months')
GROUP BY month
ORDER BY month
""", (vendor_code,))
monthly_counts = {row['month']: row['count'] for row in cursor.fetchall()}
# This month
cursor.execute("""
SELECT COUNT(*) as count
FROM cve_cache
WHERE vendor_code = ? AND strftime('%Y-%m', published_date) = strftime('%Y-%m', 'now')
""", (vendor_code,))
this_month = cursor.fetchone()['count']
# This year
cursor.execute("""
SELECT COUNT(*) as count
FROM cve_cache
WHERE vendor_code = ? AND strftime('%Y', published_date) = strftime('%Y', 'now')
""", (vendor_code,))
this_year = cursor.fetchone()['count']
# Recent (last 7 days)
cursor.execute("""
SELECT COUNT(*) as count
FROM cve_cache
WHERE vendor_code = ? AND published_date >= date('now', '-7 days')
""", (vendor_code,))
recent = cursor.fetchone()['count']
return {
'total': total,
'severity': severity_counts,
'monthly': monthly_counts,
'this_month': this_month,
'this_year': this_year,
'recent': recent
}
def search_cves(self, query: str, limit: int = 50) -> List[Dict]:
with self.get_db_connection() as conn:
cursor = conn.cursor()
search_query = f"%{query}%"
cursor.execute("""
SELECT cve_id, vendor_code, description, published_date, cvss_score, severity
FROM cve_cache
WHERE cve_id LIKE ? OR description LIKE ?
ORDER BY published_date DESC
LIMIT ?
""", (search_query, search_query, limit))
return [dict(row) for row in cursor.fetchall()]
def get_all_vendors_summary(self) -> List[Dict]:
summary = []
with self.get_db_connection() as conn:
cursor = conn.cursor()
for vendor in config.VENDORS:
cursor.execute("""
SELECT
COUNT(*) as total,
SUM(CASE WHEN severity = 'CRITICAL' THEN 1 ELSE 0 END) as critical,
SUM(CASE WHEN severity = 'HIGH' THEN 1 ELSE 0 END) as high,
SUM(CASE WHEN published_date >= date('now', '-7 days') THEN 1 ELSE 0 END) as recent
FROM cve_cache
WHERE vendor_code = ?
""", (vendor['code'],))
stats = cursor.fetchone()
cursor.execute(
"SELECT last_update FROM cve_metadata WHERE vendor_code = ?",
(vendor['code'],)
)
metadata = cursor.fetchone()
summary.append({
'code': vendor['code'],
'name': vendor['name'],
'icon': vendor.get('icon', 'fa-shield-alt'),
'total': stats['total'] or 0,
'critical': stats['critical'] or 0,
'high': stats['high'] or 0,
'recent': stats['recent'] or 0,
'last_update': metadata['last_update'] if metadata else None
})
return summary
def get_recent_cves_for_discord(self, hours: int = 1) -> List[Dict]:
with self.get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT cve_id, vendor_code, description, published_date, cvss_score, severity
FROM cve_cache
WHERE updated_at >= datetime('now', ? || ' hours')
AND severity IN ('CRITICAL', 'HIGH')
AND cvss_score >= ?
ORDER BY published_date DESC
LIMIT 10
""", (f'-{hours}', config.DISCORD_MIN_CVSS))
return [dict(row) for row in cursor.fetchall()]
def update_all_vendors(force: bool = True):
handler = CVEHandler()
updated = 0
failed = 0
total_vendors = len(config.VENDORS)
logger.info(f"{'='*60}")
logger.info(f"Starting {'FULL' if force else 'incremental'} update for {total_vendors} vendors")
logger.info(f"Lookback period: {config.INITIAL_LOOKBACK_DAYS} days")
logger.info(f"{'='*60}")
start_time = time.time()
for idx, vendor in enumerate(config.VENDORS, 1):
vendor_start = time.time()
logger.info(f"\n[{idx}/{total_vendors}] {'='*50}")
logger.info(f"Updating {vendor['name']} ({vendor['code']})...")
logger.info(f"Keywords: {vendor['keywords'][:5]}...")
try:
if handler.update_vendor_cache(vendor['code'], force=force):
updated += 1
stats = handler.get_vendor_stats(vendor['code'])
logger.info(f"{vendor['name']} updated successfully")
logger.info(f" Total CVEs: {stats['total']}")
logger.info(f" Critical: {stats['severity'].get('CRITICAL', 0)}")
logger.info(f" High: {stats['severity'].get('HIGH', 0)}")
logger.info(f" Medium: {stats['severity'].get('MEDIUM', 0)}")
logger.info(f" Low: {stats['severity'].get('LOW', 0)}")
else:
failed += 1
logger.error(f"{vendor['name']} update failed")
vendor_time = time.time() - vendor_start
logger.info(f" Time taken: {vendor_time:.1f}s")
if idx < total_vendors:
if not config.NVD_API_KEY:
delay = 10
logger.info(f"Waiting {delay}s before next vendor (no API key)...")
else:
delay = 2
logger.debug(f"Waiting {delay}s before next vendor...")
time.sleep(delay)
except KeyboardInterrupt:
logger.warning("Update interrupted by user")
break
except Exception as e:
failed += 1
logger.error(f"✗ Exception updating {vendor['name']}: {e}", exc_info=True)
total_time = time.time() - start_time
logger.info(f"\n{'='*60}")
logger.info(f"Update completed in {total_time/60:.1f} minutes")
logger.info(f"Results: {updated} successful, {failed} failed")
logger.info(f"{'='*60}")
logger.info("\nFinal summary:")
summary = handler.get_all_vendors_summary()
for v in summary:
logger.info(f" {v['name']:25s} - Total: {v['total']:5d} | Critical: {v['critical']:4d} | High: {v['high']:4d}")
return updated, failed
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
handler = CVEHandler()
print(f"Database: {config.DATABASE_PATH}")
print(f"Available vendors: {[v['code'] for v in config.VENDORS]}")