#!/usr/bin/env python3 import argparse import sys import os import json import ipaddress import urllib.request from pathlib import Path from typing import List, Dict, Set from datetime import datetime, timedelta import geoip2.database from geoip2.errors import AddressNotFoundError class Config: """Configuration manager""" DEFAULT_CONFIG = { "database_url": "https://github.com/P3TERX/GeoLite.mmdb/releases/download/2026.02.07/GeoLite2-Country.mmdb", "database_file": "GeoLite2-Country.mmdb", "last_update": None, "update_interval_days": 7, "geoip_db_dir": "geoip_db", "cache_enabled": True, "auto_update": True, "ip_range_sources": { "github": "https://raw.githubusercontent.com/herrbischoff/country-ip-blocks/master/ipv4/{country_lower}.cidr", "alternative": "https://www.ipdeny.com/ipblocks/data/aggregated/{country_lower}-aggregated.zone" } } def __init__(self, config_path: str = "geoip_db/config.json"): self.config_path = Path(config_path) self.config = self.load() def load(self) -> Dict: """Load configuration from file""" if self.config_path.exists(): try: with open(self.config_path, 'r') as f: config = json.load(f) return {**self.DEFAULT_CONFIG, **config} except Exception as e: print(f"Warning: Could not load config: {e}", file=sys.stderr) return self.DEFAULT_CONFIG.copy() def save(self): """Save configuration to file""" self.config_path.parent.mkdir(parents=True, exist_ok=True) with open(self.config_path, 'w') as f: json.dump(self.config, f, indent=2, default=str) def get(self, key: str, default=None): """Get configuration value""" return self.config.get(key, default) def set(self, key: str, value): """Set configuration value""" self.config[key] = value self.save() def needs_update(self) -> bool: """Check if database needs update""" if not self.config.get('auto_update', True): return False last_update = self.config.get('last_update') if not last_update: return True try: last_date = datetime.fromisoformat(last_update) interval = timedelta(days=self.config.get('update_interval_days', 7)) return datetime.now() - last_date > interval except: return True class GeoIPDatabase: """GeoIP database handler using MMDB format""" def __init__(self, config: Config): self.config = config self.db_dir = Path(config.get('geoip_db_dir', 'geoip_db')) self.db_dir.mkdir(parents=True, exist_ok=True) self.mmdb_file = self.db_dir / config.get('database_file', 'GeoLite2-Country.mmdb') self.cache = {} self.reader = None def download_database(self, url: str = None): """Download MMDB database""" url = url or self.config.get('database_url') print(f"Downloading database from: {url}", file=sys.stderr) print(f"Saving to: {self.mmdb_file}", file=sys.stderr) try: urllib.request.urlretrieve(url, self.mmdb_file) # Update config self.config.set('last_update', datetime.now().isoformat()) print("Database downloaded successfully", file=sys.stderr) print(f"File size: {self.mmdb_file.stat().st_size / 1024 / 1024:.2f} MB", file=sys.stderr) return True except Exception as e: print(f"Error downloading database: {e}", file=sys.stderr) return False def check_and_update(self): """Check if update is needed and download if necessary""" if not self.mmdb_file.exists(): print("Database not found, downloading...", file=sys.stderr) return self.download_database() if self.config.needs_update(): print("Database is outdated, updating...", file=sys.stderr) return self.download_database() return True def open_reader(self): """Open MMDB reader""" if self.reader is None: try: self.reader = geoip2.database.Reader(str(self.mmdb_file)) print(f"Opened database: {self.mmdb_file}", file=sys.stderr) except Exception as e: print(f"Error opening database: {e}", file=sys.stderr) print("Install geoip2: pip install geoip2", file=sys.stderr) sys.exit(1) def close_reader(self): """Close MMDB reader""" if self.reader: self.reader.close() self.reader = None def get_country_networks_from_source(self, country_code: str) -> List[ipaddress.IPv4Network]: """Download IP ranges from external source (fallback method)""" sources = self.config.get('ip_range_sources', {}) networks = [] country_lower = country_code.lower() # Try multiple sources for source_name, url_template in sources.items(): try: url = url_template.format(country_lower=country_lower, country_upper=country_code.upper()) print(f"Fetching from {source_name}: {url}", file=sys.stderr) response = urllib.request.urlopen(url, timeout=30) data = response.read().decode('utf-8') for line in data.strip().split('\n'): line = line.strip() if line and not line.startswith('#'): try: networks.append(ipaddress.IPv4Network(line)) except ValueError: continue if networks: print(f"Loaded {len(networks)} networks from {source_name}", file=sys.stderr) break except Exception as e: print(f"Could not fetch from {source_name}: {e}", file=sys.stderr) continue return networks def get_country_networks(self, country_codes: List[str]) -> Dict[str, List[ipaddress.IPv4Network]]: """Get IP networks for specified countries""" # Check cache cache_key = ','.join(sorted(country_codes)) if self.config.get('cache_enabled') and cache_key in self.cache: print(f"Using cached data for {cache_key}", file=sys.stderr) return self.cache[cache_key] country_networks = {code: [] for code in country_codes} print(f"Loading networks for: {', '.join(country_codes)}", file=sys.stderr) # Use external IP range sources (more efficient than scanning MMDB) for country_code in country_codes: networks = self.get_country_networks_from_source(country_code) country_networks[country_code] = networks print(f" {country_code}: {len(networks)} networks", file=sys.stderr) # Cache results if self.config.get('cache_enabled'): self.cache[cache_key] = country_networks return country_networks class ConfigGenerator: @staticmethod def _aggregate_networks(networks: list) -> list: """Aggregate IP networks to minimize list size""" if not networks: return [] try: ip_objects = [] for network in networks: try: ip_objects.append(ipaddress.IPv4Network(network, strict=False)) except: continue if ip_objects: # Remove duplicates and aggregate collapsed = list(ipaddress.collapse_addresses(ip_objects)) return sorted([str(net) for net in collapsed]) return sorted(list(set(networks))) # At least remove duplicates except: return sorted(list(set(networks))) @staticmethod def generate_nginx_geo(country_networks: dict, aggregate: bool = True) -> str: timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') countries_list = ', '.join(sorted(country_networks.keys())) all_networks = [] for networks in country_networks.values(): all_networks.extend(networks) if aggregate: all_networks = ConfigGenerator._aggregate_networks(all_networks) else: all_networks = sorted(list(set(all_networks))) # Remove duplicates anyway config = f"""# Nginx Geo Module Configuration # Generated: {timestamp} # Countries: {countries_list} # Total networks: {len(all_networks)} geo $blocked_country {{ default 0; """ for network in all_networks: config += f" {network} 1;\n" config += "}\n" return config @staticmethod def generate_nginx_map(country_networks: dict, aggregate: bool = True) -> str: timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') countries_list = ', '.join(sorted(country_networks.keys())) # Process each country separately processed_networks = {} for country_code, networks in country_networks.items(): if aggregate: processed_networks[country_code] = ConfigGenerator._aggregate_networks(networks) else: processed_networks[country_code] = sorted(list(set(networks))) # Calculate total total_networks = sum(len(nets) for nets in processed_networks.values()) config = f"""# Nginx Map Module Configuration # Generated: {timestamp} # Countries: {countries_list} # Total networks: {total_networks} map $remote_addr $blocked_country {{ default 0; """ for country_code in sorted(processed_networks.keys()): networks = processed_networks[country_code] config += f" # {country_code} - {len(networks)} networks\n" for network in networks: config += f" {network} 1;\n" config += "\n" config += "}\n" return config @staticmethod def generate_nginx_deny(country_networks: dict, aggregate: bool = True) -> str: timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') countries_list = ', '.join(sorted(country_networks.keys())) all_networks = [] for networks in country_networks.values(): all_networks.extend(networks) if aggregate: all_networks = ConfigGenerator._aggregate_networks(all_networks) else: all_networks = sorted(list(set(all_networks))) config = f"""# Nginx Deny Directives Configuration # Generated: {timestamp} # Countries: {countries_list} # Total networks: {len(all_networks)} """ for network in all_networks: config += f"deny {network};\n" config += "allow all;\n" return config @staticmethod def generate_apache_24(country_networks: dict, aggregate: bool = True) -> str: timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') countries_list = ', '.join(sorted(country_networks.keys())) all_networks = [] for networks in country_networks.values(): all_networks.extend(networks) if aggregate: all_networks = ConfigGenerator._aggregate_networks(all_networks) else: all_networks = sorted(list(set(all_networks))) config = f"""# Apache 2.4 Configuration # Generated: {timestamp} # Countries: {countries_list} # Total networks: {len(all_networks)} Require all granted """ for network in all_networks: config += f" Require not ip {network}\n" config += "\n" return config @staticmethod def generate_apache_22(country_networks: dict, aggregate: bool = True) -> str: timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') countries_list = ', '.join(sorted(country_networks.keys())) all_networks = [] for networks in country_networks.values(): all_networks.extend(networks) if aggregate: all_networks = ConfigGenerator._aggregate_networks(all_networks) else: all_networks = sorted(list(set(all_networks))) config = f"""# Apache 2.2 Configuration # Generated: {timestamp} # Countries: {countries_list} # Total networks: {len(all_networks)} Order Allow,Deny Allow from all """ for network in all_networks: config += f"Deny from {network}\n" return config @staticmethod def generate_haproxy_acl(country_networks: dict, aggregate: bool = True) -> str: timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') countries_list = ', '.join(sorted(country_networks.keys())) all_networks = [] for networks in country_networks.values(): all_networks.extend(networks) if aggregate: all_networks = ConfigGenerator._aggregate_networks(all_networks) else: all_networks = sorted(list(set(all_networks))) config = f"""# HAProxy ACL Configuration # Generated: {timestamp} # Countries: {countries_list} # Total networks: {len(all_networks)} frontend http-in bind *:80 """ for network in all_networks: config += f" acl blocked_ip src {network}\n" config += """ http-request deny if blocked_ip default_backend servers """ return config @staticmethod def generate_haproxy_lua(country_networks: dict, aggregate: bool = True) -> str: timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') countries_list = ', '.join(sorted(country_networks.keys())) all_networks = [] for networks in country_networks.values(): all_networks.extend(networks) if aggregate: all_networks = ConfigGenerator._aggregate_networks(all_networks) else: all_networks = sorted(list(set(all_networks))) config = f"""-- HAProxy Lua Script -- Generated: {timestamp} -- Countries: {countries_list} -- Total networks: {len(all_networks)} local blocked_networks = {{ """ for network in all_networks: config += f' "{network}",\n' config += """}} function check_blocked(txn) local src_ip = txn.f:src() for _, network in ipairs(blocked_networks) do if string.match(src_ip, network) then return true end end return false end core.register_fetches("is_blocked", check_blocked) """ return config def main(): parser = argparse.ArgumentParser( description='Advanced GeoIP ban configuration generator using MaxMind MMDB', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: # Generate nginx config for China %(prog)s --country CN --app nginx --output china.conf # Multiple countries (comma-separated) %(prog)s --country CN,RU,KP --app haproxy --output blocked.conf # Update database manually %(prog)s --update-db # Use custom database URL %(prog)s --db-url https://example.com/GeoLite2-Country.mmdb --country US --app nginx # Disable aggregation for all original networks %(prog)s --country CN --app nginx --no-aggregate # Set custom configuration options %(prog)s --set-config update_interval_days=14 %(prog)s --set-config auto_update=false # Output to console %(prog)s --country RU,BY --app nginx """ ) parser.add_argument( '--country', help='Country code(s) - comma-separated (e.g., CN,RU,KP)' ) parser.add_argument( '--app', choices=['nginx', 'haproxy', 'apache'], help='Target application type' ) parser.add_argument( '--output', help='Output file path (default: stdout)' ) parser.add_argument( '--config', default='geoip_db/config.json', help='Config file path (default: geoip_db/config.json)' ) parser.add_argument( '--db-url', help='Custom database URL (MMDB format)' ) parser.add_argument( '--update-db', action='store_true', help='Force database update' ) parser.add_argument( '--no-aggregate', action='store_true', help='Disable network aggregation' ) parser.add_argument( '--no-auto-update', action='store_true', help='Disable automatic database updates' ) parser.add_argument( '--set-config', metavar='KEY=VALUE', help='Set configuration option (e.g., update_interval_days=14)' ) parser.add_argument( '--show-config', action='store_true', help='Show current configuration' ) parser.add_argument( '--list-countries', action='store_true', help='List available country codes' ) args = parser.parse_args() # Load configuration config = Config(args.config) # Handle list-countries if args.list_countries: common_countries = [ "CN - China", "RU - Russia", "US - United States", "KP - North Korea", "IR - Iran", "BY - Belarus", "SY - Syria", "VE - Venezuela", "CU - Cuba", "SD - Sudan", "IQ - Iraq", "LY - Libya", "IN - India", "BR - Brazil", "DE - Germany", "FR - France", "GB - United Kingdom", "JP - Japan", "KR - South Korea" ] print("Common country codes:") for country in common_countries: print(f" {country}") print("\nUse ISO 3166-1 alpha-2 codes (2 letters)") return # Handle set-config if args.set_config: try: key, value = args.set_config.split('=', 1) try: value = json.loads(value) except: pass config.set(key, value) print(f"Configuration updated: {key} = {value}", file=sys.stderr) return except ValueError: print("Error: --set-config format should be KEY=VALUE", file=sys.stderr) sys.exit(1) # Handle show-config if args.show_config: print(json.dumps(config.config, indent=2, default=str)) return # Override config with command line args if args.db_url: config.set('database_url', args.db_url) if args.no_auto_update: config.set('auto_update', False) # Initialize database db = GeoIPDatabase(config) # Handle database update if args.update_db: db.download_database() print("Database updated successfully", file=sys.stderr) return # Check if we need to generate config if not args.country or not args.app: if not args.update_db and not args.set_config and not args.show_config and not args.list_countries: parser.print_help() sys.exit(1) return # Auto-update database if needed if not args.no_auto_update: db.check_and_update() # Parse countries countries = [c.strip().upper() for c in args.country.split(',')] print(f"Processing countries: {', '.join(countries)}", file=sys.stderr) # Get networks country_networks = db.get_country_networks(countries) # Check if we got any data if not any(country_networks.values()): print("Error: No networks found for specified countries", file=sys.stderr) sys.exit(1) # Generate configuration generators = { 'nginx': ConfigGenerator.generate_nginx, 'haproxy': ConfigGenerator.generate_haproxy, 'apache': ConfigGenerator.generate_apache } aggregate = not args.no_aggregate config_output = generators[args.app](country_networks, aggregate) # Output if args.output: output_path = Path(args.output) output_path.parent.mkdir(parents=True, exist_ok=True) with open(output_path, 'w') as f: f.write(config_output) print(f"Configuration written to: {output_path}", file=sys.stderr) else: print(config_output) # Close database db.close_reader() if __name__ == '__main__': main()