first commit
This commit is contained in:
648
generate_ban.py
Normal file
648
generate_ban.py
Normal file
@@ -0,0 +1,648 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import argparse
|
||||
import sys
|
||||
import os
|
||||
import json
|
||||
import ipaddress
|
||||
import urllib.request
|
||||
from pathlib import Path
|
||||
from typing import List, Dict, Set
|
||||
from datetime import datetime, timedelta
|
||||
import geoip2.database
|
||||
from geoip2.errors import AddressNotFoundError
|
||||
|
||||
|
||||
class Config:
|
||||
"""Configuration manager"""
|
||||
|
||||
DEFAULT_CONFIG = {
|
||||
"database_url": "https://github.com/P3TERX/GeoLite.mmdb/releases/download/2026.02.07/GeoLite2-Country.mmdb",
|
||||
"database_file": "GeoLite2-Country.mmdb",
|
||||
"last_update": None,
|
||||
"update_interval_days": 7,
|
||||
"geoip_db_dir": "geoip_db",
|
||||
"cache_enabled": True,
|
||||
"auto_update": True,
|
||||
"ip_range_sources": {
|
||||
"github": "https://raw.githubusercontent.com/herrbischoff/country-ip-blocks/master/ipv4/{country_lower}.cidr",
|
||||
"alternative": "https://www.ipdeny.com/ipblocks/data/aggregated/{country_lower}-aggregated.zone"
|
||||
}
|
||||
}
|
||||
|
||||
def __init__(self, config_path: str = "geoip_db/config.json"):
|
||||
self.config_path = Path(config_path)
|
||||
self.config = self.load()
|
||||
|
||||
def load(self) -> Dict:
|
||||
"""Load configuration from file"""
|
||||
if self.config_path.exists():
|
||||
try:
|
||||
with open(self.config_path, 'r') as f:
|
||||
config = json.load(f)
|
||||
return {**self.DEFAULT_CONFIG, **config}
|
||||
except Exception as e:
|
||||
print(f"Warning: Could not load config: {e}", file=sys.stderr)
|
||||
return self.DEFAULT_CONFIG.copy()
|
||||
|
||||
def save(self):
|
||||
"""Save configuration to file"""
|
||||
self.config_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
with open(self.config_path, 'w') as f:
|
||||
json.dump(self.config, f, indent=2, default=str)
|
||||
|
||||
def get(self, key: str, default=None):
|
||||
"""Get configuration value"""
|
||||
return self.config.get(key, default)
|
||||
|
||||
def set(self, key: str, value):
|
||||
"""Set configuration value"""
|
||||
self.config[key] = value
|
||||
self.save()
|
||||
|
||||
def needs_update(self) -> bool:
|
||||
"""Check if database needs update"""
|
||||
if not self.config.get('auto_update', True):
|
||||
return False
|
||||
|
||||
last_update = self.config.get('last_update')
|
||||
if not last_update:
|
||||
return True
|
||||
|
||||
try:
|
||||
last_date = datetime.fromisoformat(last_update)
|
||||
interval = timedelta(days=self.config.get('update_interval_days', 7))
|
||||
return datetime.now() - last_date > interval
|
||||
except:
|
||||
return True
|
||||
|
||||
|
||||
class GeoIPDatabase:
|
||||
"""GeoIP database handler using MMDB format"""
|
||||
|
||||
def __init__(self, config: Config):
|
||||
self.config = config
|
||||
self.db_dir = Path(config.get('geoip_db_dir', 'geoip_db'))
|
||||
self.db_dir.mkdir(parents=True, exist_ok=True)
|
||||
self.mmdb_file = self.db_dir / config.get('database_file', 'GeoLite2-Country.mmdb')
|
||||
self.cache = {}
|
||||
self.reader = None
|
||||
|
||||
def download_database(self, url: str = None):
|
||||
"""Download MMDB database"""
|
||||
url = url or self.config.get('database_url')
|
||||
|
||||
print(f"Downloading database from: {url}", file=sys.stderr)
|
||||
print(f"Saving to: {self.mmdb_file}", file=sys.stderr)
|
||||
|
||||
try:
|
||||
urllib.request.urlretrieve(url, self.mmdb_file)
|
||||
|
||||
# Update config
|
||||
self.config.set('last_update', datetime.now().isoformat())
|
||||
|
||||
print("Database downloaded successfully", file=sys.stderr)
|
||||
print(f"File size: {self.mmdb_file.stat().st_size / 1024 / 1024:.2f} MB", file=sys.stderr)
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error downloading database: {e}", file=sys.stderr)
|
||||
return False
|
||||
|
||||
def check_and_update(self):
|
||||
"""Check if update is needed and download if necessary"""
|
||||
if not self.mmdb_file.exists():
|
||||
print("Database not found, downloading...", file=sys.stderr)
|
||||
return self.download_database()
|
||||
|
||||
if self.config.needs_update():
|
||||
print("Database is outdated, updating...", file=sys.stderr)
|
||||
return self.download_database()
|
||||
|
||||
return True
|
||||
|
||||
def open_reader(self):
|
||||
"""Open MMDB reader"""
|
||||
if self.reader is None:
|
||||
try:
|
||||
self.reader = geoip2.database.Reader(str(self.mmdb_file))
|
||||
print(f"Opened database: {self.mmdb_file}", file=sys.stderr)
|
||||
except Exception as e:
|
||||
print(f"Error opening database: {e}", file=sys.stderr)
|
||||
print("Install geoip2: pip install geoip2", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
def close_reader(self):
|
||||
"""Close MMDB reader"""
|
||||
if self.reader:
|
||||
self.reader.close()
|
||||
self.reader = None
|
||||
|
||||
def get_country_networks_from_source(self, country_code: str) -> List[ipaddress.IPv4Network]:
|
||||
"""Download IP ranges from external source (fallback method)"""
|
||||
sources = self.config.get('ip_range_sources', {})
|
||||
networks = []
|
||||
|
||||
country_lower = country_code.lower()
|
||||
|
||||
# Try multiple sources
|
||||
for source_name, url_template in sources.items():
|
||||
try:
|
||||
url = url_template.format(country_lower=country_lower, country_upper=country_code.upper())
|
||||
print(f"Fetching from {source_name}: {url}", file=sys.stderr)
|
||||
|
||||
response = urllib.request.urlopen(url, timeout=30)
|
||||
data = response.read().decode('utf-8')
|
||||
|
||||
for line in data.strip().split('\n'):
|
||||
line = line.strip()
|
||||
if line and not line.startswith('#'):
|
||||
try:
|
||||
networks.append(ipaddress.IPv4Network(line))
|
||||
except ValueError:
|
||||
continue
|
||||
|
||||
if networks:
|
||||
print(f"Loaded {len(networks)} networks from {source_name}", file=sys.stderr)
|
||||
break
|
||||
|
||||
except Exception as e:
|
||||
print(f"Could not fetch from {source_name}: {e}", file=sys.stderr)
|
||||
continue
|
||||
|
||||
return networks
|
||||
|
||||
def get_country_networks(self, country_codes: List[str]) -> Dict[str, List[ipaddress.IPv4Network]]:
|
||||
"""Get IP networks for specified countries"""
|
||||
|
||||
# Check cache
|
||||
cache_key = ','.join(sorted(country_codes))
|
||||
if self.config.get('cache_enabled') and cache_key in self.cache:
|
||||
print(f"Using cached data for {cache_key}", file=sys.stderr)
|
||||
return self.cache[cache_key]
|
||||
|
||||
country_networks = {code: [] for code in country_codes}
|
||||
|
||||
print(f"Loading networks for: {', '.join(country_codes)}", file=sys.stderr)
|
||||
|
||||
# Use external IP range sources (more efficient than scanning MMDB)
|
||||
for country_code in country_codes:
|
||||
networks = self.get_country_networks_from_source(country_code)
|
||||
country_networks[country_code] = networks
|
||||
print(f" {country_code}: {len(networks)} networks", file=sys.stderr)
|
||||
|
||||
# Cache results
|
||||
if self.config.get('cache_enabled'):
|
||||
self.cache[cache_key] = country_networks
|
||||
|
||||
return country_networks
|
||||
|
||||
|
||||
class ConfigGenerator:
|
||||
|
||||
@staticmethod
|
||||
def _aggregate_networks(networks: list) -> list:
|
||||
"""Aggregate IP networks to minimize list size"""
|
||||
if not networks:
|
||||
return []
|
||||
|
||||
try:
|
||||
ip_objects = []
|
||||
for network in networks:
|
||||
try:
|
||||
ip_objects.append(ipaddress.IPv4Network(network, strict=False))
|
||||
except:
|
||||
continue
|
||||
|
||||
if ip_objects:
|
||||
# Remove duplicates and aggregate
|
||||
collapsed = list(ipaddress.collapse_addresses(ip_objects))
|
||||
return sorted([str(net) for net in collapsed])
|
||||
return sorted(list(set(networks))) # At least remove duplicates
|
||||
except:
|
||||
return sorted(list(set(networks)))
|
||||
|
||||
@staticmethod
|
||||
def generate_nginx_geo(country_networks: dict, aggregate: bool = True) -> str:
|
||||
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||||
countries_list = ', '.join(sorted(country_networks.keys()))
|
||||
|
||||
all_networks = []
|
||||
for networks in country_networks.values():
|
||||
all_networks.extend(networks)
|
||||
|
||||
if aggregate:
|
||||
all_networks = ConfigGenerator._aggregate_networks(all_networks)
|
||||
else:
|
||||
all_networks = sorted(list(set(all_networks))) # Remove duplicates anyway
|
||||
|
||||
config = f"""# Nginx Geo Module Configuration
|
||||
# Generated: {timestamp}
|
||||
# Countries: {countries_list}
|
||||
# Total networks: {len(all_networks)}
|
||||
|
||||
geo $blocked_country {{
|
||||
default 0;
|
||||
|
||||
"""
|
||||
|
||||
for network in all_networks:
|
||||
config += f" {network} 1;\n"
|
||||
|
||||
config += "}\n"
|
||||
return config
|
||||
|
||||
@staticmethod
|
||||
def generate_nginx_map(country_networks: dict, aggregate: bool = True) -> str:
|
||||
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||||
countries_list = ', '.join(sorted(country_networks.keys()))
|
||||
|
||||
# Process each country separately
|
||||
processed_networks = {}
|
||||
for country_code, networks in country_networks.items():
|
||||
if aggregate:
|
||||
processed_networks[country_code] = ConfigGenerator._aggregate_networks(networks)
|
||||
else:
|
||||
processed_networks[country_code] = sorted(list(set(networks)))
|
||||
|
||||
# Calculate total
|
||||
total_networks = sum(len(nets) for nets in processed_networks.values())
|
||||
|
||||
config = f"""# Nginx Map Module Configuration
|
||||
# Generated: {timestamp}
|
||||
# Countries: {countries_list}
|
||||
# Total networks: {total_networks}
|
||||
|
||||
map $remote_addr $blocked_country {{
|
||||
default 0;
|
||||
|
||||
"""
|
||||
|
||||
for country_code in sorted(processed_networks.keys()):
|
||||
networks = processed_networks[country_code]
|
||||
config += f" # {country_code} - {len(networks)} networks\n"
|
||||
for network in networks:
|
||||
config += f" {network} 1;\n"
|
||||
config += "\n"
|
||||
|
||||
config += "}\n"
|
||||
return config
|
||||
|
||||
@staticmethod
|
||||
def generate_nginx_deny(country_networks: dict, aggregate: bool = True) -> str:
|
||||
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||||
countries_list = ', '.join(sorted(country_networks.keys()))
|
||||
|
||||
all_networks = []
|
||||
for networks in country_networks.values():
|
||||
all_networks.extend(networks)
|
||||
|
||||
if aggregate:
|
||||
all_networks = ConfigGenerator._aggregate_networks(all_networks)
|
||||
else:
|
||||
all_networks = sorted(list(set(all_networks)))
|
||||
|
||||
config = f"""# Nginx Deny Directives Configuration
|
||||
# Generated: {timestamp}
|
||||
# Countries: {countries_list}
|
||||
# Total networks: {len(all_networks)}
|
||||
|
||||
"""
|
||||
|
||||
for network in all_networks:
|
||||
config += f"deny {network};\n"
|
||||
|
||||
config += "allow all;\n"
|
||||
return config
|
||||
|
||||
@staticmethod
|
||||
def generate_apache_24(country_networks: dict, aggregate: bool = True) -> str:
|
||||
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||||
countries_list = ', '.join(sorted(country_networks.keys()))
|
||||
|
||||
all_networks = []
|
||||
for networks in country_networks.values():
|
||||
all_networks.extend(networks)
|
||||
|
||||
if aggregate:
|
||||
all_networks = ConfigGenerator._aggregate_networks(all_networks)
|
||||
else:
|
||||
all_networks = sorted(list(set(all_networks)))
|
||||
|
||||
config = f"""# Apache 2.4 Configuration
|
||||
# Generated: {timestamp}
|
||||
# Countries: {countries_list}
|
||||
# Total networks: {len(all_networks)}
|
||||
|
||||
<RequireAll>
|
||||
Require all granted
|
||||
"""
|
||||
|
||||
for network in all_networks:
|
||||
config += f" Require not ip {network}\n"
|
||||
|
||||
config += "</RequireAll>\n"
|
||||
return config
|
||||
|
||||
@staticmethod
|
||||
def generate_apache_22(country_networks: dict, aggregate: bool = True) -> str:
|
||||
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||||
countries_list = ', '.join(sorted(country_networks.keys()))
|
||||
|
||||
all_networks = []
|
||||
for networks in country_networks.values():
|
||||
all_networks.extend(networks)
|
||||
|
||||
if aggregate:
|
||||
all_networks = ConfigGenerator._aggregate_networks(all_networks)
|
||||
else:
|
||||
all_networks = sorted(list(set(all_networks)))
|
||||
|
||||
config = f"""# Apache 2.2 Configuration
|
||||
# Generated: {timestamp}
|
||||
# Countries: {countries_list}
|
||||
# Total networks: {len(all_networks)}
|
||||
|
||||
Order Allow,Deny
|
||||
Allow from all
|
||||
"""
|
||||
|
||||
for network in all_networks:
|
||||
config += f"Deny from {network}\n"
|
||||
|
||||
return config
|
||||
|
||||
@staticmethod
|
||||
def generate_haproxy_acl(country_networks: dict, aggregate: bool = True) -> str:
|
||||
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||||
countries_list = ', '.join(sorted(country_networks.keys()))
|
||||
|
||||
all_networks = []
|
||||
for networks in country_networks.values():
|
||||
all_networks.extend(networks)
|
||||
|
||||
if aggregate:
|
||||
all_networks = ConfigGenerator._aggregate_networks(all_networks)
|
||||
else:
|
||||
all_networks = sorted(list(set(all_networks)))
|
||||
|
||||
config = f"""# HAProxy ACL Configuration
|
||||
# Generated: {timestamp}
|
||||
# Countries: {countries_list}
|
||||
# Total networks: {len(all_networks)}
|
||||
|
||||
frontend http-in
|
||||
bind *:80
|
||||
|
||||
"""
|
||||
|
||||
for network in all_networks:
|
||||
config += f" acl blocked_ip src {network}\n"
|
||||
|
||||
config += """
|
||||
http-request deny if blocked_ip
|
||||
default_backend servers
|
||||
"""
|
||||
return config
|
||||
|
||||
@staticmethod
|
||||
def generate_haproxy_lua(country_networks: dict, aggregate: bool = True) -> str:
|
||||
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||||
countries_list = ', '.join(sorted(country_networks.keys()))
|
||||
|
||||
all_networks = []
|
||||
for networks in country_networks.values():
|
||||
all_networks.extend(networks)
|
||||
|
||||
if aggregate:
|
||||
all_networks = ConfigGenerator._aggregate_networks(all_networks)
|
||||
else:
|
||||
all_networks = sorted(list(set(all_networks)))
|
||||
|
||||
config = f"""-- HAProxy Lua Script
|
||||
-- Generated: {timestamp}
|
||||
-- Countries: {countries_list}
|
||||
-- Total networks: {len(all_networks)}
|
||||
|
||||
local blocked_networks = {{
|
||||
"""
|
||||
|
||||
for network in all_networks:
|
||||
config += f' "{network}",\n'
|
||||
|
||||
config += """}}
|
||||
|
||||
function check_blocked(txn)
|
||||
local src_ip = txn.f:src()
|
||||
for _, network in ipairs(blocked_networks) do
|
||||
if string.match(src_ip, network) then
|
||||
return true
|
||||
end
|
||||
end
|
||||
return false
|
||||
end
|
||||
|
||||
core.register_fetches("is_blocked", check_blocked)
|
||||
"""
|
||||
return config
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(
|
||||
description='Advanced GeoIP ban configuration generator using MaxMind MMDB',
|
||||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||||
epilog="""
|
||||
Examples:
|
||||
# Generate nginx config for China
|
||||
%(prog)s --country CN --app nginx --output china.conf
|
||||
|
||||
# Multiple countries (comma-separated)
|
||||
%(prog)s --country CN,RU,KP --app haproxy --output blocked.conf
|
||||
|
||||
# Update database manually
|
||||
%(prog)s --update-db
|
||||
|
||||
# Use custom database URL
|
||||
%(prog)s --db-url https://example.com/GeoLite2-Country.mmdb --country US --app nginx
|
||||
|
||||
# Disable aggregation for all original networks
|
||||
%(prog)s --country CN --app nginx --no-aggregate
|
||||
|
||||
# Set custom configuration options
|
||||
%(prog)s --set-config update_interval_days=14
|
||||
%(prog)s --set-config auto_update=false
|
||||
|
||||
# Output to console
|
||||
%(prog)s --country RU,BY --app nginx
|
||||
"""
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'--country',
|
||||
help='Country code(s) - comma-separated (e.g., CN,RU,KP)'
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'--app',
|
||||
choices=['nginx', 'haproxy', 'apache'],
|
||||
help='Target application type'
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'--output',
|
||||
help='Output file path (default: stdout)'
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'--config',
|
||||
default='geoip_db/config.json',
|
||||
help='Config file path (default: geoip_db/config.json)'
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'--db-url',
|
||||
help='Custom database URL (MMDB format)'
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'--update-db',
|
||||
action='store_true',
|
||||
help='Force database update'
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'--no-aggregate',
|
||||
action='store_true',
|
||||
help='Disable network aggregation'
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'--no-auto-update',
|
||||
action='store_true',
|
||||
help='Disable automatic database updates'
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'--set-config',
|
||||
metavar='KEY=VALUE',
|
||||
help='Set configuration option (e.g., update_interval_days=14)'
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'--show-config',
|
||||
action='store_true',
|
||||
help='Show current configuration'
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'--list-countries',
|
||||
action='store_true',
|
||||
help='List available country codes'
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
# Load configuration
|
||||
config = Config(args.config)
|
||||
|
||||
# Handle list-countries
|
||||
if args.list_countries:
|
||||
common_countries = [
|
||||
"CN - China", "RU - Russia", "US - United States", "KP - North Korea",
|
||||
"IR - Iran", "BY - Belarus", "SY - Syria", "VE - Venezuela",
|
||||
"CU - Cuba", "SD - Sudan", "IQ - Iraq", "LY - Libya",
|
||||
"IN - India", "BR - Brazil", "DE - Germany", "FR - France",
|
||||
"GB - United Kingdom", "JP - Japan", "KR - South Korea"
|
||||
]
|
||||
print("Common country codes:")
|
||||
for country in common_countries:
|
||||
print(f" {country}")
|
||||
print("\nUse ISO 3166-1 alpha-2 codes (2 letters)")
|
||||
return
|
||||
|
||||
# Handle set-config
|
||||
if args.set_config:
|
||||
try:
|
||||
key, value = args.set_config.split('=', 1)
|
||||
try:
|
||||
value = json.loads(value)
|
||||
except:
|
||||
pass
|
||||
config.set(key, value)
|
||||
print(f"Configuration updated: {key} = {value}", file=sys.stderr)
|
||||
return
|
||||
except ValueError:
|
||||
print("Error: --set-config format should be KEY=VALUE", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
# Handle show-config
|
||||
if args.show_config:
|
||||
print(json.dumps(config.config, indent=2, default=str))
|
||||
return
|
||||
|
||||
# Override config with command line args
|
||||
if args.db_url:
|
||||
config.set('database_url', args.db_url)
|
||||
|
||||
if args.no_auto_update:
|
||||
config.set('auto_update', False)
|
||||
|
||||
# Initialize database
|
||||
db = GeoIPDatabase(config)
|
||||
|
||||
# Handle database update
|
||||
if args.update_db:
|
||||
db.download_database()
|
||||
print("Database updated successfully", file=sys.stderr)
|
||||
return
|
||||
|
||||
# Check if we need to generate config
|
||||
if not args.country or not args.app:
|
||||
if not args.update_db and not args.set_config and not args.show_config and not args.list_countries:
|
||||
parser.print_help()
|
||||
sys.exit(1)
|
||||
return
|
||||
|
||||
# Auto-update database if needed
|
||||
if not args.no_auto_update:
|
||||
db.check_and_update()
|
||||
|
||||
# Parse countries
|
||||
countries = [c.strip().upper() for c in args.country.split(',')]
|
||||
|
||||
print(f"Processing countries: {', '.join(countries)}", file=sys.stderr)
|
||||
|
||||
# Get networks
|
||||
country_networks = db.get_country_networks(countries)
|
||||
|
||||
# Check if we got any data
|
||||
if not any(country_networks.values()):
|
||||
print("Error: No networks found for specified countries", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
# Generate configuration
|
||||
generators = {
|
||||
'nginx': ConfigGenerator.generate_nginx,
|
||||
'haproxy': ConfigGenerator.generate_haproxy,
|
||||
'apache': ConfigGenerator.generate_apache
|
||||
}
|
||||
|
||||
aggregate = not args.no_aggregate
|
||||
config_output = generators[args.app](country_networks, aggregate)
|
||||
|
||||
# Output
|
||||
if args.output:
|
||||
output_path = Path(args.output)
|
||||
output_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
with open(output_path, 'w') as f:
|
||||
f.write(config_output)
|
||||
print(f"Configuration written to: {output_path}", file=sys.stderr)
|
||||
else:
|
||||
print(config_output)
|
||||
|
||||
# Close database
|
||||
db.close_reader()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
Reference in New Issue
Block a user