discord tester
This commit is contained in:
175
test_discord_bot.py
Normal file
175
test_discord_bot.py
Normal file
@@ -0,0 +1,175 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import discord
|
||||
from discord.ext import tasks
|
||||
import logging
|
||||
import asyncio
|
||||
from datetime import datetime
|
||||
import sys
|
||||
import config
|
||||
from cve_handler import CVEHandler
|
||||
|
||||
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class TestCVEDiscordBot(discord.Client):
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
intents = discord.Intents.default()
|
||||
intents.message_content = True
|
||||
super().__init__(intents=intents, *args, **kwargs)
|
||||
|
||||
self.channel_id = int(config.DISCORD_CHANNEL_ID)
|
||||
self.test_sent = False
|
||||
|
||||
async def on_ready(self):
|
||||
logger.info(f'✓ Bot zalogowany jako {self.user}')
|
||||
|
||||
channel = self.get_channel(self.channel_id)
|
||||
if not channel:
|
||||
logger.error(f"✗ Nie można znaleźć kanału {self.channel_id}")
|
||||
await self.close()
|
||||
return
|
||||
|
||||
logger.info(f"✓ Połączono z kanałem: #{channel.name}")
|
||||
|
||||
test_cves = [
|
||||
{
|
||||
'cve_id': 'CVE-2024-TEST-001',
|
||||
'vendor_code': 'microsoft',
|
||||
'description': 'TEST: Critical vulnerability in Windows Update Service allows remote code execution. An attacker could exploit this vulnerability by sending specially crafted packets to the target system.',
|
||||
'cvss_score': 9.8,
|
||||
'severity': 'CRITICAL',
|
||||
'published_date': datetime.utcnow().isoformat(),
|
||||
},
|
||||
{
|
||||
'cve_id': 'CVE-2024-TEST-002',
|
||||
'vendor_code': 'cisco',
|
||||
'description': 'TEST: High severity vulnerability in Cisco IOS allows privilege escalation. Authenticated attackers can gain root access.',
|
||||
'cvss_score': 8.1,
|
||||
'severity': 'HIGH',
|
||||
'published_date': datetime.utcnow().isoformat(),
|
||||
},
|
||||
{
|
||||
'cve_id': 'CVE-2024-TEST-003',
|
||||
'vendor_code': 'linux',
|
||||
'description': 'TEST: Linux kernel vulnerability allows local privilege escalation through improper input validation.',
|
||||
'cvss_score': 7.2,
|
||||
'severity': 'HIGH',
|
||||
'published_date': datetime.utcnow().isoformat(),
|
||||
}
|
||||
]
|
||||
|
||||
logger.info(f"\n{'='*60}")
|
||||
logger.info(f"Wysyłanie {len(test_cves)} testowych powiadomień CVE...")
|
||||
logger.info(f"{'='*60}\n")
|
||||
|
||||
for i, cve in enumerate(test_cves, 1):
|
||||
try:
|
||||
logger.info(f"[{i}/{len(test_cves)}] Wysyłanie: {cve['cve_id']} ({cve['severity']})")
|
||||
embed = self.create_cve_embed(cve)
|
||||
await channel.send(embed=embed)
|
||||
logger.info(f"✓ Wysłano {cve['cve_id']}")
|
||||
await asyncio.sleep(1)
|
||||
except Exception as e:
|
||||
logger.error(f"✗ Błąd przy wysyłaniu {cve['cve_id']}: {e}")
|
||||
|
||||
logger.info(f"\n{'='*60}")
|
||||
logger.info(f"✓ Test zakończony - wysłano {len(test_cves)} powiadomień")
|
||||
logger.info(f"{'='*60}")
|
||||
|
||||
self.test_sent = True
|
||||
await asyncio.sleep(2)
|
||||
await self.close()
|
||||
|
||||
def create_cve_embed(self, cve: dict) -> discord.Embed:
|
||||
|
||||
severity_colors = {
|
||||
'CRITICAL': 0xDC3545,
|
||||
'HIGH': 0xFD7E14,
|
||||
'MEDIUM': 0xFFC107,
|
||||
'LOW': 0x6C757D
|
||||
}
|
||||
|
||||
severity = cve.get('severity', 'UNKNOWN')
|
||||
color = severity_colors.get(severity, 0x6C757D)
|
||||
|
||||
vendor_code = cve.get('vendor_code', '')
|
||||
vendor = next((v for v in config.VENDORS if v['code'] == vendor_code), None)
|
||||
vendor_name = vendor['name'] if vendor else vendor_code.title()
|
||||
|
||||
embed = discord.Embed(
|
||||
title=f"🧪 TEST: {cve['cve_id']}",
|
||||
description=f"**[TESTOWE POWIADOMIENIE]**\n\n{cve.get('description', 'No description')}",
|
||||
color=color,
|
||||
timestamp=datetime.utcnow()
|
||||
)
|
||||
|
||||
embed.add_field(name="🏢 Vendor", value=vendor_name, inline=True)
|
||||
embed.add_field(name="⚠️ Severity", value=severity, inline=True)
|
||||
|
||||
cvss_score = cve.get('cvss_score')
|
||||
embed.add_field(
|
||||
name="📊 CVSS Score",
|
||||
value=f"**{cvss_score:.1f}**" if cvss_score else "N/A",
|
||||
inline=True
|
||||
)
|
||||
|
||||
published = cve.get('published_date', '')
|
||||
if published:
|
||||
try:
|
||||
pub_date = datetime.fromisoformat(published.replace('Z', '+00:00'))
|
||||
embed.add_field(
|
||||
name="📅 Published",
|
||||
value=pub_date.strftime('%Y-%m-%d %H:%M UTC'),
|
||||
inline=True
|
||||
)
|
||||
except:
|
||||
pass
|
||||
|
||||
nvd_url = f"https://nvd.nist.gov/vuln/detail/{cve['cve_id']}"
|
||||
embed.add_field(
|
||||
name="🔗 Links",
|
||||
value=f"[View on NVD]({nvd_url})",
|
||||
inline=False
|
||||
)
|
||||
|
||||
embed.set_footer(
|
||||
text=f"🧪 CVE Monitor TEST • {vendor_name}",
|
||||
icon_url="https://nvd.nist.gov/favicon.ico"
|
||||
)
|
||||
|
||||
return embed
|
||||
|
||||
def run_test():
|
||||
|
||||
if not config.DISCORD_BOT_TOKEN:
|
||||
logger.error("✗ DISCORD_BOT_TOKEN nie jest ustawiony w .env")
|
||||
sys.exit(1)
|
||||
|
||||
if not config.DISCORD_CHANNEL_ID:
|
||||
logger.error("✗ DISCORD_CHANNEL_ID nie jest ustawiony w .env")
|
||||
sys.exit(1)
|
||||
|
||||
logger.info("\n" + "="*60)
|
||||
logger.info("🧪 CVE MONITOR - TEST DISCORD NOTIFICATIONS")
|
||||
logger.info("="*60)
|
||||
logger.info(f"Token: {config.DISCORD_BOT_TOKEN[:20]}...")
|
||||
logger.info(f"Channel ID: {config.DISCORD_CHANNEL_ID}")
|
||||
logger.info(f"Min CVSS: {config.DISCORD_MIN_CVSS}")
|
||||
logger.info("="*60 + "\n")
|
||||
|
||||
bot = TestCVEDiscordBot()
|
||||
|
||||
try:
|
||||
bot.run(config.DISCORD_BOT_TOKEN)
|
||||
except discord.LoginFailure:
|
||||
logger.error("✗ Nieprawidłowy token Discord bot")
|
||||
except KeyboardInterrupt:
|
||||
logger.info("\n✗ Test przerwany przez użytkownika")
|
||||
except Exception as e:
|
||||
logger.error(f"✗ Błąd: {e}", exc_info=True)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
run_test()
|
||||
Reference in New Issue
Block a user