854 lines
37 KiB
Python
854 lines
37 KiB
Python
import sqlite3
|
|
import json
|
|
import logging
|
|
import time
|
|
import os
|
|
from datetime import datetime, timedelta
|
|
from typing import List, Dict, Optional
|
|
from contextlib import contextmanager
|
|
import requests
|
|
from requests.adapters import HTTPAdapter
|
|
from urllib3.util.retry import Retry
|
|
import config
|
|
|
|
logging.basicConfig(level=config.LOG_LEVEL, format=config.LOG_FORMAT)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class CVEHandler:
|
|
|
|
def __init__(self, db_path: str = None):
|
|
self.db_path = db_path or config.DATABASE_PATH
|
|
|
|
db_dir = os.path.dirname(self.db_path)
|
|
if db_dir and not os.path.exists(db_dir):
|
|
os.makedirs(db_dir, exist_ok=True)
|
|
logger.info(f"Created database directory: {db_dir}")
|
|
|
|
self._init_database()
|
|
self.session = self._create_session()
|
|
|
|
def _create_session(self) -> requests.Session:
|
|
session = requests.Session()
|
|
retry_strategy = Retry(
|
|
total=3,
|
|
backoff_factor=1,
|
|
status_forcelist=[429, 500, 502, 503, 504],
|
|
allowed_methods=["GET"]
|
|
)
|
|
adapter = HTTPAdapter(max_retries=retry_strategy)
|
|
session.mount("http://", adapter)
|
|
session.mount("https://", adapter)
|
|
return session
|
|
|
|
@contextmanager
|
|
def get_db_connection(self):
|
|
conn = sqlite3.connect(self.db_path)
|
|
conn.row_factory = sqlite3.Row
|
|
if config.DATABASE_WAL_MODE:
|
|
conn.execute("PRAGMA journal_mode=WAL")
|
|
conn.execute("PRAGMA synchronous=NORMAL")
|
|
conn.execute(f"PRAGMA cache_size=-{config.DATABASE_CACHE_SIZE}")
|
|
try:
|
|
yield conn
|
|
conn.commit()
|
|
except Exception as e:
|
|
conn.rollback()
|
|
logger.error(f"Database error: {e}")
|
|
raise
|
|
finally:
|
|
conn.close()
|
|
|
|
def _init_database(self):
|
|
with self.get_db_connection() as conn:
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS cve_cache (
|
|
cve_id TEXT PRIMARY KEY,
|
|
vendor_code TEXT NOT NULL,
|
|
description TEXT,
|
|
published_date TEXT,
|
|
last_modified TEXT,
|
|
cvss_score REAL,
|
|
cvss_vector TEXT,
|
|
severity TEXT,
|
|
refs TEXT,
|
|
cwe_ids TEXT,
|
|
affected_products TEXT,
|
|
raw_data TEXT,
|
|
discord_notified INTEGER DEFAULT 0,
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|
)
|
|
""")
|
|
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS cve_metadata (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
vendor_code TEXT UNIQUE NOT NULL,
|
|
last_update TIMESTAMP,
|
|
total_cve_count INTEGER DEFAULT 0,
|
|
last_cve_id TEXT,
|
|
update_status TEXT,
|
|
error_message TEXT
|
|
)
|
|
""")
|
|
|
|
cursor.execute("CREATE INDEX IF NOT EXISTS idx_vendor_code ON cve_cache(vendor_code)")
|
|
cursor.execute("CREATE INDEX IF NOT EXISTS idx_severity ON cve_cache(severity)")
|
|
cursor.execute("CREATE INDEX IF NOT EXISTS idx_published_date ON cve_cache(published_date DESC)")
|
|
cursor.execute("CREATE INDEX IF NOT EXISTS idx_cvss_score ON cve_cache(cvss_score DESC)")
|
|
cursor.execute("CREATE INDEX IF NOT EXISTS idx_updated_at ON cve_cache(updated_at DESC)")
|
|
cursor.execute("CREATE INDEX IF NOT EXISTS idx_discord_notified ON cve_cache(discord_notified)")
|
|
|
|
cursor.execute("PRAGMA table_info(cve_cache)")
|
|
columns = [col[1] for col in cursor.fetchall()]
|
|
|
|
if 'discord_notified' not in columns:
|
|
logger.info("Adding discord_notified column to existing table...")
|
|
cursor.execute("""
|
|
ALTER TABLE cve_cache
|
|
ADD COLUMN discord_notified INTEGER DEFAULT 0
|
|
""")
|
|
logger.info("✓ Column added successfully")
|
|
|
|
logger.info(f"Database initialized at {self.db_path}")
|
|
|
|
def fetch_cve_from_nvd(self, keywords: List[str], vendor_config: Dict = None, days_back: int = None) -> List[Dict]:
|
|
|
|
if days_back is None:
|
|
days_back = config.INITIAL_LOOKBACK_DAYS
|
|
|
|
if not vendor_config:
|
|
logger.error("vendor_config is required!")
|
|
return []
|
|
|
|
cpe_vendor = vendor_config.get('cpe_vendor')
|
|
cpe_product = vendor_config.get('cpe_product')
|
|
|
|
if not cpe_vendor:
|
|
logger.error(f"cpe_vendor is required in vendor_config!")
|
|
return []
|
|
|
|
require_cvss = vendor_config.get('require_cvss', True)
|
|
min_cvss = vendor_config.get('min_cvss')
|
|
max_days_per_chunk = 120
|
|
all_results = []
|
|
unique_cve_ids = set()
|
|
num_chunks = (days_back // max_days_per_chunk) + (1 if days_back % max_days_per_chunk else 0)
|
|
|
|
logger.info(f"Fetching CVEs from NVD")
|
|
logger.info(f" CPE filter: vendor='{cpe_vendor}'")
|
|
if cpe_product:
|
|
logger.info(f" product='{cpe_product}'")
|
|
if require_cvss:
|
|
logger.info(f" CVSS required: yes")
|
|
if min_cvss:
|
|
logger.info(f" Minimum CVSS: {min_cvss}")
|
|
logger.info(f" Status filter: Analyzed/Modified only")
|
|
logger.info(f" Platform filter: Ignore OS-only CPE entries")
|
|
logger.info(f" Date range: {days_back} days in {num_chunks} chunks")
|
|
|
|
for chunk_idx in range(num_chunks):
|
|
chunk_end = datetime.utcnow() - timedelta(days=chunk_idx * max_days_per_chunk)
|
|
days_in_chunk = min(max_days_per_chunk, days_back - (chunk_idx * max_days_per_chunk))
|
|
chunk_start = chunk_end - timedelta(days=days_in_chunk)
|
|
|
|
start_str = chunk_start.strftime('%Y-%m-%dT%H:%M:%S.000')
|
|
end_str = chunk_end.strftime('%Y-%m-%dT%H:%M:%S.000')
|
|
|
|
logger.info(f"Chunk {chunk_idx+1}/{num_chunks}: {start_str[:10]} to {end_str[:10]}")
|
|
|
|
params = {
|
|
'pubStartDate': start_str,
|
|
'pubEndDate': end_str,
|
|
'resultsPerPage': 2000,
|
|
'startIndex': 0
|
|
}
|
|
|
|
headers = {}
|
|
if config.NVD_API_KEY:
|
|
headers['apiKey'] = config.NVD_API_KEY
|
|
|
|
page = 0
|
|
while True:
|
|
try:
|
|
params['startIndex'] = page * 2000
|
|
|
|
response = self.session.get(
|
|
config.CVE_SOURCES['nvd']['url'],
|
|
params=params,
|
|
headers=headers,
|
|
timeout=config.CVE_SOURCES['nvd']['timeout']
|
|
)
|
|
|
|
if response.status_code != 200:
|
|
if page == 0:
|
|
logger.warning(f"Chunk {chunk_idx+1} returned {response.status_code}")
|
|
break
|
|
|
|
data = response.json()
|
|
vulnerabilities = data.get('vulnerabilities', [])
|
|
total_results = data.get('totalResults', 0)
|
|
results_per_page = data.get('resultsPerPage', 2000)
|
|
|
|
if page == 0:
|
|
logger.info(f" Total CVEs in chunk: {total_results}")
|
|
|
|
if not vulnerabilities:
|
|
break
|
|
|
|
matched_count = 0
|
|
for vuln in vulnerabilities:
|
|
cve_data = vuln.get('cve', {})
|
|
cve_id = cve_data.get('id', '')
|
|
|
|
if cve_id in unique_cve_ids:
|
|
continue
|
|
|
|
vuln_status = cve_data.get('vulnStatus', '')
|
|
|
|
if vuln_status in ['Rejected', 'Awaiting Analysis', 'Undergoing Analysis']:
|
|
continue
|
|
|
|
cpe_match_found = False
|
|
|
|
configurations = cve_data.get('configurations', [])
|
|
|
|
if not configurations:
|
|
continue
|
|
|
|
for config_item in configurations:
|
|
for node in config_item.get('nodes', []):
|
|
for cpe_match in node.get('cpeMatch', []):
|
|
cpe_uri = cpe_match.get('criteria', '').lower()
|
|
|
|
# Format: cpe:2.3:part:vendor:product:version:update:edition:...
|
|
cpe_parts = cpe_uri.split(':')
|
|
if len(cpe_parts) >= 6:
|
|
cpe_part_type = cpe_parts[2]
|
|
cpe_vendor_part = cpe_parts[3]
|
|
cpe_product_part = cpe_parts[4]
|
|
cpe_version_part = cpe_parts[5]
|
|
|
|
vendor_match = (cpe_vendor_part == cpe_vendor.lower())
|
|
|
|
if vendor_match:
|
|
is_platform_only = False
|
|
|
|
if cpe_part_type == 'o':
|
|
if cpe_version_part in ['-', '*', 'any']:
|
|
is_platform_only = True
|
|
|
|
platform_products = [
|
|
'windows', 'windows_server', 'windows_10', 'windows_11',
|
|
'macos', 'mac_os_x', 'ios', 'ipados', 'tvos', 'watchos',
|
|
'linux', 'linux_kernel',
|
|
'android', 'chrome_os',
|
|
'ubuntu', 'debian', 'centos', 'rhel', 'fedora',
|
|
'freebsd', 'netbsd', 'openbsd'
|
|
]
|
|
|
|
if cpe_product_part in platform_products:
|
|
if cpe_version_part in ['-', '*', 'any']:
|
|
is_platform_only = True
|
|
|
|
if is_platform_only:
|
|
continue
|
|
|
|
if cpe_product:
|
|
product_match = (cpe_product_part == cpe_product.lower())
|
|
if product_match:
|
|
cpe_match_found = True
|
|
break
|
|
else:
|
|
cpe_match_found = True
|
|
break
|
|
|
|
if cpe_match_found:
|
|
break
|
|
|
|
if cpe_match_found:
|
|
break
|
|
|
|
if not cpe_match_found:
|
|
continue
|
|
|
|
cvss_score = None
|
|
cvss_vector = None
|
|
severity = 'UNKNOWN'
|
|
|
|
metrics = cve_data.get('metrics', {})
|
|
|
|
for version, key in [('4.0', 'cvssMetricV40'),
|
|
('3.1', 'cvssMetricV31'),
|
|
('3.0', 'cvssMetricV30'),
|
|
('2.0', 'cvssMetricV2')]:
|
|
if key in metrics and metrics[key]:
|
|
cvss_data = metrics[key][0].get('cvssData', {})
|
|
cvss_score = cvss_data.get('baseScore')
|
|
cvss_vector = cvss_data.get('vectorString')
|
|
severity = cvss_data.get('baseSeverity', 'UNKNOWN')
|
|
|
|
if (not severity or severity == 'UNKNOWN') and cvss_score:
|
|
if cvss_score >= 9.0:
|
|
severity = 'CRITICAL'
|
|
elif cvss_score >= 7.0:
|
|
severity = 'HIGH'
|
|
elif cvss_score >= 4.0:
|
|
severity = 'MEDIUM'
|
|
elif cvss_score > 0:
|
|
severity = 'LOW'
|
|
break
|
|
|
|
|
|
if require_cvss and not cvss_score:
|
|
continue
|
|
|
|
if min_cvss and cvss_score and cvss_score < min_cvss:
|
|
continue
|
|
|
|
matched_count += 1
|
|
unique_cve_ids.add(cve_id)
|
|
descriptions = cve_data.get('descriptions', [])
|
|
description_text = ' '.join([desc.get('value', '') for desc in descriptions])
|
|
refs = [ref.get('url', '') for ref in cve_data.get('references', [])]
|
|
cwe_ids = []
|
|
for weakness in cve_data.get('weaknesses', []):
|
|
for desc in weakness.get('description', []):
|
|
value = desc.get('value', '')
|
|
if value.startswith('CWE-'):
|
|
cwe_ids.append(value)
|
|
|
|
all_results.append({
|
|
'cve_id': cve_id,
|
|
'description': description_text[:2000],
|
|
'published_date': cve_data.get('published', ''),
|
|
'last_modified': cve_data.get('lastModified', ''),
|
|
'cvss_score': cvss_score,
|
|
'cvss_vector': cvss_vector,
|
|
'severity': severity,
|
|
'references': json.dumps(refs),
|
|
'cwe_ids': json.dumps(cwe_ids),
|
|
'raw_data': json.dumps(cve_data)
|
|
})
|
|
|
|
if matched_count > 0:
|
|
logger.info(f" Page {page+1}: Matched {matched_count} CVEs")
|
|
|
|
if len(vulnerabilities) < results_per_page or (page + 1) * results_per_page >= total_results:
|
|
break
|
|
|
|
page += 1
|
|
|
|
if not config.NVD_API_KEY:
|
|
time.sleep(6)
|
|
else:
|
|
time.sleep(0.6)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in chunk {chunk_idx+1} page {page+1}: {e}", exc_info=True)
|
|
break
|
|
|
|
if chunk_idx < num_chunks - 1:
|
|
delay = 6 if not config.NVD_API_KEY else 1
|
|
time.sleep(delay)
|
|
|
|
logger.info(f"✓ Total unique CVEs matched: {len(all_results)} (CPE + CVSS + Status + Platform filters)")
|
|
return all_results
|
|
|
|
def fetch_cve_from_github(self, keywords: List[str]) -> List[Dict]:
|
|
results = []
|
|
unique_cve_ids = set()
|
|
|
|
headers = {}
|
|
if config.GITHUB_TOKEN:
|
|
headers['Authorization'] = f'token {config.GITHUB_TOKEN}'
|
|
|
|
logger.info(f"Fetching advisories from GitHub for keywords: {keywords[:5]}")
|
|
|
|
page = 1
|
|
max_pages = 10
|
|
|
|
while page <= max_pages:
|
|
params = {
|
|
'per_page': 100,
|
|
'page': page,
|
|
'sort': 'published',
|
|
'direction': 'desc'
|
|
}
|
|
|
|
try:
|
|
response = self.session.get(
|
|
config.CVE_SOURCES['github']['url'],
|
|
params=params,
|
|
headers=headers,
|
|
timeout=config.CVE_SOURCES['github']['timeout']
|
|
)
|
|
response.raise_for_status()
|
|
|
|
advisories = response.json()
|
|
|
|
if not advisories:
|
|
logger.info(f"GitHub page {page}: No more results")
|
|
break
|
|
|
|
logger.info(f"GitHub page {page}: Got {len(advisories)} advisories")
|
|
|
|
matched_count = 0
|
|
for advisory in advisories:
|
|
summary = advisory.get('summary', '')
|
|
description = advisory.get('description', '')
|
|
full_text = (summary + ' ' + description).lower()
|
|
|
|
if not any(keyword.lower() in full_text for keyword in keywords):
|
|
continue
|
|
|
|
cve_id = None
|
|
for identifier in advisory.get('identifiers', []):
|
|
if identifier.get('type') == 'CVE':
|
|
cve_id = identifier.get('value')
|
|
break
|
|
|
|
if not cve_id:
|
|
cve_id = advisory.get('ghsa_id', f"GHSA-{advisory.get('id', 'unknown')}")
|
|
|
|
if cve_id in unique_cve_ids:
|
|
continue
|
|
|
|
unique_cve_ids.add(cve_id)
|
|
matched_count += 1
|
|
|
|
cvss_data = advisory.get('cvss', {})
|
|
cvss_score = cvss_data.get('score')
|
|
cvss_vector = cvss_data.get('vector_string')
|
|
severity = advisory.get('severity', 'UNKNOWN').upper()
|
|
cwe_list = [cwe.get('cwe_id', '') for cwe in advisory.get('cwes', [])]
|
|
|
|
results.append({
|
|
'cve_id': cve_id,
|
|
'description': (summary + ' ' + description)[:1000],
|
|
'published_date': advisory.get('published_at', ''),
|
|
'last_modified': advisory.get('updated_at', ''),
|
|
'cvss_score': cvss_score,
|
|
'cvss_vector': cvss_vector,
|
|
'severity': severity,
|
|
'references': json.dumps([advisory.get('html_url', '')]),
|
|
'cwe_ids': json.dumps(cwe_list),
|
|
'raw_data': json.dumps(advisory)
|
|
})
|
|
|
|
logger.info(f"GitHub page {page}: Matched {matched_count} advisories")
|
|
|
|
page += 1
|
|
time.sleep(1)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error fetching GitHub page {page}: {e}", exc_info=True)
|
|
break
|
|
|
|
logger.info(f"✓ Total unique CVEs from GitHub: {len(results)}")
|
|
return results
|
|
|
|
def update_vendor_cache(self, vendor_code: str, force: bool = False) -> bool:
|
|
vendor = next((v for v in config.VENDORS if v['code'] == vendor_code), None)
|
|
if not vendor:
|
|
logger.error(f"Unknown vendor code: {vendor_code}")
|
|
logger.error(f"Available vendors: {[v['code'] for v in config.VENDORS]}")
|
|
return False
|
|
|
|
if not force:
|
|
with self.get_db_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute(
|
|
"SELECT last_update FROM cve_metadata WHERE vendor_code = ?",
|
|
(vendor_code,)
|
|
)
|
|
row = cursor.fetchone()
|
|
if row and row['last_update']:
|
|
last_update = datetime.fromisoformat(row['last_update'])
|
|
time_since_update = datetime.utcnow() - last_update
|
|
cache_valid_hours = config.UPDATE_INTERVAL_HOURS
|
|
|
|
if time_since_update < timedelta(hours=cache_valid_hours):
|
|
logger.info(f"Cache for {vendor_code} is fresh (updated {time_since_update.seconds//3600}h ago)")
|
|
return True
|
|
|
|
logger.info(f"Updating cache for vendor: {vendor['name']} ({vendor_code})")
|
|
|
|
if vendor.get('cpe_vendor'):
|
|
logger.info(f"CPE vendor: {vendor['cpe_vendor']}")
|
|
if vendor.get('cpe_product'):
|
|
logger.info(f"CPE product: {vendor['cpe_product']}")
|
|
else:
|
|
logger.error(f"No CPE vendor configured for {vendor_code}!")
|
|
return False
|
|
|
|
try:
|
|
all_cves = []
|
|
days_back = config.INITIAL_LOOKBACK_DAYS if force else config.UPDATE_LOOKBACK_DAYS
|
|
|
|
nvd_cves = self.fetch_cve_from_nvd(
|
|
vendor['keywords'],
|
|
vendor_config=vendor,
|
|
days_back=days_back
|
|
)
|
|
all_cves.extend(nvd_cves)
|
|
logger.info(f"Collected {len(nvd_cves)} CVEs from NVD")
|
|
|
|
if config.CVE_SOURCES.get('github', {}).get('enabled', False):
|
|
github_cves = self.fetch_cve_from_github(vendor['keywords'])
|
|
all_cves.extend(github_cves)
|
|
logger.info(f"Collected {len(github_cves)} CVEs from GitHub")
|
|
|
|
unique_cves = {cve['cve_id']: cve for cve in all_cves}
|
|
|
|
logger.info(f"Total unique CVEs after deduplication: {len(unique_cves)}")
|
|
|
|
new_count = 0
|
|
updated_count = 0
|
|
|
|
with self.get_db_connection() as conn:
|
|
cursor = conn.cursor()
|
|
|
|
for cve_id, cve in unique_cves.items():
|
|
cursor.execute("""
|
|
SELECT discord_notified FROM cve_cache WHERE cve_id = ?
|
|
""", (cve_id,))
|
|
|
|
existing = cursor.fetchone()
|
|
|
|
if not existing:
|
|
cursor.execute("""
|
|
INSERT INTO cve_cache
|
|
(cve_id, vendor_code, description, published_date, last_modified,
|
|
cvss_score, cvss_vector, severity, refs, cwe_ids,
|
|
affected_products, raw_data, discord_notified,
|
|
created_at, updated_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 0, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
|
|
""", (
|
|
cve_id,
|
|
vendor_code,
|
|
cve.get('description'),
|
|
cve.get('published_date'),
|
|
cve.get('last_modified'),
|
|
cve.get('cvss_score'),
|
|
cve.get('cvss_vector'),
|
|
cve.get('severity'),
|
|
cve.get('references'),
|
|
cve.get('cwe_ids'),
|
|
None,
|
|
cve.get('raw_data')
|
|
))
|
|
new_count += 1
|
|
else:
|
|
old_notified = existing[0]
|
|
|
|
cursor.execute("""
|
|
UPDATE cve_cache SET
|
|
vendor_code = ?,
|
|
description = ?,
|
|
published_date = ?,
|
|
last_modified = ?,
|
|
cvss_score = ?,
|
|
cvss_vector = ?,
|
|
severity = ?,
|
|
refs = ?,
|
|
cwe_ids = ?,
|
|
affected_products = ?,
|
|
raw_data = ?,
|
|
discord_notified = ?,
|
|
updated_at = CURRENT_TIMESTAMP
|
|
WHERE cve_id = ?
|
|
""", (
|
|
vendor_code,
|
|
cve.get('description'),
|
|
cve.get('published_date'),
|
|
cve.get('last_modified'),
|
|
cve.get('cvss_score'),
|
|
cve.get('cvss_vector'),
|
|
cve.get('severity'),
|
|
cve.get('references'),
|
|
cve.get('cwe_ids'),
|
|
None,
|
|
cve.get('raw_data'),
|
|
old_notified,
|
|
cve_id
|
|
))
|
|
updated_count += 1
|
|
|
|
cursor.execute("""
|
|
INSERT OR REPLACE INTO cve_metadata
|
|
(vendor_code, last_update, total_cve_count, last_cve_id, update_status, error_message)
|
|
VALUES (?, CURRENT_TIMESTAMP, ?, ?, 'success', NULL)
|
|
""", (
|
|
vendor_code,
|
|
len(unique_cves),
|
|
list(unique_cves.keys())[0] if unique_cves else None
|
|
))
|
|
|
|
if new_count > 0:
|
|
logger.info(f"Added {new_count} new CVEs for {vendor['name']}")
|
|
if updated_count > 0:
|
|
logger.info(f"Updated {updated_count} existing CVEs for {vendor['name']}")
|
|
|
|
logger.info(f"Successfully processed {len(unique_cves)} CVEs for {vendor['name']}")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error updating vendor cache for {vendor_code}: {e}", exc_info=True)
|
|
|
|
try:
|
|
with self.get_db_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute("""
|
|
INSERT OR REPLACE INTO cve_metadata
|
|
(vendor_code, last_update, update_status, error_message)
|
|
VALUES (?, CURRENT_TIMESTAMP, 'failed', ?)
|
|
""", (vendor_code, str(e)[:500]))
|
|
except:
|
|
pass
|
|
|
|
return False
|
|
|
|
|
|
def get_vendor_cves(self, vendor_code: str, limit: int = None, offset: int = 0,
|
|
severity: str = None, year: int = None) -> List[Dict]:
|
|
with self.get_db_connection() as conn:
|
|
cursor = conn.cursor()
|
|
|
|
query = """
|
|
SELECT cve_id, vendor_code, description, published_date, last_modified,
|
|
cvss_score, cvss_vector, severity, refs, cwe_ids, updated_at
|
|
FROM cve_cache
|
|
WHERE vendor_code = ?
|
|
"""
|
|
params = [vendor_code]
|
|
|
|
if severity:
|
|
query += " AND severity = ?"
|
|
params.append(severity.upper())
|
|
|
|
if year:
|
|
query += " AND strftime('%Y', published_date) = ?"
|
|
params.append(str(year))
|
|
|
|
query += " ORDER BY published_date DESC"
|
|
|
|
if limit:
|
|
query += f" LIMIT {limit} OFFSET {offset}"
|
|
|
|
cursor.execute(query, params)
|
|
|
|
results = []
|
|
for row in cursor.fetchall():
|
|
row_dict = dict(row)
|
|
if 'refs' in row_dict:
|
|
row_dict['references'] = row_dict.pop('refs')
|
|
results.append(row_dict)
|
|
|
|
return results
|
|
|
|
def get_vendor_stats(self, vendor_code: str) -> Dict:
|
|
with self.get_db_connection() as conn:
|
|
cursor = conn.cursor()
|
|
|
|
# Total count
|
|
cursor.execute(
|
|
"SELECT COUNT(*) as total FROM cve_cache WHERE vendor_code = ?",
|
|
(vendor_code,)
|
|
)
|
|
total = cursor.fetchone()['total']
|
|
|
|
# Severity distribution
|
|
cursor.execute("""
|
|
SELECT severity, COUNT(*) as count
|
|
FROM cve_cache
|
|
WHERE vendor_code = ?
|
|
GROUP BY severity
|
|
""", (vendor_code,))
|
|
severity_counts = {row['severity']: row['count'] for row in cursor.fetchall()}
|
|
|
|
# Monthly trend (last 12 months)
|
|
cursor.execute("""
|
|
SELECT strftime('%Y-%m', published_date) as month, COUNT(*) as count
|
|
FROM cve_cache
|
|
WHERE vendor_code = ? AND published_date >= date('now', '-12 months')
|
|
GROUP BY month
|
|
ORDER BY month
|
|
""", (vendor_code,))
|
|
monthly_counts = {row['month']: row['count'] for row in cursor.fetchall()}
|
|
|
|
# This month
|
|
cursor.execute("""
|
|
SELECT COUNT(*) as count
|
|
FROM cve_cache
|
|
WHERE vendor_code = ? AND strftime('%Y-%m', published_date) = strftime('%Y-%m', 'now')
|
|
""", (vendor_code,))
|
|
this_month = cursor.fetchone()['count']
|
|
|
|
# This year
|
|
cursor.execute("""
|
|
SELECT COUNT(*) as count
|
|
FROM cve_cache
|
|
WHERE vendor_code = ? AND strftime('%Y', published_date) = strftime('%Y', 'now')
|
|
""", (vendor_code,))
|
|
this_year = cursor.fetchone()['count']
|
|
|
|
# Recent (last 7 days)
|
|
cursor.execute("""
|
|
SELECT COUNT(*) as count
|
|
FROM cve_cache
|
|
WHERE vendor_code = ? AND published_date >= date('now', '-7 days')
|
|
""", (vendor_code,))
|
|
recent = cursor.fetchone()['count']
|
|
|
|
return {
|
|
'total': total,
|
|
'severity': severity_counts,
|
|
'monthly': monthly_counts,
|
|
'this_month': this_month,
|
|
'this_year': this_year,
|
|
'recent': recent
|
|
}
|
|
|
|
def search_cves(self, query: str, limit: int = 50) -> List[Dict]:
|
|
with self.get_db_connection() as conn:
|
|
cursor = conn.cursor()
|
|
search_query = f"%{query}%"
|
|
|
|
cursor.execute("""
|
|
SELECT cve_id, vendor_code, description, published_date, cvss_score, severity
|
|
FROM cve_cache
|
|
WHERE cve_id LIKE ? OR description LIKE ?
|
|
ORDER BY published_date DESC
|
|
LIMIT ?
|
|
""", (search_query, search_query, limit))
|
|
|
|
return [dict(row) for row in cursor.fetchall()]
|
|
|
|
def get_all_vendors_summary(self) -> List[Dict]:
|
|
summary = []
|
|
|
|
with self.get_db_connection() as conn:
|
|
cursor = conn.cursor()
|
|
|
|
for vendor in config.VENDORS:
|
|
cursor.execute("""
|
|
SELECT
|
|
COUNT(*) as total,
|
|
SUM(CASE WHEN severity = 'CRITICAL' THEN 1 ELSE 0 END) as critical,
|
|
SUM(CASE WHEN severity = 'HIGH' THEN 1 ELSE 0 END) as high,
|
|
SUM(CASE WHEN published_date >= date('now', '-7 days') THEN 1 ELSE 0 END) as recent
|
|
FROM cve_cache
|
|
WHERE vendor_code = ?
|
|
""", (vendor['code'],))
|
|
stats = cursor.fetchone()
|
|
|
|
cursor.execute(
|
|
"SELECT last_update FROM cve_metadata WHERE vendor_code = ?",
|
|
(vendor['code'],)
|
|
)
|
|
metadata = cursor.fetchone()
|
|
|
|
summary.append({
|
|
'code': vendor['code'],
|
|
'name': vendor['name'],
|
|
'icon': vendor.get('icon', 'fa-shield-alt'),
|
|
'total': stats['total'] or 0,
|
|
'critical': stats['critical'] or 0,
|
|
'high': stats['high'] or 0,
|
|
'recent': stats['recent'] or 0,
|
|
'last_update': metadata['last_update'] if metadata else None
|
|
})
|
|
return summary
|
|
|
|
def get_recent_cves_for_discord(self, hours: int = 1) -> List[Dict]:
|
|
with self.get_db_connection() as conn:
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("""
|
|
SELECT cve_id, vendor_code, description, published_date, cvss_score, severity
|
|
FROM cve_cache
|
|
WHERE updated_at >= datetime('now', ? || ' hours')
|
|
AND severity IN ('CRITICAL', 'HIGH')
|
|
AND cvss_score >= ?
|
|
ORDER BY published_date DESC
|
|
LIMIT 10
|
|
""", (f'-{hours}', config.DISCORD_MIN_CVSS))
|
|
|
|
return [dict(row) for row in cursor.fetchall()]
|
|
|
|
def update_all_vendors(force: bool = True):
|
|
handler = CVEHandler()
|
|
updated = 0
|
|
failed = 0
|
|
|
|
total_vendors = len(config.VENDORS)
|
|
logger.info(f"{'='*60}")
|
|
logger.info(f"Starting {'FULL' if force else 'incremental'} update for {total_vendors} vendors")
|
|
logger.info(f"Lookback period: {config.INITIAL_LOOKBACK_DAYS} days")
|
|
logger.info(f"{'='*60}")
|
|
|
|
start_time = time.time()
|
|
|
|
for idx, vendor in enumerate(config.VENDORS, 1):
|
|
vendor_start = time.time()
|
|
|
|
logger.info(f"\n[{idx}/{total_vendors}] {'='*50}")
|
|
logger.info(f"Updating {vendor['name']} ({vendor['code']})...")
|
|
logger.info(f"Keywords: {vendor['keywords'][:5]}...")
|
|
|
|
try:
|
|
if handler.update_vendor_cache(vendor['code'], force=force):
|
|
updated += 1
|
|
stats = handler.get_vendor_stats(vendor['code'])
|
|
logger.info(f"✓ {vendor['name']} updated successfully")
|
|
logger.info(f" Total CVEs: {stats['total']}")
|
|
logger.info(f" Critical: {stats['severity'].get('CRITICAL', 0)}")
|
|
logger.info(f" High: {stats['severity'].get('HIGH', 0)}")
|
|
logger.info(f" Medium: {stats['severity'].get('MEDIUM', 0)}")
|
|
logger.info(f" Low: {stats['severity'].get('LOW', 0)}")
|
|
else:
|
|
failed += 1
|
|
logger.error(f"✗ {vendor['name']} update failed")
|
|
|
|
vendor_time = time.time() - vendor_start
|
|
logger.info(f" Time taken: {vendor_time:.1f}s")
|
|
|
|
if idx < total_vendors:
|
|
if not config.NVD_API_KEY:
|
|
delay = 10
|
|
logger.info(f"Waiting {delay}s before next vendor (no API key)...")
|
|
else:
|
|
delay = 2
|
|
logger.debug(f"Waiting {delay}s before next vendor...")
|
|
time.sleep(delay)
|
|
|
|
except KeyboardInterrupt:
|
|
logger.warning("Update interrupted by user")
|
|
break
|
|
except Exception as e:
|
|
failed += 1
|
|
logger.error(f"✗ Exception updating {vendor['name']}: {e}", exc_info=True)
|
|
|
|
total_time = time.time() - start_time
|
|
logger.info(f"\n{'='*60}")
|
|
logger.info(f"Update completed in {total_time/60:.1f} minutes")
|
|
logger.info(f"Results: {updated} successful, {failed} failed")
|
|
logger.info(f"{'='*60}")
|
|
|
|
logger.info("\nFinal summary:")
|
|
summary = handler.get_all_vendors_summary()
|
|
for v in summary:
|
|
logger.info(f" {v['name']:25s} - Total: {v['total']:5d} | Critical: {v['critical']:4d} | High: {v['high']:4d}")
|
|
|
|
return updated, failed
|
|
|
|
if __name__ == '__main__':
|
|
logging.basicConfig(level=logging.INFO)
|
|
handler = CVEHandler()
|
|
print(f"Database: {config.DATABASE_PATH}")
|
|
print(f"Available vendors: {[v['code'] for v in config.VENDORS]}")
|
|
|
|
|