168 lines
5.6 KiB
Python
168 lines
5.6 KiB
Python
#!/usr/bin/env python3
|
|
|
|
import discord
|
|
from discord.ext import tasks
|
|
import logging
|
|
import asyncio
|
|
from datetime import datetime
|
|
import sys
|
|
import config
|
|
from cve_handler import CVEHandler
|
|
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class TestCVEDiscordBot(discord.Client):
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
intents = discord.Intents.default()
|
|
intents.message_content = True
|
|
super().__init__(intents=intents, *args, **kwargs)
|
|
|
|
self.channel_id = int(config.DISCORD_CHANNEL_ID)
|
|
self.test_sent = False
|
|
|
|
async def on_ready(self):
|
|
logger.info(f'Bot logged in as {self.user}')
|
|
|
|
channel = self.get_channel(self.channel_id)
|
|
if not channel:
|
|
logger.error(f"Cannot find channel {self.channel_id}")
|
|
await self.close()
|
|
return
|
|
|
|
logger.info(f"Connected to channel: #{channel.name}")
|
|
|
|
test_cves = [
|
|
{
|
|
'cve_id': 'CVE-2024-TEST-001',
|
|
'vendor_code': 'microsoft',
|
|
'description': 'TEST: Critical vulnerability in Windows Update Service allows remote code execution. An attacker could exploit this vulnerability by sending specially crafted packets to the target system.',
|
|
'cvss_score': 9.8,
|
|
'severity': 'CRITICAL',
|
|
'published_date': datetime.utcnow().isoformat(),
|
|
},
|
|
{
|
|
'cve_id': 'CVE-2024-TEST-002',
|
|
'vendor_code': 'cisco',
|
|
'description': 'TEST: High severity vulnerability in Cisco IOS allows privilege escalation. Authenticated attackers can gain root access.',
|
|
'cvss_score': 8.1,
|
|
'severity': 'HIGH',
|
|
'published_date': datetime.utcnow().isoformat(),
|
|
},
|
|
{
|
|
'cve_id': 'CVE-2024-TEST-003',
|
|
'vendor_code': 'linux',
|
|
'description': 'TEST: Linux kernel vulnerability allows local privilege escalation through improper input validation.',
|
|
'cvss_score': 7.2,
|
|
'severity': 'HIGH',
|
|
'published_date': datetime.utcnow().isoformat(),
|
|
}
|
|
]
|
|
|
|
logger.info(f"Sending {len(test_cves)} test CVE notifications...")
|
|
|
|
for i, cve in enumerate(test_cves, 1):
|
|
try:
|
|
logger.info(f"[{i}/{len(test_cves)}] Sending: {cve['cve_id']} ({cve['severity']})")
|
|
embed = self.create_cve_embed(cve)
|
|
await channel.send(embed=embed)
|
|
logger.info(f"Sent {cve['cve_id']}")
|
|
await asyncio.sleep(1)
|
|
except Exception as e:
|
|
logger.error(f"Error sending {cve['cve_id']}: {e}")
|
|
|
|
logger.info(f"Test completed - sent {len(test_cves)} notifications")
|
|
|
|
self.test_sent = True
|
|
await asyncio.sleep(2)
|
|
await self.close()
|
|
|
|
def create_cve_embed(self, cve: dict) -> discord.Embed:
|
|
|
|
severity_colors = {
|
|
'CRITICAL': 0xDC3545,
|
|
'HIGH': 0xFD7E14,
|
|
'MEDIUM': 0xFFC107,
|
|
'LOW': 0x6C757D
|
|
}
|
|
|
|
severity = cve.get('severity', 'UNKNOWN')
|
|
color = severity_colors.get(severity, 0x6C757D)
|
|
|
|
vendor_code = cve.get('vendor_code', '')
|
|
vendor = next((v for v in config.VENDORS if v['code'] == vendor_code), None)
|
|
vendor_name = vendor['name'] if vendor else vendor_code.title()
|
|
|
|
embed = discord.Embed(
|
|
title=f"🧪 TEST: {cve['cve_id']}",
|
|
description=f"**[TEST NOTIFICATION]**\n\n{cve.get('description', 'No description')}",
|
|
color=color,
|
|
timestamp=datetime.utcnow()
|
|
)
|
|
|
|
embed.add_field(name="Vendor", value=vendor_name, inline=True)
|
|
embed.add_field(name="Severity", value=severity, inline=True)
|
|
|
|
cvss_score = cve.get('cvss_score')
|
|
embed.add_field(
|
|
name="CVSS Score",
|
|
value=f"**{cvss_score:.1f}**" if cvss_score else "N/A",
|
|
inline=True
|
|
)
|
|
|
|
published = cve.get('published_date', '')
|
|
if published:
|
|
try:
|
|
pub_date = datetime.fromisoformat(published.replace('Z', '+00:00'))
|
|
embed.add_field(
|
|
name="Published",
|
|
value=pub_date.strftime('%Y-%m-%d %H:%M UTC'),
|
|
inline=True
|
|
)
|
|
except:
|
|
pass
|
|
|
|
nvd_url = f"https://nvd.nist.gov/vuln/detail/{cve['cve_id']}"
|
|
embed.add_field(
|
|
name="Links",
|
|
value=f"[View on NVD]({nvd_url})",
|
|
inline=False
|
|
)
|
|
|
|
embed.set_footer(
|
|
text=f"CVE Monitor TEST • {vendor_name}",
|
|
icon_url="https://nvd.nist.gov/favicon.ico"
|
|
)
|
|
|
|
return embed
|
|
|
|
def run_test():
|
|
|
|
if not config.DISCORD_BOT_TOKEN:
|
|
logger.error("DISCORD_BOT_TOKEN not set in .env")
|
|
sys.exit(1)
|
|
|
|
if not config.DISCORD_CHANNEL_ID:
|
|
logger.error("DISCORD_CHANNEL_ID not set in .env")
|
|
sys.exit(1)
|
|
|
|
logger.info("CVE MONITOR - Discord Bot Test")
|
|
logger.info(f"Token: {config.DISCORD_BOT_TOKEN[:20]}...")
|
|
logger.info(f"Channel ID: {config.DISCORD_CHANNEL_ID}")
|
|
logger.info(f"Min CVSS: {config.DISCORD_MIN_CVSS}")
|
|
|
|
bot = TestCVEDiscordBot()
|
|
|
|
try:
|
|
bot.run(config.DISCORD_BOT_TOKEN)
|
|
except discord.LoginFailure:
|
|
logger.error("Invalid Discord bot token")
|
|
except KeyboardInterrupt:
|
|
logger.info("Test interrupted by user")
|
|
except Exception as e:
|
|
logger.error(f"Error: {e}", exc_info=True)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
run_test() |