import socket import subprocess def whois_lookup(ip: str) -> str: try: result = subprocess.run(['whois', ip], capture_output=True, text=True, timeout=5) return result.stdout except subprocess.TimeoutExpired: return '' except FileNotFoundError: print('WARNING: whois command not found. Install it in the container/image.') return '' except Exception as exc: print(f'WHOIS error for {ip}: {exc}') return '' def cymru_lookup(ips: list[str]) -> dict[str, dict[str, str]]: results: dict[str, dict[str, str]] = {} if not ips: return results try: query = 'begin\nverbose\n' + '\n'.join(ips) + '\nend\n' sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(10) sock.connect(('whois.cymru.com', 43)) sock.sendall(query.encode()) response = b'' while True: data = sock.recv(4096) if not data: break response += data sock.close() for line in response.decode('utf-8', errors='ignore').split('\n'): if '|' in line and not line.startswith('AS'): parts = [p.strip() for p in line.split('|')] if len(parts) >= 5: asn, ip, prefix, cc, owner = parts[0], parts[1], parts[2], parts[3], parts[4] if asn.isdigit(): asn = f'AS{asn}' results[ip] = { 'asn': asn, 'prefix': prefix, 'country': cc, 'owner': owner, } except socket.timeout: print('WARNING: Team Cymru timeout. Using fallback WHOIS.') except Exception as exc: print(f'Team Cymru lookup error: {exc}') return results