219 lines
7.7 KiB
Python
219 lines
7.7 KiB
Python
#!/usr/bin/env python3
|
|
|
|
import discord
|
|
from discord.ext import tasks
|
|
import logging
|
|
import asyncio
|
|
from datetime import datetime
|
|
import sys
|
|
|
|
import config
|
|
from cve_handler import CVEHandler
|
|
|
|
logging.basicConfig(level=config.LOG_LEVEL, format=config.LOG_FORMAT)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class CVEDiscordBot(discord.Client):
|
|
def __init__(self, *args, **kwargs):
|
|
intents = discord.Intents.default()
|
|
intents.message_content = True
|
|
super().__init__(intents=intents, *args, **kwargs)
|
|
|
|
self.cve_handler = CVEHandler()
|
|
self.channel_id = None
|
|
self.check_interval = config.DISCORD_CHECK_INTERVAL_MINUTES
|
|
|
|
if not config.DISCORD_BOT_TOKEN:
|
|
logger.error("DISCORD_BOT_TOKEN not configured in .env")
|
|
sys.exit(1)
|
|
|
|
if not config.DISCORD_CHANNEL_ID:
|
|
logger.error("DISCORD_CHANNEL_ID not configured in .env")
|
|
sys.exit(1)
|
|
|
|
try:
|
|
self.channel_id = int(config.DISCORD_CHANNEL_ID)
|
|
except ValueError:
|
|
logger.error(f"Invalid DISCORD_CHANNEL_ID: {config.DISCORD_CHANNEL_ID}")
|
|
sys.exit(1)
|
|
|
|
async def on_ready(self):
|
|
logger.info(f'Discord bot logged in as {self.user}')
|
|
logger.info(f'Bot ID: {self.user.id}')
|
|
logger.info(f'Monitoring channel ID: {self.channel_id}')
|
|
logger.info(f'Check interval: {self.check_interval} minutes')
|
|
logger.info(f'Min CVSS score: {config.DISCORD_MIN_CVSS}')
|
|
|
|
channel = self.get_channel(self.channel_id)
|
|
if not channel:
|
|
logger.error(f"Cannot access channel {self.channel_id}")
|
|
logger.error("Make sure the bot has been invited to the server and has permissions")
|
|
await self.close()
|
|
return
|
|
|
|
logger.info(f"Successfully connected to channel: #{channel.name}")
|
|
|
|
if not self.check_new_cves.is_running():
|
|
self.check_new_cves.start()
|
|
logger.info("CVE monitoring task started")
|
|
|
|
async def on_error(self, event, *args, **kwargs):
|
|
logger.error(f"Discord error in event {event}", exc_info=True)
|
|
|
|
@tasks.loop(minutes=config.DISCORD_CHECK_INTERVAL_MINUTES)
|
|
async def check_new_cves(self):
|
|
try:
|
|
logger.info("Checking for new CVEs...")
|
|
|
|
channel = self.get_channel(self.channel_id)
|
|
if not channel:
|
|
logger.error(f"Channel {self.channel_id} not found")
|
|
return
|
|
|
|
hours_back = self.check_interval / 60
|
|
new_cves = self.cve_handler.get_recent_cves_for_discord(hours=hours_back)
|
|
|
|
if not new_cves:
|
|
logger.info("No new CVEs found")
|
|
return
|
|
|
|
filtered_cves = [
|
|
cve for cve in new_cves
|
|
if cve.get('cvss_score', 0) >= config.DISCORD_MIN_CVSS
|
|
]
|
|
|
|
if not filtered_cves:
|
|
logger.info(f"Found {len(new_cves)} CVEs but none meet min CVSS threshold of {config.DISCORD_MIN_CVSS}")
|
|
return
|
|
|
|
logger.info(f"Found {len(filtered_cves)} CVEs meeting criteria (out of {len(new_cves)} total)")
|
|
|
|
for cve in filtered_cves:
|
|
try:
|
|
embed = self.create_cve_embed(cve)
|
|
await channel.send(embed=embed)
|
|
logger.info(f"Sent notification for {cve['cve_id']}")
|
|
await asyncio.sleep(1)
|
|
except Exception as e:
|
|
logger.error(f"Error sending notification for {cve['cve_id']}: {e}")
|
|
|
|
logger.info(f"Successfully sent {len(filtered_cves)} CVE notifications")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in check_new_cves: {e}", exc_info=True)
|
|
|
|
@check_new_cves.before_loop
|
|
async def before_check_new_cves(self):
|
|
await self.wait_until_ready()
|
|
logger.info("Bot is ready, starting CVE monitoring...")
|
|
|
|
def create_cve_embed(self, cve: dict) -> discord.Embed:
|
|
|
|
# Severity colors
|
|
severity_colors = {
|
|
'CRITICAL': 0xDC3545, # Red
|
|
'HIGH': 0xFD7E14, # Orange
|
|
'MEDIUM': 0xFFC107, # Yellow
|
|
'LOW': 0x6C757D # Gray
|
|
}
|
|
|
|
severity = cve.get('severity', 'UNKNOWN')
|
|
color = severity_colors.get(severity, 0x6C757D)
|
|
|
|
# Get vendor info
|
|
vendor_code = cve.get('vendor_code', '')
|
|
vendor = next((v for v in config.VENDORS if v['code'] == vendor_code), None)
|
|
vendor_name = vendor['name'] if vendor else vendor_code.title()
|
|
|
|
# Create embed
|
|
embed = discord.Embed(
|
|
title=f"🚨 {cve['cve_id']}",
|
|
description=cve.get('description', 'No description available')[:2000],
|
|
color=color,
|
|
timestamp=datetime.utcnow()
|
|
)
|
|
|
|
# Add fields
|
|
embed.add_field(
|
|
name="🏢 Vendor",
|
|
value=vendor_name,
|
|
inline=True
|
|
)
|
|
|
|
embed.add_field(
|
|
name="⚠️ Severity",
|
|
value=severity,
|
|
inline=True
|
|
)
|
|
|
|
cvss_score = cve.get('cvss_score')
|
|
embed.add_field(
|
|
name="📊 CVSS Score",
|
|
value=f"**{cvss_score:.1f}**" if cvss_score else "N/A",
|
|
inline=True
|
|
)
|
|
|
|
published = cve.get('published_date', '')
|
|
if published:
|
|
try:
|
|
pub_date = datetime.fromisoformat(published.replace('Z', '+00:00'))
|
|
embed.add_field(
|
|
name="📅 Published",
|
|
value=pub_date.strftime('%Y-%m-%d %H:%M UTC'),
|
|
inline=True
|
|
)
|
|
except:
|
|
embed.add_field(name="📅 Published", value=published[:10], inline=True)
|
|
|
|
nvd_url = f"https://nvd.nist.gov/vuln/detail/{cve['cve_id']}"
|
|
embed.add_field(
|
|
name="🔗 Links",
|
|
value=f"[View on NVD]({nvd_url})",
|
|
inline=False
|
|
)
|
|
|
|
embed.set_footer(
|
|
text=f"CVE Monitor • {vendor_name}",
|
|
icon_url="https://nvd.nist.gov/favicon.ico"
|
|
)
|
|
|
|
return embed
|
|
|
|
def start_discord_bot():
|
|
if not config.ENABLE_DISCORD_BOT:
|
|
logger.info("Discord bot is disabled (ENABLE_DISCORD_BOT=False)")
|
|
return
|
|
|
|
if not config.DISCORD_BOT_TOKEN:
|
|
logger.error("DISCORD_BOT_TOKEN not configured")
|
|
logger.error("Please set DISCORD_BOT_TOKEN in .env file")
|
|
logger.error("Get your token at: https://discord.com/developers/applications")
|
|
return
|
|
|
|
if not config.DISCORD_CHANNEL_ID:
|
|
logger.error("DISCORD_CHANNEL_ID not configured")
|
|
logger.error("Please set DISCORD_CHANNEL_ID in .env file")
|
|
logger.error("Enable Developer Mode in Discord, right-click channel -> Copy ID")
|
|
return
|
|
|
|
logger.info("Starting Discord bot...")
|
|
logger.info(f"Configuration:")
|
|
logger.info(f" - Check interval: {config.DISCORD_CHECK_INTERVAL_MINUTES} minutes")
|
|
logger.info(f" - Min CVSS score: {config.DISCORD_MIN_CVSS}")
|
|
logger.info(f" - Notify on CRITICAL: {config.DISCORD_NOTIFY_CRITICAL}")
|
|
logger.info(f" - Notify on HIGH: {config.DISCORD_NOTIFY_HIGH}")
|
|
|
|
bot = CVEDiscordBot()
|
|
|
|
try:
|
|
bot.run(config.DISCORD_BOT_TOKEN)
|
|
except discord.LoginFailure:
|
|
logger.error("Invalid Discord bot token")
|
|
logger.error("Please check DISCORD_BOT_TOKEN in .env file")
|
|
except Exception as e:
|
|
logger.error(f"Failed to start Discord bot: {e}", exc_info=True)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
start_discord_bot()
|