diff --git a/test_discord_bot.py b/test_discord_bot.py new file mode 100644 index 0000000..5aceb76 --- /dev/null +++ b/test_discord_bot.py @@ -0,0 +1,175 @@ +#!/usr/bin/env python3 + +import discord +from discord.ext import tasks +import logging +import asyncio +from datetime import datetime +import sys +import config +from cve_handler import CVEHandler + +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + +class TestCVEDiscordBot(discord.Client): + + def __init__(self, *args, **kwargs): + intents = discord.Intents.default() + intents.message_content = True + super().__init__(intents=intents, *args, **kwargs) + + self.channel_id = int(config.DISCORD_CHANNEL_ID) + self.test_sent = False + + async def on_ready(self): + logger.info(f'✓ Bot zalogowany jako {self.user}') + + channel = self.get_channel(self.channel_id) + if not channel: + logger.error(f"✗ Nie można znaleźć kanału {self.channel_id}") + await self.close() + return + + logger.info(f"✓ Połączono z kanałem: #{channel.name}") + + test_cves = [ + { + 'cve_id': 'CVE-2024-TEST-001', + 'vendor_code': 'microsoft', + 'description': 'TEST: Critical vulnerability in Windows Update Service allows remote code execution. An attacker could exploit this vulnerability by sending specially crafted packets to the target system.', + 'cvss_score': 9.8, + 'severity': 'CRITICAL', + 'published_date': datetime.utcnow().isoformat(), + }, + { + 'cve_id': 'CVE-2024-TEST-002', + 'vendor_code': 'cisco', + 'description': 'TEST: High severity vulnerability in Cisco IOS allows privilege escalation. Authenticated attackers can gain root access.', + 'cvss_score': 8.1, + 'severity': 'HIGH', + 'published_date': datetime.utcnow().isoformat(), + }, + { + 'cve_id': 'CVE-2024-TEST-003', + 'vendor_code': 'linux', + 'description': 'TEST: Linux kernel vulnerability allows local privilege escalation through improper input validation.', + 'cvss_score': 7.2, + 'severity': 'HIGH', + 'published_date': datetime.utcnow().isoformat(), + } + ] + + logger.info(f"\n{'='*60}") + logger.info(f"Wysyłanie {len(test_cves)} testowych powiadomień CVE...") + logger.info(f"{'='*60}\n") + + for i, cve in enumerate(test_cves, 1): + try: + logger.info(f"[{i}/{len(test_cves)}] Wysyłanie: {cve['cve_id']} ({cve['severity']})") + embed = self.create_cve_embed(cve) + await channel.send(embed=embed) + logger.info(f"✓ Wysłano {cve['cve_id']}") + await asyncio.sleep(1) + except Exception as e: + logger.error(f"✗ Błąd przy wysyłaniu {cve['cve_id']}: {e}") + + logger.info(f"\n{'='*60}") + logger.info(f"✓ Test zakończony - wysłano {len(test_cves)} powiadomień") + logger.info(f"{'='*60}") + + self.test_sent = True + await asyncio.sleep(2) + await self.close() + + def create_cve_embed(self, cve: dict) -> discord.Embed: + + severity_colors = { + 'CRITICAL': 0xDC3545, + 'HIGH': 0xFD7E14, + 'MEDIUM': 0xFFC107, + 'LOW': 0x6C757D + } + + severity = cve.get('severity', 'UNKNOWN') + color = severity_colors.get(severity, 0x6C757D) + + vendor_code = cve.get('vendor_code', '') + vendor = next((v for v in config.VENDORS if v['code'] == vendor_code), None) + vendor_name = vendor['name'] if vendor else vendor_code.title() + + embed = discord.Embed( + title=f"🧪 TEST: {cve['cve_id']}", + description=f"**[TESTOWE POWIADOMIENIE]**\n\n{cve.get('description', 'No description')}", + color=color, + timestamp=datetime.utcnow() + ) + + embed.add_field(name="🏢 Vendor", value=vendor_name, inline=True) + embed.add_field(name="⚠️ Severity", value=severity, inline=True) + + cvss_score = cve.get('cvss_score') + embed.add_field( + name="📊 CVSS Score", + value=f"**{cvss_score:.1f}**" if cvss_score else "N/A", + inline=True + ) + + published = cve.get('published_date', '') + if published: + try: + pub_date = datetime.fromisoformat(published.replace('Z', '+00:00')) + embed.add_field( + name="📅 Published", + value=pub_date.strftime('%Y-%m-%d %H:%M UTC'), + inline=True + ) + except: + pass + + nvd_url = f"https://nvd.nist.gov/vuln/detail/{cve['cve_id']}" + embed.add_field( + name="🔗 Links", + value=f"[View on NVD]({nvd_url})", + inline=False + ) + + embed.set_footer( + text=f"🧪 CVE Monitor TEST • {vendor_name}", + icon_url="https://nvd.nist.gov/favicon.ico" + ) + + return embed + +def run_test(): + + if not config.DISCORD_BOT_TOKEN: + logger.error("✗ DISCORD_BOT_TOKEN nie jest ustawiony w .env") + sys.exit(1) + + if not config.DISCORD_CHANNEL_ID: + logger.error("✗ DISCORD_CHANNEL_ID nie jest ustawiony w .env") + sys.exit(1) + + logger.info("\n" + "="*60) + logger.info("🧪 CVE MONITOR - TEST DISCORD NOTIFICATIONS") + logger.info("="*60) + logger.info(f"Token: {config.DISCORD_BOT_TOKEN[:20]}...") + logger.info(f"Channel ID: {config.DISCORD_CHANNEL_ID}") + logger.info(f"Min CVSS: {config.DISCORD_MIN_CVSS}") + logger.info("="*60 + "\n") + + bot = TestCVEDiscordBot() + + try: + bot.run(config.DISCORD_BOT_TOKEN) + except discord.LoginFailure: + logger.error("✗ Nieprawidłowy token Discord bot") + except KeyboardInterrupt: + logger.info("\n✗ Test przerwany przez użytkownika") + except Exception as e: + logger.error(f"✗ Błąd: {e}", exc_info=True) + + +if __name__ == '__main__': + run_test()