Files
geoip_block_generator/generate_ban.py
Mateusz Gruszczyński c0afc1554d first commit
2026-02-17 09:04:09 +01:00

649 lines
21 KiB
Python

#!/usr/bin/env python3
import argparse
import sys
import os
import json
import ipaddress
import urllib.request
from pathlib import Path
from typing import List, Dict, Set
from datetime import datetime, timedelta
import geoip2.database
from geoip2.errors import AddressNotFoundError
class Config:
"""Configuration manager"""
DEFAULT_CONFIG = {
"database_url": "https://github.com/P3TERX/GeoLite.mmdb/releases/download/2026.02.07/GeoLite2-Country.mmdb",
"database_file": "GeoLite2-Country.mmdb",
"last_update": None,
"update_interval_days": 7,
"geoip_db_dir": "geoip_db",
"cache_enabled": True,
"auto_update": True,
"ip_range_sources": {
"github": "https://raw.githubusercontent.com/herrbischoff/country-ip-blocks/master/ipv4/{country_lower}.cidr",
"alternative": "https://www.ipdeny.com/ipblocks/data/aggregated/{country_lower}-aggregated.zone"
}
}
def __init__(self, config_path: str = "geoip_db/config.json"):
self.config_path = Path(config_path)
self.config = self.load()
def load(self) -> Dict:
"""Load configuration from file"""
if self.config_path.exists():
try:
with open(self.config_path, 'r') as f:
config = json.load(f)
return {**self.DEFAULT_CONFIG, **config}
except Exception as e:
print(f"Warning: Could not load config: {e}", file=sys.stderr)
return self.DEFAULT_CONFIG.copy()
def save(self):
"""Save configuration to file"""
self.config_path.parent.mkdir(parents=True, exist_ok=True)
with open(self.config_path, 'w') as f:
json.dump(self.config, f, indent=2, default=str)
def get(self, key: str, default=None):
"""Get configuration value"""
return self.config.get(key, default)
def set(self, key: str, value):
"""Set configuration value"""
self.config[key] = value
self.save()
def needs_update(self) -> bool:
"""Check if database needs update"""
if not self.config.get('auto_update', True):
return False
last_update = self.config.get('last_update')
if not last_update:
return True
try:
last_date = datetime.fromisoformat(last_update)
interval = timedelta(days=self.config.get('update_interval_days', 7))
return datetime.now() - last_date > interval
except:
return True
class GeoIPDatabase:
"""GeoIP database handler using MMDB format"""
def __init__(self, config: Config):
self.config = config
self.db_dir = Path(config.get('geoip_db_dir', 'geoip_db'))
self.db_dir.mkdir(parents=True, exist_ok=True)
self.mmdb_file = self.db_dir / config.get('database_file', 'GeoLite2-Country.mmdb')
self.cache = {}
self.reader = None
def download_database(self, url: str = None):
"""Download MMDB database"""
url = url or self.config.get('database_url')
print(f"Downloading database from: {url}", file=sys.stderr)
print(f"Saving to: {self.mmdb_file}", file=sys.stderr)
try:
urllib.request.urlretrieve(url, self.mmdb_file)
# Update config
self.config.set('last_update', datetime.now().isoformat())
print("Database downloaded successfully", file=sys.stderr)
print(f"File size: {self.mmdb_file.stat().st_size / 1024 / 1024:.2f} MB", file=sys.stderr)
return True
except Exception as e:
print(f"Error downloading database: {e}", file=sys.stderr)
return False
def check_and_update(self):
"""Check if update is needed and download if necessary"""
if not self.mmdb_file.exists():
print("Database not found, downloading...", file=sys.stderr)
return self.download_database()
if self.config.needs_update():
print("Database is outdated, updating...", file=sys.stderr)
return self.download_database()
return True
def open_reader(self):
"""Open MMDB reader"""
if self.reader is None:
try:
self.reader = geoip2.database.Reader(str(self.mmdb_file))
print(f"Opened database: {self.mmdb_file}", file=sys.stderr)
except Exception as e:
print(f"Error opening database: {e}", file=sys.stderr)
print("Install geoip2: pip install geoip2", file=sys.stderr)
sys.exit(1)
def close_reader(self):
"""Close MMDB reader"""
if self.reader:
self.reader.close()
self.reader = None
def get_country_networks_from_source(self, country_code: str) -> List[ipaddress.IPv4Network]:
"""Download IP ranges from external source (fallback method)"""
sources = self.config.get('ip_range_sources', {})
networks = []
country_lower = country_code.lower()
# Try multiple sources
for source_name, url_template in sources.items():
try:
url = url_template.format(country_lower=country_lower, country_upper=country_code.upper())
print(f"Fetching from {source_name}: {url}", file=sys.stderr)
response = urllib.request.urlopen(url, timeout=30)
data = response.read().decode('utf-8')
for line in data.strip().split('\n'):
line = line.strip()
if line and not line.startswith('#'):
try:
networks.append(ipaddress.IPv4Network(line))
except ValueError:
continue
if networks:
print(f"Loaded {len(networks)} networks from {source_name}", file=sys.stderr)
break
except Exception as e:
print(f"Could not fetch from {source_name}: {e}", file=sys.stderr)
continue
return networks
def get_country_networks(self, country_codes: List[str]) -> Dict[str, List[ipaddress.IPv4Network]]:
"""Get IP networks for specified countries"""
# Check cache
cache_key = ','.join(sorted(country_codes))
if self.config.get('cache_enabled') and cache_key in self.cache:
print(f"Using cached data for {cache_key}", file=sys.stderr)
return self.cache[cache_key]
country_networks = {code: [] for code in country_codes}
print(f"Loading networks for: {', '.join(country_codes)}", file=sys.stderr)
# Use external IP range sources (more efficient than scanning MMDB)
for country_code in country_codes:
networks = self.get_country_networks_from_source(country_code)
country_networks[country_code] = networks
print(f" {country_code}: {len(networks)} networks", file=sys.stderr)
# Cache results
if self.config.get('cache_enabled'):
self.cache[cache_key] = country_networks
return country_networks
class ConfigGenerator:
@staticmethod
def _aggregate_networks(networks: list) -> list:
"""Aggregate IP networks to minimize list size"""
if not networks:
return []
try:
ip_objects = []
for network in networks:
try:
ip_objects.append(ipaddress.IPv4Network(network, strict=False))
except:
continue
if ip_objects:
# Remove duplicates and aggregate
collapsed = list(ipaddress.collapse_addresses(ip_objects))
return sorted([str(net) for net in collapsed])
return sorted(list(set(networks))) # At least remove duplicates
except:
return sorted(list(set(networks)))
@staticmethod
def generate_nginx_geo(country_networks: dict, aggregate: bool = True) -> str:
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
countries_list = ', '.join(sorted(country_networks.keys()))
all_networks = []
for networks in country_networks.values():
all_networks.extend(networks)
if aggregate:
all_networks = ConfigGenerator._aggregate_networks(all_networks)
else:
all_networks = sorted(list(set(all_networks))) # Remove duplicates anyway
config = f"""# Nginx Geo Module Configuration
# Generated: {timestamp}
# Countries: {countries_list}
# Total networks: {len(all_networks)}
geo $blocked_country {{
default 0;
"""
for network in all_networks:
config += f" {network} 1;\n"
config += "}\n"
return config
@staticmethod
def generate_nginx_map(country_networks: dict, aggregate: bool = True) -> str:
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
countries_list = ', '.join(sorted(country_networks.keys()))
# Process each country separately
processed_networks = {}
for country_code, networks in country_networks.items():
if aggregate:
processed_networks[country_code] = ConfigGenerator._aggregate_networks(networks)
else:
processed_networks[country_code] = sorted(list(set(networks)))
# Calculate total
total_networks = sum(len(nets) for nets in processed_networks.values())
config = f"""# Nginx Map Module Configuration
# Generated: {timestamp}
# Countries: {countries_list}
# Total networks: {total_networks}
map $remote_addr $blocked_country {{
default 0;
"""
for country_code in sorted(processed_networks.keys()):
networks = processed_networks[country_code]
config += f" # {country_code} - {len(networks)} networks\n"
for network in networks:
config += f" {network} 1;\n"
config += "\n"
config += "}\n"
return config
@staticmethod
def generate_nginx_deny(country_networks: dict, aggregate: bool = True) -> str:
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
countries_list = ', '.join(sorted(country_networks.keys()))
all_networks = []
for networks in country_networks.values():
all_networks.extend(networks)
if aggregate:
all_networks = ConfigGenerator._aggregate_networks(all_networks)
else:
all_networks = sorted(list(set(all_networks)))
config = f"""# Nginx Deny Directives Configuration
# Generated: {timestamp}
# Countries: {countries_list}
# Total networks: {len(all_networks)}
"""
for network in all_networks:
config += f"deny {network};\n"
config += "allow all;\n"
return config
@staticmethod
def generate_apache_24(country_networks: dict, aggregate: bool = True) -> str:
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
countries_list = ', '.join(sorted(country_networks.keys()))
all_networks = []
for networks in country_networks.values():
all_networks.extend(networks)
if aggregate:
all_networks = ConfigGenerator._aggregate_networks(all_networks)
else:
all_networks = sorted(list(set(all_networks)))
config = f"""# Apache 2.4 Configuration
# Generated: {timestamp}
# Countries: {countries_list}
# Total networks: {len(all_networks)}
<RequireAll>
Require all granted
"""
for network in all_networks:
config += f" Require not ip {network}\n"
config += "</RequireAll>\n"
return config
@staticmethod
def generate_apache_22(country_networks: dict, aggregate: bool = True) -> str:
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
countries_list = ', '.join(sorted(country_networks.keys()))
all_networks = []
for networks in country_networks.values():
all_networks.extend(networks)
if aggregate:
all_networks = ConfigGenerator._aggregate_networks(all_networks)
else:
all_networks = sorted(list(set(all_networks)))
config = f"""# Apache 2.2 Configuration
# Generated: {timestamp}
# Countries: {countries_list}
# Total networks: {len(all_networks)}
Order Allow,Deny
Allow from all
"""
for network in all_networks:
config += f"Deny from {network}\n"
return config
@staticmethod
def generate_haproxy_acl(country_networks: dict, aggregate: bool = True) -> str:
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
countries_list = ', '.join(sorted(country_networks.keys()))
all_networks = []
for networks in country_networks.values():
all_networks.extend(networks)
if aggregate:
all_networks = ConfigGenerator._aggregate_networks(all_networks)
else:
all_networks = sorted(list(set(all_networks)))
config = f"""# HAProxy ACL Configuration
# Generated: {timestamp}
# Countries: {countries_list}
# Total networks: {len(all_networks)}
frontend http-in
bind *:80
"""
for network in all_networks:
config += f" acl blocked_ip src {network}\n"
config += """
http-request deny if blocked_ip
default_backend servers
"""
return config
@staticmethod
def generate_haproxy_lua(country_networks: dict, aggregate: bool = True) -> str:
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
countries_list = ', '.join(sorted(country_networks.keys()))
all_networks = []
for networks in country_networks.values():
all_networks.extend(networks)
if aggregate:
all_networks = ConfigGenerator._aggregate_networks(all_networks)
else:
all_networks = sorted(list(set(all_networks)))
config = f"""-- HAProxy Lua Script
-- Generated: {timestamp}
-- Countries: {countries_list}
-- Total networks: {len(all_networks)}
local blocked_networks = {{
"""
for network in all_networks:
config += f' "{network}",\n'
config += """}}
function check_blocked(txn)
local src_ip = txn.f:src()
for _, network in ipairs(blocked_networks) do
if string.match(src_ip, network) then
return true
end
end
return false
end
core.register_fetches("is_blocked", check_blocked)
"""
return config
def main():
parser = argparse.ArgumentParser(
description='Advanced GeoIP ban configuration generator using MaxMind MMDB',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Generate nginx config for China
%(prog)s --country CN --app nginx --output china.conf
# Multiple countries (comma-separated)
%(prog)s --country CN,RU,KP --app haproxy --output blocked.conf
# Update database manually
%(prog)s --update-db
# Use custom database URL
%(prog)s --db-url https://example.com/GeoLite2-Country.mmdb --country US --app nginx
# Disable aggregation for all original networks
%(prog)s --country CN --app nginx --no-aggregate
# Set custom configuration options
%(prog)s --set-config update_interval_days=14
%(prog)s --set-config auto_update=false
# Output to console
%(prog)s --country RU,BY --app nginx
"""
)
parser.add_argument(
'--country',
help='Country code(s) - comma-separated (e.g., CN,RU,KP)'
)
parser.add_argument(
'--app',
choices=['nginx', 'haproxy', 'apache'],
help='Target application type'
)
parser.add_argument(
'--output',
help='Output file path (default: stdout)'
)
parser.add_argument(
'--config',
default='geoip_db/config.json',
help='Config file path (default: geoip_db/config.json)'
)
parser.add_argument(
'--db-url',
help='Custom database URL (MMDB format)'
)
parser.add_argument(
'--update-db',
action='store_true',
help='Force database update'
)
parser.add_argument(
'--no-aggregate',
action='store_true',
help='Disable network aggregation'
)
parser.add_argument(
'--no-auto-update',
action='store_true',
help='Disable automatic database updates'
)
parser.add_argument(
'--set-config',
metavar='KEY=VALUE',
help='Set configuration option (e.g., update_interval_days=14)'
)
parser.add_argument(
'--show-config',
action='store_true',
help='Show current configuration'
)
parser.add_argument(
'--list-countries',
action='store_true',
help='List available country codes'
)
args = parser.parse_args()
# Load configuration
config = Config(args.config)
# Handle list-countries
if args.list_countries:
common_countries = [
"CN - China", "RU - Russia", "US - United States", "KP - North Korea",
"IR - Iran", "BY - Belarus", "SY - Syria", "VE - Venezuela",
"CU - Cuba", "SD - Sudan", "IQ - Iraq", "LY - Libya",
"IN - India", "BR - Brazil", "DE - Germany", "FR - France",
"GB - United Kingdom", "JP - Japan", "KR - South Korea"
]
print("Common country codes:")
for country in common_countries:
print(f" {country}")
print("\nUse ISO 3166-1 alpha-2 codes (2 letters)")
return
# Handle set-config
if args.set_config:
try:
key, value = args.set_config.split('=', 1)
try:
value = json.loads(value)
except:
pass
config.set(key, value)
print(f"Configuration updated: {key} = {value}", file=sys.stderr)
return
except ValueError:
print("Error: --set-config format should be KEY=VALUE", file=sys.stderr)
sys.exit(1)
# Handle show-config
if args.show_config:
print(json.dumps(config.config, indent=2, default=str))
return
# Override config with command line args
if args.db_url:
config.set('database_url', args.db_url)
if args.no_auto_update:
config.set('auto_update', False)
# Initialize database
db = GeoIPDatabase(config)
# Handle database update
if args.update_db:
db.download_database()
print("Database updated successfully", file=sys.stderr)
return
# Check if we need to generate config
if not args.country or not args.app:
if not args.update_db and not args.set_config and not args.show_config and not args.list_countries:
parser.print_help()
sys.exit(1)
return
# Auto-update database if needed
if not args.no_auto_update:
db.check_and_update()
# Parse countries
countries = [c.strip().upper() for c in args.country.split(',')]
print(f"Processing countries: {', '.join(countries)}", file=sys.stderr)
# Get networks
country_networks = db.get_country_networks(countries)
# Check if we got any data
if not any(country_networks.values()):
print("Error: No networks found for specified countries", file=sys.stderr)
sys.exit(1)
# Generate configuration
generators = {
'nginx': ConfigGenerator.generate_nginx,
'haproxy': ConfigGenerator.generate_haproxy,
'apache': ConfigGenerator.generate_apache
}
aggregate = not args.no_aggregate
config_output = generators[args.app](country_networks, aggregate)
# Output
if args.output:
output_path = Path(args.output)
output_path.parent.mkdir(parents=True, exist_ok=True)
with open(output_path, 'w') as f:
f.write(config_output)
print(f"Configuration written to: {output_path}", file=sys.stderr)
else:
print(config_output)
# Close database
db.close_reader()
if __name__ == '__main__':
main()