733 lines
18 KiB
Python
733 lines
18 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
IP WHOIS Analyzer
|
|
Complete Flask application with RESTful API
|
|
"""
|
|
|
|
from flask import Flask, render_template, request, jsonify, Response
|
|
import re
|
|
import ipaddress
|
|
import socket
|
|
import subprocess
|
|
from collections import defaultdict, Counter
|
|
from datetime import datetime
|
|
import json
|
|
import os
|
|
import hashlib
|
|
|
|
app = Flask(__name__)
|
|
|
|
# ============================================================================
|
|
# UTILITY FUNCTIONS
|
|
# ============================================================================
|
|
|
|
def parse_ip_list(text):
|
|
"""
|
|
Parse IPs from text with various separators.
|
|
Supports: comma, semicolon, space, newline, tab
|
|
"""
|
|
# Replace common separators with newlines
|
|
text = re.sub(r'[,;|\t]+', '\n', text)
|
|
lines = text.strip().split('\n')
|
|
|
|
ips = []
|
|
for line in lines:
|
|
# Extract IPs using regex
|
|
found_ips = re.findall(r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b', line)
|
|
ips.extend(found_ips)
|
|
|
|
# Validate IPs
|
|
valid_ips = []
|
|
for ip in ips:
|
|
try:
|
|
ipaddress.IPv4Address(ip)
|
|
valid_ips.append(ip)
|
|
except:
|
|
pass
|
|
|
|
return list(set(valid_ips)) # Remove duplicates
|
|
|
|
|
|
def whois_lookup(ip):
|
|
"""Perform WHOIS lookup for single IP address"""
|
|
try:
|
|
result = subprocess.run(
|
|
['whois', ip],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=5
|
|
)
|
|
return result.stdout
|
|
except subprocess.TimeoutExpired:
|
|
return ""
|
|
except FileNotFoundError:
|
|
print("WARNING: whois command not found. Install it: apt install whois")
|
|
return ""
|
|
except Exception as e:
|
|
print(f"WHOIS error for {ip}: {e}")
|
|
return ""
|
|
|
|
|
|
def parse_whois(whois_output):
|
|
"""Extract relevant information from WHOIS output"""
|
|
info = {
|
|
'org': 'Unknown',
|
|
'country': 'Unknown',
|
|
'netname': 'Unknown',
|
|
'asn': 'Unknown',
|
|
'cidr': 'Unknown'
|
|
}
|
|
|
|
for line in whois_output.split('\n'):
|
|
line = line.strip()
|
|
|
|
# Organization
|
|
if line.startswith('Organization:') or line.startswith('org-name:'):
|
|
info['org'] = line.split(':', 1)[1].strip()
|
|
|
|
# Country
|
|
elif line.startswith('Country:') or line.startswith('country:'):
|
|
info['country'] = line.split(':', 1)[1].strip()
|
|
|
|
# Network name
|
|
elif line.startswith('NetName:') or line.startswith('netname:'):
|
|
info['netname'] = line.split(':', 1)[1].strip()
|
|
|
|
# ASN
|
|
elif line.startswith('OriginAS:') or 'origin:' in line.lower():
|
|
asn = re.search(r'AS\d+', line)
|
|
if asn:
|
|
info['asn'] = asn.group()
|
|
|
|
# CIDR
|
|
elif line.startswith('CIDR:') or line.startswith('inetnum:'):
|
|
info['cidr'] = line.split(':', 1)[1].strip()
|
|
|
|
return info
|
|
|
|
|
|
def cymru_lookup(ips):
|
|
"""
|
|
Bulk ASN lookup using Team Cymru WHOIS service.
|
|
Much faster than individual WHOIS lookups.
|
|
"""
|
|
results = {}
|
|
|
|
if not ips:
|
|
return results
|
|
|
|
try:
|
|
# Build query
|
|
query = "begin\nverbose\n" + "\n".join(ips) + "\nend\n"
|
|
|
|
# Connect to Team Cymru
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
sock.settimeout(10)
|
|
sock.connect(('whois.cymru.com', 43))
|
|
sock.sendall(query.encode())
|
|
|
|
# Read response
|
|
response = b""
|
|
while True:
|
|
data = sock.recv(4096)
|
|
if not data:
|
|
break
|
|
response += data
|
|
sock.close()
|
|
|
|
# Parse response
|
|
for line in response.decode('utf-8', errors='ignore').split('\n'):
|
|
if '|' in line and not line.startswith('AS'):
|
|
parts = [p.strip() for p in line.split('|')]
|
|
if len(parts) >= 5:
|
|
asn, ip, prefix, cc, owner = parts[0], parts[1], parts[2], parts[3], parts[4]
|
|
|
|
# Format ASN
|
|
if asn.isdigit():
|
|
asn = f"AS{asn}"
|
|
|
|
results[ip] = {
|
|
'asn': asn,
|
|
'prefix': prefix,
|
|
'country': cc,
|
|
'owner': owner
|
|
}
|
|
|
|
except socket.timeout:
|
|
print("WARNING: Team Cymru timeout. Using fallback WHOIS.")
|
|
except Exception as e:
|
|
print(f"Team Cymru lookup error: {e}")
|
|
|
|
return results
|
|
|
|
|
|
def analyze_ip(ip, cymru_data=None):
|
|
"""
|
|
Analyze single IP address.
|
|
Uses Team Cymru data if available, falls back to WHOIS.
|
|
"""
|
|
info = {
|
|
'ip': ip,
|
|
'asn': 'Unknown',
|
|
'owner': 'Unknown',
|
|
'country': 'Unknown',
|
|
'network': 'Unknown'
|
|
}
|
|
|
|
# Try Team Cymru data first
|
|
if cymru_data and ip in cymru_data:
|
|
data = cymru_data[ip]
|
|
info['asn'] = data.get('asn', 'Unknown')
|
|
info['owner'] = data.get('owner', 'Unknown')
|
|
info['country'] = data.get('country', 'Unknown')
|
|
info['network'] = data.get('prefix', 'Unknown')
|
|
|
|
# Fallback to WHOIS
|
|
else:
|
|
whois_output = whois_lookup(ip)
|
|
if whois_output:
|
|
parsed = parse_whois(whois_output)
|
|
info['asn'] = parsed['asn']
|
|
info['owner'] = parsed['org'] if parsed['org'] != 'Unknown' else parsed['netname']
|
|
info['country'] = parsed['country']
|
|
info['network'] = parsed['cidr']
|
|
|
|
return info
|
|
|
|
|
|
def apply_filters(results, filters):
|
|
"""
|
|
Apply filters to results.
|
|
Filters: countries, asns, owners
|
|
"""
|
|
countries = set(filters.get('countries', []))
|
|
asns = set(filters.get('asns', []))
|
|
owners = set(filters.get('owners', []))
|
|
|
|
# No filters = return all
|
|
if not (countries or asns or owners):
|
|
return results
|
|
|
|
filtered = []
|
|
for item in results:
|
|
# AND logic: all specified filters must match
|
|
if (not countries or item['country'] in countries) and \
|
|
(not asns or item['asn'] in asns) and \
|
|
(not owners or item['owner'] in owners):
|
|
filtered.append(item)
|
|
|
|
return filtered
|
|
|
|
|
|
# ============================================================================
|
|
# EXPORT GENERATORS
|
|
# ============================================================================
|
|
|
|
def generate_ipset(ips, timeout=86400):
|
|
"""Generate IPSet rules with timeout"""
|
|
timestamp = datetime.now().strftime('%Y-%m-%d')
|
|
|
|
rules = f"""#!/bin/bash
|
|
# IPSet Rules - Generated {timestamp}
|
|
# Total IPs: {len(ips)}
|
|
# Timeout: {timeout} seconds ({timeout//3600} hours)
|
|
|
|
# Create ipset
|
|
ipset create blocked_ips hash:ip timeout {timeout} maxelem 1000000
|
|
|
|
# Add IPs to set
|
|
"""
|
|
|
|
for ip in ips:
|
|
rules += f"ipset add blocked_ips {ip}\n"
|
|
|
|
rules += """
|
|
# Apply iptables rules
|
|
iptables -I INPUT -m set --match-set blocked_ips src -j DROP
|
|
iptables -I FORWARD -m set --match-set blocked_ips src -j DROP
|
|
|
|
echo "IPSet created and iptables rules applied"
|
|
echo "To remove: ipset destroy blocked_ips"
|
|
"""
|
|
|
|
return rules
|
|
|
|
|
|
def generate_iptables(ips):
|
|
"""Generate iptables DROP rules"""
|
|
timestamp = datetime.now().strftime('%Y-%m-%d')
|
|
|
|
rules = f"""#!/bin/bash
|
|
# iptables Rules - Generated {timestamp}
|
|
# Total IPs: {len(ips)}
|
|
|
|
# INPUT chain (incoming connections)
|
|
"""
|
|
|
|
for ip in ips:
|
|
rules += f"iptables -A INPUT -s {ip} -j DROP\n"
|
|
|
|
rules += "\n# FORWARD chain (routed traffic)\n"
|
|
|
|
for ip in ips:
|
|
rules += f"iptables -A FORWARD -s {ip} -j DROP\n"
|
|
|
|
rules += """
|
|
# Save rules
|
|
iptables-save > /etc/iptables/rules.v4
|
|
|
|
echo "iptables rules applied and saved"
|
|
"""
|
|
|
|
return rules
|
|
|
|
|
|
def generate_nginx(ips):
|
|
"""Generate Nginx deny directives"""
|
|
timestamp = datetime.now().strftime('%Y-%m-%d')
|
|
|
|
rules = f"""# Nginx Deny Rules - Generated {timestamp}
|
|
# Total IPs: {len(ips)}
|
|
#
|
|
# Usage: Include in http or server block
|
|
# include /etc/nginx/conf.d/blocked_ips.conf;
|
|
|
|
"""
|
|
|
|
for ip in ips:
|
|
rules += f"deny {ip};\n"
|
|
|
|
rules += "\n# After adding rules, reload nginx:\n"
|
|
rules += "# nginx -t && nginx -s reload\n"
|
|
|
|
return rules
|
|
|
|
|
|
def generate_apache(ips):
|
|
"""Generate Apache deny directives"""
|
|
timestamp = datetime.now().strftime('%Y-%m-%d')
|
|
|
|
rules = f"""# Apache Deny Rules - Generated {timestamp}
|
|
# Total IPs: {len(ips)}
|
|
#
|
|
# Usage: Add to .htaccess or VirtualHost configuration
|
|
|
|
<RequireAll>
|
|
Require all granted
|
|
"""
|
|
|
|
for ip in ips:
|
|
rules += f" Require not ip {ip}\n"
|
|
|
|
rules += """</RequireAll>
|
|
|
|
# After adding rules, restart apache:
|
|
# systemctl restart apache2
|
|
"""
|
|
|
|
return rules
|
|
|
|
|
|
def generate_firewalld(ips):
|
|
"""Generate Firewalld rich rules"""
|
|
timestamp = datetime.now().strftime('%Y-%m-%d')
|
|
|
|
rules = f"""#!/bin/bash
|
|
# Firewalld Rules - Generated {timestamp}
|
|
# Total IPs: {len(ips)}
|
|
|
|
"""
|
|
|
|
for ip in ips:
|
|
rules += f"firewall-cmd --permanent --add-rich-rule=\"rule family='ipv4' source address='{ip}' reject\"\n"
|
|
|
|
rules += """
|
|
# Reload firewall
|
|
firewall-cmd --reload
|
|
|
|
echo "Firewalld rules applied"
|
|
"""
|
|
|
|
return rules
|
|
|
|
|
|
def generate_mikrotik(ips):
|
|
"""Generate MikroTik RouterOS commands"""
|
|
timestamp = datetime.now().strftime('%Y-%m-%d')
|
|
|
|
rules = f"""# MikroTik RouterOS Configuration - Generated {timestamp}
|
|
# Total IPs: {len(ips)}
|
|
#
|
|
# Usage: Copy and paste into RouterOS Terminal
|
|
|
|
/ip firewall address-list
|
|
"""
|
|
|
|
for ip in ips:
|
|
rules += f"add list=blocked_ips address={ip} comment=\"Auto-blocked {timestamp}\"\n"
|
|
|
|
rules += """
|
|
# Create firewall filter rules (if not exists)
|
|
/ip firewall filter
|
|
add chain=input src-address-list=blocked_ips action=drop comment="Drop blocked IPs - input"
|
|
add chain=forward src-address-list=blocked_ips action=drop comment="Drop blocked IPs - forward"
|
|
|
|
# Verify
|
|
/ip firewall address-list print where list=blocked_ips
|
|
"""
|
|
|
|
return rules
|
|
|
|
|
|
def generate_cidr(results):
|
|
"""Generate list of unique CIDR networks"""
|
|
networks = list(set([r['network'] for r in results if r['network'] != 'Unknown']))
|
|
timestamp = datetime.now().strftime('%Y-%m-%d')
|
|
|
|
output = f"""# CIDR Networks - Generated {timestamp}
|
|
# Total unique networks: {len(networks)}
|
|
#
|
|
# One network per line
|
|
|
|
"""
|
|
output += "\n".join(sorted(networks))
|
|
|
|
return output
|
|
|
|
|
|
def generate_csv(results):
|
|
"""Generate CSV export"""
|
|
csv = "IP,ASN,Owner,Country,Network\n"
|
|
|
|
for item in results:
|
|
# Escape CSV fields
|
|
ip = item['ip']
|
|
asn = item['asn'].replace('"', '""')
|
|
owner = item['owner'].replace('"', '""')
|
|
country = item['country']
|
|
network = item['network']
|
|
|
|
csv += f'"{ip}","{asn}","{owner}","{country}","{network}"\n'
|
|
|
|
return csv
|
|
|
|
def get_file_hash(filepath):
|
|
"""Generate MD5 hash for cache busting"""
|
|
with open(filepath, 'rb') as f:
|
|
return hashlib.md5(f.read()).hexdigest()[:8]
|
|
|
|
@app.context_processor
|
|
def inject_static_hash():
|
|
"""Inject static file hash into templates"""
|
|
def static_hash(filename):
|
|
filepath = os.path.join(app.static_folder, filename)
|
|
file_hash = get_file_hash(filepath)
|
|
return f"/static/{filename}?v={file_hash}"
|
|
return dict(static_hash=static_hash)
|
|
|
|
@app.after_request
|
|
def add_header(response):
|
|
if request.path.startswith('/static/'):
|
|
# Clear default cache control
|
|
response.cache_control.no_cache = None
|
|
response.cache_control.no_store = None
|
|
|
|
# Set static file cache
|
|
response.cache_control.max_age = 31536000
|
|
response.cache_control.public = True
|
|
|
|
# Remove Content-Disposition header
|
|
response.headers.pop('Content-Disposition', None)
|
|
else:
|
|
# Dynamic content: no cache
|
|
response.cache_control.no_cache = True
|
|
response.cache_control.no_store = True
|
|
|
|
return response
|
|
|
|
# ============================================================================
|
|
# WEB ROUTES
|
|
# ============================================================================
|
|
|
|
@app.route('/')
|
|
def index():
|
|
"""Main application interface"""
|
|
return render_template('index.html')
|
|
|
|
@app.route('/favicon.ico')
|
|
def favicon():
|
|
"""Handle favicon requests - return 204 No Content instead of 404"""
|
|
return '', 204
|
|
|
|
@app.route('/api')
|
|
def api_docs():
|
|
"""API documentation page"""
|
|
return render_template('api.html')
|
|
|
|
|
|
# ============================================================================
|
|
# API ENDPOINTS
|
|
# ============================================================================
|
|
|
|
@app.route('/api/analyze', methods=['POST'])
|
|
def api_analyze():
|
|
"""
|
|
Analyze IP addresses.
|
|
|
|
POST /api/analyze
|
|
{
|
|
"ips": "1.1.1.1, 8.8.8.8, 9.9.9.9"
|
|
}
|
|
|
|
Returns:
|
|
{
|
|
"results": [...],
|
|
"stats": {...}
|
|
}
|
|
"""
|
|
data = request.get_json()
|
|
|
|
if not data:
|
|
return jsonify({'error': 'Invalid JSON'}), 400
|
|
|
|
ip_text = data.get('ips', '')
|
|
|
|
# Parse IPs
|
|
ips = parse_ip_list(ip_text)
|
|
|
|
if not ips:
|
|
return jsonify({'error': 'No valid IPs found'}), 400
|
|
|
|
# Bulk lookup via Team Cymru
|
|
print(f"Analyzing {len(ips)} IPs via Team Cymru...")
|
|
cymru_data = cymru_lookup(ips)
|
|
|
|
# Analyze each IP
|
|
results = []
|
|
for ip in ips:
|
|
info = analyze_ip(ip, cymru_data)
|
|
results.append(info)
|
|
|
|
# Generate statistics
|
|
stats = {
|
|
'total': len(results),
|
|
'countries': dict(Counter([r['country'] for r in results])),
|
|
'asns': dict(Counter([r['asn'] for r in results])),
|
|
'owners': dict(Counter([r['owner'] for r in results]))
|
|
}
|
|
|
|
print(f"Analysis complete: {len(results)} IPs, {len(stats['countries'])} countries")
|
|
|
|
return jsonify({
|
|
'results': results,
|
|
'stats': stats
|
|
})
|
|
|
|
|
|
@app.route('/api/filter', methods=['POST'])
|
|
def api_filter():
|
|
"""
|
|
Filter results.
|
|
|
|
POST /api/filter
|
|
{
|
|
"results": [...],
|
|
"filters": {
|
|
"countries": ["CN", "RU"],
|
|
"asns": ["AS4134"],
|
|
"owners": ["CHINANET"]
|
|
}
|
|
}
|
|
|
|
Returns:
|
|
{
|
|
"filtered": [...],
|
|
"count": 15
|
|
}
|
|
"""
|
|
data = request.get_json()
|
|
|
|
if not data:
|
|
return jsonify({'error': 'Invalid JSON'}), 400
|
|
|
|
results = data.get('results', [])
|
|
filters = data.get('filters', {})
|
|
|
|
filtered = apply_filters(results, filters)
|
|
|
|
return jsonify({
|
|
'filtered': filtered,
|
|
'count': len(filtered)
|
|
})
|
|
|
|
|
|
@app.route('/api/export/ipset', methods=['POST'])
|
|
def api_export_ipset():
|
|
"""
|
|
Export IPSet rules.
|
|
|
|
POST /api/export/ipset
|
|
{
|
|
"ips": ["1.1.1.1", "8.8.8.8"],
|
|
"timeout": 86400
|
|
}
|
|
"""
|
|
data = request.get_json()
|
|
|
|
if not data:
|
|
return Response('Invalid JSON', status=400)
|
|
|
|
ips = data.get('ips', [])
|
|
timeout = data.get('timeout', 86400)
|
|
|
|
rules = generate_ipset(ips, timeout)
|
|
|
|
return Response(rules, mimetype='text/plain')
|
|
|
|
|
|
@app.route('/api/export/iptables', methods=['POST'])
|
|
def api_export_iptables():
|
|
"""Export iptables rules"""
|
|
data = request.get_json()
|
|
|
|
if not data:
|
|
return Response('Invalid JSON', status=400)
|
|
|
|
ips = data.get('ips', [])
|
|
rules = generate_iptables(ips)
|
|
|
|
return Response(rules, mimetype='text/plain')
|
|
|
|
|
|
@app.route('/api/export/nginx', methods=['POST'])
|
|
def api_export_nginx():
|
|
"""Export Nginx deny rules"""
|
|
data = request.get_json()
|
|
|
|
if not data:
|
|
return Response('Invalid JSON', status=400)
|
|
|
|
ips = data.get('ips', [])
|
|
rules = generate_nginx(ips)
|
|
|
|
return Response(rules, mimetype='text/plain')
|
|
|
|
|
|
@app.route('/api/export/apache', methods=['POST'])
|
|
def api_export_apache():
|
|
"""Export Apache deny rules"""
|
|
data = request.get_json()
|
|
|
|
if not data:
|
|
return Response('Invalid JSON', status=400)
|
|
|
|
ips = data.get('ips', [])
|
|
rules = generate_apache(ips)
|
|
|
|
return Response(rules, mimetype='text/plain')
|
|
|
|
|
|
@app.route('/api/export/firewalld', methods=['POST'])
|
|
def api_export_firewalld():
|
|
"""Export Firewalld rules"""
|
|
data = request.get_json()
|
|
|
|
if not data:
|
|
return Response('Invalid JSON', status=400)
|
|
|
|
ips = data.get('ips', [])
|
|
rules = generate_firewalld(ips)
|
|
|
|
return Response(rules, mimetype='text/plain')
|
|
|
|
|
|
@app.route('/api/export/mikrotik', methods=['POST'])
|
|
def api_export_mikrotik():
|
|
"""Export MikroTik rules"""
|
|
data = request.get_json()
|
|
|
|
if not data:
|
|
return Response('Invalid JSON', status=400)
|
|
|
|
ips = data.get('ips', [])
|
|
rules = generate_mikrotik(ips)
|
|
|
|
return Response(rules, mimetype='text/plain')
|
|
|
|
|
|
@app.route('/api/export/cidr', methods=['POST'])
|
|
def api_export_cidr():
|
|
"""Export CIDR list"""
|
|
data = request.get_json()
|
|
|
|
if not data:
|
|
return Response('Invalid JSON', status=400)
|
|
|
|
results = data.get('results', [])
|
|
output = generate_cidr(results)
|
|
|
|
return Response(output, mimetype='text/plain')
|
|
|
|
|
|
@app.route('/api/export/csv', methods=['POST'])
|
|
def api_export_csv():
|
|
"""Export CSV"""
|
|
data = request.get_json()
|
|
|
|
if not data:
|
|
return Response('Invalid JSON', status=400)
|
|
|
|
results = data.get('results', [])
|
|
csv = generate_csv(results)
|
|
|
|
timestamp = datetime.now().strftime('%Y-%m-%d')
|
|
|
|
return Response(
|
|
csv,
|
|
mimetype='text/csv',
|
|
headers={'Content-Disposition': f'attachment; filename=ip-analysis-{timestamp}.csv'}
|
|
)
|
|
|
|
|
|
# Legacy endpoint for backward compatibility
|
|
@app.route('/analyze', methods=['POST'])
|
|
def analyze():
|
|
"""Legacy analyze endpoint - redirects to /api/analyze"""
|
|
return api_analyze()
|
|
|
|
|
|
# ============================================================================
|
|
# ERROR HANDLERS
|
|
# ============================================================================
|
|
|
|
@app.errorhandler(404)
|
|
def not_found(e):
|
|
return jsonify({'error': 'Endpoint not found'}), 404
|
|
|
|
|
|
@app.errorhandler(500)
|
|
def server_error(e):
|
|
return jsonify({'error': 'Internal server error'}), 500
|
|
|
|
|
|
# ============================================================================
|
|
# MAIN
|
|
# ============================================================================
|
|
|
|
if __name__ == '__main__':
|
|
print("=" * 70)
|
|
print("IP WHOIS Analyzer - Starting")
|
|
print("=" * 70)
|
|
print()
|
|
print("Interface: http://localhost:8799")
|
|
print("API Docs: http://localhost:8799/api")
|
|
print()
|
|
print("Press Ctrl+C to stop")
|
|
print()
|
|
|
|
app.run(
|
|
debug=True,
|
|
host='0.0.0.0',
|
|
port=8799
|
|
)
|