diff --git a/.gitignore b/.gitignore index 9d4188f..daccbf2 100644 --- a/.gitignore +++ b/.gitignore @@ -4,4 +4,5 @@ env .env .vscode/ .idea/ -.DS_Store \ No newline at end of file +.DS_Store +*.zip diff --git a/Dockerfile b/Dockerfile index 89971d8..fb03150 100644 --- a/Dockerfile +++ b/Dockerfile @@ -3,7 +3,10 @@ WORKDIR /app ENV PYTHONDONTWRITEBYTECODE=1 ENV PYTHONUNBUFFERED=1 COPY requirements.txt . -RUN pip install --no-cache-dir -r requirements.txt +RUN apt-get update \ + && apt-get install -y --no-install-recommends whois \ + && rm -rf /var/lib/apt/lists/* \ + && pip install --no-cache-dir -r requirements.txt COPY . . EXPOSE 8799 -CMD ["python", "ip_analyzer.py"] \ No newline at end of file +CMD ["python", "run.py"] diff --git a/README.md b/README.md index e7aae9a..ea0cece 100644 --- a/README.md +++ b/README.md @@ -140,3 +140,14 @@ ip-whois-analyzer/ This project is provided as-is, without any warranty. Use at your own risk. + + +## Refactored structure + +- `run.py` - Docker/start entrypoint +- `ip_analyzer_app/__init__.py` - Flask app factory +- `ip_analyzer_app/routes/` - web and API routes +- `ip_analyzer_app/services/` - parsing, lookups, analysis, exports +- `ip_analyzer_app/utils/` - static asset helpers + +The analyzer now also extracts an additional `user` field from WHOIS data, e.g. `User: OVH`. diff --git a/ip_analyzer.py b/ip_analyzer.py deleted file mode 100644 index 56c40bb..0000000 --- a/ip_analyzer.py +++ /dev/null @@ -1,748 +0,0 @@ -#!/usr/bin/env python3 -""" -IP WHOIS Analyzer -Complete Flask application with RESTful API -""" - -from flask import Flask, render_template, request, jsonify, Response -import re -import ipaddress -import socket -import subprocess -from collections import defaultdict, Counter -from datetime import datetime -import json -import os -import hashlib - -app = Flask(__name__) - -# ============================================================================ -# UTILITY FUNCTIONS -# ============================================================================ - -def parse_ip_list(text): - """ - Parse IPs from text with various separators. - Supports: comma, semicolon, space, newline, tab - """ - # Replace common separators with newlines - text = re.sub(r'[,;|\t]+', '\n', text) - lines = text.strip().split('\n') - - ips = [] - for line in lines: - # Extract IPs using regex - found_ips = re.findall(r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b', line) - ips.extend(found_ips) - - # Validate IPs - valid_ips = [] - for ip in ips: - try: - ipaddress.IPv4Address(ip) - valid_ips.append(ip) - except: - pass - - return list(set(valid_ips)) # Remove duplicates - - -def whois_lookup(ip): - """Perform WHOIS lookup for single IP address""" - try: - result = subprocess.run( - ['whois', ip], - capture_output=True, - text=True, - timeout=5 - ) - return result.stdout - except subprocess.TimeoutExpired: - return "" - except FileNotFoundError: - print("WARNING: whois command not found. Install it: apt install whois") - return "" - except Exception as e: - print(f"WHOIS error for {ip}: {e}") - return "" - - -def parse_whois(whois_output): - """Extract relevant information from WHOIS output""" - info = { - 'org': 'Unknown', - 'country': 'Unknown', - 'netname': 'Unknown', - 'asn': 'Unknown', - 'cidr': 'Unknown' - } - - for line in whois_output.split('\n'): - line = line.strip() - - # Organization - if line.startswith('Organization:') or line.startswith('org-name:'): - info['org'] = line.split(':', 1)[1].strip() - - # Country - elif line.startswith('Country:') or line.startswith('country:'): - info['country'] = line.split(':', 1)[1].strip() - - # Network name - elif line.startswith('NetName:') or line.startswith('netname:'): - info['netname'] = line.split(':', 1)[1].strip() - - # ASN - elif line.startswith('OriginAS:') or 'origin:' in line.lower(): - asn = re.search(r'AS\d+', line) - if asn: - info['asn'] = asn.group() - - # CIDR - elif line.startswith('CIDR:') or line.startswith('inetnum:'): - info['cidr'] = line.split(':', 1)[1].strip() - - return info - - -def cymru_lookup(ips): - """ - Bulk ASN lookup using Team Cymru WHOIS service. - Much faster than individual WHOIS lookups. - """ - results = {} - - if not ips: - return results - - try: - # Build query - query = "begin\nverbose\n" + "\n".join(ips) + "\nend\n" - - # Connect to Team Cymru - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.settimeout(10) - sock.connect(('whois.cymru.com', 43)) - sock.sendall(query.encode()) - - # Read response - response = b"" - while True: - data = sock.recv(4096) - if not data: - break - response += data - sock.close() - - # Parse response - for line in response.decode('utf-8', errors='ignore').split('\n'): - if '|' in line and not line.startswith('AS'): - parts = [p.strip() for p in line.split('|')] - if len(parts) >= 5: - asn, ip, prefix, cc, owner = parts[0], parts[1], parts[2], parts[3], parts[4] - - # Format ASN - if asn.isdigit(): - asn = f"AS{asn}" - - results[ip] = { - 'asn': asn, - 'prefix': prefix, - 'country': cc, - 'owner': owner - } - - except socket.timeout: - print("WARNING: Team Cymru timeout. Using fallback WHOIS.") - except Exception as e: - print(f"Team Cymru lookup error: {e}") - - return results - - -def analyze_ip(ip, cymru_data=None): - """ - Analyze single IP address. - Uses Team Cymru data if available, falls back to WHOIS. - """ - info = { - 'ip': ip, - 'asn': 'Unknown', - 'owner': 'Unknown', - 'country': 'Unknown', - 'network': 'Unknown' - } - - # Try Team Cymru data first - if cymru_data and ip in cymru_data: - data = cymru_data[ip] - info['asn'] = data.get('asn', 'Unknown') - info['owner'] = data.get('owner', 'Unknown') - info['country'] = data.get('country', 'Unknown') - info['network'] = data.get('prefix', 'Unknown') - - # Fallback to WHOIS - else: - whois_output = whois_lookup(ip) - if whois_output: - parsed = parse_whois(whois_output) - info['asn'] = parsed['asn'] - info['owner'] = parsed['org'] if parsed['org'] != 'Unknown' else parsed['netname'] - info['country'] = parsed['country'] - info['network'] = parsed['cidr'] - - return info - - -def apply_filters(results, filters): - """ - Apply filters to results. - Filters: countries, asns, owners - """ - countries = set(filters.get('countries', [])) - asns = set(filters.get('asns', [])) - owners = set(filters.get('owners', [])) - - # No filters = return all - if not (countries or asns or owners): - return results - - filtered = [] - for item in results: - # AND logic: all specified filters must match - if (not countries or item['country'] in countries) and \ - (not asns or item['asn'] in asns) and \ - (not owners or item['owner'] in owners): - filtered.append(item) - - return filtered - - -# ============================================================================ -# EXPORT GENERATORS -# ============================================================================ - -def generate_ipset(ips, timeout=86400): - """Generate IPSet rules with timeout""" - timestamp = datetime.now().strftime('%Y-%m-%d') - - rules = f"""#!/bin/bash -# IPSet Rules - Generated {timestamp} -# Total IPs: {len(ips)} -# Timeout: {timeout} seconds ({timeout//3600} hours) - -# Create ipset -ipset create blocked_ips hash:ip timeout {timeout} maxelem 1000000 - -# Add IPs to set -""" - - for ip in ips: - rules += f"ipset add blocked_ips {ip}\n" - - rules += """ -# Apply iptables rules -iptables -I INPUT -m set --match-set blocked_ips src -j DROP -iptables -I FORWARD -m set --match-set blocked_ips src -j DROP - -echo "IPSet created and iptables rules applied" -echo "To remove: ipset destroy blocked_ips" -""" - - return rules - - -def generate_iptables(ips): - """Generate iptables DROP rules""" - timestamp = datetime.now().strftime('%Y-%m-%d') - - rules = f"""#!/bin/bash -# iptables Rules - Generated {timestamp} -# Total IPs: {len(ips)} - -# INPUT chain (incoming connections) -""" - - for ip in ips: - rules += f"iptables -A INPUT -s {ip} -j DROP\n" - - rules += "\n# FORWARD chain (routed traffic)\n" - - for ip in ips: - rules += f"iptables -A FORWARD -s {ip} -j DROP\n" - - rules += """ -# Save rules -iptables-save > /etc/iptables/rules.v4 - -echo "iptables rules applied and saved" -""" - - return rules - - -def generate_nginx(ips): - """Generate Nginx deny directives""" - timestamp = datetime.now().strftime('%Y-%m-%d') - - rules = f"""# Nginx Deny Rules - Generated {timestamp} -# Total IPs: {len(ips)} -# -# Usage: Include in http or server block -# include /etc/nginx/conf.d/blocked_ips.conf; - -""" - - for ip in ips: - rules += f"deny {ip};\n" - - rules += "\n# After adding rules, reload nginx:\n" - rules += "# nginx -t && nginx -s reload\n" - - return rules - - -def generate_apache(ips): - """Generate Apache deny directives""" - timestamp = datetime.now().strftime('%Y-%m-%d') - - rules = f"""# Apache Deny Rules - Generated {timestamp} -# Total IPs: {len(ips)} -# -# Usage: Add to .htaccess or VirtualHost configuration - - - Require all granted -""" - - for ip in ips: - rules += f" Require not ip {ip}\n" - - rules += """ - -# After adding rules, restart apache: -# systemctl restart apache2 -""" - - return rules - - -def generate_firewalld(ips): - """Generate Firewalld rich rules""" - timestamp = datetime.now().strftime('%Y-%m-%d') - - rules = f"""#!/bin/bash -# Firewalld Rules - Generated {timestamp} -# Total IPs: {len(ips)} - -""" - - for ip in ips: - rules += f"firewall-cmd --permanent --add-rich-rule=\"rule family='ipv4' source address='{ip}' reject\"\n" - - rules += """ -# Reload firewall -firewall-cmd --reload - -echo "Firewalld rules applied" -""" - - return rules - - -def generate_mikrotik(ips): - """Generate MikroTik RouterOS commands""" - timestamp = datetime.now().strftime('%Y-%m-%d') - - rules = f"""# MikroTik RouterOS Configuration - Generated {timestamp} -# Total IPs: {len(ips)} -# -# Usage: Copy and paste into RouterOS Terminal - -/ip firewall address-list -""" - - for ip in ips: - rules += f"add list=blocked_ips address={ip} comment=\"Auto-blocked {timestamp}\"\n" - - rules += """ -# Create firewall filter rules (if not exists) -/ip firewall filter -add chain=input src-address-list=blocked_ips action=drop comment="Drop blocked IPs - input" -add chain=forward src-address-list=blocked_ips action=drop comment="Drop blocked IPs - forward" - -# Verify -/ip firewall address-list print where list=blocked_ips -""" - - return rules - - -def generate_cidr(results): - """Generate list of unique CIDR networks""" - networks = list(set([r['network'] for r in results if r['network'] != 'Unknown'])) - timestamp = datetime.now().strftime('%Y-%m-%d') - - output = f"""# CIDR Networks - Generated {timestamp} -# Total unique networks: {len(networks)} -# -# One network per line - -""" - output += "\n".join(sorted(networks)) - - return output - - -def generate_csv(results): - """Generate CSV export""" - csv = "IP,ASN,Owner,Country,Network\n" - - for item in results: - # Escape CSV fields - ip = item['ip'] - asn = item['asn'].replace('"', '""') - owner = item['owner'].replace('"', '""') - country = item['country'] - network = item['network'] - - csv += f'"{ip}","{asn}","{owner}","{country}","{network}"\n' - - return csv - -def get_file_hash(filepath): - """Generate MD5 hash for cache busting""" - with open(filepath, 'rb') as f: - return hashlib.md5(f.read()).hexdigest()[:8] - -@app.context_processor -def inject_static_hash(): - """Inject static file hash into templates""" - def static_hash(filename): - filepath = os.path.join(app.static_folder, filename) - file_hash = get_file_hash(filepath) - return f"/static/{filename}?v={file_hash}" - return dict(static_hash=static_hash) - -@app.after_request -def add_header(response): - if request.path.startswith('/static/'): - # Clear default cache control - response.cache_control.no_cache = None - response.cache_control.no_store = None - - # Set static file cache - response.cache_control.max_age = 31536000 - response.cache_control.public = True - - # Remove Content-Disposition header - response.headers.pop('Content-Disposition', None) - else: - # Dynamic content: no cache - response.cache_control.no_cache = True - response.cache_control.no_store = True - - return response - -@app.context_processor -def inject_config(): - """Inject configuration variables into templates""" - from flask import request - - def get_base_url(): - # Construct base URL from request - scheme = request.headers.get('X-Forwarded-Proto', request.scheme) - host = request.headers.get('X-Forwarded-Host', request.host) - return f"{scheme}://{host}" - - return dict( - base_url=get_base_url, - request=request - ) - -# ============================================================================ -# WEB ROUTES -# ============================================================================ - -@app.route('/') -def index(): - """Main application interface""" - return render_template('index.html') - -@app.route('/favicon.ico') -def favicon(): - """Handle favicon requests - return 204 No Content instead of 404""" - return '', 204 - -@app.route('/api') -def api_docs(): - """API documentation page""" - return render_template('api.html') - - -# ============================================================================ -# API ENDPOINTS -# ============================================================================ - -@app.route('/api/analyze', methods=['POST']) -def api_analyze(): - """ - Analyze IP addresses. - - POST /api/analyze - { - "ips": "1.1.1.1, 8.8.8.8, 9.9.9.9" - } - - Returns: - { - "results": [...], - "stats": {...} - } - """ - data = request.get_json() - - if not data: - return jsonify({'error': 'Invalid JSON'}), 400 - - ip_text = data.get('ips', '') - - # Parse IPs - ips = parse_ip_list(ip_text) - - if not ips: - return jsonify({'error': 'No valid IPs found'}), 400 - - # Bulk lookup via Team Cymru - print(f"Analyzing {len(ips)} IPs via Team Cymru...") - cymru_data = cymru_lookup(ips) - - # Analyze each IP - results = [] - for ip in ips: - info = analyze_ip(ip, cymru_data) - results.append(info) - - # Generate statistics - stats = { - 'total': len(results), - 'countries': dict(Counter([r['country'] for r in results])), - 'asns': dict(Counter([r['asn'] for r in results])), - 'owners': dict(Counter([r['owner'] for r in results])) - } - - print(f"Analysis complete: {len(results)} IPs, {len(stats['countries'])} countries") - - return jsonify({ - 'results': results, - 'stats': stats - }) - - -@app.route('/api/filter', methods=['POST']) -def api_filter(): - """ - Filter results. - - POST /api/filter - { - "results": [...], - "filters": { - "countries": ["CN", "RU"], - "asns": ["AS4134"], - "owners": ["CHINANET"] - } - } - - Returns: - { - "filtered": [...], - "count": 15 - } - """ - data = request.get_json() - - if not data: - return jsonify({'error': 'Invalid JSON'}), 400 - - results = data.get('results', []) - filters = data.get('filters', {}) - - filtered = apply_filters(results, filters) - - return jsonify({ - 'filtered': filtered, - 'count': len(filtered) - }) - - -@app.route('/api/export/ipset', methods=['POST']) -def api_export_ipset(): - """ - Export IPSet rules. - - POST /api/export/ipset - { - "ips": ["1.1.1.1", "8.8.8.8"], - "timeout": 86400 - } - """ - data = request.get_json() - - if not data: - return Response('Invalid JSON', status=400) - - ips = data.get('ips', []) - timeout = data.get('timeout', 86400) - - rules = generate_ipset(ips, timeout) - - return Response(rules, mimetype='text/plain') - - -@app.route('/api/export/iptables', methods=['POST']) -def api_export_iptables(): - """Export iptables rules""" - data = request.get_json() - - if not data: - return Response('Invalid JSON', status=400) - - ips = data.get('ips', []) - rules = generate_iptables(ips) - - return Response(rules, mimetype='text/plain') - - -@app.route('/api/export/nginx', methods=['POST']) -def api_export_nginx(): - """Export Nginx deny rules""" - data = request.get_json() - - if not data: - return Response('Invalid JSON', status=400) - - ips = data.get('ips', []) - rules = generate_nginx(ips) - - return Response(rules, mimetype='text/plain') - - -@app.route('/api/export/apache', methods=['POST']) -def api_export_apache(): - """Export Apache deny rules""" - data = request.get_json() - - if not data: - return Response('Invalid JSON', status=400) - - ips = data.get('ips', []) - rules = generate_apache(ips) - - return Response(rules, mimetype='text/plain') - - -@app.route('/api/export/firewalld', methods=['POST']) -def api_export_firewalld(): - """Export Firewalld rules""" - data = request.get_json() - - if not data: - return Response('Invalid JSON', status=400) - - ips = data.get('ips', []) - rules = generate_firewalld(ips) - - return Response(rules, mimetype='text/plain') - - -@app.route('/api/export/mikrotik', methods=['POST']) -def api_export_mikrotik(): - """Export MikroTik rules""" - data = request.get_json() - - if not data: - return Response('Invalid JSON', status=400) - - ips = data.get('ips', []) - rules = generate_mikrotik(ips) - - return Response(rules, mimetype='text/plain') - - -@app.route('/api/export/cidr', methods=['POST']) -def api_export_cidr(): - """Export CIDR list""" - data = request.get_json() - - if not data: - return Response('Invalid JSON', status=400) - - results = data.get('results', []) - output = generate_cidr(results) - - return Response(output, mimetype='text/plain') - - -@app.route('/api/export/csv', methods=['POST']) -def api_export_csv(): - """Export CSV""" - data = request.get_json() - - if not data: - return Response('Invalid JSON', status=400) - - results = data.get('results', []) - csv = generate_csv(results) - - timestamp = datetime.now().strftime('%Y-%m-%d') - - return Response( - csv, - mimetype='text/csv', - headers={'Content-Disposition': f'attachment; filename=ip-analysis-{timestamp}.csv'} - ) - - -# Legacy endpoint for backward compatibility -@app.route('/analyze', methods=['POST']) -def analyze(): - """Legacy analyze endpoint - redirects to /api/analyze""" - return api_analyze() - - -# ============================================================================ -# ERROR HANDLERS -# ============================================================================ - -@app.errorhandler(404) -def not_found(e): - return jsonify({'error': 'Endpoint not found'}), 404 - - -@app.errorhandler(500) -def server_error(e): - return jsonify({'error': 'Internal server error'}), 500 - - -# ============================================================================ -# MAIN -# ============================================================================ - -if __name__ == '__main__': - print("=" * 70) - print("IP WHOIS Analyzer - Starting") - print("=" * 70) - print() - print("Interface: http://localhost:8799") - print("API Docs: http://localhost:8799/api") - print() - print("Press Ctrl+C to stop") - print() - - app.run( - debug=True, - host='0.0.0.0', - port=8799 - ) diff --git a/ip_analyzer_app/__init__.py b/ip_analyzer_app/__init__.py new file mode 100644 index 0000000..067651a --- /dev/null +++ b/ip_analyzer_app/__init__.py @@ -0,0 +1,24 @@ +from flask import Flask, jsonify + +from .routes.api import api_bp +from .routes.web import web_bp +from .utils.assets import register_asset_helpers + + +def create_app(): + app = Flask(__name__, template_folder='../templates', static_folder='../static') + + register_asset_helpers(app) + + app.register_blueprint(web_bp) + app.register_blueprint(api_bp) + + @app.errorhandler(404) + def not_found(_e): + return jsonify({'error': 'Endpoint not found'}), 404 + + @app.errorhandler(500) + def server_error(_e): + return jsonify({'error': 'Internal server error'}), 500 + + return app diff --git a/ip_analyzer_app/routes/api.py b/ip_analyzer_app/routes/api.py new file mode 100644 index 0000000..92f815d --- /dev/null +++ b/ip_analyzer_app/routes/api.py @@ -0,0 +1,120 @@ +from datetime import datetime + +from flask import Blueprint, Response, jsonify, request + +from ..services.analysis import analyze_ips, apply_filters +from ..services.exports import ( + generate_apache, + generate_cidr, + generate_csv, + generate_firewalld, + generate_ipset, + generate_iptables, + generate_mikrotik, + generate_nginx, +) +from ..services.parsing import parse_ip_list + +api_bp = Blueprint('api', __name__) + + +@api_bp.route('/api/analyze', methods=['POST']) +def api_analyze(): + data = request.get_json() + if not data: + return jsonify({'error': 'Invalid JSON'}), 400 + + ips = parse_ip_list(data.get('ips', '')) + if not ips: + return jsonify({'error': 'No valid IPs found'}), 400 + + results, stats = analyze_ips(ips) + return jsonify({'results': results, 'stats': stats}) + + +@api_bp.route('/api/filter', methods=['POST']) +def api_filter(): + data = request.get_json() + if not data: + return jsonify({'error': 'Invalid JSON'}), 400 + + results = data.get('results', []) + filters = data.get('filters', {}) + filtered = apply_filters(results, filters) + return jsonify({'filtered': filtered, 'count': len(filtered)}) + + +@api_bp.route('/api/export/ipset', methods=['POST']) +def api_export_ipset(): + data = request.get_json() + if not data: + return Response('Invalid JSON', status=400) + return Response(generate_ipset(data.get('ips', []), data.get('timeout', 86400)), mimetype='text/plain') + + +@api_bp.route('/api/export/iptables', methods=['POST']) +def api_export_iptables(): + data = request.get_json() + if not data: + return Response('Invalid JSON', status=400) + return Response(generate_iptables(data.get('ips', [])), mimetype='text/plain') + + +@api_bp.route('/api/export/nginx', methods=['POST']) +def api_export_nginx(): + data = request.get_json() + if not data: + return Response('Invalid JSON', status=400) + return Response(generate_nginx(data.get('ips', [])), mimetype='text/plain') + + +@api_bp.route('/api/export/apache', methods=['POST']) +def api_export_apache(): + data = request.get_json() + if not data: + return Response('Invalid JSON', status=400) + return Response(generate_apache(data.get('ips', [])), mimetype='text/plain') + + +@api_bp.route('/api/export/firewalld', methods=['POST']) +def api_export_firewalld(): + data = request.get_json() + if not data: + return Response('Invalid JSON', status=400) + return Response(generate_firewalld(data.get('ips', [])), mimetype='text/plain') + + +@api_bp.route('/api/export/mikrotik', methods=['POST']) +def api_export_mikrotik(): + data = request.get_json() + if not data: + return Response('Invalid JSON', status=400) + return Response(generate_mikrotik(data.get('ips', [])), mimetype='text/plain') + + +@api_bp.route('/api/export/cidr', methods=['POST']) +def api_export_cidr(): + data = request.get_json() + if not data: + return Response('Invalid JSON', status=400) + return Response(generate_cidr(data.get('results', [])), mimetype='text/plain') + + +@api_bp.route('/api/export/csv', methods=['POST']) +def api_export_csv(): + data = request.get_json() + if not data: + return Response('Invalid JSON', status=400) + + csv_content = generate_csv(data.get('results', [])) + timestamp = datetime.now().strftime('%Y-%m-%d') + return Response( + csv_content, + mimetype='text/csv', + headers={'Content-Disposition': f'attachment; filename=ip-analysis-{timestamp}.csv'}, + ) + + +@api_bp.route('/analyze', methods=['POST']) +def analyze_legacy(): + return api_analyze() diff --git a/ip_analyzer_app/routes/web.py b/ip_analyzer_app/routes/web.py new file mode 100644 index 0000000..48d7f55 --- /dev/null +++ b/ip_analyzer_app/routes/web.py @@ -0,0 +1,18 @@ +from flask import Blueprint, render_template + +web_bp = Blueprint('web', __name__) + + +@web_bp.route('/') +def index(): + return render_template('index.html') + + +@web_bp.route('/favicon.ico') +def favicon(): + return '', 204 + + +@web_bp.route('/api') +def api_docs(): + return render_template('api.html') diff --git a/ip_analyzer_app/services/analysis.py b/ip_analyzer_app/services/analysis.py new file mode 100644 index 0000000..85e6b0b --- /dev/null +++ b/ip_analyzer_app/services/analysis.py @@ -0,0 +1,75 @@ +from collections import Counter + +from .lookups import cymru_lookup, whois_lookup +from .parsing import parse_whois + + +def analyze_ip(ip: str, cymru_data: dict[str, dict[str, str]] | None = None) -> dict[str, str]: + info = { + 'ip': ip, + 'asn': 'Unknown', + 'owner': 'Unknown', + 'user': 'Unknown', + 'country': 'Unknown', + 'network': 'Unknown', + } + + if cymru_data and ip in cymru_data: + data = cymru_data[ip] + info['asn'] = data.get('asn', 'Unknown') + info['owner'] = data.get('owner', 'Unknown') + info['country'] = data.get('country', 'Unknown') + info['network'] = data.get('prefix', 'Unknown') + + whois_output = whois_lookup(ip) + if whois_output: + parsed = parse_whois(whois_output) + if info['asn'] == 'Unknown' and parsed['asn'] != 'Unknown': + info['asn'] = parsed['asn'] + if parsed['country'] != 'Unknown': + info['country'] = parsed['country'] + if parsed['cidr'] != 'Unknown': + info['network'] = parsed['cidr'] + info['user'] = parsed['user'] + if info['owner'] == 'Unknown': + info['owner'] = parsed['org'] if parsed['org'] != 'Unknown' else parsed['netname'] + + return info + + +def analyze_ips(ips: list[str]) -> tuple[list[dict[str, str]], dict[str, dict[str, int]]]: + print(f'Analyzing {len(ips)} IPs via Team Cymru...') + cymru_data = cymru_lookup(ips) + + results = [analyze_ip(ip, cymru_data) for ip in ips] + stats = { + 'total': len(results), + 'countries': dict(Counter(r['country'] for r in results)), + 'asns': dict(Counter(r['asn'] for r in results)), + 'owners': dict(Counter(r['owner'] for r in results)), + 'users': dict(Counter(r['user'] for r in results)), + } + + print(f"Analysis complete: {len(results)} IPs, {len(stats['countries'])} countries") + return results, stats + + +def apply_filters(results: list[dict[str, str]], filters: dict) -> list[dict[str, str]]: + countries = set(filters.get('countries', [])) + asns = set(filters.get('asns', [])) + owners = set(filters.get('owners', [])) + users = set(filters.get('users', [])) + + if not (countries or asns or owners or users): + return results + + filtered: list[dict[str, str]] = [] + for item in results: + if ( + (not countries or item['country'] in countries) + and (not asns or item['asn'] in asns) + and (not owners or item['owner'] in owners) + and (not users or item.get('user', 'Unknown') in users) + ): + filtered.append(item) + return filtered diff --git a/ip_analyzer_app/services/exports.py b/ip_analyzer_app/services/exports.py new file mode 100644 index 0000000..80e5639 --- /dev/null +++ b/ip_analyzer_app/services/exports.py @@ -0,0 +1,153 @@ +from datetime import datetime + + +def _ts() -> str: + return datetime.now().strftime('%Y-%m-%d') + + +def generate_ipset(ips: list[str], timeout: int = 86400) -> str: + timestamp = _ts() + rules = f'''#!/bin/bash +# IPSet Rules - Generated {timestamp} +# Total IPs: {len(ips)} +# Timeout: {timeout} seconds ({timeout//3600} hours) + +# Create ipset +ipset create blocked_ips hash:ip timeout {timeout} maxelem 1000000 + +# Add IPs to set +''' + for ip in ips: + rules += f'ipset add blocked_ips {ip}\n' + rules += ''' +# Apply iptables rules +iptables -I INPUT -m set --match-set blocked_ips src -j DROP +iptables -I FORWARD -m set --match-set blocked_ips src -j DROP + +echo "IPSet created and iptables rules applied" +echo "To remove: ipset destroy blocked_ips" +''' + return rules + + +def generate_iptables(ips: list[str]) -> str: + timestamp = _ts() + rules = f'''#!/bin/bash +# iptables Rules - Generated {timestamp} +# Total IPs: {len(ips)} + +# INPUT chain (incoming connections) +''' + for ip in ips: + rules += f'iptables -A INPUT -s {ip} -j DROP\n' + rules += '\n# FORWARD chain (routed traffic)\n' + for ip in ips: + rules += f'iptables -A FORWARD -s {ip} -j DROP\n' + rules += ''' +# Save rules +iptables-save > /etc/iptables/rules.v4 + +echo "iptables rules applied and saved" +''' + return rules + + +def generate_nginx(ips: list[str]) -> str: + timestamp = _ts() + rules = f'''# Nginx Deny Rules - Generated {timestamp} +# Total IPs: {len(ips)} +# +# Usage: Include in http or server block +# include /etc/nginx/conf.d/blocked_ips.conf; + +''' + for ip in ips: + rules += f'deny {ip};\n' + rules += '\n# After adding rules, reload nginx:\n# nginx -t && nginx -s reload\n' + return rules + + +def generate_apache(ips: list[str]) -> str: + timestamp = _ts() + rules = f'''# Apache Deny Rules - Generated {timestamp} +# Total IPs: {len(ips)} +# +# Usage: Add to .htaccess or VirtualHost configuration + + + Require all granted +''' + for ip in ips: + rules += f' Require not ip {ip}\n' + rules += ''' + +# After adding rules, restart apache: +# systemctl restart apache2 +''' + return rules + + +def generate_firewalld(ips: list[str]) -> str: + timestamp = _ts() + rules = f'''#!/bin/bash +# Firewalld Rules - Generated {timestamp} +# Total IPs: {len(ips)} + +''' + for ip in ips: + rules += f'firewall-cmd --permanent --add-rich-rule="rule family=\'ipv4\' source address=\'{ip}\' reject"\n' + rules += ''' +# Reload firewall +firewall-cmd --reload + +echo "Firewalld rules applied" +''' + return rules + + +def generate_mikrotik(ips: list[str]) -> str: + timestamp = _ts() + rules = f'''# MikroTik RouterOS Configuration - Generated {timestamp} +# Total IPs: {len(ips)} +# +# Usage: Copy and paste into RouterOS Terminal + +/ip firewall address-list +''' + for ip in ips: + rules += f'add list=blocked_ips address={ip} comment="Auto-blocked {timestamp}"\n' + rules += ''' +# Create firewall filter rules (if not exists) +/ip firewall filter +add chain=input src-address-list=blocked_ips action=drop comment="Drop blocked IPs - input" +add chain=forward src-address-list=blocked_ips action=drop comment="Drop blocked IPs - forward" + +# Verify +/ip firewall address-list print where list=blocked_ips +''' + return rules + + +def generate_cidr(results: list[dict[str, str]]) -> str: + networks = sorted(set(r['network'] for r in results if r['network'] != 'Unknown')) + timestamp = _ts() + output = f'''# CIDR Networks - Generated {timestamp} +# Total unique networks: {len(networks)} +# +# One network per line + +''' + return output + '\n'.join(networks) + + +def generate_csv(results: list[dict[str, str]]) -> str: + csv = 'IP,ASN,Owner,User,Country,Network\n' + for item in results: + ip = item['ip'] + asn = item['asn'].replace('"', '""') + owner = item['owner'].replace('"', '""') + user = item.get('user', 'Unknown').replace('"', '""') + country = item['country'] + network = item['network'] + csv += f'"{ip}","{asn}","{owner}","{user}","{country}","{network}"\n' + return csv diff --git a/ip_analyzer_app/services/lookups.py b/ip_analyzer_app/services/lookups.py new file mode 100644 index 0000000..c519663 --- /dev/null +++ b/ip_analyzer_app/services/lookups.py @@ -0,0 +1,57 @@ +import socket +import subprocess + + +def whois_lookup(ip: str) -> str: + try: + result = subprocess.run(['whois', ip], capture_output=True, text=True, timeout=5) + return result.stdout + except subprocess.TimeoutExpired: + return '' + except FileNotFoundError: + print('WARNING: whois command not found. Install it in the container/image.') + return '' + except Exception as exc: + print(f'WHOIS error for {ip}: {exc}') + return '' + + +def cymru_lookup(ips: list[str]) -> dict[str, dict[str, str]]: + results: dict[str, dict[str, str]] = {} + if not ips: + return results + + try: + query = 'begin\nverbose\n' + '\n'.join(ips) + '\nend\n' + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.settimeout(10) + sock.connect(('whois.cymru.com', 43)) + sock.sendall(query.encode()) + + response = b'' + while True: + data = sock.recv(4096) + if not data: + break + response += data + sock.close() + + for line in response.decode('utf-8', errors='ignore').split('\n'): + if '|' in line and not line.startswith('AS'): + parts = [p.strip() for p in line.split('|')] + if len(parts) >= 5: + asn, ip, prefix, cc, owner = parts[0], parts[1], parts[2], parts[3], parts[4] + if asn.isdigit(): + asn = f'AS{asn}' + results[ip] = { + 'asn': asn, + 'prefix': prefix, + 'country': cc, + 'owner': owner, + } + except socket.timeout: + print('WARNING: Team Cymru timeout. Using fallback WHOIS.') + except Exception as exc: + print(f'Team Cymru lookup error: {exc}') + + return results diff --git a/ip_analyzer_app/services/parsing.py b/ip_analyzer_app/services/parsing.py new file mode 100644 index 0000000..ec9d046 --- /dev/null +++ b/ip_analyzer_app/services/parsing.py @@ -0,0 +1,69 @@ +import ipaddress +import re + + +def parse_ip_list(text: str) -> list[str]: + """Parse unique IPv4 addresses from free-form text.""" + normalized = re.sub(r'[,;|\t]+', '\n', text or '') + lines = normalized.strip().split('\n') + + ips: list[str] = [] + for line in lines: + ips.extend(re.findall(r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b', line)) + + valid_ips: list[str] = [] + for ip in ips: + try: + ipaddress.IPv4Address(ip) + valid_ips.append(ip) + except Exception: + pass + + return sorted(set(valid_ips), key=valid_ips.index) + + +def extract_field(line: str) -> str: + return line.split(':', 1)[1].strip() if ':' in line else '' + + +def parse_whois(whois_output: str) -> dict[str, str]: + """Extract relevant information from WHOIS output, including user/customer labels.""" + info = { + 'org': 'Unknown', + 'user': 'Unknown', + 'country': 'Unknown', + 'netname': 'Unknown', + 'asn': 'Unknown', + 'cidr': 'Unknown', + } + + for raw_line in whois_output.split('\n'): + line = raw_line.strip() + lowered = line.lower() + + if line.startswith('Organization:') or line.startswith('org-name:') or line.startswith('OrgName:'): + value = extract_field(line) + if value: + info['org'] = value + elif line.startswith('Country:') or line.startswith('country:'): + value = extract_field(line) + if value: + info['country'] = value + elif line.startswith('NetName:') or line.startswith('netname:'): + value = extract_field(line) + if value: + info['netname'] = value + elif line.startswith('CIDR:') or line.startswith('inetnum:') or line.startswith('route:'): + value = extract_field(line) + if value: + info['cidr'] = value + elif line.startswith('OriginAS:') or line.startswith('origin:') or line.startswith('originas:'): + asn = re.search(r'AS\d+', line, re.IGNORECASE) + if asn: + info['asn'] = asn.group().upper() + elif re.match(r'^(user|customer|owner|descr):', lowered): + value = extract_field(line) + if value and info['user'] == 'Unknown': + info['user'] = value + + return info diff --git a/ip_analyzer_app/utils/assets.py b/ip_analyzer_app/utils/assets.py new file mode 100644 index 0000000..b7bcef0 --- /dev/null +++ b/ip_analyzer_app/utils/assets.py @@ -0,0 +1,42 @@ +import hashlib +import os + +from flask import request + + + +def get_file_hash(filepath: str) -> str: + with open(filepath, 'rb') as file_handle: + return hashlib.md5(file_handle.read()).hexdigest()[:8] + + + +def register_asset_helpers(app): + @app.context_processor + def inject_static_hash(): + def static_hash(filename: str) -> str: + filepath = os.path.join(app.static_folder, filename) + file_hash = get_file_hash(filepath) + return f'/static/{filename}?v={file_hash}' + return dict(static_hash=static_hash) + + @app.after_request + def add_header(response): + if request.path.startswith('/static/'): + response.cache_control.no_cache = None + response.cache_control.no_store = None + response.cache_control.max_age = 31536000 + response.cache_control.public = True + response.headers.pop('Content-Disposition', None) + else: + response.cache_control.no_cache = True + response.cache_control.no_store = True + return response + + @app.context_processor + def inject_config(): + def get_base_url() -> str: + scheme = request.headers.get('X-Forwarded-Proto', request.scheme) + host = request.headers.get('X-Forwarded-Host', request.host) + return f'{scheme}://{host}' + return dict(base_url=get_base_url, request=request) diff --git a/run.py b/run.py new file mode 100644 index 0000000..6edf78e --- /dev/null +++ b/run.py @@ -0,0 +1,16 @@ +#!/usr/bin/env python3 +from ip_analyzer_app import create_app + +app = create_app() + +if __name__ == '__main__': + print('=' * 70) + print('IP WHOIS Analyzer - Starting') + print('=' * 70) + print() + print('Interface: http://localhost:8799') + print('API Docs: http://localhost:8799/api') + print() + print('Press Ctrl+C to stop') + print() + app.run(debug=True, host='0.0.0.0', port=8799) diff --git a/static/js/main.js b/static/js/main.js index 18a9a82..ae8560f 100644 --- a/static/js/main.js +++ b/static/js/main.js @@ -3,7 +3,8 @@ let filteredResults = []; let selectedFilters = { countries: new Set(), asns: new Set(), - owners: new Set() + owners: new Set(), + users: new Set() }; // Theme Management @@ -148,6 +149,19 @@ function displayFilters(stats) { ${shortOwner} (${count}) `; }).join(''); + + // Users (top 15) + const users = Object.entries(stats.users || {}) + .sort((a, b) => b[1] - a[1]) + .slice(0, 15); + + document.getElementById('userFilters').innerHTML = users.map(([user, count]) => { + const shortUser = user.length > 30 ? user.substring(0, 30) + '...' : user; + return ` + ${shortUser} (${count}) + `; + }).join(''); } // Toggle Filter @@ -167,7 +181,7 @@ function toggleFilter(type, value) { // Clear All Filters function clearFilters() { - selectedFilters = { countries: new Set(), asns: new Set(), owners: new Set() }; + selectedFilters = { countries: new Set(), asns: new Set(), owners: new Set(), users: new Set() }; document.querySelectorAll('.filter-chip').forEach(chip => chip.classList.remove('active')); applyFilters(); } @@ -176,7 +190,8 @@ function clearFilters() { function applyFilters() { const hasFilters = selectedFilters.countries.size > 0 || selectedFilters.asns.size > 0 || - selectedFilters.owners.size > 0; + selectedFilters.owners.size > 0 || + selectedFilters.users.size > 0; if (!hasFilters) { filteredResults = [...allResults]; @@ -184,7 +199,8 @@ function applyFilters() { filteredResults = allResults.filter(item => { return (selectedFilters.countries.size === 0 || selectedFilters.countries.has(item.country)) && (selectedFilters.asns.size === 0 || selectedFilters.asns.has(item.asn)) && - (selectedFilters.owners.size === 0 || selectedFilters.owners.has(item.owner)); + (selectedFilters.owners.size === 0 || selectedFilters.owners.has(item.owner)) && + (selectedFilters.users.size === 0 || selectedFilters.users.has(item.user || 'Unknown')); }); } @@ -194,7 +210,7 @@ function applyFilters() { // Update Filter Count function updateFilterCount() { - const total = selectedFilters.countries.size + selectedFilters.asns.size + selectedFilters.owners.size; + const total = selectedFilters.countries.size + selectedFilters.asns.size + selectedFilters.owners.size + selectedFilters.users.size; const countEl = document.getElementById('filterCount'); if (countEl) { countEl.textContent = total > 0 @@ -214,6 +230,7 @@ function displayTable(results) { ${item.ip} ${item.asn} ${escapeHtml(item.owner)} + ${escapeHtml(item.user || 'Unknown')} ${item.country} ${item.network} diff --git a/templates/index.html b/templates/index.html index f60cf19..f197eda 100644 --- a/templates/index.html +++ b/templates/index.html @@ -61,24 +61,30 @@ Example:
-
+
-
+
-
+
+
+ +
+