Files
cve_monitor/discord_bot.py
Mateusz Gruszczyński 804e74aaa9 bot
2026-02-14 09:14:52 +01:00

731 lines
29 KiB
Python

#!/usr/bin/env python3
"""
CVE Monitor - Discord Bot
Monitors CVE database and sends notifications to Discord channel
Supports interactive commands for querying CVE data
"""
import discord
from discord.ext import tasks, commands
import logging
import asyncio
from datetime import datetime, timedelta
import sys
import sqlite3
import config
from cve_handler import CVEHandler
logging.basicConfig(level=config.LOG_LEVEL, format=config.LOG_FORMAT)
logger = logging.getLogger(__name__)
class CVEDiscordBot(commands.Bot):
def __init__(self):
intents = discord.Intents.default()
intents.message_content = True
super().__init__(
command_prefix='!cve ',
intents=intents,
help_command=None # Custom help
)
self.cve_handler = CVEHandler()
self.channel_id = None
self.check_interval = config.DISCORD_CHECK_INTERVAL_MINUTES
# Validation
if not config.DISCORD_BOT_TOKEN:
logger.error("DISCORD_BOT_TOKEN not configured in .env")
sys.exit(1)
if not config.DISCORD_CHANNEL_ID:
logger.error("DISCORD_CHANNEL_ID not configured in .env")
sys.exit(1)
try:
self.channel_id = int(config.DISCORD_CHANNEL_ID)
except ValueError:
logger.error(f"Invalid DISCORD_CHANNEL_ID: {config.DISCORD_CHANNEL_ID}")
sys.exit(1)
# Register commands
self.setup_commands()
def setup_commands(self):
"""Register all bot commands"""
@self.command(name='help', aliases=['h', 'pomoc'])
async def help_command(ctx):
"""Show available commands"""
embed = discord.Embed(
title="🤖 CVE Monitor Bot - Commands",
description="Monitor and query CVE vulnerabilities",
color=0x3498DB
)
embed.add_field(
name="📊 Statistics",
value=(
"`!cve stats` - Overall statistics\n"
"`!cve stats <vendor>` - Vendor statistics\n"
"`!cve vendors` - List all vendors"
),
inline=False
)
embed.add_field(
name="🔍 Search & List",
value=(
"`!cve latest` - 10 latest CVEs\n"
"`!cve latest <vendor>` - Latest from vendor\n"
"`!cve critical` - All CRITICAL CVEs\n"
"`!cve high` - All HIGH severity CVEs\n"
"`!cve search <keyword>` - Search CVEs"
),
inline=False
)
embed.add_field(
name="🎯 Top Lists",
value=(
"`!cve top` - Top 10 worst CVEs\n"
"`!cve top <vendor>` - Top CVEs from vendor\n"
"`!cve worst` - Worst CVE (highest CVSS)"
),
inline=False
)
embed.add_field(
name="📅 Time-based",
value=(
"`!cve today` - CVEs from today\n"
"`!cve week` - CVEs from last 7 days\n"
"`!cve month` - CVEs from last 30 days"
),
inline=False
)
embed.add_field(
name="🔎 Details",
value=(
"`!cve info <CVE-ID>` - Detailed CVE info\n"
"`!cve <CVE-ID>` - Quick CVE lookup"
),
inline=False
)
embed.add_field(
name="⚙️ Configuration",
value=(
"`!cve status` - Bot status\n"
"`!cve test` - Test notification"
),
inline=False
)
embed.set_footer(text="Use !cve <command> or !cve help")
await ctx.send(embed=embed)
@self.command(name='stats', aliases=['statistics', 'statystyki'])
async def stats_command(ctx, vendor: str = None):
"""Show CVE statistics"""
if vendor:
# Specific vendor stats
vendor_obj = next((v for v in config.VENDORS if v['code'].lower() == vendor.lower()), None)
if not vendor_obj:
await ctx.send(f"❌ Unknown vendor: `{vendor}`. Use `!cve vendors` to see available vendors.")
return
stats = self.cve_handler.get_vendor_stats(vendor_obj['code'])
embed = discord.Embed(
title=f"📊 {vendor_obj['name']} Statistics",
color=0x3498DB
)
embed.add_field(name="Total CVEs", value=f"**{stats['total']}**", inline=True)
embed.add_field(name="Critical", value=f"🔴 {stats['severity'].get('CRITICAL', 0)}", inline=True)
embed.add_field(name="High", value=f"🟠 {stats['severity'].get('HIGH', 0)}", inline=True)
embed.add_field(name="Medium", value=f"🟡 {stats['severity'].get('MEDIUM', 0)}", inline=True)
embed.add_field(name="Low", value=f"{stats['severity'].get('LOW', 0)}", inline=True)
embed.add_field(name="This Week", value=f"📅 {stats['recent']}", inline=True)
await ctx.send(embed=embed)
else:
# Overall stats
summary = self.cve_handler.get_all_vendors_summary()
total = sum(v['total'] for v in summary)
critical = sum(v['critical'] for v in summary)
high = sum(v['high'] for v in summary)
recent = sum(v['recent'] for v in summary)
embed = discord.Embed(
title="📊 Overall CVE Statistics",
color=0x3498DB
)
embed.add_field(name="Total CVEs", value=f"**{total}**", inline=True)
embed.add_field(name="Critical", value=f"🔴 {critical}", inline=True)
embed.add_field(name="High", value=f"🟠 {high}", inline=True)
embed.add_field(name="Vendors", value=f"**{len(config.VENDORS)}**", inline=True)
embed.add_field(name="This Week", value=f"📅 {recent}", inline=True)
embed.add_field(name="Active", value=f"✅ Monitoring", inline=True)
# Top vendors
top_vendors = sorted(summary, key=lambda x: x['total'], reverse=True)[:5]
top_text = "\n".join([f"{i+1}. **{v['name']}**: {v['total']} CVEs" for i, v in enumerate(top_vendors)])
embed.add_field(name="🏆 Top 5 Vendors", value=top_text, inline=False)
await ctx.send(embed=embed)
@self.command(name='vendors', aliases=['vendorzy', 'list'])
async def vendors_command(ctx):
"""List all monitored vendors"""
summary = self.cve_handler.get_all_vendors_summary()
summary_sorted = sorted(summary, key=lambda x: x['total'], reverse=True)
embed = discord.Embed(
title="🏢 Monitored Vendors",
description=f"Tracking {len(config.VENDORS)} vendors",
color=0x3498DB
)
# Split into chunks
for i in range(0, len(summary_sorted), 10):
chunk = summary_sorted[i:i+10]
text = "\n".join([
f"`{v['code']:12s}` {v['name']:20s} - {v['total']:4d} CVEs (🔴{v['critical']} 🟠{v['high']})"
for v in chunk
])
embed.add_field(
name=f"Vendors {i+1}-{min(i+10, len(summary_sorted))}",
value=text,
inline=False
)
embed.set_footer(text="Use: !cve stats <vendor> for details")
await ctx.send(embed=embed)
@self.command(name='latest', aliases=['last', 'ostatnie'])
async def latest_command(ctx, vendor: str = None, limit: int = 10):
"""Show latest CVEs"""
if vendor:
vendor_obj = next((v for v in config.VENDORS if v['code'].lower() == vendor.lower()), None)
if not vendor_obj:
await ctx.send(f"❌ Unknown vendor: `{vendor}`")
return
cves = self.cve_handler.get_vendor_cves(vendor_obj['code'], limit=limit)
title = f"📋 Latest {len(cves)} CVEs - {vendor_obj['name']}"
else:
# Get latest from all vendors
with self.cve_handler.get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT cve_id, vendor_code, severity, cvss_score, published_date,
substr(description, 1, 100) as desc
FROM cve_cache
ORDER BY published_date DESC
LIMIT ?
""", (limit,))
cves = [dict(row) for row in cursor.fetchall()]
title = f"📋 Latest {len(cves)} CVEs (All Vendors)"
if not cves:
await ctx.send("No CVEs found.")
return
embed = discord.Embed(title=title, color=0x3498DB)
for cve in cves[:10]:
severity_emoji = {
'CRITICAL': '🔴',
'HIGH': '🟠',
'MEDIUM': '🟡',
'LOW': ''
}.get(cve.get('severity'), '')
cvss = cve.get('cvss_score', 0)
desc = cve.get('description', cve.get('desc', ''))[:100]
embed.add_field(
name=f"{severity_emoji} {cve['cve_id']} - CVSS {cvss}",
value=f"{desc}...",
inline=False
)
await ctx.send(embed=embed)
@self.command(name='critical', aliases=['krytyczne'])
async def critical_command(ctx, vendor: str = None):
"""Show all CRITICAL severity CVEs"""
await self._severity_command(ctx, 'CRITICAL', vendor)
@self.command(name='high', aliases=['wysokie'])
async def high_command(ctx, vendor: str = None):
"""Show all HIGH severity CVEs"""
await self._severity_command(ctx, 'HIGH', vendor)
@self.command(name='top', aliases=['worst', 'najgorsze'])
async def top_command(ctx, vendor: str = None, limit: int = 10):
"""Show top CVEs by CVSS score"""
if vendor:
vendor_obj = next((v for v in config.VENDORS if v['code'].lower() == vendor.lower()), None)
if not vendor_obj:
await ctx.send(f"❌ Unknown vendor: `{vendor}`")
return
with self.cve_handler.get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT cve_id, severity, cvss_score, published_date,
substr(description, 1, 100) as desc
FROM cve_cache
WHERE vendor_code = ?
ORDER BY cvss_score DESC
LIMIT ?
""", (vendor_obj['code'], limit))
cves = [dict(row) for row in cursor.fetchall()]
title = f"🎯 Top {len(cves)} CVEs - {vendor_obj['name']}"
else:
with self.cve_handler.get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT cve_id, vendor_code, severity, cvss_score, published_date,
substr(description, 1, 100) as desc
FROM cve_cache
ORDER BY cvss_score DESC
LIMIT ?
""", (limit,))
cves = [dict(row) for row in cursor.fetchall()]
title = f"🎯 Top {len(cves)} Worst CVEs"
if not cves:
await ctx.send("No CVEs found.")
return
embed = discord.Embed(title=title, color=0xDC3545)
for i, cve in enumerate(cves, 1):
severity_emoji = {
'CRITICAL': '🔴',
'HIGH': '🟠',
'MEDIUM': '🟡',
'LOW': ''
}.get(cve.get('severity'), '')
vendor_code = cve.get('vendor_code', '')
vendor_name = next((v['name'] for v in config.VENDORS if v['code'] == vendor_code), vendor_code)
embed.add_field(
name=f"{i}. {severity_emoji} {cve['cve_id']} - **CVSS {cve.get('cvss_score', 0)}**",
value=f"**{vendor_name}** | {cve.get('desc', '')}...",
inline=False
)
await ctx.send(embed=embed)
@self.command(name='search', aliases=['find', 'szukaj'])
async def search_command(ctx, *, query: str):
"""Search CVEs by keyword"""
cves = self.cve_handler.search_cves(query, limit=10)
if not cves:
await ctx.send(f"No CVEs found for: `{query}`")
return
embed = discord.Embed(
title=f"🔍 Search Results: '{query}'",
description=f"Found {len(cves)} CVEs",
color=0x3498DB
)
for cve in cves[:10]:
severity_emoji = {
'CRITICAL': '🔴',
'HIGH': '🟠',
'MEDIUM': '🟡',
'LOW': ''
}.get(cve.get('severity'), '')
cvss = cve.get('cvss_score', 0)
desc = cve.get('description', '')[:100]
embed.add_field(
name=f"{severity_emoji} {cve['cve_id']} - CVSS {cvss}",
value=f"{desc}...",
inline=False
)
await ctx.send(embed=embed)
@self.command(name='info', aliases=['detail', 'szczegoly'])
async def info_command(ctx, cve_id: str):
"""Show detailed CVE information"""
if not cve_id.upper().startswith('CVE-'):
cve_id = f"CVE-{cve_id}"
with self.cve_handler.get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT * FROM cve_cache WHERE cve_id = ?
""", (cve_id.upper(),))
row = cursor.fetchone()
if not row:
await ctx.send(f"❌ CVE not found: `{cve_id}`")
return
cve = dict(row)
embed = self.create_cve_embed(cve, detailed=True)
await ctx.send(embed=embed)
@self.command(name='today', aliases=['dzisiaj'])
async def today_command(ctx):
"""Show CVEs from today"""
await self._timeframe_command(ctx, days=1, title="Today")
@self.command(name='week', aliases=['tydzien'])
async def week_command(ctx):
"""Show CVEs from last 7 days"""
await self._timeframe_command(ctx, days=7, title="Last 7 Days")
@self.command(name='month', aliases='miesiac')
async def month_command(ctx):
"""Show CVEs from last 30 days"""
await self._timeframe_command(ctx, days=30, title="Last 30 Days")
@self.command(name='status')
async def status_command(ctx):
"""Show bot status"""
summary = self.cve_handler.get_all_vendors_summary()
total = sum(v['total'] for v in summary)
embed = discord.Embed(
title="🤖 Bot Status",
color=0x2ECC71
)
embed.add_field(name="Status", value="✅ Online", inline=True)
embed.add_field(name="Monitoring", value=f"{len(config.VENDORS)} vendors", inline=True)
embed.add_field(name="Total CVEs", value=f"{total}", inline=True)
embed.add_field(name="Check Interval", value=f"{self.check_interval} min", inline=True)
embed.add_field(name="Min CVSS", value=f"{config.DISCORD_MIN_CVSS}", inline=True)
embed.add_field(name="Channel", value=f"<#{self.channel_id}>", inline=True)
embed.set_footer(text=f"Bot: {self.user.name}")
await ctx.send(embed=embed)
@self.command(name='test')
async def test_command(ctx):
"""Send test notification"""
embed = discord.Embed(
title="✅ Test Notification",
description="Bot is working correctly!",
color=0x2ECC71
)
embed.add_field(name="Bot", value=self.user.name, inline=True)
embed.add_field(name="Channel", value=ctx.channel.name, inline=True)
embed.set_footer(text="This is a test message")
await ctx.send(embed=embed)
async def _severity_command(self, ctx, severity: str, vendor: str = None):
"""Helper for severity commands"""
if vendor:
vendor_obj = next((v for v in config.VENDORS if v['code'].lower() == vendor.lower()), None)
if not vendor_obj:
await ctx.send(f"❌ Unknown vendor: `{vendor}`")
return
cves = self.cve_handler.get_vendor_cves(vendor_obj['code'], severity=severity)
title = f"{severity} CVEs - {vendor_obj['name']}"
else:
with self.cve_handler.get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT cve_id, vendor_code, cvss_score, published_date,
substr(description, 1, 100) as desc
FROM cve_cache
WHERE severity = ?
ORDER BY published_date DESC
LIMIT 20
""", (severity,))
cves = [dict(row) for row in cursor.fetchall()]
title = f"{severity} Severity CVEs"
if not cves:
await ctx.send(f"No {severity} CVEs found.")
return
color = 0xDC3545 if severity == 'CRITICAL' else 0xFD7E14
embed = discord.Embed(
title=f"{'🔴' if severity == 'CRITICAL' else '🟠'} {title}",
description=f"Found {len(cves)} CVEs",
color=color
)
for cve in cves[:10]:
cvss = cve.get('cvss_score', 0)
desc = cve.get('description', cve.get('desc', ''))[:80]
vendor_code = cve.get('vendor_code', '')
vendor_name = next((v['name'] for v in config.VENDORS if v['code'] == vendor_code), vendor_code)
embed.add_field(
name=f"{cve['cve_id']} - CVSS {cvss}",
value=f"**{vendor_name}** | {desc}...",
inline=False
)
if len(cves) > 10:
embed.set_footer(text=f"Showing 10 of {len(cves)} CVEs")
await ctx.send(embed=embed)
async def _timeframe_command(self, ctx, days: int, title: str):
"""Helper for timeframe commands"""
cutoff = (datetime.utcnow() - timedelta(days=days)).strftime('%Y-%m-%d')
with self.cve_handler.get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT cve_id, vendor_code, severity, cvss_score, published_date,
substr(description, 1, 100) as desc
FROM cve_cache
WHERE published_date >= ?
ORDER BY cvss_score DESC
LIMIT 15
""", (cutoff,))
cves = [dict(row) for row in cursor.fetchall()]
if not cves:
await ctx.send(f"No CVEs found from {title.lower()}.")
return
embed = discord.Embed(
title=f"📅 CVEs from {title}",
description=f"Found {len(cves)} CVEs",
color=0x3498DB
)
for cve in cves[:10]:
severity_emoji = {
'CRITICAL': '🔴',
'HIGH': '🟠',
'MEDIUM': '🟡',
'LOW': ''
}.get(cve.get('severity'), '')
vendor_code = cve.get('vendor_code', '')
vendor_name = next((v['name'] for v in config.VENDORS if v['code'] == vendor_code), vendor_code)
embed.add_field(
name=f"{severity_emoji} {cve['cve_id']} - CVSS {cve.get('cvss_score', 0)}",
value=f"**{vendor_name}** | {cve.get('desc', '')}...",
inline=False
)
await ctx.send(embed=embed)
async def on_ready(self):
logger.info(f'Discord bot logged in as {self.user}')
logger.info(f'Bot ID: {self.user.id}')
logger.info(f'Monitoring channel ID: {self.channel_id}')
logger.info(f'Check interval: {self.check_interval} minutes')
logger.info(f'Min CVSS score: {config.DISCORD_MIN_CVSS}')
channel = self.get_channel(self.channel_id)
if not channel:
logger.error(f"Cannot access channel {self.channel_id}")
logger.error("Make sure the bot has been invited to the server and has permissions")
await self.close()
return
logger.info(f"Successfully connected to channel: #{channel.name}")
if not self.check_new_cves.is_running():
self.check_new_cves.start()
logger.info("CVE monitoring task started")
async def on_command_error(self, ctx, error):
if isinstance(error, commands.CommandNotFound):
await ctx.send(f"❌ Unknown command. Use `!cve help` to see available commands.")
elif isinstance(error, commands.MissingRequiredArgument):
await ctx.send(f"❌ Missing argument. Use `!cve help` for usage.")
else:
logger.error(f"Command error: {error}", exc_info=True)
await ctx.send(f"❌ Error: {str(error)}")
@tasks.loop(minutes=config.DISCORD_CHECK_INTERVAL_MINUTES)
async def check_new_cves(self):
"""Check for new CVEs and send notifications"""
try:
logger.info("Checking for new CVEs...")
channel = self.get_channel(self.channel_id)
if not channel:
logger.error(f"Channel {self.channel_id} not found")
return
# Get unnotified CVEs
with self.cve_handler.get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT cve_id, vendor_code, description, published_date,
cvss_score, severity
FROM cve_cache
WHERE discord_notified = 0
AND cvss_score >= ?
AND severity IN ('CRITICAL', 'HIGH')
ORDER BY published_date DESC
LIMIT 20
""", (config.DISCORD_MIN_CVSS,))
new_cves = [dict(row) for row in cursor.fetchall()]
if not new_cves:
logger.info("No new CVEs to notify")
return
logger.info(f"Found {len(new_cves)} new CVEs to notify")
for cve in new_cves:
try:
embed = self.create_cve_embed(cve)
await channel.send(embed=embed)
# Mark as notified
with self.cve_handler.get_db_connection() as conn:
conn.execute("""
UPDATE cve_cache
SET discord_notified = 1
WHERE cve_id = ?
""", (cve['cve_id'],))
logger.info(f"Sent notification for {cve['cve_id']}")
await asyncio.sleep(2) # Rate limit
except Exception as e:
logger.error(f"Error sending notification for {cve['cve_id']}: {e}")
logger.info(f"Successfully sent {len(new_cves)} CVE notifications")
except Exception as e:
logger.error(f"Error in check_new_cves: {e}", exc_info=True)
@check_new_cves.before_loop
async def before_check_new_cves(self):
await self.wait_until_ready()
logger.info("Bot is ready, starting CVE monitoring...")
def create_cve_embed(self, cve: dict, detailed: bool = False) -> discord.Embed:
"""Create Discord embed for CVE"""
severity_colors = {
'CRITICAL': 0xDC3545,
'HIGH': 0xFD7E14,
'MEDIUM': 0xFFC107,
'LOW': 0x6C757D
}
severity = cve.get('severity', 'UNKNOWN')
color = severity_colors.get(severity, 0x6C757D)
vendor_code = cve.get('vendor_code', '')
vendor = next((v for v in config.VENDORS if v['code'] == vendor_code), None)
vendor_name = vendor['name'] if vendor else vendor_code.title()
description = cve.get('description', 'No description available')
if not detailed:
description = description[:500] + ('...' if len(description) > 500 else '')
embed = discord.Embed(
title=f"🚨 {cve['cve_id']}",
description=description,
color=color,
timestamp=datetime.utcnow()
)
embed.add_field(name="🏢 Vendor", value=vendor_name, inline=True)
embed.add_field(name="⚠️ Severity", value=severity, inline=True)
cvss_score = cve.get('cvss_score')
embed.add_field(
name="📊 CVSS Score",
value=f"**{cvss_score:.1f}**" if cvss_score else "N/A",
inline=True
)
published = cve.get('published_date', '')
if published:
try:
pub_date = datetime.fromisoformat(published.replace('Z', '+00:00'))
embed.add_field(
name="📅 Published",
value=pub_date.strftime('%Y-%m-%d %H:%M UTC'),
inline=True
)
except:
embed.add_field(name="📅 Published", value=published[:10], inline=True)
nvd_url = f"https://nvd.nist.gov/vuln/detail/{cve['cve_id']}"
embed.add_field(
name="🔗 Links",
value=f"[View on NVD]({nvd_url})",
inline=False
)
embed.set_footer(
text=f"CVE Monitor • {vendor_name}",
icon_url="https://nvd.nist.gov/favicon.ico"
)
return embed
def start_discord_bot():
"""Start the Discord bot"""
if not config.ENABLE_DISCORD_BOT:
logger.info("Discord bot is disabled (ENABLE_DISCORD_BOT=False)")
return
if not config.DISCORD_BOT_TOKEN:
logger.error("DISCORD_BOT_TOKEN not configured")
logger.error("Please set DISCORD_BOT_TOKEN in .env file")
logger.error("Get your token at: https://discord.com/developers/applications")
return
if not config.DISCORD_CHANNEL_ID:
logger.error("DISCORD_CHANNEL_ID not configured")
logger.error("Please set DISCORD_CHANNEL_ID in .env file")
logger.error("Enable Developer Mode in Discord, right-click channel -> Copy ID")
return
logger.info("Starting Discord bot...")
logger.info(f"Configuration:")
logger.info(f" - Check interval: {config.DISCORD_CHECK_INTERVAL_MINUTES} minutes")
logger.info(f" - Min CVSS score: {config.DISCORD_MIN_CVSS}")
logger.info(f" - Notify on CRITICAL: {config.DISCORD_NOTIFY_CRITICAL}")
logger.info(f" - Notify on HIGH: {config.DISCORD_NOTIFY_HIGH}")
bot = CVEDiscordBot()
try:
bot.run(config.DISCORD_BOT_TOKEN)
except discord.LoginFailure:
logger.error("Invalid Discord bot token")
logger.error("Please check DISCORD_BOT_TOKEN in .env file")
except Exception as e:
logger.error(f"Failed to start Discord bot: {e}", exc_info=True)
if __name__ == '__main__':
start_discord_bot()