649 lines
21 KiB
Python
649 lines
21 KiB
Python
#!/usr/bin/env python3
|
|
|
|
import argparse
|
|
import sys
|
|
import os
|
|
import json
|
|
import ipaddress
|
|
import urllib.request
|
|
from pathlib import Path
|
|
from typing import List, Dict, Set
|
|
from datetime import datetime, timedelta
|
|
import geoip2.database
|
|
from geoip2.errors import AddressNotFoundError
|
|
|
|
|
|
class Config:
|
|
"""Configuration manager"""
|
|
|
|
DEFAULT_CONFIG = {
|
|
"database_url": "https://github.com/P3TERX/GeoLite.mmdb/releases/download/2026.02.07/GeoLite2-Country.mmdb",
|
|
"database_file": "GeoLite2-Country.mmdb",
|
|
"last_update": None,
|
|
"update_interval_days": 7,
|
|
"geoip_db_dir": "geoip_db",
|
|
"cache_enabled": True,
|
|
"auto_update": True,
|
|
"ip_range_sources": {
|
|
"github": "https://raw.githubusercontent.com/herrbischoff/country-ip-blocks/master/ipv4/{country_lower}.cidr",
|
|
"alternative": "https://www.ipdeny.com/ipblocks/data/aggregated/{country_lower}-aggregated.zone"
|
|
}
|
|
}
|
|
|
|
def __init__(self, config_path: str = "geoip_db/config.json"):
|
|
self.config_path = Path(config_path)
|
|
self.config = self.load()
|
|
|
|
def load(self) -> Dict:
|
|
"""Load configuration from file"""
|
|
if self.config_path.exists():
|
|
try:
|
|
with open(self.config_path, 'r') as f:
|
|
config = json.load(f)
|
|
return {**self.DEFAULT_CONFIG, **config}
|
|
except Exception as e:
|
|
print(f"Warning: Could not load config: {e}", file=sys.stderr)
|
|
return self.DEFAULT_CONFIG.copy()
|
|
|
|
def save(self):
|
|
"""Save configuration to file"""
|
|
self.config_path.parent.mkdir(parents=True, exist_ok=True)
|
|
with open(self.config_path, 'w') as f:
|
|
json.dump(self.config, f, indent=2, default=str)
|
|
|
|
def get(self, key: str, default=None):
|
|
"""Get configuration value"""
|
|
return self.config.get(key, default)
|
|
|
|
def set(self, key: str, value):
|
|
"""Set configuration value"""
|
|
self.config[key] = value
|
|
self.save()
|
|
|
|
def needs_update(self) -> bool:
|
|
"""Check if database needs update"""
|
|
if not self.config.get('auto_update', True):
|
|
return False
|
|
|
|
last_update = self.config.get('last_update')
|
|
if not last_update:
|
|
return True
|
|
|
|
try:
|
|
last_date = datetime.fromisoformat(last_update)
|
|
interval = timedelta(days=self.config.get('update_interval_days', 7))
|
|
return datetime.now() - last_date > interval
|
|
except:
|
|
return True
|
|
|
|
|
|
class GeoIPDatabase:
|
|
"""GeoIP database handler using MMDB format"""
|
|
|
|
def __init__(self, config: Config):
|
|
self.config = config
|
|
self.db_dir = Path(config.get('geoip_db_dir', 'geoip_db'))
|
|
self.db_dir.mkdir(parents=True, exist_ok=True)
|
|
self.mmdb_file = self.db_dir / config.get('database_file', 'GeoLite2-Country.mmdb')
|
|
self.cache = {}
|
|
self.reader = None
|
|
|
|
def download_database(self, url: str = None):
|
|
"""Download MMDB database"""
|
|
url = url or self.config.get('database_url')
|
|
|
|
print(f"Downloading database from: {url}", file=sys.stderr)
|
|
print(f"Saving to: {self.mmdb_file}", file=sys.stderr)
|
|
|
|
try:
|
|
urllib.request.urlretrieve(url, self.mmdb_file)
|
|
|
|
# Update config
|
|
self.config.set('last_update', datetime.now().isoformat())
|
|
|
|
print("Database downloaded successfully", file=sys.stderr)
|
|
print(f"File size: {self.mmdb_file.stat().st_size / 1024 / 1024:.2f} MB", file=sys.stderr)
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f"Error downloading database: {e}", file=sys.stderr)
|
|
return False
|
|
|
|
def check_and_update(self):
|
|
"""Check if update is needed and download if necessary"""
|
|
if not self.mmdb_file.exists():
|
|
print("Database not found, downloading...", file=sys.stderr)
|
|
return self.download_database()
|
|
|
|
if self.config.needs_update():
|
|
print("Database is outdated, updating...", file=sys.stderr)
|
|
return self.download_database()
|
|
|
|
return True
|
|
|
|
def open_reader(self):
|
|
"""Open MMDB reader"""
|
|
if self.reader is None:
|
|
try:
|
|
self.reader = geoip2.database.Reader(str(self.mmdb_file))
|
|
print(f"Opened database: {self.mmdb_file}", file=sys.stderr)
|
|
except Exception as e:
|
|
print(f"Error opening database: {e}", file=sys.stderr)
|
|
print("Install geoip2: pip install geoip2", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
def close_reader(self):
|
|
"""Close MMDB reader"""
|
|
if self.reader:
|
|
self.reader.close()
|
|
self.reader = None
|
|
|
|
def get_country_networks_from_source(self, country_code: str) -> List[ipaddress.IPv4Network]:
|
|
"""Download IP ranges from external source (fallback method)"""
|
|
sources = self.config.get('ip_range_sources', {})
|
|
networks = []
|
|
|
|
country_lower = country_code.lower()
|
|
|
|
# Try multiple sources
|
|
for source_name, url_template in sources.items():
|
|
try:
|
|
url = url_template.format(country_lower=country_lower, country_upper=country_code.upper())
|
|
print(f"Fetching from {source_name}: {url}", file=sys.stderr)
|
|
|
|
response = urllib.request.urlopen(url, timeout=30)
|
|
data = response.read().decode('utf-8')
|
|
|
|
for line in data.strip().split('\n'):
|
|
line = line.strip()
|
|
if line and not line.startswith('#'):
|
|
try:
|
|
networks.append(ipaddress.IPv4Network(line))
|
|
except ValueError:
|
|
continue
|
|
|
|
if networks:
|
|
print(f"Loaded {len(networks)} networks from {source_name}", file=sys.stderr)
|
|
break
|
|
|
|
except Exception as e:
|
|
print(f"Could not fetch from {source_name}: {e}", file=sys.stderr)
|
|
continue
|
|
|
|
return networks
|
|
|
|
def get_country_networks(self, country_codes: List[str]) -> Dict[str, List[ipaddress.IPv4Network]]:
|
|
"""Get IP networks for specified countries"""
|
|
|
|
# Check cache
|
|
cache_key = ','.join(sorted(country_codes))
|
|
if self.config.get('cache_enabled') and cache_key in self.cache:
|
|
print(f"Using cached data for {cache_key}", file=sys.stderr)
|
|
return self.cache[cache_key]
|
|
|
|
country_networks = {code: [] for code in country_codes}
|
|
|
|
print(f"Loading networks for: {', '.join(country_codes)}", file=sys.stderr)
|
|
|
|
# Use external IP range sources (more efficient than scanning MMDB)
|
|
for country_code in country_codes:
|
|
networks = self.get_country_networks_from_source(country_code)
|
|
country_networks[country_code] = networks
|
|
print(f" {country_code}: {len(networks)} networks", file=sys.stderr)
|
|
|
|
# Cache results
|
|
if self.config.get('cache_enabled'):
|
|
self.cache[cache_key] = country_networks
|
|
|
|
return country_networks
|
|
|
|
|
|
class ConfigGenerator:
|
|
|
|
@staticmethod
|
|
def _aggregate_networks(networks: list) -> list:
|
|
"""Aggregate IP networks to minimize list size"""
|
|
if not networks:
|
|
return []
|
|
|
|
try:
|
|
ip_objects = []
|
|
for network in networks:
|
|
try:
|
|
ip_objects.append(ipaddress.IPv4Network(network, strict=False))
|
|
except:
|
|
continue
|
|
|
|
if ip_objects:
|
|
# Remove duplicates and aggregate
|
|
collapsed = list(ipaddress.collapse_addresses(ip_objects))
|
|
return sorted([str(net) for net in collapsed])
|
|
return sorted(list(set(networks))) # At least remove duplicates
|
|
except:
|
|
return sorted(list(set(networks)))
|
|
|
|
@staticmethod
|
|
def generate_nginx_geo(country_networks: dict, aggregate: bool = True) -> str:
|
|
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
|
countries_list = ', '.join(sorted(country_networks.keys()))
|
|
|
|
all_networks = []
|
|
for networks in country_networks.values():
|
|
all_networks.extend(networks)
|
|
|
|
if aggregate:
|
|
all_networks = ConfigGenerator._aggregate_networks(all_networks)
|
|
else:
|
|
all_networks = sorted(list(set(all_networks))) # Remove duplicates anyway
|
|
|
|
config = f"""# Nginx Geo Module Configuration
|
|
# Generated: {timestamp}
|
|
# Countries: {countries_list}
|
|
# Total networks: {len(all_networks)}
|
|
|
|
geo $blocked_country {{
|
|
default 0;
|
|
|
|
"""
|
|
|
|
for network in all_networks:
|
|
config += f" {network} 1;\n"
|
|
|
|
config += "}\n"
|
|
return config
|
|
|
|
@staticmethod
|
|
def generate_nginx_map(country_networks: dict, aggregate: bool = True) -> str:
|
|
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
|
countries_list = ', '.join(sorted(country_networks.keys()))
|
|
|
|
# Process each country separately
|
|
processed_networks = {}
|
|
for country_code, networks in country_networks.items():
|
|
if aggregate:
|
|
processed_networks[country_code] = ConfigGenerator._aggregate_networks(networks)
|
|
else:
|
|
processed_networks[country_code] = sorted(list(set(networks)))
|
|
|
|
# Calculate total
|
|
total_networks = sum(len(nets) for nets in processed_networks.values())
|
|
|
|
config = f"""# Nginx Map Module Configuration
|
|
# Generated: {timestamp}
|
|
# Countries: {countries_list}
|
|
# Total networks: {total_networks}
|
|
|
|
map $remote_addr $blocked_country {{
|
|
default 0;
|
|
|
|
"""
|
|
|
|
for country_code in sorted(processed_networks.keys()):
|
|
networks = processed_networks[country_code]
|
|
config += f" # {country_code} - {len(networks)} networks\n"
|
|
for network in networks:
|
|
config += f" {network} 1;\n"
|
|
config += "\n"
|
|
|
|
config += "}\n"
|
|
return config
|
|
|
|
@staticmethod
|
|
def generate_nginx_deny(country_networks: dict, aggregate: bool = True) -> str:
|
|
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
|
countries_list = ', '.join(sorted(country_networks.keys()))
|
|
|
|
all_networks = []
|
|
for networks in country_networks.values():
|
|
all_networks.extend(networks)
|
|
|
|
if aggregate:
|
|
all_networks = ConfigGenerator._aggregate_networks(all_networks)
|
|
else:
|
|
all_networks = sorted(list(set(all_networks)))
|
|
|
|
config = f"""# Nginx Deny Directives Configuration
|
|
# Generated: {timestamp}
|
|
# Countries: {countries_list}
|
|
# Total networks: {len(all_networks)}
|
|
|
|
"""
|
|
|
|
for network in all_networks:
|
|
config += f"deny {network};\n"
|
|
|
|
config += "allow all;\n"
|
|
return config
|
|
|
|
@staticmethod
|
|
def generate_apache_24(country_networks: dict, aggregate: bool = True) -> str:
|
|
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
|
countries_list = ', '.join(sorted(country_networks.keys()))
|
|
|
|
all_networks = []
|
|
for networks in country_networks.values():
|
|
all_networks.extend(networks)
|
|
|
|
if aggregate:
|
|
all_networks = ConfigGenerator._aggregate_networks(all_networks)
|
|
else:
|
|
all_networks = sorted(list(set(all_networks)))
|
|
|
|
config = f"""# Apache 2.4 Configuration
|
|
# Generated: {timestamp}
|
|
# Countries: {countries_list}
|
|
# Total networks: {len(all_networks)}
|
|
|
|
<RequireAll>
|
|
Require all granted
|
|
"""
|
|
|
|
for network in all_networks:
|
|
config += f" Require not ip {network}\n"
|
|
|
|
config += "</RequireAll>\n"
|
|
return config
|
|
|
|
@staticmethod
|
|
def generate_apache_22(country_networks: dict, aggregate: bool = True) -> str:
|
|
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
|
countries_list = ', '.join(sorted(country_networks.keys()))
|
|
|
|
all_networks = []
|
|
for networks in country_networks.values():
|
|
all_networks.extend(networks)
|
|
|
|
if aggregate:
|
|
all_networks = ConfigGenerator._aggregate_networks(all_networks)
|
|
else:
|
|
all_networks = sorted(list(set(all_networks)))
|
|
|
|
config = f"""# Apache 2.2 Configuration
|
|
# Generated: {timestamp}
|
|
# Countries: {countries_list}
|
|
# Total networks: {len(all_networks)}
|
|
|
|
Order Allow,Deny
|
|
Allow from all
|
|
"""
|
|
|
|
for network in all_networks:
|
|
config += f"Deny from {network}\n"
|
|
|
|
return config
|
|
|
|
@staticmethod
|
|
def generate_haproxy_acl(country_networks: dict, aggregate: bool = True) -> str:
|
|
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
|
countries_list = ', '.join(sorted(country_networks.keys()))
|
|
|
|
all_networks = []
|
|
for networks in country_networks.values():
|
|
all_networks.extend(networks)
|
|
|
|
if aggregate:
|
|
all_networks = ConfigGenerator._aggregate_networks(all_networks)
|
|
else:
|
|
all_networks = sorted(list(set(all_networks)))
|
|
|
|
config = f"""# HAProxy ACL Configuration
|
|
# Generated: {timestamp}
|
|
# Countries: {countries_list}
|
|
# Total networks: {len(all_networks)}
|
|
|
|
frontend http-in
|
|
bind *:80
|
|
|
|
"""
|
|
|
|
for network in all_networks:
|
|
config += f" acl blocked_ip src {network}\n"
|
|
|
|
config += """
|
|
http-request deny if blocked_ip
|
|
default_backend servers
|
|
"""
|
|
return config
|
|
|
|
@staticmethod
|
|
def generate_haproxy_lua(country_networks: dict, aggregate: bool = True) -> str:
|
|
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
|
countries_list = ', '.join(sorted(country_networks.keys()))
|
|
|
|
all_networks = []
|
|
for networks in country_networks.values():
|
|
all_networks.extend(networks)
|
|
|
|
if aggregate:
|
|
all_networks = ConfigGenerator._aggregate_networks(all_networks)
|
|
else:
|
|
all_networks = sorted(list(set(all_networks)))
|
|
|
|
config = f"""-- HAProxy Lua Script
|
|
-- Generated: {timestamp}
|
|
-- Countries: {countries_list}
|
|
-- Total networks: {len(all_networks)}
|
|
|
|
local blocked_networks = {{
|
|
"""
|
|
|
|
for network in all_networks:
|
|
config += f' "{network}",\n'
|
|
|
|
config += """}}
|
|
|
|
function check_blocked(txn)
|
|
local src_ip = txn.f:src()
|
|
for _, network in ipairs(blocked_networks) do
|
|
if string.match(src_ip, network) then
|
|
return true
|
|
end
|
|
end
|
|
return false
|
|
end
|
|
|
|
core.register_fetches("is_blocked", check_blocked)
|
|
"""
|
|
return config
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description='Advanced GeoIP ban configuration generator using MaxMind MMDB',
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog="""
|
|
Examples:
|
|
# Generate nginx config for China
|
|
%(prog)s --country CN --app nginx --output china.conf
|
|
|
|
# Multiple countries (comma-separated)
|
|
%(prog)s --country CN,RU,KP --app haproxy --output blocked.conf
|
|
|
|
# Update database manually
|
|
%(prog)s --update-db
|
|
|
|
# Use custom database URL
|
|
%(prog)s --db-url https://example.com/GeoLite2-Country.mmdb --country US --app nginx
|
|
|
|
# Disable aggregation for all original networks
|
|
%(prog)s --country CN --app nginx --no-aggregate
|
|
|
|
# Set custom configuration options
|
|
%(prog)s --set-config update_interval_days=14
|
|
%(prog)s --set-config auto_update=false
|
|
|
|
# Output to console
|
|
%(prog)s --country RU,BY --app nginx
|
|
"""
|
|
)
|
|
|
|
parser.add_argument(
|
|
'--country',
|
|
help='Country code(s) - comma-separated (e.g., CN,RU,KP)'
|
|
)
|
|
|
|
parser.add_argument(
|
|
'--app',
|
|
choices=['nginx', 'haproxy', 'apache'],
|
|
help='Target application type'
|
|
)
|
|
|
|
parser.add_argument(
|
|
'--output',
|
|
help='Output file path (default: stdout)'
|
|
)
|
|
|
|
parser.add_argument(
|
|
'--config',
|
|
default='geoip_db/config.json',
|
|
help='Config file path (default: geoip_db/config.json)'
|
|
)
|
|
|
|
parser.add_argument(
|
|
'--db-url',
|
|
help='Custom database URL (MMDB format)'
|
|
)
|
|
|
|
parser.add_argument(
|
|
'--update-db',
|
|
action='store_true',
|
|
help='Force database update'
|
|
)
|
|
|
|
parser.add_argument(
|
|
'--no-aggregate',
|
|
action='store_true',
|
|
help='Disable network aggregation'
|
|
)
|
|
|
|
parser.add_argument(
|
|
'--no-auto-update',
|
|
action='store_true',
|
|
help='Disable automatic database updates'
|
|
)
|
|
|
|
parser.add_argument(
|
|
'--set-config',
|
|
metavar='KEY=VALUE',
|
|
help='Set configuration option (e.g., update_interval_days=14)'
|
|
)
|
|
|
|
parser.add_argument(
|
|
'--show-config',
|
|
action='store_true',
|
|
help='Show current configuration'
|
|
)
|
|
|
|
parser.add_argument(
|
|
'--list-countries',
|
|
action='store_true',
|
|
help='List available country codes'
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Load configuration
|
|
config = Config(args.config)
|
|
|
|
# Handle list-countries
|
|
if args.list_countries:
|
|
common_countries = [
|
|
"CN - China", "RU - Russia", "US - United States", "KP - North Korea",
|
|
"IR - Iran", "BY - Belarus", "SY - Syria", "VE - Venezuela",
|
|
"CU - Cuba", "SD - Sudan", "IQ - Iraq", "LY - Libya",
|
|
"IN - India", "BR - Brazil", "DE - Germany", "FR - France",
|
|
"GB - United Kingdom", "JP - Japan", "KR - South Korea"
|
|
]
|
|
print("Common country codes:")
|
|
for country in common_countries:
|
|
print(f" {country}")
|
|
print("\nUse ISO 3166-1 alpha-2 codes (2 letters)")
|
|
return
|
|
|
|
# Handle set-config
|
|
if args.set_config:
|
|
try:
|
|
key, value = args.set_config.split('=', 1)
|
|
try:
|
|
value = json.loads(value)
|
|
except:
|
|
pass
|
|
config.set(key, value)
|
|
print(f"Configuration updated: {key} = {value}", file=sys.stderr)
|
|
return
|
|
except ValueError:
|
|
print("Error: --set-config format should be KEY=VALUE", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
# Handle show-config
|
|
if args.show_config:
|
|
print(json.dumps(config.config, indent=2, default=str))
|
|
return
|
|
|
|
# Override config with command line args
|
|
if args.db_url:
|
|
config.set('database_url', args.db_url)
|
|
|
|
if args.no_auto_update:
|
|
config.set('auto_update', False)
|
|
|
|
# Initialize database
|
|
db = GeoIPDatabase(config)
|
|
|
|
# Handle database update
|
|
if args.update_db:
|
|
db.download_database()
|
|
print("Database updated successfully", file=sys.stderr)
|
|
return
|
|
|
|
# Check if we need to generate config
|
|
if not args.country or not args.app:
|
|
if not args.update_db and not args.set_config and not args.show_config and not args.list_countries:
|
|
parser.print_help()
|
|
sys.exit(1)
|
|
return
|
|
|
|
# Auto-update database if needed
|
|
if not args.no_auto_update:
|
|
db.check_and_update()
|
|
|
|
# Parse countries
|
|
countries = [c.strip().upper() for c in args.country.split(',')]
|
|
|
|
print(f"Processing countries: {', '.join(countries)}", file=sys.stderr)
|
|
|
|
# Get networks
|
|
country_networks = db.get_country_networks(countries)
|
|
|
|
# Check if we got any data
|
|
if not any(country_networks.values()):
|
|
print("Error: No networks found for specified countries", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
# Generate configuration
|
|
generators = {
|
|
'nginx': ConfigGenerator.generate_nginx,
|
|
'haproxy': ConfigGenerator.generate_haproxy,
|
|
'apache': ConfigGenerator.generate_apache
|
|
}
|
|
|
|
aggregate = not args.no_aggregate
|
|
config_output = generators[args.app](country_networks, aggregate)
|
|
|
|
# Output
|
|
if args.output:
|
|
output_path = Path(args.output)
|
|
output_path.parent.mkdir(parents=True, exist_ok=True)
|
|
with open(output_path, 'w') as f:
|
|
f.write(config_output)
|
|
print(f"Configuration written to: {output_path}", file=sys.stderr)
|
|
else:
|
|
print(config_output)
|
|
|
|
# Close database
|
|
db.close_reader()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|