diff --git a/cve_handler.py b/cve_handler.py index 1146482..92a3c8e 100644 --- a/cve_handler.py +++ b/cve_handler.py @@ -76,6 +76,7 @@ class CVEHandler: cwe_ids TEXT, affected_products TEXT, raw_data TEXT, + discord_notified INTEGER DEFAULT 0, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) @@ -98,6 +99,18 @@ class CVEHandler: cursor.execute("CREATE INDEX IF NOT EXISTS idx_published_date ON cve_cache(published_date DESC)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_cvss_score ON cve_cache(cvss_score DESC)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_updated_at ON cve_cache(updated_at DESC)") + cursor.execute("CREATE INDEX IF NOT EXISTS idx_discord_notified ON cve_cache(discord_notified)") + + cursor.execute("PRAGMA table_info(cve_cache)") + columns = [col[1] for col in cursor.fetchall()] + + if 'discord_notified' not in columns: + logger.info("Adding discord_notified column to existing table...") + cursor.execute(""" + ALTER TABLE cve_cache + ADD COLUMN discord_notified INTEGER DEFAULT 0 + """) + logger.info("✓ Column added successfully") logger.info(f"Database initialized at {self.db_path}") diff --git a/discord_bot.py b/discord_bot.py index f3c41ae..139c808 100644 --- a/discord_bot.py +++ b/discord_bot.py @@ -1,11 +1,17 @@ #!/usr/bin/env python3 +""" +CVE Monitor - Discord Bot +Monitors CVE database and sends notifications to Discord channel +Supports interactive commands for querying CVE data +""" import discord -from discord.ext import tasks +from discord.ext import tasks, commands import logging import asyncio -from datetime import datetime +from datetime import datetime, timedelta import sys +import sqlite3 import config from cve_handler import CVEHandler @@ -13,16 +19,23 @@ from cve_handler import CVEHandler logging.basicConfig(level=config.LOG_LEVEL, format=config.LOG_FORMAT) logger = logging.getLogger(__name__) -class CVEDiscordBot(discord.Client): - def __init__(self, *args, **kwargs): + +class CVEDiscordBot(commands.Bot): + def __init__(self): intents = discord.Intents.default() intents.message_content = True - super().__init__(intents=intents, *args, **kwargs) + + super().__init__( + command_prefix='!cve ', + intents=intents, + help_command=None # Custom help + ) self.cve_handler = CVEHandler() self.channel_id = None self.check_interval = config.DISCORD_CHECK_INTERVAL_MINUTES + # Validation if not config.DISCORD_BOT_TOKEN: logger.error("DISCORD_BOT_TOKEN not configured in .env") sys.exit(1) @@ -36,6 +49,491 @@ class CVEDiscordBot(discord.Client): except ValueError: logger.error(f"Invalid DISCORD_CHANNEL_ID: {config.DISCORD_CHANNEL_ID}") sys.exit(1) + + # Register commands + self.setup_commands() + + def setup_commands(self): + """Register all bot commands""" + + @self.command(name='help', aliases=['h', 'pomoc']) + async def help_command(ctx): + """Show available commands""" + embed = discord.Embed( + title="🤖 CVE Monitor Bot - Commands", + description="Monitor and query CVE vulnerabilities", + color=0x3498DB + ) + + embed.add_field( + name="📊 Statistics", + value=( + "`!cve stats` - Overall statistics\n" + "`!cve stats ` - Vendor statistics\n" + "`!cve vendors` - List all vendors" + ), + inline=False + ) + + embed.add_field( + name="🔍 Search & List", + value=( + "`!cve latest` - 10 latest CVEs\n" + "`!cve latest ` - Latest from vendor\n" + "`!cve critical` - All CRITICAL CVEs\n" + "`!cve high` - All HIGH severity CVEs\n" + "`!cve search ` - Search CVEs" + ), + inline=False + ) + + embed.add_field( + name="🎯 Top Lists", + value=( + "`!cve top` - Top 10 worst CVEs\n" + "`!cve top ` - Top CVEs from vendor\n" + "`!cve worst` - Worst CVE (highest CVSS)" + ), + inline=False + ) + + embed.add_field( + name="📅 Time-based", + value=( + "`!cve today` - CVEs from today\n" + "`!cve week` - CVEs from last 7 days\n" + "`!cve month` - CVEs from last 30 days" + ), + inline=False + ) + + embed.add_field( + name="🔎 Details", + value=( + "`!cve info ` - Detailed CVE info\n" + "`!cve ` - Quick CVE lookup" + ), + inline=False + ) + + embed.add_field( + name="⚙️ Configuration", + value=( + "`!cve status` - Bot status\n" + "`!cve test` - Test notification" + ), + inline=False + ) + + embed.set_footer(text="Use !cve or !cve help") + await ctx.send(embed=embed) + + @self.command(name='stats', aliases=['statistics', 'statystyki']) + async def stats_command(ctx, vendor: str = None): + """Show CVE statistics""" + if vendor: + # Specific vendor stats + vendor_obj = next((v for v in config.VENDORS if v['code'].lower() == vendor.lower()), None) + if not vendor_obj: + await ctx.send(f"❌ Unknown vendor: `{vendor}`. Use `!cve vendors` to see available vendors.") + return + + stats = self.cve_handler.get_vendor_stats(vendor_obj['code']) + + embed = discord.Embed( + title=f"📊 {vendor_obj['name']} Statistics", + color=0x3498DB + ) + + embed.add_field(name="Total CVEs", value=f"**{stats['total']}**", inline=True) + embed.add_field(name="Critical", value=f"🔴 {stats['severity'].get('CRITICAL', 0)}", inline=True) + embed.add_field(name="High", value=f"🟠 {stats['severity'].get('HIGH', 0)}", inline=True) + embed.add_field(name="Medium", value=f"🟡 {stats['severity'].get('MEDIUM', 0)}", inline=True) + embed.add_field(name="Low", value=f"⚪ {stats['severity'].get('LOW', 0)}", inline=True) + embed.add_field(name="This Week", value=f"📅 {stats['recent']}", inline=True) + + await ctx.send(embed=embed) + else: + # Overall stats + summary = self.cve_handler.get_all_vendors_summary() + total = sum(v['total'] for v in summary) + critical = sum(v['critical'] for v in summary) + high = sum(v['high'] for v in summary) + recent = sum(v['recent'] for v in summary) + + embed = discord.Embed( + title="📊 Overall CVE Statistics", + color=0x3498DB + ) + + embed.add_field(name="Total CVEs", value=f"**{total}**", inline=True) + embed.add_field(name="Critical", value=f"🔴 {critical}", inline=True) + embed.add_field(name="High", value=f"🟠 {high}", inline=True) + embed.add_field(name="Vendors", value=f"**{len(config.VENDORS)}**", inline=True) + embed.add_field(name="This Week", value=f"📅 {recent}", inline=True) + embed.add_field(name="Active", value=f"✅ Monitoring", inline=True) + + # Top vendors + top_vendors = sorted(summary, key=lambda x: x['total'], reverse=True)[:5] + top_text = "\n".join([f"{i+1}. **{v['name']}**: {v['total']} CVEs" for i, v in enumerate(top_vendors)]) + embed.add_field(name="🏆 Top 5 Vendors", value=top_text, inline=False) + + await ctx.send(embed=embed) + + @self.command(name='vendors', aliases=['vendorzy', 'list']) + async def vendors_command(ctx): + """List all monitored vendors""" + summary = self.cve_handler.get_all_vendors_summary() + summary_sorted = sorted(summary, key=lambda x: x['total'], reverse=True) + + embed = discord.Embed( + title="🏢 Monitored Vendors", + description=f"Tracking {len(config.VENDORS)} vendors", + color=0x3498DB + ) + + # Split into chunks + for i in range(0, len(summary_sorted), 10): + chunk = summary_sorted[i:i+10] + text = "\n".join([ + f"`{v['code']:12s}` {v['name']:20s} - {v['total']:4d} CVEs (🔴{v['critical']} 🟠{v['high']})" + for v in chunk + ]) + embed.add_field( + name=f"Vendors {i+1}-{min(i+10, len(summary_sorted))}", + value=text, + inline=False + ) + + embed.set_footer(text="Use: !cve stats for details") + await ctx.send(embed=embed) + + @self.command(name='latest', aliases=['last', 'ostatnie']) + async def latest_command(ctx, vendor: str = None, limit: int = 10): + """Show latest CVEs""" + if vendor: + vendor_obj = next((v for v in config.VENDORS if v['code'].lower() == vendor.lower()), None) + if not vendor_obj: + await ctx.send(f"❌ Unknown vendor: `{vendor}`") + return + + cves = self.cve_handler.get_vendor_cves(vendor_obj['code'], limit=limit) + title = f"📋 Latest {len(cves)} CVEs - {vendor_obj['name']}" + else: + # Get latest from all vendors + with self.cve_handler.get_db_connection() as conn: + cursor = conn.cursor() + cursor.execute(""" + SELECT cve_id, vendor_code, severity, cvss_score, published_date, + substr(description, 1, 100) as desc + FROM cve_cache + ORDER BY published_date DESC + LIMIT ? + """, (limit,)) + cves = [dict(row) for row in cursor.fetchall()] + + title = f"📋 Latest {len(cves)} CVEs (All Vendors)" + + if not cves: + await ctx.send("No CVEs found.") + return + + embed = discord.Embed(title=title, color=0x3498DB) + + for cve in cves[:10]: + severity_emoji = { + 'CRITICAL': '🔴', + 'HIGH': '🟠', + 'MEDIUM': '🟡', + 'LOW': '⚪' + }.get(cve.get('severity'), '⚫') + + cvss = cve.get('cvss_score', 0) + desc = cve.get('description', cve.get('desc', ''))[:100] + + embed.add_field( + name=f"{severity_emoji} {cve['cve_id']} - CVSS {cvss}", + value=f"{desc}...", + inline=False + ) + + await ctx.send(embed=embed) + + @self.command(name='critical', aliases=['krytyczne']) + async def critical_command(ctx, vendor: str = None): + """Show all CRITICAL severity CVEs""" + await self._severity_command(ctx, 'CRITICAL', vendor) + + @self.command(name='high', aliases=['wysokie']) + async def high_command(ctx, vendor: str = None): + """Show all HIGH severity CVEs""" + await self._severity_command(ctx, 'HIGH', vendor) + + @self.command(name='top', aliases=['worst', 'najgorsze']) + async def top_command(ctx, vendor: str = None, limit: int = 10): + """Show top CVEs by CVSS score""" + if vendor: + vendor_obj = next((v for v in config.VENDORS if v['code'].lower() == vendor.lower()), None) + if not vendor_obj: + await ctx.send(f"❌ Unknown vendor: `{vendor}`") + return + + with self.cve_handler.get_db_connection() as conn: + cursor = conn.cursor() + cursor.execute(""" + SELECT cve_id, severity, cvss_score, published_date, + substr(description, 1, 100) as desc + FROM cve_cache + WHERE vendor_code = ? + ORDER BY cvss_score DESC + LIMIT ? + """, (vendor_obj['code'], limit)) + cves = [dict(row) for row in cursor.fetchall()] + + title = f"🎯 Top {len(cves)} CVEs - {vendor_obj['name']}" + else: + with self.cve_handler.get_db_connection() as conn: + cursor = conn.cursor() + cursor.execute(""" + SELECT cve_id, vendor_code, severity, cvss_score, published_date, + substr(description, 1, 100) as desc + FROM cve_cache + ORDER BY cvss_score DESC + LIMIT ? + """, (limit,)) + cves = [dict(row) for row in cursor.fetchall()] + + title = f"🎯 Top {len(cves)} Worst CVEs" + + if not cves: + await ctx.send("No CVEs found.") + return + + embed = discord.Embed(title=title, color=0xDC3545) + + for i, cve in enumerate(cves, 1): + severity_emoji = { + 'CRITICAL': '🔴', + 'HIGH': '🟠', + 'MEDIUM': '🟡', + 'LOW': '⚪' + }.get(cve.get('severity'), '⚫') + + vendor_code = cve.get('vendor_code', '') + vendor_name = next((v['name'] for v in config.VENDORS if v['code'] == vendor_code), vendor_code) + + embed.add_field( + name=f"{i}. {severity_emoji} {cve['cve_id']} - **CVSS {cve.get('cvss_score', 0)}**", + value=f"**{vendor_name}** | {cve.get('desc', '')}...", + inline=False + ) + + await ctx.send(embed=embed) + + @self.command(name='search', aliases=['find', 'szukaj']) + async def search_command(ctx, *, query: str): + """Search CVEs by keyword""" + cves = self.cve_handler.search_cves(query, limit=10) + + if not cves: + await ctx.send(f"No CVEs found for: `{query}`") + return + + embed = discord.Embed( + title=f"🔍 Search Results: '{query}'", + description=f"Found {len(cves)} CVEs", + color=0x3498DB + ) + + for cve in cves[:10]: + severity_emoji = { + 'CRITICAL': '🔴', + 'HIGH': '🟠', + 'MEDIUM': '🟡', + 'LOW': '⚪' + }.get(cve.get('severity'), '⚫') + + cvss = cve.get('cvss_score', 0) + desc = cve.get('description', '')[:100] + + embed.add_field( + name=f"{severity_emoji} {cve['cve_id']} - CVSS {cvss}", + value=f"{desc}...", + inline=False + ) + + await ctx.send(embed=embed) + + @self.command(name='info', aliases=['detail', 'szczegoly']) + async def info_command(ctx, cve_id: str): + """Show detailed CVE information""" + if not cve_id.upper().startswith('CVE-'): + cve_id = f"CVE-{cve_id}" + + with self.cve_handler.get_db_connection() as conn: + cursor = conn.cursor() + cursor.execute(""" + SELECT * FROM cve_cache WHERE cve_id = ? + """, (cve_id.upper(),)) + row = cursor.fetchone() + + if not row: + await ctx.send(f"❌ CVE not found: `{cve_id}`") + return + + cve = dict(row) + embed = self.create_cve_embed(cve, detailed=True) + await ctx.send(embed=embed) + + @self.command(name='today', aliases=['dzisiaj']) + async def today_command(ctx): + """Show CVEs from today""" + await self._timeframe_command(ctx, days=1, title="Today") + + @self.command(name='week', aliases=['tydzien']) + async def week_command(ctx): + """Show CVEs from last 7 days""" + await self._timeframe_command(ctx, days=7, title="Last 7 Days") + + @self.command(name='month', aliases='miesiac') + async def month_command(ctx): + """Show CVEs from last 30 days""" + await self._timeframe_command(ctx, days=30, title="Last 30 Days") + + @self.command(name='status') + async def status_command(ctx): + """Show bot status""" + summary = self.cve_handler.get_all_vendors_summary() + total = sum(v['total'] for v in summary) + + embed = discord.Embed( + title="🤖 Bot Status", + color=0x2ECC71 + ) + + embed.add_field(name="Status", value="✅ Online", inline=True) + embed.add_field(name="Monitoring", value=f"{len(config.VENDORS)} vendors", inline=True) + embed.add_field(name="Total CVEs", value=f"{total}", inline=True) + embed.add_field(name="Check Interval", value=f"{self.check_interval} min", inline=True) + embed.add_field(name="Min CVSS", value=f"{config.DISCORD_MIN_CVSS}", inline=True) + embed.add_field(name="Channel", value=f"<#{self.channel_id}>", inline=True) + + embed.set_footer(text=f"Bot: {self.user.name}") + + await ctx.send(embed=embed) + + @self.command(name='test') + async def test_command(ctx): + """Send test notification""" + embed = discord.Embed( + title="✅ Test Notification", + description="Bot is working correctly!", + color=0x2ECC71 + ) + embed.add_field(name="Bot", value=self.user.name, inline=True) + embed.add_field(name="Channel", value=ctx.channel.name, inline=True) + embed.set_footer(text="This is a test message") + + await ctx.send(embed=embed) + + async def _severity_command(self, ctx, severity: str, vendor: str = None): + """Helper for severity commands""" + if vendor: + vendor_obj = next((v for v in config.VENDORS if v['code'].lower() == vendor.lower()), None) + if not vendor_obj: + await ctx.send(f"❌ Unknown vendor: `{vendor}`") + return + + cves = self.cve_handler.get_vendor_cves(vendor_obj['code'], severity=severity) + title = f"{severity} CVEs - {vendor_obj['name']}" + else: + with self.cve_handler.get_db_connection() as conn: + cursor = conn.cursor() + cursor.execute(""" + SELECT cve_id, vendor_code, cvss_score, published_date, + substr(description, 1, 100) as desc + FROM cve_cache + WHERE severity = ? + ORDER BY published_date DESC + LIMIT 20 + """, (severity,)) + cves = [dict(row) for row in cursor.fetchall()] + + title = f"{severity} Severity CVEs" + + if not cves: + await ctx.send(f"No {severity} CVEs found.") + return + + color = 0xDC3545 if severity == 'CRITICAL' else 0xFD7E14 + embed = discord.Embed( + title=f"{'🔴' if severity == 'CRITICAL' else '🟠'} {title}", + description=f"Found {len(cves)} CVEs", + color=color + ) + + for cve in cves[:10]: + cvss = cve.get('cvss_score', 0) + desc = cve.get('description', cve.get('desc', ''))[:80] + + vendor_code = cve.get('vendor_code', '') + vendor_name = next((v['name'] for v in config.VENDORS if v['code'] == vendor_code), vendor_code) + + embed.add_field( + name=f"{cve['cve_id']} - CVSS {cvss}", + value=f"**{vendor_name}** | {desc}...", + inline=False + ) + + if len(cves) > 10: + embed.set_footer(text=f"Showing 10 of {len(cves)} CVEs") + + await ctx.send(embed=embed) + + async def _timeframe_command(self, ctx, days: int, title: str): + """Helper for timeframe commands""" + cutoff = (datetime.utcnow() - timedelta(days=days)).strftime('%Y-%m-%d') + + with self.cve_handler.get_db_connection() as conn: + cursor = conn.cursor() + cursor.execute(""" + SELECT cve_id, vendor_code, severity, cvss_score, published_date, + substr(description, 1, 100) as desc + FROM cve_cache + WHERE published_date >= ? + ORDER BY cvss_score DESC + LIMIT 15 + """, (cutoff,)) + cves = [dict(row) for row in cursor.fetchall()] + + if not cves: + await ctx.send(f"No CVEs found from {title.lower()}.") + return + + embed = discord.Embed( + title=f"📅 CVEs from {title}", + description=f"Found {len(cves)} CVEs", + color=0x3498DB + ) + + for cve in cves[:10]: + severity_emoji = { + 'CRITICAL': '🔴', + 'HIGH': '🟠', + 'MEDIUM': '🟡', + 'LOW': '⚪' + }.get(cve.get('severity'), '⚫') + + vendor_code = cve.get('vendor_code', '') + vendor_name = next((v['name'] for v in config.VENDORS if v['code'] == vendor_code), vendor_code) + + embed.add_field( + name=f"{severity_emoji} {cve['cve_id']} - CVSS {cve.get('cvss_score', 0)}", + value=f"**{vendor_name}** | {cve.get('desc', '')}...", + inline=False + ) + + await ctx.send(embed=embed) async def on_ready(self): logger.info(f'Discord bot logged in as {self.user}') @@ -57,11 +555,18 @@ class CVEDiscordBot(discord.Client): self.check_new_cves.start() logger.info("CVE monitoring task started") - async def on_error(self, event, *args, **kwargs): - logger.error(f"Discord error in event {event}", exc_info=True) + async def on_command_error(self, ctx, error): + if isinstance(error, commands.CommandNotFound): + await ctx.send(f"❌ Unknown command. Use `!cve help` to see available commands.") + elif isinstance(error, commands.MissingRequiredArgument): + await ctx.send(f"❌ Missing argument. Use `!cve help` for usage.") + else: + logger.error(f"Command error: {error}", exc_info=True) + await ctx.send(f"❌ Error: {str(error)}") @tasks.loop(minutes=config.DISCORD_CHECK_INTERVAL_MINUTES) async def check_new_cves(self): + """Check for new CVEs and send notifications""" try: logger.info("Checking for new CVEs...") @@ -70,34 +575,48 @@ class CVEDiscordBot(discord.Client): logger.error(f"Channel {self.channel_id} not found") return - hours_back = self.check_interval / 60 - new_cves = self.cve_handler.get_recent_cves_for_discord(hours=hours_back) + # Get unnotified CVEs + with self.cve_handler.get_db_connection() as conn: + cursor = conn.cursor() + cursor.execute(""" + SELECT cve_id, vendor_code, description, published_date, + cvss_score, severity + FROM cve_cache + WHERE discord_notified = 0 + AND cvss_score >= ? + AND severity IN ('CRITICAL', 'HIGH') + ORDER BY published_date DESC + LIMIT 20 + """, (config.DISCORD_MIN_CVSS,)) + + new_cves = [dict(row) for row in cursor.fetchall()] if not new_cves: - logger.info("No new CVEs found") + logger.info("No new CVEs to notify") return - filtered_cves = [ - cve for cve in new_cves - if cve.get('cvss_score', 0) >= config.DISCORD_MIN_CVSS - ] + logger.info(f"Found {len(new_cves)} new CVEs to notify") - if not filtered_cves: - logger.info(f"Found {len(new_cves)} CVEs but none meet min CVSS threshold of {config.DISCORD_MIN_CVSS}") - return - - logger.info(f"Found {len(filtered_cves)} CVEs meeting criteria (out of {len(new_cves)} total)") - - for cve in filtered_cves: + for cve in new_cves: try: embed = self.create_cve_embed(cve) await channel.send(embed=embed) + + # Mark as notified + with self.cve_handler.get_db_connection() as conn: + conn.execute(""" + UPDATE cve_cache + SET discord_notified = 1 + WHERE cve_id = ? + """, (cve['cve_id'],)) + logger.info(f"Sent notification for {cve['cve_id']}") - await asyncio.sleep(1) + await asyncio.sleep(2) # Rate limit + except Exception as e: logger.error(f"Error sending notification for {cve['cve_id']}: {e}") - logger.info(f"Successfully sent {len(filtered_cves)} CVE notifications") + logger.info(f"Successfully sent {len(new_cves)} CVE notifications") except Exception as e: logger.error(f"Error in check_new_cves: {e}", exc_info=True) @@ -107,44 +626,35 @@ class CVEDiscordBot(discord.Client): await self.wait_until_ready() logger.info("Bot is ready, starting CVE monitoring...") - def create_cve_embed(self, cve: dict) -> discord.Embed: - - # Severity colors + def create_cve_embed(self, cve: dict, detailed: bool = False) -> discord.Embed: + """Create Discord embed for CVE""" severity_colors = { - 'CRITICAL': 0xDC3545, # Red - 'HIGH': 0xFD7E14, # Orange - 'MEDIUM': 0xFFC107, # Yellow - 'LOW': 0x6C757D # Gray + 'CRITICAL': 0xDC3545, + 'HIGH': 0xFD7E14, + 'MEDIUM': 0xFFC107, + 'LOW': 0x6C757D } severity = cve.get('severity', 'UNKNOWN') color = severity_colors.get(severity, 0x6C757D) - # Get vendor info vendor_code = cve.get('vendor_code', '') vendor = next((v for v in config.VENDORS if v['code'] == vendor_code), None) vendor_name = vendor['name'] if vendor else vendor_code.title() - # Create embed + description = cve.get('description', 'No description available') + if not detailed: + description = description[:500] + ('...' if len(description) > 500 else '') + embed = discord.Embed( title=f"🚨 {cve['cve_id']}", - description=cve.get('description', 'No description available')[:2000], + description=description, color=color, timestamp=datetime.utcnow() ) - # Add fields - embed.add_field( - name="🏢 Vendor", - value=vendor_name, - inline=True - ) - - embed.add_field( - name="⚠️ Severity", - value=severity, - inline=True - ) + embed.add_field(name="🏢 Vendor", value=vendor_name, inline=True) + embed.add_field(name="⚠️ Severity", value=severity, inline=True) cvss_score = cve.get('cvss_score') embed.add_field( @@ -176,10 +686,12 @@ class CVEDiscordBot(discord.Client): text=f"CVE Monitor • {vendor_name}", icon_url="https://nvd.nist.gov/favicon.ico" ) - + return embed + def start_discord_bot(): + """Start the Discord bot""" if not config.ENABLE_DISCORD_BOT: logger.info("Discord bot is disabled (ENABLE_DISCORD_BOT=False)") return