diff --git a/.env.example b/.env.example index 69b3205..6b747eb 100644 --- a/.env.example +++ b/.env.example @@ -35,9 +35,6 @@ SCAN_TIME=02:00 SCAN_ON_STARTUP=true CACHE_MAX_AGE_HOURS=168 -# Parallel scanning -PARALLEL_WORKERS=8 # 0=auto - # Redis REDIS_HOST=redis REDIS_PORT=6379 @@ -49,4 +46,16 @@ REDIS_CACHE_TTL=86400 # Precache Daemon Settings PRECACHE_INTERVAL_HOURS=168 PRECACHE_CHECK_INTERVAL=3600 -PRECACHE_MIN_TTL_HOURS=7 \ No newline at end of file +PRECACHE_MIN_TTL_HOURS=7 + +# MaxMind chunking +MAXMIND_CHUNK_TASKS_PER_WORKER=16 +MAXMIND_CHUNK_MIN=200 +MAXMIND_CHUNK_MAX=4000 + +# cap MaxMind workers per-country +MAXMIND_WORKERS_MAX=48 +MAXMIND_WORKERS_MIN=6 + +# Parallel scanning +PARALLEL_WORKERS=8 # 0=auto \ No newline at end of file diff --git a/config.py b/config.py index 68fdf5a..62efbbb 100644 --- a/config.py +++ b/config.py @@ -234,10 +234,17 @@ for country in COMMON_COUNTRIES: PRECACHE_APP_TYPES = [ 'nginx_geo', + 'nginx_map', 'nginx_deny', 'apache_24', + 'apache_22', 'haproxy_acl', + 'haproxy_lua', + 'haproxy_map', 'raw-cidr_txt', + 'raw-newline_txt', + 'raw-json', + 'raw-csv', ] PRECACHE_AGGREGATE_VARIANTS = [True] @@ -257,3 +264,11 @@ CACHE_MAX_AGE_HOURS = 168 PRECACHE_INTERVAL_HOURS = int(os.getenv('PRECACHE_INTERVAL_HOURS', 168)) PRECACHE_CHECK_INTERVAL = int(os.getenv('PRECACHE_CHECK_INTERVAL', 3600)) PRECACHE_MIN_TTL_HOURS = int(os.getenv('PRECACHE_MIN_TTL_HOURS', 7)) + +# MaxMind scan chunking +MAXMIND_CHUNK_TASKS_PER_WORKER = int(os.getenv('MAXMIND_CHUNK_TASKS_PER_WORKER', '16')) +MAXMIND_CHUNK_MIN = int(os.getenv('MAXMIND_CHUNK_MIN', '200')) +MAXMIND_CHUNK_MAX = int(os.getenv('MAXMIND_CHUNK_MAX', '4000')) + +MAXMIND_WORKERS_MIN = int(os.getenv('MAXMIND_WORKERS_MIN', '6')) +MAXMIND_WORKERS_MAX = int(os.getenv('MAXMIND_WORKERS_MAX', '48')) \ No newline at end of file diff --git a/geoip_handler.py b/geoip_handler.py index 74fb074..41c0f02 100644 --- a/geoip_handler.py +++ b/geoip_handler.py @@ -13,7 +13,8 @@ from concurrent.futures import ThreadPoolExecutor, as_completed import threading import config import ipaddress - +import math +from multiprocessing import cpu_count def generate_metadata(countries: list, country_data: dict, redis_stats: dict = None, handler: 'GeoIPHandler' = None) -> dict: """ @@ -568,76 +569,106 @@ class GeoIPHandler: return scan_ranges - def _scan_maxmind_for_country(self, country_code: str, progress_callback=None) -> list: + def _scan_maxmind_for_country(self, country_code: str, progress_callback=None, workers=None) -> list: if not self.mmdb_file.exists(): return [] - + country_code = country_code.upper() + + scan_ranges = self._get_scan_ranges() + total_ranges = len(scan_ranges) + + # workers default + if workers is None or int(workers) <= 0: + workers = min(32, max(4, cpu_count() * 2)) + else: + workers = int(workers) + + tasks_per_worker = getattr(config, "MAXMIND_CHUNK_TASKS_PER_WORKER", 12) + chunk_min = getattr(config, "MAXMIND_CHUNK_MIN", 50) + chunk_max = getattr(config, "MAXMIND_CHUNK_MAX", 2000) + + target_tasks = max(workers * int(tasks_per_worker), workers) + chunk = int(math.ceil(total_ranges / float(target_tasks))) + CHUNK = max(int(chunk_min), min(int(chunk_max), chunk)) + + if progress_callback: + progress_callback(f"Starting parallel MaxMind scan with {workers} workers...") + progress_callback(f"Scanning {total_ranges} IP ranges...") + progress_callback(f"Chunking: {CHUNK} ranges/task (~{int(math.ceil(total_ranges/float(CHUNK)))} tasks)") + found_networks = set() found_networks_lock = threading.Lock() - - try: - if progress_callback: - progress_callback(f"Starting parallel MaxMind scan with 32 workers...") - - scan_ranges = self._get_scan_ranges() - total_ranges = len(scan_ranges) - - if progress_callback: - progress_callback(f"Scanning {total_ranges} IP ranges...") - - completed = 0 - completed_lock = threading.Lock() - - def scan_range(network_str): - nonlocal completed - - reader = geoip2.database.Reader(str(self.mmdb_file)) - local_networks = set() - - try: - network = ipaddress.IPv4Network(network_str, strict=False) - - for subnet in network.subnets(new_prefix=24): - sample_ip = str(subnet.network_address + 1) - - try: - response = reader.country(sample_ip) - if response.country.iso_code == country_code: - local_networks.add(str(subnet)) - except: - pass - - except Exception as e: - pass - finally: - reader.close() - + + completed = 0 + completed_lock = threading.Lock() + + tls = threading.local() + + def get_reader(): + r = getattr(tls, "reader", None) + if r is None: + tls.reader = geoip2.database.Reader(str(self.mmdb_file)) + return tls.reader + + def scan_one_range(reader, network_str: str): + local = set() + try: + network = ipaddress.IPv4Network(network_str, strict=False) + for subnet in network.subnets(new_prefix=24): + sample_ip = str(subnet.network_address + 1) + try: + resp = reader.country(sample_ip) + if resp.country.iso_code == country_code: + local.add(subnet) # mniej alokacji niż str() w pętli + except Exception: + pass + except Exception: + pass + return local + + def scan_chunk(ranges): + nonlocal completed + reader = get_reader() + local_chunk = set() + + for r in ranges: + local_chunk.update(scan_one_range(reader, r)) + with completed_lock: completed += 1 - if completed % 2000 == 0 and progress_callback: - with found_networks_lock: - progress_pct = (completed / total_ranges) * 100 - progress_callback(f"Scanning: {completed}/{total_ranges} ranges ({progress_pct:.1f}%), found {len(found_networks)} networks") - - return local_networks - - with ThreadPoolExecutor(max_workers=32) as executor: - futures = {executor.submit(scan_range, r): r for r in scan_ranges} - + c = completed + + # progres częściej (diagnostyka), nie wpływa na wynik + if progress_callback and (c % 500 == 0 or c == total_ranges): + with found_networks_lock: + found_cnt = len(found_networks) + pct = (c / float(total_ranges)) * 100.0 + progress_callback( + f"Scanning: {c}/{total_ranges} ranges ({pct:.1f}%), found {found_cnt} networks" + ) + + return local_chunk + + try: + chunks = [scan_ranges[i:i + CHUNK] for i in range(0, total_ranges, CHUNK)] + + with ThreadPoolExecutor(max_workers=workers) as executor: + futures = [executor.submit(scan_chunk, ch) for ch in chunks] for future in as_completed(futures): local_nets = future.result() - - with found_networks_lock: - found_networks.update(local_nets) - - result = list(found_networks) - + if local_nets: + with found_networks_lock: + found_networks.update(local_nets) + + # konwersja na string na końcu (wynik ten sam co wcześniej) + result = [str(n) for n in found_networks] + if progress_callback: progress_callback(f"MaxMind scan complete: {len(result)} networks") - + return result - + except Exception as e: print(f"[ERROR] MaxMind scan failed for {country_code}: {e}", flush=True) import traceback diff --git a/scheduler.py b/scheduler.py index 134736d..116b7f7 100644 --- a/scheduler.py +++ b/scheduler.py @@ -14,7 +14,7 @@ from pathlib import Path from concurrent.futures import ThreadPoolExecutor, as_completed from multiprocessing import cpu_count import threading - +import traceback sys.path.insert(0, str(Path(__file__).parent)) @@ -29,6 +29,21 @@ write_lock = threading.Lock() active_scans = {} active_scans_lock = threading.Lock() +def heartbeat(): + log_safe(f"[{datetime.now()}] HEARTBEAT running=True next_run={schedule.next_run()} jobs={len(schedule.jobs)}") + +def compute_maxmind_workers(): + with active_scans_lock: + active = max(1, len(active_scans)) + + cpu = cpu_count() + total_budget = max(32, cpu * 6) # 16*6 = 96 + per_country = max(4, total_budget // active) + + min_w = int(os.getenv('MAXMIND_WORKERS_MIN', '6')) + max_w = int(os.getenv('MAXMIND_WORKERS_MAX', '48')) + + return max(min_w, min(max_w, per_country)) def signal_handler(signum, frame): global running @@ -96,7 +111,14 @@ def scan_single_country(country_code, is_update=False): print(f"[{country_code}] Scanning MaxMind + GitHub...", flush=True) - maxmind_networks = handler._scan_maxmind_for_country(country_code, progress_callback=progress_cb) + maxmind_workers = compute_maxmind_workers() + print(f"[{country_code}] MaxMind workers: {maxmind_workers} (active scans: {len(active_scans)})", flush=True) + + maxmind_networks = handler._scan_maxmind_for_country( + country_code, + progress_callback=progress_cb, + workers=maxmind_workers + ) if maxmind_networks: print(f"[{country_code}] MaxMind: {len(maxmind_networks):,} networks, checking GitHub...", flush=True) @@ -386,8 +408,16 @@ if __name__ == '__main__': print("\nScheduler running. Press Ctrl+C to stop.\n", flush=True) sys.stdout.flush() + # heartbeat + schedule.every(1).minutes.do(heartbeat) + while running: - schedule.run_pending() + try: + schedule.run_pending() + except Exception as e: + log_safe(f"[{datetime.now()}] ERROR in run_pending: {e}") + traceback.print_exc() + sys.stdout.flush() time.sleep(60) print("\n[SHUTDOWN] Stopped gracefully.", flush=True)