Files
geoip_block_generator/redis_cache.py
Mateusz Gruszczyński c0afc1554d first commit
2026-02-17 09:04:09 +01:00

205 lines
7.2 KiB
Python

"""
Redis Cache Handler for pre-generated GeoIP configs
"""
import redis
import json
import hashlib
from datetime import datetime
from typing import Optional, Dict, List
import config
class RedisCache:
def __init__(self):
self.redis_client = redis.Redis(
host=config.REDIS_HOST,
port=config.REDIS_PORT,
db=config.REDIS_DB,
password=config.REDIS_PASSWORD if config.REDIS_PASSWORD else None,
decode_responses=True,
socket_connect_timeout=5,
socket_timeout=5,
#decode_responses=True
)
self.default_ttl = config.REDIS_CACHE_TTL
def _generate_key(self, countries: List[str], app_type: str, aggregate: bool) -> str:
"""Generate cache key with normalization"""
normalized_countries = sorted([c.upper().strip() for c in countries])
normalized_app_type = app_type.lower().strip()
normalized_aggregate = bool(aggregate)
key_data = {
'countries': normalized_countries,
'app_type': normalized_app_type,
'aggregate': normalized_aggregate
}
key_str = json.dumps(key_data, sort_keys=True)
key_hash = hashlib.md5(key_str.encode()).hexdigest()[:16]
# DEBUG
#print(f"[CACHE KEY] {normalized_countries} + {normalized_app_type} + {normalized_aggregate} -> geoip:config:{key_hash}", flush=True)
return f"geoip:config:{key_hash}"
def get_cached_config(self, countries: List[str], app_type: str, aggregate: bool) -> Optional[Dict]:
"""Get pre-generated config from Redis"""
try:
cache_key = self._generate_key(countries, app_type, aggregate)
data = self.redis_client.get(cache_key)
if data:
cached = json.loads(data)
print(f"[REDIS] Cache HIT: {cache_key}", flush=True)
return cached
print(f"[REDIS] Cache MISS: {cache_key}", flush=True)
return None
except redis.RedisError as e:
print(f"[REDIS] Error getting cache: {e}", flush=True)
return None
def save_config(self, countries: List[str], app_type: str, aggregate: bool,
config_text: str, stats: Dict, ttl: Optional[int] = None) -> bool:
"""Save generated config to Redis"""
try:
cache_key = self._generate_key(countries, app_type, aggregate)
cache_data = {
'config': config_text,
'stats': stats,
'generated_at': datetime.now().isoformat(),
'countries': sorted(countries),
'app_type': app_type,
'aggregate': aggregate
}
ttl = ttl or self.default_ttl
self.redis_client.setex(
cache_key,
ttl,
json.dumps(cache_data, ensure_ascii=False)
)
print(f"[REDIS] Saved config: {cache_key} (TTL: {ttl}s)", flush=True)
return True
except redis.RedisError as e:
print(f"[REDIS] Error saving cache: {e}", flush=True)
return False
def invalidate_country(self, country_code: str) -> int:
"""Invalidate all cached configs containing specific country"""
try:
pattern = f"geoip:config:*"
deleted = 0
for key in self.redis_client.scan_iter(match=pattern, count=100):
data = self.redis_client.get(key)
if data:
try:
cached = json.loads(data)
if country_code in cached.get('countries', []):
self.redis_client.delete(key)
deleted += 1
except:
continue
print(f"[REDIS] Invalidated {deleted} cache entries for {country_code}", flush=True)
return deleted
except redis.RedisError as e:
print(f"[REDIS] Error invalidating cache: {e}", flush=True)
return 0
def get_cache_stats(self) -> Dict:
"""Get Redis cache statistics"""
try:
pattern = f"geoip:config:*"
keys = list(self.redis_client.scan_iter(match=pattern, count=1000))
total_size = 0
entries = []
for key in keys[:100]:
try:
data = self.redis_client.get(key)
ttl = self.redis_client.ttl(key)
if data:
cached = json.loads(data)
size = len(data)
total_size += size
entries.append({
'countries': cached.get('countries', []),
'app_type': cached.get('app_type'),
'aggregate': cached.get('aggregate'),
'generated_at': cached.get('generated_at'),
'size_bytes': size,
'ttl_seconds': ttl
})
except:
continue
return {
'total_entries': len(keys),
'total_size_bytes': total_size,
'total_size_mb': round(total_size / 1024 / 1024, 2),
'entries_sample': entries[:20]
}
except redis.RedisError as e:
print(f"[REDIS] Error getting stats: {e}", flush=True)
return {'error': str(e)}
def flush_all(self):
"""Flush all geoban-related keys from Redis"""
try:
patterns = [
'geoban:country:*',
'geoban:config:*',
'geoip:config:*'
]
deleted = 0
for pattern in patterns:
cursor = 0
while True:
cursor, keys = self.redis_client.scan(cursor, match=pattern, count=1000)
if keys:
deleted += self.redis_client.delete(*keys)
if cursor == 0:
break
print(f"[REDIS] Flushed {deleted} keys", flush=True)
return True
except Exception as e:
print(f"[REDIS] Flush error: {e}", flush=True)
return False
def health_check(self) -> Dict:
"""Check Redis connection health"""
try:
self.redis_client.ping()
info = self.redis_client.info('memory')
return {
'status': 'healthy',
'connected': True,
'memory_used_mb': round(info.get('used_memory', 0) / 1024 / 1024, 2),
'memory_peak_mb': round(info.get('used_memory_peak', 0) / 1024 / 1024, 2)
}
except redis.RedisError as e:
return {
'status': 'unhealthy',
'connected': False,
'error': str(e)
}