Files
ip-whois/ip_analyzer_app/services/lookups.py
Mateusz Gruszczyński c0ac69fce7 ip user info
2026-04-15 08:47:45 +02:00

58 lines
1.8 KiB
Python

import socket
import subprocess
def whois_lookup(ip: str) -> str:
try:
result = subprocess.run(['whois', ip], capture_output=True, text=True, timeout=5)
return result.stdout
except subprocess.TimeoutExpired:
return ''
except FileNotFoundError:
print('WARNING: whois command not found. Install it in the container/image.')
return ''
except Exception as exc:
print(f'WHOIS error for {ip}: {exc}')
return ''
def cymru_lookup(ips: list[str]) -> dict[str, dict[str, str]]:
results: dict[str, dict[str, str]] = {}
if not ips:
return results
try:
query = 'begin\nverbose\n' + '\n'.join(ips) + '\nend\n'
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(10)
sock.connect(('whois.cymru.com', 43))
sock.sendall(query.encode())
response = b''
while True:
data = sock.recv(4096)
if not data:
break
response += data
sock.close()
for line in response.decode('utf-8', errors='ignore').split('\n'):
if '|' in line and not line.startswith('AS'):
parts = [p.strip() for p in line.split('|')]
if len(parts) >= 5:
asn, ip, prefix, cc, owner = parts[0], parts[1], parts[2], parts[3], parts[4]
if asn.isdigit():
asn = f'AS{asn}'
results[ip] = {
'asn': asn,
'prefix': prefix,
'country': cc,
'owner': owner,
}
except socket.timeout:
print('WARNING: Team Cymru timeout. Using fallback WHOIS.')
except Exception as exc:
print(f'Team Cymru lookup error: {exc}')
return results