diff --git a/discord_bot.py b/discord_bot.py index 139c808..5d443d3 100644 --- a/discord_bot.py +++ b/discord_bot.py @@ -1,9 +1,4 @@ #!/usr/bin/env python3 -""" -CVE Monitor - Discord Bot -Monitors CVE database and sends notifications to Discord channel -Supports interactive commands for querying CVE data -""" import discord from discord.ext import tasks, commands @@ -28,14 +23,13 @@ class CVEDiscordBot(commands.Bot): super().__init__( command_prefix='!cve ', intents=intents, - help_command=None # Custom help + help_command=None ) self.cve_handler = CVEHandler() self.channel_id = None self.check_interval = config.DISCORD_CHECK_INTERVAL_MINUTES - # Validation if not config.DISCORD_BOT_TOKEN: logger.error("DISCORD_BOT_TOKEN not configured in .env") sys.exit(1) @@ -50,15 +44,12 @@ class CVEDiscordBot(commands.Bot): logger.error(f"Invalid DISCORD_CHANNEL_ID: {config.DISCORD_CHANNEL_ID}") sys.exit(1) - # Register commands self.setup_commands() def setup_commands(self): - """Register all bot commands""" - @self.command(name='help', aliases=['h', 'pomoc']) + @self.command(name='help', aliases=['h']) async def help_command(ctx): - """Show available commands""" embed = discord.Embed( title="🤖 CVE Monitor Bot - Commands", description="Monitor and query CVE vulnerabilities", @@ -128,11 +119,9 @@ class CVEDiscordBot(commands.Bot): embed.set_footer(text="Use !cve or !cve help") await ctx.send(embed=embed) - @self.command(name='stats', aliases=['statistics', 'statystyki']) + @self.command(name='stats', aliases=['statistics']) async def stats_command(ctx, vendor: str = None): - """Show CVE statistics""" if vendor: - # Specific vendor stats vendor_obj = next((v for v in config.VENDORS if v['code'].lower() == vendor.lower()), None) if not vendor_obj: await ctx.send(f"❌ Unknown vendor: `{vendor}`. Use `!cve vendors` to see available vendors.") @@ -154,7 +143,6 @@ class CVEDiscordBot(commands.Bot): await ctx.send(embed=embed) else: - # Overall stats summary = self.cve_handler.get_all_vendors_summary() total = sum(v['total'] for v in summary) critical = sum(v['critical'] for v in summary) @@ -173,14 +161,13 @@ class CVEDiscordBot(commands.Bot): embed.add_field(name="This Week", value=f"📅 {recent}", inline=True) embed.add_field(name="Active", value=f"✅ Monitoring", inline=True) - # Top vendors top_vendors = sorted(summary, key=lambda x: x['total'], reverse=True)[:5] top_text = "\n".join([f"{i+1}. **{v['name']}**: {v['total']} CVEs" for i, v in enumerate(top_vendors)]) embed.add_field(name="🏆 Top 5 Vendors", value=top_text, inline=False) await ctx.send(embed=embed) - @self.command(name='vendors', aliases=['vendorzy', 'list']) + @self.command(name='vendors', aliases=['list']) async def vendors_command(ctx): """List all monitored vendors""" summary = self.cve_handler.get_all_vendors_summary() @@ -192,7 +179,6 @@ class CVEDiscordBot(commands.Bot): color=0x3498DB ) - # Split into chunks for i in range(0, len(summary_sorted), 10): chunk = summary_sorted[i:i+10] text = "\n".join([ @@ -208,9 +194,8 @@ class CVEDiscordBot(commands.Bot): embed.set_footer(text="Use: !cve stats for details") await ctx.send(embed=embed) - @self.command(name='latest', aliases=['last', 'ostatnie']) + @self.command(name='latest', aliases=['last']) async def latest_command(ctx, vendor: str = None, limit: int = 10): - """Show latest CVEs""" if vendor: vendor_obj = next((v for v in config.VENDORS if v['code'].lower() == vendor.lower()), None) if not vendor_obj: @@ -220,7 +205,6 @@ class CVEDiscordBot(commands.Bot): cves = self.cve_handler.get_vendor_cves(vendor_obj['code'], limit=limit) title = f"📋 Latest {len(cves)} CVEs - {vendor_obj['name']}" else: - # Get latest from all vendors with self.cve_handler.get_db_connection() as conn: cursor = conn.cursor() cursor.execute(""" @@ -259,19 +243,16 @@ class CVEDiscordBot(commands.Bot): await ctx.send(embed=embed) - @self.command(name='critical', aliases=['krytyczne']) + @self.command(name='critical') async def critical_command(ctx, vendor: str = None): - """Show all CRITICAL severity CVEs""" await self._severity_command(ctx, 'CRITICAL', vendor) - @self.command(name='high', aliases=['wysokie']) + @self.command(name='high') async def high_command(ctx, vendor: str = None): - """Show all HIGH severity CVEs""" await self._severity_command(ctx, 'HIGH', vendor) - @self.command(name='top', aliases=['worst', 'najgorsze']) + @self.command(name='top', aliases=['worst']) async def top_command(ctx, vendor: str = None, limit: int = 10): - """Show top CVEs by CVSS score""" if vendor: vendor_obj = next((v for v in config.VENDORS if v['code'].lower() == vendor.lower()), None) if not vendor_obj: @@ -330,9 +311,8 @@ class CVEDiscordBot(commands.Bot): await ctx.send(embed=embed) - @self.command(name='search', aliases=['find', 'szukaj']) + @self.command(name='search', aliases=['find']) async def search_command(ctx, *, query: str): - """Search CVEs by keyword""" cves = self.cve_handler.search_cves(query, limit=10) if not cves: @@ -364,9 +344,8 @@ class CVEDiscordBot(commands.Bot): await ctx.send(embed=embed) - @self.command(name='info', aliases=['detail', 'szczegoly']) + @self.command(name='info', aliases=['detail']) async def info_command(ctx, cve_id: str): - """Show detailed CVE information""" if not cve_id.upper().startswith('CVE-'): cve_id = f"CVE-{cve_id}" @@ -385,17 +364,17 @@ class CVEDiscordBot(commands.Bot): embed = self.create_cve_embed(cve, detailed=True) await ctx.send(embed=embed) - @self.command(name='today', aliases=['dzisiaj']) + @self.command(name='today') async def today_command(ctx): """Show CVEs from today""" await self._timeframe_command(ctx, days=1, title="Today") - @self.command(name='week', aliases=['tydzien']) + @self.command(name='week') async def week_command(ctx): """Show CVEs from last 7 days""" await self._timeframe_command(ctx, days=7, title="Last 7 Days") - @self.command(name='month', aliases='miesiac') + @self.command(name='month') async def month_command(ctx): """Show CVEs from last 30 days""" await self._timeframe_command(ctx, days=30, title="Last 30 Days") @@ -424,7 +403,6 @@ class CVEDiscordBot(commands.Bot): @self.command(name='test') async def test_command(ctx): - """Send test notification""" embed = discord.Embed( title="✅ Test Notification", description="Bot is working correctly!", @@ -437,7 +415,6 @@ class CVEDiscordBot(commands.Bot): await ctx.send(embed=embed) async def _severity_command(self, ctx, severity: str, vendor: str = None): - """Helper for severity commands""" if vendor: vendor_obj = next((v for v in config.VENDORS if v['code'].lower() == vendor.lower()), None) if not vendor_obj: @@ -491,7 +468,6 @@ class CVEDiscordBot(commands.Bot): await ctx.send(embed=embed) async def _timeframe_command(self, ctx, days: int, title: str): - """Helper for timeframe commands""" cutoff = (datetime.utcnow() - timedelta(days=days)).strftime('%Y-%m-%d') with self.cve_handler.get_db_connection() as conn: @@ -575,7 +551,6 @@ class CVEDiscordBot(commands.Bot): logger.error(f"Channel {self.channel_id} not found") return - # Get unnotified CVEs with self.cve_handler.get_db_connection() as conn: cursor = conn.cursor() cursor.execute(""" @@ -611,7 +586,7 @@ class CVEDiscordBot(commands.Bot): """, (cve['cve_id'],)) logger.info(f"Sent notification for {cve['cve_id']}") - await asyncio.sleep(2) # Rate limit + await asyncio.sleep(2) except Exception as e: logger.error(f"Error sending notification for {cve['cve_id']}: {e}")