Files
geoip_block_generator/test_performance.py
Mateusz Gruszczyński 4485aa37ba small change in js
2026-02-17 11:20:24 +01:00

587 lines
23 KiB
Python

#!/usr/bin/env python3
"""
Performance & Stress Testing Suite for GeoIP Ban API
Simulates real production load with concurrent users
"""
import requests
import time
import random
import statistics
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, as_completed
from collections import defaultdict
import sys
import argparse
import threading
import json
class PerformanceTest:
def __init__(self, base_url):
self.base_url = base_url
self.results = defaultdict(list)
self.errors = []
self.error_samples = [] # Store sample responses
self.lock = threading.Lock()
self.available_countries = self._fetch_available_countries()
def log(self, msg):
print(f"[{datetime.now().strftime('%H:%M:%S')}] {msg}", flush=True)
def _fetch_available_countries(self):
"""Fetch available countries from API"""
self.log("Fetching available countries from API...")
try:
resp = requests.get(f"{self.base_url}/api/database/sqlite/status", timeout=10)
if resp.status_code == 200:
data = resp.json()
countries = [item['country_code'] for item in data.get('countries', [])]
if countries:
self.log(f"Loaded {len(countries)} countries from API")
return countries
except Exception as e:
self.log(f"Failed to fetch countries: {e}")
self.log("Using default country list")
return ['CN', 'US', 'RU', 'DE', 'FR', 'GB', 'JP', 'KR', 'IN', 'BR']
def make_request(self, method, endpoint, data=None, timeout=30):
"""Single API request with timing"""
url = f"{self.base_url}{endpoint}"
start = time.time()
try:
if method == 'GET':
resp = requests.get(url, timeout=timeout)
elif method == 'POST':
resp = requests.post(url, json=data, timeout=timeout)
else:
raise ValueError(f"Unsupported method: {method}")
duration = time.time() - start
# Check status code
if resp.status_code != 200:
error_info = {
'endpoint': endpoint,
'error': f"HTTP {resp.status_code}",
'duration': duration,
'response_preview': resp.text[:300]
}
with self.lock:
self.errors.append(error_info)
if len(self.error_samples) < 3:
self.error_samples.append({
'type': f'HTTP {resp.status_code}',
'url': url,
'status': resp.status_code,
'headers': dict(resp.headers),
'body': resp.text[:500]
})
return {
'status': resp.status_code,
'duration': duration,
'size': 0,
'success': False,
'endpoint': endpoint,
'error': f"HTTP {resp.status_code}"
}
# Check if this is a RAW endpoint (returns plaintext, not JSON)
is_raw_endpoint = '/api/generate/raw' in endpoint
if is_raw_endpoint:
# RAW endpoints return plaintext, not JSON
return {
'status': resp.status_code,
'duration': duration,
'size': len(resp.content),
'success': True,
'endpoint': endpoint,
'cache_type': None # RAW doesn't have cache_type in response
}
# Try to parse JSON for non-RAW endpoints
try:
json_data = resp.json()
except json.JSONDecodeError as e:
error_info = {
'endpoint': endpoint,
'error': f"Invalid JSON: {str(e)}",
'duration': duration,
'response_preview': resp.text[:300]
}
with self.lock:
self.errors.append(error_info)
if len(self.error_samples) < 3:
self.error_samples.append({
'type': 'JSON Parse Error',
'url': url,
'status': resp.status_code,
'headers': dict(resp.headers),
'body': resp.text[:500],
'error': str(e)
})
return {
'status': resp.status_code,
'duration': duration,
'size': len(resp.content),
'success': False,
'endpoint': endpoint,
'error': f"Invalid JSON: {str(e)}"
}
return {
'status': resp.status_code,
'duration': duration,
'size': len(resp.content),
'success': True,
'endpoint': endpoint,
'cache_type': json_data.get('cache_type')
}
except Exception as e:
duration = time.time() - start
error_info = {
'endpoint': endpoint,
'error': str(e),
'duration': duration
}
with self.lock:
self.errors.append(error_info)
if len(self.error_samples) < 3:
self.error_samples.append({
'type': 'Exception',
'url': url,
'error': str(e)
})
return {
'status': 0,
'duration': duration,
'size': 0,
'success': False,
'endpoint': endpoint,
'error': str(e)
}
def simulate_user(self, user_id, duration_seconds, think_time_range=(1, 5)):
"""Simulate single user behavior - ONLY single countries"""
formats = [
('nginx', 'geo'),
('nginx', 'map'),
('nginx', 'deny'),
('apache', '24'),
('haproxy', 'acl'),
]
raw_formats = ['raw-cidr_txt', 'raw-newline_txt', 'raw-json', 'raw-csv']
user_requests = []
start_time = time.time()
while (time.time() - start_time) < duration_seconds:
action = random.choice(['generate', 'generate', 'generate', 'raw', 'status'])
if action == 'generate':
country = random.choice(self.available_countries)
app_type, variant = random.choice(formats)
result = self.make_request(
'POST', '/api/generate/preview',
data={
'countries': [country],
'app_type': app_type,
'app_variant': variant,
'aggregate': random.choice([True, False])
}
)
elif action == 'raw':
country = random.choice(self.available_countries)
raw_format = random.choice(raw_formats)
result = self.make_request(
'POST', '/api/generate/raw',
data={
'countries': [country],
'app_type': raw_format,
'aggregate': random.choice([True, False])
}
)
else:
result = self.make_request('GET', '/api/database/status')
user_requests.append(result)
think_time = random.uniform(*think_time_range)
time.sleep(think_time)
return user_requests
def test_concurrent_users(self, num_users, duration_seconds, think_time_range=(1, 5)):
"""Simulate multiple concurrent users"""
self.log(f"\n{'='*70}")
self.log(f"USER SIMULATION TEST")
self.log(f" Users: {num_users}")
self.log(f" Duration: {duration_seconds}s")
self.log(f" Think time: {think_time_range[0]}-{think_time_range[1]}s")
self.log(f" Countries: {len(self.available_countries)}")
self.log(f"{'='*70}")
all_results = []
start_time = time.time()
with ThreadPoolExecutor(max_workers=num_users) as executor:
futures = [
executor.submit(self.simulate_user, i, duration_seconds, think_time_range)
for i in range(num_users)
]
completed = 0
for future in as_completed(futures):
user_results = future.result()
all_results.extend(user_results)
completed += 1
self.log(f" User {completed}/{num_users} completed ({len(user_results)} requests)")
total_time = time.time() - start_time
self.analyze_results(f"Concurrent Users ({num_users} users)", all_results, total_time)
self.results[f"user_simulation_{num_users}"] = all_results
return all_results
def test_scenario(self, name, method, endpoint, data=None, count=10, concurrent=1):
"""Run test scenario with multiple requests"""
self.log(f"\n{'='*70}")
self.log(f"TEST: {name}")
self.log(f" Method: {method} {endpoint}")
self.log(f" Requests: {count} (concurrent: {concurrent})")
self.log(f"{'='*70}")
results = []
start_time = time.time()
if concurrent == 1:
for i in range(count):
result = self.make_request(method, endpoint, data)
results.append(result)
if (i + 1) % 10 == 0:
self.log(f" Progress: {i+1}/{count}")
else:
with ThreadPoolExecutor(max_workers=concurrent) as executor:
futures = [executor.submit(self.make_request, method, endpoint, data)
for _ in range(count)]
for i, future in enumerate(as_completed(futures), 1):
results.append(future.result())
if i % 10 == 0:
self.log(f" Progress: {i}/{count}")
total_time = time.time() - start_time
self.analyze_results(name, results, total_time)
self.results[name] = results
return results
def analyze_results(self, name, results, total_time):
"""Analyze and display results"""
successful = [r for r in results if r['success']]
failed = [r for r in results if not r['success']]
if not successful:
self.log(f"\nERROR: ALL REQUESTS FAILED!")
return
durations = [r['duration'] for r in successful]
sizes = [r['size'] for r in successful]
cache_hits = sum(1 for r in successful if r.get('cache_type') in ['redis-full', 'hybrid'])
cache_misses = sum(1 for r in successful if r.get('cache_type') == 'fresh')
self.log(f"\nRESULTS: {name}")
self.log(f" Total time: {total_time:.2f}s")
self.log(f" Requests: {len(results)} ({len(successful)} success, {len(failed)} failed)")
self.log(f" Success rate: {len(successful)/len(results)*100:.1f}%")
self.log(f" Throughput: {len(successful)/total_time:.2f} req/s")
self.log(f"\nTIMING:")
self.log(f" Min: {min(durations)*1000:.0f}ms")
self.log(f" Max: {max(durations)*1000:.0f}ms")
self.log(f" Mean: {statistics.mean(durations)*1000:.0f}ms")
self.log(f" Median: {statistics.median(durations)*1000:.0f}ms")
if len(durations) >= 20:
self.log(f" P95: {statistics.quantiles(durations, n=20)[18]*1000:.0f}ms")
if len(durations) >= 100:
self.log(f" P99: {statistics.quantiles(durations, n=100)[98]*1000:.0f}ms")
self.log(f"\nCACHE:")
self.log(f" Hits: {cache_hits}")
self.log(f" Misses: {cache_misses}")
if cache_hits + cache_misses > 0:
self.log(f" Hit rate: {cache_hits/(cache_hits+cache_misses)*100:.1f}%")
self.log(f"\nRESPONSE SIZE:")
self.log(f" Min: {min(sizes)/1024:.1f}KB")
self.log(f" Max: {max(sizes)/1024:.1f}KB")
self.log(f" Mean: {statistics.mean(sizes)/1024:.1f}KB")
if failed:
self.log(f"\nERRORS: {len(failed)}")
for err in failed[:5]:
self.log(f" - {err.get('error', 'Unknown error')}")
def final_report(self):
"""Generate final summary report"""
self.log(f"\n\n{'='*70}")
self.log(f"FINAL PERFORMANCE REPORT")
self.log(f"{'='*70}\n")
for name, results in self.results.items():
successful = [r for r in results if r['success']]
if successful:
durations = [r['duration'] for r in successful]
self.log(f"{name:40s} {statistics.mean(durations)*1000:6.0f}ms avg, "
f"{len(successful):3d}/{len(results):3d} success")
if self.errors:
self.log(f"\nTOTAL ERRORS: {len(self.errors)}")
error_types = defaultdict(int)
for err in self.errors:
error_types[err.get('error', 'Unknown')[:50]] += 1
for error, count in sorted(error_types.items(), key=lambda x: -x[1])[:10]:
self.log(f" {count:3d}x {error}")
# Show detailed error samples
if self.error_samples:
self.log(f"\n{'='*70}")
self.log("DETAILED ERROR SAMPLES")
self.log(f"{'='*70}")
for idx, sample in enumerate(self.error_samples, 1):
self.log(f"\nSample #{idx}: {sample['type']}")
self.log(f" URL: {sample['url']}")
if 'status' in sample:
self.log(f" Status: {sample['status']}")
if 'headers' in sample:
self.log(f" Content-Type: {sample['headers'].get('Content-Type', 'N/A')}")
self.log(f" Content-Length: {sample['headers'].get('Content-Length', 'N/A')}")
if 'error' in sample:
self.log(f" Error: {sample['error']}")
if 'body' in sample:
self.log(f" Response body preview:")
self.log(f" ---")
for line in sample['body'].split('\n')[:10]:
self.log(f" {line}")
self.log(f" ---")
def run_user_simulation(base_url, num_users, duration, think_time):
"""Run only user simulation"""
tester = PerformanceTest(base_url)
print("\n" + "="*70)
print("GEOIP BAN API - USER SIMULATION")
print(f"Target: {base_url}")
print(f"Simulating {num_users} concurrent users for {duration}s")
print(f"Think time: {think_time[0]}-{think_time[1]}s")
print("="*70 + "\n")
tester.test_concurrent_users(num_users, duration, think_time_range=think_time)
tester.final_report()
def run_quick_test(base_url, num_users, duration, think_time):
"""Quick performance test - single countries only"""
tester = PerformanceTest(base_url)
print("\n" + "="*70)
print("GEOIP BAN API - QUICK PERFORMANCE TEST")
print(f"Target: {base_url}")
print("="*70 + "\n")
if not tester.available_countries:
print("ERROR: No countries available from API")
return
test_country = tester.available_countries[0]
tester.test_scenario(
"Warm-up",
"POST", "/api/generate/preview",
data={'countries': [test_country], 'app_type': 'nginx', 'app_variant': 'geo', 'aggregate': True},
count=10, concurrent=2
)
tester.test_scenario(
"Single Country (Cached)",
"POST", "/api/generate/preview",
data={'countries': [test_country], 'app_type': 'nginx', 'app_variant': 'geo', 'aggregate': True},
count=50, concurrent=10
)
tester.test_concurrent_users(num_users=num_users, duration_seconds=duration, think_time_range=think_time)
tester.final_report()
def run_full_test_suite(base_url, num_users, duration, think_time):
"""Execute complete test suite - single countries only"""
tester = PerformanceTest(base_url)
print("\n" + "="*70)
print("GEOIP BAN API - FULL PERFORMANCE TEST SUITE")
print(f"Target: {base_url}")
print("="*70 + "\n")
if not tester.available_countries:
print("ERROR: No countries available from API")
return
tester.test_scenario("Health Check", "GET", "/api/database/status", count=50, concurrent=10)
time.sleep(1)
test_country = tester.available_countries[0]
tester.test_scenario(
"Single Country (Cached)",
"POST", "/api/generate/preview",
data={'countries': [test_country], 'app_type': 'nginx', 'app_variant': 'geo', 'aggregate': True},
count=100, concurrent=20
)
time.sleep(1)
tester.test_scenario(
"Heavy Load (50 concurrent)",
"POST", "/api/generate/preview",
data={'countries': [test_country], 'app_type': 'nginx', 'app_variant': 'map', 'aggregate': True},
count=200, concurrent=50
)
time.sleep(2)
tester.log("\nSPIKE TEST - Sudden burst of 100 requests")
start = time.time()
tester.test_scenario(
"Spike Test",
"POST", "/api/generate/preview",
data={'countries': [test_country], 'app_type': 'apache', 'app_variant': '24', 'aggregate': True},
count=100, concurrent=100
)
spike_duration = time.time() - start
tester.log(f" Spike handled in: {spike_duration:.2f}s")
time.sleep(2)
tester.test_concurrent_users(num_users=num_users, duration_seconds=duration, think_time_range=think_time)
time.sleep(2)
tester.test_concurrent_users(num_users=num_users*5, duration_seconds=duration//2, think_time_range=(think_time[0]/2, think_time[1]/2))
tester.final_report()
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Performance testing for GeoIP Ban API')
parser.add_argument('--url', default='http://127.0.0.1:5000', help='API base URL')
parser.add_argument('--mode', choices=['quick', 'full', 'users'], default='quick',
help='Test mode: quick (5min), full (15min), users (simulation only)')
parser.add_argument('--users', type=int, default=10, help='Number of concurrent users')
parser.add_argument('--duration', type=int, default=60, help='Test duration in seconds')
parser.add_argument('--think-min', type=float, default=1.0, help='Minimum think time between requests (seconds)')
parser.add_argument('--think-max', type=float, default=5.0, help='Maximum think time between requests (seconds)')
args = parser.parse_args()
think_time = (args.think_min, args.think_max)
try:
if args.mode == 'quick':
run_quick_test(args.url, args.users, args.duration, think_time)
elif args.mode == 'full':
run_full_test_suite(args.url, args.users, args.duration, think_time)
elif args.mode == 'users':
run_user_simulation(args.url, args.users, args.duration, think_time)
except KeyboardInterrupt:
print("\n\nTest interrupted by user")
sys.exit(1)
def run_full_test_suite(base_url):
"""Execute complete test suite - single countries only"""
tester = PerformanceTest(base_url)
print("\n" + "="*70)
print("GEOIP BAN API - FULL PERFORMANCE TEST SUITE")
print(f"Target: {base_url}")
print("="*70 + "\n")
if not tester.available_countries:
print("ERROR: No countries available from API")
return
tester.test_scenario("Health Check", "GET", "/api/database/status", count=50, concurrent=10)
time.sleep(1)
test_country = tester.available_countries[0]
tester.test_scenario(
"Single Country (Cached)",
"POST", "/api/generate/preview",
data={'countries': [test_country], 'app_type': 'nginx', 'app_variant': 'geo', 'aggregate': True},
count=100, concurrent=20
)
time.sleep(1)
tester.test_scenario(
"Heavy Load (50 concurrent)",
"POST", "/api/generate/preview",
data={'countries': [test_country], 'app_type': 'nginx', 'app_variant': 'map', 'aggregate': True},
count=200, concurrent=50
)
time.sleep(2)
tester.log("\nSPIKE TEST - Sudden burst of 100 requests")
start = time.time()
tester.test_scenario(
"Spike Test",
"POST", "/api/generate/preview",
data={'countries': [test_country], 'app_type': 'apache', 'app_variant': '24', 'aggregate': True},
count=100, concurrent=100
)
spike_duration = time.time() - start
tester.log(f" Spike handled in: {spike_duration:.2f}s")
time.sleep(2)
tester.test_concurrent_users(num_users=10, duration_seconds=60, think_time_range=(1, 5))
time.sleep(2)
tester.test_concurrent_users(num_users=50, duration_seconds=30, think_time_range=(0.5, 2))
tester.final_report()
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Performance testing for GeoIP Ban API')
parser.add_argument('--url', default='http://127.0.0.1:5000', help='API base URL')
parser.add_argument('--mode', choices=['quick', 'full', 'users'], default='quick',
help='Test mode: quick (5min), full (15min), users (simulation only)')
parser.add_argument('--users', type=int, default=10, help='Number of concurrent users')
parser.add_argument('--duration', type=int, default=60, help='Test duration in seconds')
parser.add_argument('--think-min', type=float, default=1.0, help='Minimum think time between requests (seconds)')
parser.add_argument('--think-max', type=float, default=5.0, help='Maximum think time between requests (seconds)')
args = parser.parse_args()
think_time = (args.think_min, args.think_max)
try:
if args.mode == 'quick':
run_quick_test(args.url, args.users, args.duration, think_time)
elif args.mode == 'full':
run_full_test_suite(args.url, args.users, args.duration, think_time)
elif args.mode == 'users':
run_user_simulation(args.url, args.users, args.duration, think_time)
except KeyboardInterrupt:
print("\n\nTest interrupted by user")
sys.exit(1)