#!/usr/bin/env python3 """ Performance & Stress Testing Suite for GeoIP Ban API Simulates real production load with concurrent users """ import requests import time import random import statistics from datetime import datetime from concurrent.futures import ThreadPoolExecutor, as_completed from collections import defaultdict import sys import argparse import threading import json class PerformanceTest: def __init__(self, base_url): self.base_url = base_url self.results = defaultdict(list) self.errors = [] self.error_samples = [] # Store sample responses self.lock = threading.Lock() self.available_countries = self._fetch_available_countries() def log(self, msg): print(f"[{datetime.now().strftime('%H:%M:%S')}] {msg}", flush=True) def _fetch_available_countries(self): """Fetch available countries from API""" self.log("Fetching available countries from API...") try: resp = requests.get(f"{self.base_url}/api/database/sqlite/status", timeout=10) if resp.status_code == 200: data = resp.json() countries = [item['country_code'] for item in data.get('countries', [])] if countries: self.log(f"Loaded {len(countries)} countries from API") return countries except Exception as e: self.log(f"Failed to fetch countries: {e}") self.log("Using default country list") return ['CN', 'US', 'RU', 'DE', 'FR', 'GB', 'JP', 'KR', 'IN', 'BR'] def make_request(self, method, endpoint, data=None, timeout=30): """Single API request with timing""" url = f"{self.base_url}{endpoint}" start = time.time() try: if method == 'GET': resp = requests.get(url, timeout=timeout) elif method == 'POST': resp = requests.post(url, json=data, timeout=timeout) else: raise ValueError(f"Unsupported method: {method}") duration = time.time() - start # Check status code if resp.status_code != 200: error_info = { 'endpoint': endpoint, 'error': f"HTTP {resp.status_code}", 'duration': duration, 'response_preview': resp.text[:300] } with self.lock: self.errors.append(error_info) if len(self.error_samples) < 3: self.error_samples.append({ 'type': f'HTTP {resp.status_code}', 'url': url, 'status': resp.status_code, 'headers': dict(resp.headers), 'body': resp.text[:500] }) return { 'status': resp.status_code, 'duration': duration, 'size': 0, 'success': False, 'endpoint': endpoint, 'error': f"HTTP {resp.status_code}" } # Check if this is a RAW endpoint (returns plaintext, not JSON) is_raw_endpoint = '/api/generate/raw' in endpoint if is_raw_endpoint: # RAW endpoints return plaintext, not JSON return { 'status': resp.status_code, 'duration': duration, 'size': len(resp.content), 'success': True, 'endpoint': endpoint, 'cache_type': None # RAW doesn't have cache_type in response } # Try to parse JSON for non-RAW endpoints try: json_data = resp.json() except json.JSONDecodeError as e: error_info = { 'endpoint': endpoint, 'error': f"Invalid JSON: {str(e)}", 'duration': duration, 'response_preview': resp.text[:300] } with self.lock: self.errors.append(error_info) if len(self.error_samples) < 3: self.error_samples.append({ 'type': 'JSON Parse Error', 'url': url, 'status': resp.status_code, 'headers': dict(resp.headers), 'body': resp.text[:500], 'error': str(e) }) return { 'status': resp.status_code, 'duration': duration, 'size': len(resp.content), 'success': False, 'endpoint': endpoint, 'error': f"Invalid JSON: {str(e)}" } return { 'status': resp.status_code, 'duration': duration, 'size': len(resp.content), 'success': True, 'endpoint': endpoint, 'cache_type': json_data.get('cache_type') } except Exception as e: duration = time.time() - start error_info = { 'endpoint': endpoint, 'error': str(e), 'duration': duration } with self.lock: self.errors.append(error_info) if len(self.error_samples) < 3: self.error_samples.append({ 'type': 'Exception', 'url': url, 'error': str(e) }) return { 'status': 0, 'duration': duration, 'size': 0, 'success': False, 'endpoint': endpoint, 'error': str(e) } def simulate_user(self, user_id, duration_seconds, think_time_range=(1, 5)): """Simulate single user behavior - ONLY single countries""" formats = [ ('nginx', 'geo'), ('nginx', 'map'), ('nginx', 'deny'), ('apache', '24'), ('haproxy', 'acl'), ] raw_formats = ['raw-cidr_txt', 'raw-newline_txt', 'raw-json', 'raw-csv'] user_requests = [] start_time = time.time() while (time.time() - start_time) < duration_seconds: action = random.choice(['generate', 'generate', 'generate', 'raw', 'status']) if action == 'generate': country = random.choice(self.available_countries) app_type, variant = random.choice(formats) result = self.make_request( 'POST', '/api/generate/preview', data={ 'countries': [country], 'app_type': app_type, 'app_variant': variant, 'aggregate': random.choice([True, False]) } ) elif action == 'raw': country = random.choice(self.available_countries) raw_format = random.choice(raw_formats) result = self.make_request( 'POST', '/api/generate/raw', data={ 'countries': [country], 'app_type': raw_format, 'aggregate': random.choice([True, False]) } ) else: result = self.make_request('GET', '/api/database/status') user_requests.append(result) think_time = random.uniform(*think_time_range) time.sleep(think_time) return user_requests def test_concurrent_users(self, num_users, duration_seconds, think_time_range=(1, 5)): """Simulate multiple concurrent users""" self.log(f"\n{'='*70}") self.log(f"USER SIMULATION TEST") self.log(f" Users: {num_users}") self.log(f" Duration: {duration_seconds}s") self.log(f" Think time: {think_time_range[0]}-{think_time_range[1]}s") self.log(f" Countries: {len(self.available_countries)}") self.log(f"{'='*70}") all_results = [] start_time = time.time() with ThreadPoolExecutor(max_workers=num_users) as executor: futures = [ executor.submit(self.simulate_user, i, duration_seconds, think_time_range) for i in range(num_users) ] completed = 0 for future in as_completed(futures): user_results = future.result() all_results.extend(user_results) completed += 1 self.log(f" User {completed}/{num_users} completed ({len(user_results)} requests)") total_time = time.time() - start_time self.analyze_results(f"Concurrent Users ({num_users} users)", all_results, total_time) self.results[f"user_simulation_{num_users}"] = all_results return all_results def test_scenario(self, name, method, endpoint, data=None, count=10, concurrent=1): """Run test scenario with multiple requests""" self.log(f"\n{'='*70}") self.log(f"TEST: {name}") self.log(f" Method: {method} {endpoint}") self.log(f" Requests: {count} (concurrent: {concurrent})") self.log(f"{'='*70}") results = [] start_time = time.time() if concurrent == 1: for i in range(count): result = self.make_request(method, endpoint, data) results.append(result) if (i + 1) % 10 == 0: self.log(f" Progress: {i+1}/{count}") else: with ThreadPoolExecutor(max_workers=concurrent) as executor: futures = [executor.submit(self.make_request, method, endpoint, data) for _ in range(count)] for i, future in enumerate(as_completed(futures), 1): results.append(future.result()) if i % 10 == 0: self.log(f" Progress: {i}/{count}") total_time = time.time() - start_time self.analyze_results(name, results, total_time) self.results[name] = results return results def analyze_results(self, name, results, total_time): """Analyze and display results""" successful = [r for r in results if r['success']] failed = [r for r in results if not r['success']] if not successful: self.log(f"\nERROR: ALL REQUESTS FAILED!") return durations = [r['duration'] for r in successful] sizes = [r['size'] for r in successful] cache_hits = sum(1 for r in successful if r.get('cache_type') in ['redis-full', 'hybrid']) cache_misses = sum(1 for r in successful if r.get('cache_type') == 'fresh') self.log(f"\nRESULTS: {name}") self.log(f" Total time: {total_time:.2f}s") self.log(f" Requests: {len(results)} ({len(successful)} success, {len(failed)} failed)") self.log(f" Success rate: {len(successful)/len(results)*100:.1f}%") self.log(f" Throughput: {len(successful)/total_time:.2f} req/s") self.log(f"\nTIMING:") self.log(f" Min: {min(durations)*1000:.0f}ms") self.log(f" Max: {max(durations)*1000:.0f}ms") self.log(f" Mean: {statistics.mean(durations)*1000:.0f}ms") self.log(f" Median: {statistics.median(durations)*1000:.0f}ms") if len(durations) >= 20: self.log(f" P95: {statistics.quantiles(durations, n=20)[18]*1000:.0f}ms") if len(durations) >= 100: self.log(f" P99: {statistics.quantiles(durations, n=100)[98]*1000:.0f}ms") self.log(f"\nCACHE:") self.log(f" Hits: {cache_hits}") self.log(f" Misses: {cache_misses}") if cache_hits + cache_misses > 0: self.log(f" Hit rate: {cache_hits/(cache_hits+cache_misses)*100:.1f}%") self.log(f"\nRESPONSE SIZE:") self.log(f" Min: {min(sizes)/1024:.1f}KB") self.log(f" Max: {max(sizes)/1024:.1f}KB") self.log(f" Mean: {statistics.mean(sizes)/1024:.1f}KB") if failed: self.log(f"\nERRORS: {len(failed)}") for err in failed[:5]: self.log(f" - {err.get('error', 'Unknown error')}") def final_report(self): """Generate final summary report""" self.log(f"\n\n{'='*70}") self.log(f"FINAL PERFORMANCE REPORT") self.log(f"{'='*70}\n") for name, results in self.results.items(): successful = [r for r in results if r['success']] if successful: durations = [r['duration'] for r in successful] self.log(f"{name:40s} {statistics.mean(durations)*1000:6.0f}ms avg, " f"{len(successful):3d}/{len(results):3d} success") if self.errors: self.log(f"\nTOTAL ERRORS: {len(self.errors)}") error_types = defaultdict(int) for err in self.errors: error_types[err.get('error', 'Unknown')[:50]] += 1 for error, count in sorted(error_types.items(), key=lambda x: -x[1])[:10]: self.log(f" {count:3d}x {error}") # Show detailed error samples if self.error_samples: self.log(f"\n{'='*70}") self.log("DETAILED ERROR SAMPLES") self.log(f"{'='*70}") for idx, sample in enumerate(self.error_samples, 1): self.log(f"\nSample #{idx}: {sample['type']}") self.log(f" URL: {sample['url']}") if 'status' in sample: self.log(f" Status: {sample['status']}") if 'headers' in sample: self.log(f" Content-Type: {sample['headers'].get('Content-Type', 'N/A')}") self.log(f" Content-Length: {sample['headers'].get('Content-Length', 'N/A')}") if 'error' in sample: self.log(f" Error: {sample['error']}") if 'body' in sample: self.log(f" Response body preview:") self.log(f" ---") for line in sample['body'].split('\n')[:10]: self.log(f" {line}") self.log(f" ---") def run_user_simulation(base_url, num_users, duration, think_time): """Run only user simulation""" tester = PerformanceTest(base_url) print("\n" + "="*70) print("GEOIP BAN API - USER SIMULATION") print(f"Target: {base_url}") print(f"Simulating {num_users} concurrent users for {duration}s") print(f"Think time: {think_time[0]}-{think_time[1]}s") print("="*70 + "\n") tester.test_concurrent_users(num_users, duration, think_time_range=think_time) tester.final_report() def run_quick_test(base_url, num_users, duration, think_time): """Quick performance test - single countries only""" tester = PerformanceTest(base_url) print("\n" + "="*70) print("GEOIP BAN API - QUICK PERFORMANCE TEST") print(f"Target: {base_url}") print("="*70 + "\n") if not tester.available_countries: print("ERROR: No countries available from API") return test_country = tester.available_countries[0] tester.test_scenario( "Warm-up", "POST", "/api/generate/preview", data={'countries': [test_country], 'app_type': 'nginx', 'app_variant': 'geo', 'aggregate': True}, count=10, concurrent=2 ) tester.test_scenario( "Single Country (Cached)", "POST", "/api/generate/preview", data={'countries': [test_country], 'app_type': 'nginx', 'app_variant': 'geo', 'aggregate': True}, count=50, concurrent=10 ) tester.test_concurrent_users(num_users=num_users, duration_seconds=duration, think_time_range=think_time) tester.final_report() def run_full_test_suite(base_url, num_users, duration, think_time): """Execute complete test suite - single countries only""" tester = PerformanceTest(base_url) print("\n" + "="*70) print("GEOIP BAN API - FULL PERFORMANCE TEST SUITE") print(f"Target: {base_url}") print("="*70 + "\n") if not tester.available_countries: print("ERROR: No countries available from API") return tester.test_scenario("Health Check", "GET", "/api/database/status", count=50, concurrent=10) time.sleep(1) test_country = tester.available_countries[0] tester.test_scenario( "Single Country (Cached)", "POST", "/api/generate/preview", data={'countries': [test_country], 'app_type': 'nginx', 'app_variant': 'geo', 'aggregate': True}, count=100, concurrent=20 ) time.sleep(1) tester.test_scenario( "Heavy Load (50 concurrent)", "POST", "/api/generate/preview", data={'countries': [test_country], 'app_type': 'nginx', 'app_variant': 'map', 'aggregate': True}, count=200, concurrent=50 ) time.sleep(2) tester.log("\nSPIKE TEST - Sudden burst of 100 requests") start = time.time() tester.test_scenario( "Spike Test", "POST", "/api/generate/preview", data={'countries': [test_country], 'app_type': 'apache', 'app_variant': '24', 'aggregate': True}, count=100, concurrent=100 ) spike_duration = time.time() - start tester.log(f" Spike handled in: {spike_duration:.2f}s") time.sleep(2) tester.test_concurrent_users(num_users=num_users, duration_seconds=duration, think_time_range=think_time) time.sleep(2) tester.test_concurrent_users(num_users=num_users*5, duration_seconds=duration//2, think_time_range=(think_time[0]/2, think_time[1]/2)) tester.final_report() if __name__ == '__main__': parser = argparse.ArgumentParser(description='Performance testing for GeoIP Ban API') parser.add_argument('--url', default='http://127.0.0.1:5000', help='API base URL') parser.add_argument('--mode', choices=['quick', 'full', 'users'], default='quick', help='Test mode: quick (5min), full (15min), users (simulation only)') parser.add_argument('--users', type=int, default=10, help='Number of concurrent users') parser.add_argument('--duration', type=int, default=60, help='Test duration in seconds') parser.add_argument('--think-min', type=float, default=1.0, help='Minimum think time between requests (seconds)') parser.add_argument('--think-max', type=float, default=5.0, help='Maximum think time between requests (seconds)') args = parser.parse_args() think_time = (args.think_min, args.think_max) try: if args.mode == 'quick': run_quick_test(args.url, args.users, args.duration, think_time) elif args.mode == 'full': run_full_test_suite(args.url, args.users, args.duration, think_time) elif args.mode == 'users': run_user_simulation(args.url, args.users, args.duration, think_time) except KeyboardInterrupt: print("\n\nTest interrupted by user") sys.exit(1) def run_full_test_suite(base_url): """Execute complete test suite - single countries only""" tester = PerformanceTest(base_url) print("\n" + "="*70) print("GEOIP BAN API - FULL PERFORMANCE TEST SUITE") print(f"Target: {base_url}") print("="*70 + "\n") if not tester.available_countries: print("ERROR: No countries available from API") return tester.test_scenario("Health Check", "GET", "/api/database/status", count=50, concurrent=10) time.sleep(1) test_country = tester.available_countries[0] tester.test_scenario( "Single Country (Cached)", "POST", "/api/generate/preview", data={'countries': [test_country], 'app_type': 'nginx', 'app_variant': 'geo', 'aggregate': True}, count=100, concurrent=20 ) time.sleep(1) tester.test_scenario( "Heavy Load (50 concurrent)", "POST", "/api/generate/preview", data={'countries': [test_country], 'app_type': 'nginx', 'app_variant': 'map', 'aggregate': True}, count=200, concurrent=50 ) time.sleep(2) tester.log("\nSPIKE TEST - Sudden burst of 100 requests") start = time.time() tester.test_scenario( "Spike Test", "POST", "/api/generate/preview", data={'countries': [test_country], 'app_type': 'apache', 'app_variant': '24', 'aggregate': True}, count=100, concurrent=100 ) spike_duration = time.time() - start tester.log(f" Spike handled in: {spike_duration:.2f}s") time.sleep(2) tester.test_concurrent_users(num_users=10, duration_seconds=60, think_time_range=(1, 5)) time.sleep(2) tester.test_concurrent_users(num_users=50, duration_seconds=30, think_time_range=(0.5, 2)) tester.final_report() if __name__ == '__main__': parser = argparse.ArgumentParser(description='Performance testing for GeoIP Ban API') parser.add_argument('--url', default='http://127.0.0.1:5000', help='API base URL') parser.add_argument('--mode', choices=['quick', 'full', 'users'], default='quick', help='Test mode: quick (5min), full (15min), users (simulation only)') parser.add_argument('--users', type=int, default=10, help='Number of concurrent users') parser.add_argument('--duration', type=int, default=60, help='Test duration in seconds') parser.add_argument('--think-min', type=float, default=1.0, help='Minimum think time between requests (seconds)') parser.add_argument('--think-max', type=float, default=5.0, help='Maximum think time between requests (seconds)') args = parser.parse_args() think_time = (args.think_min, args.think_max) try: if args.mode == 'quick': run_quick_test(args.url, args.users, args.duration, think_time) elif args.mode == 'full': run_full_test_suite(args.url, args.users, args.duration, think_time) elif args.mode == 'users': run_user_simulation(args.url, args.users, args.duration, think_time) except KeyboardInterrupt: print("\n\nTest interrupted by user") sys.exit(1)