#!/usr/bin/env python3 """ IP WHOIS Analyzer Complete Flask application with RESTful API """ from flask import Flask, render_template, request, jsonify, Response import re import ipaddress import socket import subprocess from collections import defaultdict, Counter from datetime import datetime import json import os import hashlib app = Flask(__name__) # ============================================================================ # UTILITY FUNCTIONS # ============================================================================ def parse_ip_list(text): """ Parse IPs from text with various separators. Supports: comma, semicolon, space, newline, tab """ # Replace common separators with newlines text = re.sub(r'[,;|\t]+', '\n', text) lines = text.strip().split('\n') ips = [] for line in lines: # Extract IPs using regex found_ips = re.findall(r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b', line) ips.extend(found_ips) # Validate IPs valid_ips = [] for ip in ips: try: ipaddress.IPv4Address(ip) valid_ips.append(ip) except: pass return list(set(valid_ips)) # Remove duplicates def whois_lookup(ip): """Perform WHOIS lookup for single IP address""" try: result = subprocess.run( ['whois', ip], capture_output=True, text=True, timeout=5 ) return result.stdout except subprocess.TimeoutExpired: return "" except FileNotFoundError: print("WARNING: whois command not found. Install it: apt install whois") return "" except Exception as e: print(f"WHOIS error for {ip}: {e}") return "" def parse_whois(whois_output): """Extract relevant information from WHOIS output""" info = { 'org': 'Unknown', 'country': 'Unknown', 'netname': 'Unknown', 'asn': 'Unknown', 'cidr': 'Unknown' } for line in whois_output.split('\n'): line = line.strip() # Organization if line.startswith('Organization:') or line.startswith('org-name:'): info['org'] = line.split(':', 1)[1].strip() # Country elif line.startswith('Country:') or line.startswith('country:'): info['country'] = line.split(':', 1)[1].strip() # Network name elif line.startswith('NetName:') or line.startswith('netname:'): info['netname'] = line.split(':', 1)[1].strip() # ASN elif line.startswith('OriginAS:') or 'origin:' in line.lower(): asn = re.search(r'AS\d+', line) if asn: info['asn'] = asn.group() # CIDR elif line.startswith('CIDR:') or line.startswith('inetnum:'): info['cidr'] = line.split(':', 1)[1].strip() return info def cymru_lookup(ips): """ Bulk ASN lookup using Team Cymru WHOIS service. Much faster than individual WHOIS lookups. """ results = {} if not ips: return results try: # Build query query = "begin\nverbose\n" + "\n".join(ips) + "\nend\n" # Connect to Team Cymru sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(10) sock.connect(('whois.cymru.com', 43)) sock.sendall(query.encode()) # Read response response = b"" while True: data = sock.recv(4096) if not data: break response += data sock.close() # Parse response for line in response.decode('utf-8', errors='ignore').split('\n'): if '|' in line and not line.startswith('AS'): parts = [p.strip() for p in line.split('|')] if len(parts) >= 5: asn, ip, prefix, cc, owner = parts[0], parts[1], parts[2], parts[3], parts[4] # Format ASN if asn.isdigit(): asn = f"AS{asn}" results[ip] = { 'asn': asn, 'prefix': prefix, 'country': cc, 'owner': owner } except socket.timeout: print("WARNING: Team Cymru timeout. Using fallback WHOIS.") except Exception as e: print(f"Team Cymru lookup error: {e}") return results def analyze_ip(ip, cymru_data=None): """ Analyze single IP address. Uses Team Cymru data if available, falls back to WHOIS. """ info = { 'ip': ip, 'asn': 'Unknown', 'owner': 'Unknown', 'country': 'Unknown', 'network': 'Unknown' } # Try Team Cymru data first if cymru_data and ip in cymru_data: data = cymru_data[ip] info['asn'] = data.get('asn', 'Unknown') info['owner'] = data.get('owner', 'Unknown') info['country'] = data.get('country', 'Unknown') info['network'] = data.get('prefix', 'Unknown') # Fallback to WHOIS else: whois_output = whois_lookup(ip) if whois_output: parsed = parse_whois(whois_output) info['asn'] = parsed['asn'] info['owner'] = parsed['org'] if parsed['org'] != 'Unknown' else parsed['netname'] info['country'] = parsed['country'] info['network'] = parsed['cidr'] return info def apply_filters(results, filters): """ Apply filters to results. Filters: countries, asns, owners """ countries = set(filters.get('countries', [])) asns = set(filters.get('asns', [])) owners = set(filters.get('owners', [])) # No filters = return all if not (countries or asns or owners): return results filtered = [] for item in results: # AND logic: all specified filters must match if (not countries or item['country'] in countries) and \ (not asns or item['asn'] in asns) and \ (not owners or item['owner'] in owners): filtered.append(item) return filtered # ============================================================================ # EXPORT GENERATORS # ============================================================================ def generate_ipset(ips, timeout=86400): """Generate IPSet rules with timeout""" timestamp = datetime.now().strftime('%Y-%m-%d') rules = f"""#!/bin/bash # IPSet Rules - Generated {timestamp} # Total IPs: {len(ips)} # Timeout: {timeout} seconds ({timeout//3600} hours) # Create ipset ipset create blocked_ips hash:ip timeout {timeout} maxelem 1000000 # Add IPs to set """ for ip in ips: rules += f"ipset add blocked_ips {ip}\n" rules += """ # Apply iptables rules iptables -I INPUT -m set --match-set blocked_ips src -j DROP iptables -I FORWARD -m set --match-set blocked_ips src -j DROP echo "IPSet created and iptables rules applied" echo "To remove: ipset destroy blocked_ips" """ return rules def generate_iptables(ips): """Generate iptables DROP rules""" timestamp = datetime.now().strftime('%Y-%m-%d') rules = f"""#!/bin/bash # iptables Rules - Generated {timestamp} # Total IPs: {len(ips)} # INPUT chain (incoming connections) """ for ip in ips: rules += f"iptables -A INPUT -s {ip} -j DROP\n" rules += "\n# FORWARD chain (routed traffic)\n" for ip in ips: rules += f"iptables -A FORWARD -s {ip} -j DROP\n" rules += """ # Save rules iptables-save > /etc/iptables/rules.v4 echo "iptables rules applied and saved" """ return rules def generate_nginx(ips): """Generate Nginx deny directives""" timestamp = datetime.now().strftime('%Y-%m-%d') rules = f"""# Nginx Deny Rules - Generated {timestamp} # Total IPs: {len(ips)} # # Usage: Include in http or server block # include /etc/nginx/conf.d/blocked_ips.conf; """ for ip in ips: rules += f"deny {ip};\n" rules += "\n# After adding rules, reload nginx:\n" rules += "# nginx -t && nginx -s reload\n" return rules def generate_apache(ips): """Generate Apache deny directives""" timestamp = datetime.now().strftime('%Y-%m-%d') rules = f"""# Apache Deny Rules - Generated {timestamp} # Total IPs: {len(ips)} # # Usage: Add to .htaccess or VirtualHost configuration Require all granted """ for ip in ips: rules += f" Require not ip {ip}\n" rules += """ # After adding rules, restart apache: # systemctl restart apache2 """ return rules def generate_firewalld(ips): """Generate Firewalld rich rules""" timestamp = datetime.now().strftime('%Y-%m-%d') rules = f"""#!/bin/bash # Firewalld Rules - Generated {timestamp} # Total IPs: {len(ips)} """ for ip in ips: rules += f"firewall-cmd --permanent --add-rich-rule=\"rule family='ipv4' source address='{ip}' reject\"\n" rules += """ # Reload firewall firewall-cmd --reload echo "Firewalld rules applied" """ return rules def generate_mikrotik(ips): """Generate MikroTik RouterOS commands""" timestamp = datetime.now().strftime('%Y-%m-%d') rules = f"""# MikroTik RouterOS Configuration - Generated {timestamp} # Total IPs: {len(ips)} # # Usage: Copy and paste into RouterOS Terminal /ip firewall address-list """ for ip in ips: rules += f"add list=blocked_ips address={ip} comment=\"Auto-blocked {timestamp}\"\n" rules += """ # Create firewall filter rules (if not exists) /ip firewall filter add chain=input src-address-list=blocked_ips action=drop comment="Drop blocked IPs - input" add chain=forward src-address-list=blocked_ips action=drop comment="Drop blocked IPs - forward" # Verify /ip firewall address-list print where list=blocked_ips """ return rules def generate_cidr(results): """Generate list of unique CIDR networks""" networks = list(set([r['network'] for r in results if r['network'] != 'Unknown'])) timestamp = datetime.now().strftime('%Y-%m-%d') output = f"""# CIDR Networks - Generated {timestamp} # Total unique networks: {len(networks)} # # One network per line """ output += "\n".join(sorted(networks)) return output def generate_csv(results): """Generate CSV export""" csv = "IP,ASN,Owner,Country,Network\n" for item in results: # Escape CSV fields ip = item['ip'] asn = item['asn'].replace('"', '""') owner = item['owner'].replace('"', '""') country = item['country'] network = item['network'] csv += f'"{ip}","{asn}","{owner}","{country}","{network}"\n' return csv def get_file_hash(filepath): """Generate MD5 hash for cache busting""" with open(filepath, 'rb') as f: return hashlib.md5(f.read()).hexdigest()[:8] @app.context_processor def inject_static_hash(): """Inject static file hash into templates""" def static_hash(filename): filepath = os.path.join(app.static_folder, filename) file_hash = get_file_hash(filepath) return f"/static/{filename}?v={file_hash}" return dict(static_hash=static_hash) @app.after_request def add_header(response): if request.path.startswith('/static/'): # Clear default cache control response.cache_control.no_cache = None response.cache_control.no_store = None # Set static file cache response.cache_control.max_age = 31536000 response.cache_control.public = True # Remove Content-Disposition header response.headers.pop('Content-Disposition', None) else: # Dynamic content: no cache response.cache_control.no_cache = True response.cache_control.no_store = True return response @app.context_processor def inject_config(): """Inject configuration variables into templates""" from flask import request def get_base_url(): # Construct base URL from request scheme = request.headers.get('X-Forwarded-Proto', request.scheme) host = request.headers.get('X-Forwarded-Host', request.host) return f"{scheme}://{host}" return dict( base_url=get_base_url, request=request ) # ============================================================================ # WEB ROUTES # ============================================================================ @app.route('/') def index(): """Main application interface""" return render_template('index.html') @app.route('/favicon.ico') def favicon(): """Handle favicon requests - return 204 No Content instead of 404""" return '', 204 @app.route('/api') def api_docs(): """API documentation page""" return render_template('api.html') # ============================================================================ # API ENDPOINTS # ============================================================================ @app.route('/api/analyze', methods=['POST']) def api_analyze(): """ Analyze IP addresses. POST /api/analyze { "ips": "1.1.1.1, 8.8.8.8, 9.9.9.9" } Returns: { "results": [...], "stats": {...} } """ data = request.get_json() if not data: return jsonify({'error': 'Invalid JSON'}), 400 ip_text = data.get('ips', '') # Parse IPs ips = parse_ip_list(ip_text) if not ips: return jsonify({'error': 'No valid IPs found'}), 400 # Bulk lookup via Team Cymru print(f"Analyzing {len(ips)} IPs via Team Cymru...") cymru_data = cymru_lookup(ips) # Analyze each IP results = [] for ip in ips: info = analyze_ip(ip, cymru_data) results.append(info) # Generate statistics stats = { 'total': len(results), 'countries': dict(Counter([r['country'] for r in results])), 'asns': dict(Counter([r['asn'] for r in results])), 'owners': dict(Counter([r['owner'] for r in results])) } print(f"Analysis complete: {len(results)} IPs, {len(stats['countries'])} countries") return jsonify({ 'results': results, 'stats': stats }) @app.route('/api/filter', methods=['POST']) def api_filter(): """ Filter results. POST /api/filter { "results": [...], "filters": { "countries": ["CN", "RU"], "asns": ["AS4134"], "owners": ["CHINANET"] } } Returns: { "filtered": [...], "count": 15 } """ data = request.get_json() if not data: return jsonify({'error': 'Invalid JSON'}), 400 results = data.get('results', []) filters = data.get('filters', {}) filtered = apply_filters(results, filters) return jsonify({ 'filtered': filtered, 'count': len(filtered) }) @app.route('/api/export/ipset', methods=['POST']) def api_export_ipset(): """ Export IPSet rules. POST /api/export/ipset { "ips": ["1.1.1.1", "8.8.8.8"], "timeout": 86400 } """ data = request.get_json() if not data: return Response('Invalid JSON', status=400) ips = data.get('ips', []) timeout = data.get('timeout', 86400) rules = generate_ipset(ips, timeout) return Response(rules, mimetype='text/plain') @app.route('/api/export/iptables', methods=['POST']) def api_export_iptables(): """Export iptables rules""" data = request.get_json() if not data: return Response('Invalid JSON', status=400) ips = data.get('ips', []) rules = generate_iptables(ips) return Response(rules, mimetype='text/plain') @app.route('/api/export/nginx', methods=['POST']) def api_export_nginx(): """Export Nginx deny rules""" data = request.get_json() if not data: return Response('Invalid JSON', status=400) ips = data.get('ips', []) rules = generate_nginx(ips) return Response(rules, mimetype='text/plain') @app.route('/api/export/apache', methods=['POST']) def api_export_apache(): """Export Apache deny rules""" data = request.get_json() if not data: return Response('Invalid JSON', status=400) ips = data.get('ips', []) rules = generate_apache(ips) return Response(rules, mimetype='text/plain') @app.route('/api/export/firewalld', methods=['POST']) def api_export_firewalld(): """Export Firewalld rules""" data = request.get_json() if not data: return Response('Invalid JSON', status=400) ips = data.get('ips', []) rules = generate_firewalld(ips) return Response(rules, mimetype='text/plain') @app.route('/api/export/mikrotik', methods=['POST']) def api_export_mikrotik(): """Export MikroTik rules""" data = request.get_json() if not data: return Response('Invalid JSON', status=400) ips = data.get('ips', []) rules = generate_mikrotik(ips) return Response(rules, mimetype='text/plain') @app.route('/api/export/cidr', methods=['POST']) def api_export_cidr(): """Export CIDR list""" data = request.get_json() if not data: return Response('Invalid JSON', status=400) results = data.get('results', []) output = generate_cidr(results) return Response(output, mimetype='text/plain') @app.route('/api/export/csv', methods=['POST']) def api_export_csv(): """Export CSV""" data = request.get_json() if not data: return Response('Invalid JSON', status=400) results = data.get('results', []) csv = generate_csv(results) timestamp = datetime.now().strftime('%Y-%m-%d') return Response( csv, mimetype='text/csv', headers={'Content-Disposition': f'attachment; filename=ip-analysis-{timestamp}.csv'} ) # Legacy endpoint for backward compatibility @app.route('/analyze', methods=['POST']) def analyze(): """Legacy analyze endpoint - redirects to /api/analyze""" return api_analyze() # ============================================================================ # ERROR HANDLERS # ============================================================================ @app.errorhandler(404) def not_found(e): return jsonify({'error': 'Endpoint not found'}), 404 @app.errorhandler(500) def server_error(e): return jsonify({'error': 'Internal server error'}), 500 # ============================================================================ # MAIN # ============================================================================ if __name__ == '__main__': print("=" * 70) print("IP WHOIS Analyzer - Starting") print("=" * 70) print() print("Interface: http://localhost:8799") print("API Docs: http://localhost:8799/api") print() print("Press Ctrl+C to stop") print() app.run( debug=True, host='0.0.0.0', port=8799 )