#!/usr/bin/env python3 """ Performance & Stress Testing Suite for GeoIP Ban API Simulates real production load with concurrent users """ import requests import time import random import statistics from datetime import datetime from concurrent.futures import ThreadPoolExecutor, as_completed from collections import defaultdict import sys import argparse import threading class PerformanceTest: def __init__(self, base_url): self.base_url = base_url self.results = defaultdict(list) self.errors = [] self.lock = threading.Lock() self.available_countries = self._fetch_available_countries() def log(self, msg): print(f"[{datetime.now().strftime('%H:%M:%S')}] {msg}", flush=True) def _fetch_available_countries(self): """Fetch available countries from API""" self.log("Fetching available countries from API...") try: resp = requests.get(f"{self.base_url}/api/database/sqlite/status", timeout=10) if resp.status_code == 200: data = resp.json() countries = [item['country_code'] for item in data.get('countries', [])] if countries: self.log(f"Loaded {len(countries)} countries from API") return countries except Exception as e: self.log(f"Failed to fetch countries: {e}") self.log("Using default country list") return ['CN', 'US', 'RU', 'DE', 'FR', 'GB', 'JP', 'KR', 'IN', 'BR'] def make_request(self, method, endpoint, data=None, timeout=30): """Single API request with timing""" url = f"{self.base_url}{endpoint}" start = time.time() try: if method == 'GET': resp = requests.get(url, timeout=timeout) elif method == 'POST': resp = requests.post(url, json=data, timeout=timeout) else: raise ValueError(f"Unsupported method: {method}") duration = time.time() - start return { 'status': resp.status_code, 'duration': duration, 'size': len(resp.content), 'success': resp.status_code == 200, 'endpoint': endpoint, 'cache_type': resp.json().get('cache_type') if resp.status_code == 200 else None } except Exception as e: duration = time.time() - start with self.lock: self.errors.append({ 'endpoint': endpoint, 'error': str(e), 'duration': duration }) return { 'status': 0, 'duration': duration, 'size': 0, 'success': False, 'endpoint': endpoint, 'error': str(e) } def simulate_user(self, user_id, duration_seconds, think_time_range=(1, 5)): """Simulate single user behavior - ONLY single countries""" formats = [ ('nginx', 'geo'), ('nginx', 'map'), ('nginx', 'deny'), ('apache', '24'), ('haproxy', 'acl'), ] raw_formats = ['raw-cidr_txt', 'raw-newline_txt', 'raw-json', 'raw-csv'] user_requests = [] start_time = time.time() while (time.time() - start_time) < duration_seconds: action = random.choice(['generate', 'generate', 'generate', 'raw', 'status']) if action == 'generate': country = random.choice(self.available_countries) app_type, variant = random.choice(formats) result = self.make_request( 'POST', '/api/generate/preview', data={ 'countries': [country], 'app_type': app_type, 'app_variant': variant, 'aggregate': random.choice([True, False]) } ) elif action == 'raw': country = random.choice(self.available_countries) raw_format = random.choice(raw_formats) result = self.make_request( 'POST', '/api/generate/raw', data={ 'countries': [country], 'app_type': raw_format, 'aggregate': random.choice([True, False]) } ) else: result = self.make_request('GET', '/api/database/status') user_requests.append(result) think_time = random.uniform(*think_time_range) time.sleep(think_time) return user_requests def test_concurrent_users(self, num_users, duration_seconds, think_time_range=(1, 5)): """Simulate multiple concurrent users""" self.log(f"\n{'='*70}") self.log(f"USER SIMULATION TEST") self.log(f" Users: {num_users}") self.log(f" Duration: {duration_seconds}s") self.log(f" Think time: {think_time_range[0]}-{think_time_range[1]}s") self.log(f" Countries: {len(self.available_countries)}") self.log(f"{'='*70}") all_results = [] start_time = time.time() with ThreadPoolExecutor(max_workers=num_users) as executor: futures = [ executor.submit(self.simulate_user, i, duration_seconds, think_time_range) for i in range(num_users) ] completed = 0 for future in as_completed(futures): user_results = future.result() all_results.extend(user_results) completed += 1 self.log(f" User {completed}/{num_users} completed ({len(user_results)} requests)") total_time = time.time() - start_time self.analyze_results(f"Concurrent Users ({num_users} users)", all_results, total_time) self.results[f"user_simulation_{num_users}"] = all_results return all_results def test_scenario(self, name, method, endpoint, data=None, count=10, concurrent=1): """Run test scenario with multiple requests""" self.log(f"\n{'='*70}") self.log(f"TEST: {name}") self.log(f" Method: {method} {endpoint}") self.log(f" Requests: {count} (concurrent: {concurrent})") self.log(f"{'='*70}") results = [] start_time = time.time() if concurrent == 1: for i in range(count): result = self.make_request(method, endpoint, data) results.append(result) if (i + 1) % 10 == 0: self.log(f" Progress: {i+1}/{count}") else: with ThreadPoolExecutor(max_workers=concurrent) as executor: futures = [executor.submit(self.make_request, method, endpoint, data) for _ in range(count)] for i, future in enumerate(as_completed(futures), 1): results.append(future.result()) if i % 10 == 0: self.log(f" Progress: {i}/{count}") total_time = time.time() - start_time self.analyze_results(name, results, total_time) self.results[name] = results return results def analyze_results(self, name, results, total_time): """Analyze and display results""" successful = [r for r in results if r['success']] failed = [r for r in results if not r['success']] if not successful: self.log(f"\nERROR: ALL REQUESTS FAILED!") return durations = [r['duration'] for r in successful] sizes = [r['size'] for r in successful] cache_hits = sum(1 for r in successful if r.get('cache_type') in ['redis-full', 'hybrid']) cache_misses = sum(1 for r in successful if r.get('cache_type') == 'fresh') self.log(f"\nRESULTS: {name}") self.log(f" Total time: {total_time:.2f}s") self.log(f" Requests: {len(results)} ({len(successful)} success, {len(failed)} failed)") self.log(f" Success rate: {len(successful)/len(results)*100:.1f}%") self.log(f" Throughput: {len(successful)/total_time:.2f} req/s") self.log(f"\nTIMING:") self.log(f" Min: {min(durations)*1000:.0f}ms") self.log(f" Max: {max(durations)*1000:.0f}ms") self.log(f" Mean: {statistics.mean(durations)*1000:.0f}ms") self.log(f" Median: {statistics.median(durations)*1000:.0f}ms") if len(durations) >= 20: self.log(f" P95: {statistics.quantiles(durations, n=20)[18]*1000:.0f}ms") if len(durations) >= 100: self.log(f" P99: {statistics.quantiles(durations, n=100)[98]*1000:.0f}ms") self.log(f"\nCACHE:") self.log(f" Hits: {cache_hits}") self.log(f" Misses: {cache_misses}") if cache_hits + cache_misses > 0: self.log(f" Hit rate: {cache_hits/(cache_hits+cache_misses)*100:.1f}%") self.log(f"\nRESPONSE SIZE:") self.log(f" Min: {min(sizes)/1024:.1f}KB") self.log(f" Max: {max(sizes)/1024:.1f}KB") self.log(f" Mean: {statistics.mean(sizes)/1024:.1f}KB") if failed: self.log(f"\nERRORS: {len(failed)}") for err in failed[:5]: self.log(f" - {err.get('error', 'Unknown error')}") def final_report(self): """Generate final summary report""" self.log(f"\n\n{'='*70}") self.log(f"FINAL PERFORMANCE REPORT") self.log(f"{'='*70}\n") for name, results in self.results.items(): successful = [r for r in results if r['success']] if successful: durations = [r['duration'] for r in successful] self.log(f"{name:40s} {statistics.mean(durations)*1000:6.0f}ms avg, " f"{len(successful):3d}/{len(results):3d} success") if self.errors: self.log(f"\nTOTAL ERRORS: {len(self.errors)}") error_types = defaultdict(int) for err in self.errors: error_types[err.get('error', 'Unknown')[:50]] += 1 for error, count in sorted(error_types.items(), key=lambda x: -x[1])[:10]: self.log(f" {count:3d}x {error}") def run_user_simulation(base_url, num_users, duration): """Run only user simulation""" tester = PerformanceTest(base_url) print("\n" + "="*70) print("GEOIP BAN API - USER SIMULATION") print(f"Target: {base_url}") print(f"Simulating {num_users} concurrent users for {duration}s") print("="*70 + "\n") tester.test_concurrent_users(num_users, duration, think_time_range=(1, 5)) tester.final_report() def run_quick_test(base_url): """Quick performance test - single countries only""" tester = PerformanceTest(base_url) print("\n" + "="*70) print("GEOIP BAN API - QUICK PERFORMANCE TEST") print(f"Target: {base_url}") print("="*70 + "\n") if not tester.available_countries: print("ERROR: No countries available from API") return test_country = tester.available_countries[0] tester.test_scenario( "Warm-up", "POST", "/api/generate/preview", data={'countries': [test_country], 'app_type': 'nginx', 'app_variant': 'geo', 'aggregate': True}, count=10, concurrent=2 ) tester.test_scenario( "Single Country (Cached)", "POST", "/api/generate/preview", data={'countries': [test_country], 'app_type': 'nginx', 'app_variant': 'geo', 'aggregate': True}, count=50, concurrent=10 ) tester.test_concurrent_users(num_users=10, duration_seconds=30, think_time_range=(1, 3)) tester.final_report() def run_full_test_suite(base_url): """Execute complete test suite - single countries only""" tester = PerformanceTest(base_url) print("\n" + "="*70) print("GEOIP BAN API - FULL PERFORMANCE TEST SUITE") print(f"Target: {base_url}") print("="*70 + "\n") if not tester.available_countries: print("ERROR: No countries available from API") return tester.test_scenario("Health Check", "GET", "/api/database/status", count=50, concurrent=10) time.sleep(1) test_country = tester.available_countries[0] tester.test_scenario( "Single Country (Cached)", "POST", "/api/generate/preview", data={'countries': [test_country], 'app_type': 'nginx', 'app_variant': 'geo', 'aggregate': True}, count=100, concurrent=20 ) time.sleep(1) tester.test_scenario( "Heavy Load (50 concurrent)", "POST", "/api/generate/preview", data={'countries': [test_country], 'app_type': 'nginx', 'app_variant': 'map', 'aggregate': True}, count=200, concurrent=50 ) time.sleep(2) tester.log("\nSPIKE TEST - Sudden burst of 100 requests") start = time.time() tester.test_scenario( "Spike Test", "POST", "/api/generate/preview", data={'countries': [test_country], 'app_type': 'apache', 'app_variant': '24', 'aggregate': True}, count=100, concurrent=100 ) spike_duration = time.time() - start tester.log(f" Spike handled in: {spike_duration:.2f}s") time.sleep(2) tester.test_concurrent_users(num_users=10, duration_seconds=60, think_time_range=(1, 5)) time.sleep(2) tester.test_concurrent_users(num_users=50, duration_seconds=30, think_time_range=(0.5, 2)) tester.final_report() if __name__ == '__main__': parser = argparse.ArgumentParser(description='Performance testing for GeoIP Ban API') parser.add_argument('--url', default='http://127.0.0.1:5000', help='API base URL') parser.add_argument('--mode', choices=['quick', 'full', 'users'], default='quick', help='Test mode: quick (5min), full (15min), users (simulation only)') parser.add_argument('--users', type=int, default=10, help='Number of concurrent users (for users mode)') parser.add_argument('--duration', type=int, default=60, help='Test duration in seconds (for users mode)') args = parser.parse_args() try: if args.mode == 'quick': run_quick_test(args.url) elif args.mode == 'full': run_full_test_suite(args.url) elif args.mode == 'users': run_user_simulation(args.url, args.users, args.duration) except KeyboardInterrupt: print("\n\nTest interrupted by user") sys.exit(1)