58 lines
1.8 KiB
Python
58 lines
1.8 KiB
Python
import socket
|
|
import subprocess
|
|
|
|
|
|
def whois_lookup(ip: str) -> str:
|
|
try:
|
|
result = subprocess.run(['whois', ip], capture_output=True, text=True, timeout=5)
|
|
return result.stdout
|
|
except subprocess.TimeoutExpired:
|
|
return ''
|
|
except FileNotFoundError:
|
|
print('WARNING: whois command not found. Install it in the container/image.')
|
|
return ''
|
|
except Exception as exc:
|
|
print(f'WHOIS error for {ip}: {exc}')
|
|
return ''
|
|
|
|
|
|
def cymru_lookup(ips: list[str]) -> dict[str, dict[str, str]]:
|
|
results: dict[str, dict[str, str]] = {}
|
|
if not ips:
|
|
return results
|
|
|
|
try:
|
|
query = 'begin\nverbose\n' + '\n'.join(ips) + '\nend\n'
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
sock.settimeout(10)
|
|
sock.connect(('whois.cymru.com', 43))
|
|
sock.sendall(query.encode())
|
|
|
|
response = b''
|
|
while True:
|
|
data = sock.recv(4096)
|
|
if not data:
|
|
break
|
|
response += data
|
|
sock.close()
|
|
|
|
for line in response.decode('utf-8', errors='ignore').split('\n'):
|
|
if '|' in line and not line.startswith('AS'):
|
|
parts = [p.strip() for p in line.split('|')]
|
|
if len(parts) >= 5:
|
|
asn, ip, prefix, cc, owner = parts[0], parts[1], parts[2], parts[3], parts[4]
|
|
if asn.isdigit():
|
|
asn = f'AS{asn}'
|
|
results[ip] = {
|
|
'asn': asn,
|
|
'prefix': prefix,
|
|
'country': cc,
|
|
'owner': owner,
|
|
}
|
|
except socket.timeout:
|
|
print('WARNING: Team Cymru timeout. Using fallback WHOIS.')
|
|
except Exception as exc:
|
|
print(f'Team Cymru lookup error: {exc}')
|
|
|
|
return results
|