#!/usr/bin/env python3 import discord from discord.ext import tasks, commands import logging import asyncio from datetime import datetime, timedelta import sys import sqlite3 import config from cve_handler import CVEHandler logging.basicConfig(level=config.LOG_LEVEL, format=config.LOG_FORMAT) logger = logging.getLogger(__name__) class CVEDiscordBot(commands.Bot): def __init__(self): intents = discord.Intents.default() intents.message_content = True super().__init__( command_prefix='!cve ', intents=intents, help_command=None ) self.cve_handler = CVEHandler() self.channel_id = None self.check_interval = config.DISCORD_CHECK_INTERVAL_MINUTES if not config.DISCORD_BOT_TOKEN: logger.error("DISCORD_BOT_TOKEN not configured in .env") sys.exit(1) if not config.DISCORD_CHANNEL_ID: logger.error("DISCORD_CHANNEL_ID not configured in .env") sys.exit(1) try: self.channel_id = int(config.DISCORD_CHANNEL_ID) except ValueError: logger.error(f"Invalid DISCORD_CHANNEL_ID: {config.DISCORD_CHANNEL_ID}") sys.exit(1) self.setup_commands() def setup_commands(self): @self.command(name='help', aliases=['h']) async def help_command(ctx): embed = discord.Embed( title="🤖 CVE Monitor Bot - Commands", description="Monitor and query CVE vulnerabilities", color=0x3498DB ) embed.add_field( name="📊 Statistics", value=( "`!cve stats` - Overall statistics\n" "`!cve stats ` - Vendor statistics\n" "`!cve vendors` - List all vendors" ), inline=False ) embed.add_field( name="🔍 Search & List", value=( "`!cve latest` - 10 latest CVEs\n" "`!cve latest ` - Latest from vendor\n" "`!cve critical` - All CRITICAL CVEs\n" "`!cve high` - All HIGH severity CVEs\n" "`!cve search ` - Search CVEs" ), inline=False ) embed.add_field( name="🎯 Top Lists", value=( "`!cve top` - Top 10 worst CVEs\n" "`!cve top ` - Top CVEs from vendor\n" "`!cve worst` - Worst CVE (highest CVSS)" ), inline=False ) embed.add_field( name="📅 Time-based", value=( "`!cve today` - CVEs from today\n" "`!cve week` - CVEs from last 7 days\n" "`!cve month` - CVEs from last 30 days" ), inline=False ) embed.add_field( name="🔎 Details", value=( "`!cve info ` - Detailed CVE info\n" "`!cve ` - Quick CVE lookup" ), inline=False ) embed.add_field( name="⚙️ Configuration", value=( "`!cve status` - Bot status\n" "`!cve test` - Test notification" ), inline=False ) embed.set_footer(text="Use !cve or !cve help") await ctx.send(embed=embed) @self.command(name='stats', aliases=['statistics']) async def stats_command(ctx, vendor: str = None): if vendor: vendor_obj = next((v for v in config.VENDORS if v['code'].lower() == vendor.lower()), None) if not vendor_obj: await ctx.send(f"❌ Unknown vendor: `{vendor}`. Use `!cve vendors` to see available vendors.") return stats = self.cve_handler.get_vendor_stats(vendor_obj['code']) embed = discord.Embed( title=f"📊 {vendor_obj['name']} Statistics", color=0x3498DB ) embed.add_field(name="Total CVEs", value=f"**{stats['total']}**", inline=True) embed.add_field(name="Critical", value=f"🔴 {stats['severity'].get('CRITICAL', 0)}", inline=True) embed.add_field(name="High", value=f"🟠 {stats['severity'].get('HIGH', 0)}", inline=True) embed.add_field(name="Medium", value=f"🟡 {stats['severity'].get('MEDIUM', 0)}", inline=True) embed.add_field(name="Low", value=f"⚪ {stats['severity'].get('LOW', 0)}", inline=True) embed.add_field(name="This Week", value=f"📅 {stats['recent']}", inline=True) await ctx.send(embed=embed) else: summary = self.cve_handler.get_all_vendors_summary() total = sum(v['total'] for v in summary) critical = sum(v['critical'] for v in summary) high = sum(v['high'] for v in summary) recent = sum(v['recent'] for v in summary) embed = discord.Embed( title="📊 Overall CVE Statistics", color=0x3498DB ) embed.add_field(name="Total CVEs", value=f"**{total}**", inline=True) embed.add_field(name="Critical", value=f"🔴 {critical}", inline=True) embed.add_field(name="High", value=f"🟠 {high}", inline=True) embed.add_field(name="Vendors", value=f"**{len(config.VENDORS)}**", inline=True) embed.add_field(name="This Week", value=f"📅 {recent}", inline=True) embed.add_field(name="Active", value=f"✅ Monitoring", inline=True) top_vendors = sorted(summary, key=lambda x: x['total'], reverse=True)[:5] top_text = "\n".join([f"{i+1}. **{v['name']}**: {v['total']} CVEs" for i, v in enumerate(top_vendors)]) embed.add_field(name="🏆 Top 5 Vendors", value=top_text, inline=False) await ctx.send(embed=embed) @self.command(name='vendors', aliases=['list']) async def vendors_command(ctx): """List all monitored vendors""" summary = self.cve_handler.get_all_vendors_summary() summary_sorted = sorted(summary, key=lambda x: x['total'], reverse=True) embed = discord.Embed( title="🏢 Monitored Vendors", description=f"Tracking {len(config.VENDORS)} vendors", color=0x3498DB ) for i in range(0, len(summary_sorted), 10): chunk = summary_sorted[i:i+10] text = "\n".join([ f"`{v['code']:12s}` {v['name']:20s} - {v['total']:4d} CVEs (🔴{v['critical']} 🟠{v['high']})" for v in chunk ]) embed.add_field( name=f"Vendors {i+1}-{min(i+10, len(summary_sorted))}", value=text, inline=False ) embed.set_footer(text="Use: !cve stats for details") await ctx.send(embed=embed) @self.command(name='latest', aliases=['last']) async def latest_command(ctx, vendor: str = None, limit: int = 10): if vendor: vendor_obj = next((v for v in config.VENDORS if v['code'].lower() == vendor.lower()), None) if not vendor_obj: await ctx.send(f"❌ Unknown vendor: `{vendor}`") return cves = self.cve_handler.get_vendor_cves(vendor_obj['code'], limit=limit) title = f"📋 Latest {len(cves)} CVEs - {vendor_obj['name']}" else: with self.cve_handler.get_db_connection() as conn: cursor = conn.cursor() cursor.execute(""" SELECT cve_id, vendor_code, severity, cvss_score, published_date, substr(description, 1, 100) as desc FROM cve_cache ORDER BY published_date DESC LIMIT ? """, (limit,)) cves = [dict(row) for row in cursor.fetchall()] title = f"📋 Latest {len(cves)} CVEs (All Vendors)" if not cves: await ctx.send("No CVEs found.") return embed = discord.Embed(title=title, color=0x3498DB) for cve in cves[:10]: severity_emoji = { 'CRITICAL': '🔴', 'HIGH': '🟠', 'MEDIUM': '🟡', 'LOW': '⚪' }.get(cve.get('severity'), '⚫') cvss = cve.get('cvss_score', 0) desc = cve.get('description', cve.get('desc', ''))[:100] embed.add_field( name=f"{severity_emoji} {cve['cve_id']} - CVSS {cvss}", value=f"{desc}...", inline=False ) await ctx.send(embed=embed) @self.command(name='critical') async def critical_command(ctx, vendor: str = None): await self._severity_command(ctx, 'CRITICAL', vendor) @self.command(name='high') async def high_command(ctx, vendor: str = None): await self._severity_command(ctx, 'HIGH', vendor) @self.command(name='top', aliases=['worst']) async def top_command(ctx, vendor: str = None, limit: int = 10): if vendor: vendor_obj = next((v for v in config.VENDORS if v['code'].lower() == vendor.lower()), None) if not vendor_obj: await ctx.send(f"❌ Unknown vendor: `{vendor}`") return with self.cve_handler.get_db_connection() as conn: cursor = conn.cursor() cursor.execute(""" SELECT cve_id, severity, cvss_score, published_date, substr(description, 1, 100) as desc FROM cve_cache WHERE vendor_code = ? ORDER BY cvss_score DESC LIMIT ? """, (vendor_obj['code'], limit)) cves = [dict(row) for row in cursor.fetchall()] title = f"🎯 Top {len(cves)} CVEs - {vendor_obj['name']}" else: with self.cve_handler.get_db_connection() as conn: cursor = conn.cursor() cursor.execute(""" SELECT cve_id, vendor_code, severity, cvss_score, published_date, substr(description, 1, 100) as desc FROM cve_cache ORDER BY cvss_score DESC LIMIT ? """, (limit,)) cves = [dict(row) for row in cursor.fetchall()] title = f"🎯 Top {len(cves)} Worst CVEs" if not cves: await ctx.send("No CVEs found.") return embed = discord.Embed(title=title, color=0xDC3545) for i, cve in enumerate(cves, 1): severity_emoji = { 'CRITICAL': '🔴', 'HIGH': '🟠', 'MEDIUM': '🟡', 'LOW': '⚪' }.get(cve.get('severity'), '⚫') vendor_code = cve.get('vendor_code', '') vendor_name = next((v['name'] for v in config.VENDORS if v['code'] == vendor_code), vendor_code) embed.add_field( name=f"{i}. {severity_emoji} {cve['cve_id']} - **CVSS {cve.get('cvss_score', 0)}**", value=f"**{vendor_name}** | {cve.get('desc', '')}...", inline=False ) await ctx.send(embed=embed) @self.command(name='search', aliases=['find']) async def search_command(ctx, *, query: str): cves = self.cve_handler.search_cves(query, limit=10) if not cves: await ctx.send(f"No CVEs found for: `{query}`") return embed = discord.Embed( title=f"🔍 Search Results: '{query}'", description=f"Found {len(cves)} CVEs", color=0x3498DB ) for cve in cves[:10]: severity_emoji = { 'CRITICAL': '🔴', 'HIGH': '🟠', 'MEDIUM': '🟡', 'LOW': '⚪' }.get(cve.get('severity'), '⚫') cvss = cve.get('cvss_score', 0) desc = cve.get('description', '')[:100] embed.add_field( name=f"{severity_emoji} {cve['cve_id']} - CVSS {cvss}", value=f"{desc}...", inline=False ) await ctx.send(embed=embed) @self.command(name='info', aliases=['detail']) async def info_command(ctx, cve_id: str): if not cve_id.upper().startswith('CVE-'): cve_id = f"CVE-{cve_id}" with self.cve_handler.get_db_connection() as conn: cursor = conn.cursor() cursor.execute(""" SELECT * FROM cve_cache WHERE cve_id = ? """, (cve_id.upper(),)) row = cursor.fetchone() if not row: await ctx.send(f"❌ CVE not found: `{cve_id}`") return cve = dict(row) embed = self.create_cve_embed(cve, detailed=True) await ctx.send(embed=embed) @self.command(name='today') async def today_command(ctx): """Show CVEs from today""" await self._timeframe_command(ctx, days=1, title="Today") @self.command(name='week') async def week_command(ctx): """Show CVEs from last 7 days""" await self._timeframe_command(ctx, days=7, title="Last 7 Days") @self.command(name='month') async def month_command(ctx): """Show CVEs from last 30 days""" await self._timeframe_command(ctx, days=30, title="Last 30 Days") @self.command(name='status') async def status_command(ctx): """Show bot status""" summary = self.cve_handler.get_all_vendors_summary() total = sum(v['total'] for v in summary) embed = discord.Embed( title="🤖 Bot Status", color=0x2ECC71 ) embed.add_field(name="Status", value="✅ Online", inline=True) embed.add_field(name="Monitoring", value=f"{len(config.VENDORS)} vendors", inline=True) embed.add_field(name="Total CVEs", value=f"{total}", inline=True) embed.add_field(name="Check Interval", value=f"{self.check_interval} min", inline=True) embed.add_field(name="Min CVSS", value=f"{config.DISCORD_MIN_CVSS}", inline=True) embed.add_field(name="Channel", value=f"<#{self.channel_id}>", inline=True) embed.set_footer(text=f"Bot: {self.user.name}") await ctx.send(embed=embed) @self.command(name='test') async def test_command(ctx): embed = discord.Embed( title="✅ Test Notification", description="Bot is working correctly!", color=0x2ECC71 ) embed.add_field(name="Bot", value=self.user.name, inline=True) embed.add_field(name="Channel", value=ctx.channel.name, inline=True) embed.set_footer(text="This is a test message") await ctx.send(embed=embed) async def _severity_command(self, ctx, severity: str, vendor: str = None): if vendor: vendor_obj = next((v for v in config.VENDORS if v['code'].lower() == vendor.lower()), None) if not vendor_obj: await ctx.send(f"❌ Unknown vendor: `{vendor}`") return cves = self.cve_handler.get_vendor_cves(vendor_obj['code'], severity=severity) title = f"{severity} CVEs - {vendor_obj['name']}" else: with self.cve_handler.get_db_connection() as conn: cursor = conn.cursor() cursor.execute(""" SELECT cve_id, vendor_code, cvss_score, published_date, substr(description, 1, 100) as desc FROM cve_cache WHERE severity = ? ORDER BY published_date DESC LIMIT 20 """, (severity,)) cves = [dict(row) for row in cursor.fetchall()] title = f"{severity} Severity CVEs" if not cves: await ctx.send(f"No {severity} CVEs found.") return color = 0xDC3545 if severity == 'CRITICAL' else 0xFD7E14 embed = discord.Embed( title=f"{'🔴' if severity == 'CRITICAL' else '🟠'} {title}", description=f"Found {len(cves)} CVEs", color=color ) for cve in cves[:10]: cvss = cve.get('cvss_score', 0) desc = cve.get('description', cve.get('desc', ''))[:80] vendor_code = cve.get('vendor_code', '') vendor_name = next((v['name'] for v in config.VENDORS if v['code'] == vendor_code), vendor_code) embed.add_field( name=f"{cve['cve_id']} - CVSS {cvss}", value=f"**{vendor_name}** | {desc}...", inline=False ) if len(cves) > 10: embed.set_footer(text=f"Showing 10 of {len(cves)} CVEs") await ctx.send(embed=embed) async def _timeframe_command(self, ctx, days: int, title: str): cutoff = (datetime.utcnow() - timedelta(days=days)).strftime('%Y-%m-%d') with self.cve_handler.get_db_connection() as conn: cursor = conn.cursor() cursor.execute(""" SELECT cve_id, vendor_code, severity, cvss_score, published_date, substr(description, 1, 100) as desc FROM cve_cache WHERE published_date >= ? ORDER BY cvss_score DESC LIMIT 15 """, (cutoff,)) cves = [dict(row) for row in cursor.fetchall()] if not cves: await ctx.send(f"No CVEs found from {title.lower()}.") return embed = discord.Embed( title=f"📅 CVEs from {title}", description=f"Found {len(cves)} CVEs", color=0x3498DB ) for cve in cves[:10]: severity_emoji = { 'CRITICAL': '🔴', 'HIGH': '🟠', 'MEDIUM': '🟡', 'LOW': '⚪' }.get(cve.get('severity'), '⚫') vendor_code = cve.get('vendor_code', '') vendor_name = next((v['name'] for v in config.VENDORS if v['code'] == vendor_code), vendor_code) embed.add_field( name=f"{severity_emoji} {cve['cve_id']} - CVSS {cve.get('cvss_score', 0)}", value=f"**{vendor_name}** | {cve.get('desc', '')}...", inline=False ) await ctx.send(embed=embed) async def on_ready(self): logger.info(f'Discord bot logged in as {self.user}') logger.info(f'Bot ID: {self.user.id}') logger.info(f'Monitoring channel ID: {self.channel_id}') logger.info(f'Check interval: {self.check_interval} minutes') logger.info(f'Min CVSS score: {config.DISCORD_MIN_CVSS}') channel = self.get_channel(self.channel_id) if not channel: logger.error(f"Cannot access channel {self.channel_id}") logger.error("Make sure the bot has been invited to the server and has permissions") await self.close() return logger.info(f"Successfully connected to channel: #{channel.name}") if not self.check_new_cves.is_running(): self.check_new_cves.start() logger.info("CVE monitoring task started") async def on_command_error(self, ctx, error): if isinstance(error, commands.CommandNotFound): await ctx.send(f"❌ Unknown command. Use `!cve help` to see available commands.") elif isinstance(error, commands.MissingRequiredArgument): await ctx.send(f"❌ Missing argument. Use `!cve help` for usage.") else: logger.error(f"Command error: {error}", exc_info=True) await ctx.send(f"❌ Error: {str(error)}") @tasks.loop(minutes=config.DISCORD_CHECK_INTERVAL_MINUTES) async def check_new_cves(self): """Check for new CVEs and send notifications""" try: logger.info("Checking for new CVEs...") channel = self.get_channel(self.channel_id) if not channel: logger.error(f"Channel {self.channel_id} not found") return with self.cve_handler.get_db_connection() as conn: cursor = conn.cursor() cursor.execute(""" SELECT cve_id, vendor_code, description, published_date, cvss_score, severity FROM cve_cache WHERE discord_notified = 0 AND cvss_score >= ? AND severity IN ('CRITICAL', 'HIGH') ORDER BY published_date DESC LIMIT 20 """, (config.DISCORD_MIN_CVSS,)) new_cves = [dict(row) for row in cursor.fetchall()] if not new_cves: logger.info("No new CVEs to notify") return logger.info(f"Found {len(new_cves)} new CVEs to notify") for cve in new_cves: try: embed = self.create_cve_embed(cve) await channel.send(embed=embed) # Mark as notified with self.cve_handler.get_db_connection() as conn: conn.execute(""" UPDATE cve_cache SET discord_notified = 1 WHERE cve_id = ? """, (cve['cve_id'],)) logger.info(f"Sent notification for {cve['cve_id']}") await asyncio.sleep(2) except Exception as e: logger.error(f"Error sending notification for {cve['cve_id']}: {e}") logger.info(f"Successfully sent {len(new_cves)} CVE notifications") except Exception as e: logger.error(f"Error in check_new_cves: {e}", exc_info=True) @check_new_cves.before_loop async def before_check_new_cves(self): await self.wait_until_ready() logger.info("Bot is ready, starting CVE monitoring...") def create_cve_embed(self, cve: dict, detailed: bool = False) -> discord.Embed: """Create Discord embed for CVE""" severity_colors = { 'CRITICAL': 0xDC3545, 'HIGH': 0xFD7E14, 'MEDIUM': 0xFFC107, 'LOW': 0x6C757D } severity = cve.get('severity', 'UNKNOWN') color = severity_colors.get(severity, 0x6C757D) vendor_code = cve.get('vendor_code', '') vendor = next((v for v in config.VENDORS if v['code'] == vendor_code), None) vendor_name = vendor['name'] if vendor else vendor_code.title() description = cve.get('description', 'No description available') if not detailed: description = description[:500] + ('...' if len(description) > 500 else '') embed = discord.Embed( title=f"🚨 {cve['cve_id']}", description=description, color=color, timestamp=datetime.utcnow() ) embed.add_field(name="🏢 Vendor", value=vendor_name, inline=True) embed.add_field(name="⚠️ Severity", value=severity, inline=True) cvss_score = cve.get('cvss_score') embed.add_field( name="📊 CVSS Score", value=f"**{cvss_score:.1f}**" if cvss_score else "N/A", inline=True ) published = cve.get('published_date', '') if published: try: pub_date = datetime.fromisoformat(published.replace('Z', '+00:00')) embed.add_field( name="📅 Published", value=pub_date.strftime('%Y-%m-%d %H:%M UTC'), inline=True ) except: embed.add_field(name="📅 Published", value=published[:10], inline=True) nvd_url = f"https://nvd.nist.gov/vuln/detail/{cve['cve_id']}" embed.add_field( name="🔗 Links", value=f"[View on NVD]({nvd_url})", inline=False ) embed.set_footer( text=f"CVE Monitor • {vendor_name}", icon_url="https://nvd.nist.gov/favicon.ico" ) return embed def start_discord_bot(): """Start the Discord bot""" if not config.ENABLE_DISCORD_BOT: logger.info("Discord bot is disabled (ENABLE_DISCORD_BOT=False)") return if not config.DISCORD_BOT_TOKEN: logger.error("DISCORD_BOT_TOKEN not configured") logger.error("Please set DISCORD_BOT_TOKEN in .env file") logger.error("Get your token at: https://discord.com/developers/applications") return if not config.DISCORD_CHANNEL_ID: logger.error("DISCORD_CHANNEL_ID not configured") logger.error("Please set DISCORD_CHANNEL_ID in .env file") logger.error("Enable Developer Mode in Discord, right-click channel -> Copy ID") return logger.info("Starting Discord bot...") logger.info(f"Configuration:") logger.info(f" - Check interval: {config.DISCORD_CHECK_INTERVAL_MINUTES} minutes") logger.info(f" - Min CVSS score: {config.DISCORD_MIN_CVSS}") logger.info(f" - Notify on CRITICAL: {config.DISCORD_NOTIFY_CRITICAL}") logger.info(f" - Notify on HIGH: {config.DISCORD_NOTIFY_HIGH}") bot = CVEDiscordBot() try: bot.run(config.DISCORD_BOT_TOKEN) except discord.LoginFailure: logger.error("Invalid Discord bot token") logger.error("Please check DISCORD_BOT_TOKEN in .env file") except Exception as e: logger.error(f"Failed to start Discord bot: {e}", exc_info=True) if __name__ == '__main__': start_discord_bot()