452 lines
14 KiB
Python
452 lines
14 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
TLS/SSL Scanner @linuxiarz.pl, Mateusz Gruszczyński
|
|
|
|
"""
|
|
|
|
import argparse
|
|
import socket
|
|
import ssl
|
|
from datetime import datetime, timezone
|
|
from typing import List, Dict, Tuple, Optional
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
import multiprocessing as mp
|
|
|
|
from cryptography import x509
|
|
from cryptography.hazmat.backends import default_backend
|
|
from cryptography.hazmat.primitives.serialization import Encoding
|
|
from cryptography.x509.oid import NameOID, ExtensionOID
|
|
|
|
|
|
# -----------------------------
|
|
# Helpers: TLS connectivity
|
|
# -----------------------------
|
|
def _connect(ctx: ssl.SSLContext, host: str, port: int, timeout: float = 8.0) -> ssl.SSLSocket:
|
|
sock = socket.create_connection((host, port), timeout=timeout)
|
|
try:
|
|
ssock = ctx.wrap_socket(sock, server_hostname=host)
|
|
return ssock
|
|
except Exception:
|
|
sock.close()
|
|
raise
|
|
|
|
|
|
def test_protocol_versions(host: str, port: int) -> List[str]:
|
|
"""Return supported versions in descending preference order, e.g. ['TLSv1.3','TLSv1.2']."""
|
|
supported: List[str] = []
|
|
|
|
def _try(ver: ssl.TLSVersion) -> Optional[str]:
|
|
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
|
|
ctx.check_hostname = False
|
|
ctx.verify_mode = ssl.CERT_NONE
|
|
ctx.minimum_version = ver
|
|
ctx.maximum_version = ver
|
|
try:
|
|
with _connect(ctx, host, port, timeout=8.0) as s:
|
|
return s.version()
|
|
except Exception:
|
|
return None
|
|
|
|
for v in (ssl.TLSVersion.TLSv1_3, ssl.TLSVersion.TLSv1_2):
|
|
r = _try(v)
|
|
if r:
|
|
supported.append(r)
|
|
|
|
return supported
|
|
|
|
|
|
def get_negotiated_cipher(host: str, port: int) -> List[str]:
|
|
"""Return negotiated cipher name as 1-element list, or empty."""
|
|
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
|
|
ctx.check_hostname = False
|
|
ctx.verify_mode = ssl.CERT_NONE
|
|
try:
|
|
with _connect(ctx, host, port, timeout=8.0) as s:
|
|
c = s.cipher()
|
|
return [c[0]] if c and c[0] else []
|
|
except Exception:
|
|
return []
|
|
|
|
|
|
# -----------------------------
|
|
# Certificate parsing
|
|
# -----------------------------
|
|
def _get_cn(name: x509.Name) -> str:
|
|
try:
|
|
attrs = name.get_attributes_for_oid(NameOID.COMMON_NAME)
|
|
return attrs[0].value if attrs else ""
|
|
except Exception:
|
|
return ""
|
|
|
|
|
|
def is_self_signed(cert: x509.Certificate) -> bool:
|
|
return cert.issuer == cert.subject
|
|
|
|
|
|
def analyze_leaf_cert(leaf_der: bytes) -> Dict:
|
|
if not leaf_der:
|
|
return {"expired": True, "days_left": 0, "key_strong": False}
|
|
|
|
try:
|
|
cert = x509.load_der_x509_certificate(leaf_der, default_backend())
|
|
now_utc = datetime.now(timezone.utc)
|
|
|
|
pub = cert.public_key()
|
|
key_size = getattr(pub, "key_size", None)
|
|
key_type = pub.__class__.__name__.replace("PublicKey", "")
|
|
|
|
sans = []
|
|
try:
|
|
san_ext = cert.extensions.get_extension_for_oid(ExtensionOID.SUBJECT_ALTERNATIVE_NAME)
|
|
sans = san_ext.value.get_values_for_type(x509.DNSName)
|
|
except Exception:
|
|
pass
|
|
|
|
sig_name = getattr(cert.signature_algorithm_oid, "_name", "") or ""
|
|
weak_sig = "sha1" in sig_name.lower()
|
|
|
|
expired = now_utc > cert.not_valid_after_utc
|
|
days_left = max(0, int((cert.not_valid_after_utc - now_utc).total_seconds() / 86400))
|
|
|
|
key_strong = True
|
|
if key_size is not None:
|
|
key_strong = key_size >= 2048 if "RSA" in key_type.upper() else key_size >= 256
|
|
|
|
return {
|
|
"cn": _get_cn(cert.subject) or "N/A",
|
|
"expired": expired,
|
|
"days_left": days_left,
|
|
"not_after": cert.not_valid_after_utc.isoformat(),
|
|
"key_type": key_type or "N/A",
|
|
"key_size": key_size or "N/A",
|
|
"key_strong": key_strong,
|
|
"weak_sig": weak_sig,
|
|
"san_count": len(sans),
|
|
}
|
|
except Exception:
|
|
return {"expired": True, "days_left": 0, "key_strong": False}
|
|
|
|
|
|
def get_server_chain(host: str, port: int) -> Tuple[List[x509.Certificate], Optional[str]]:
|
|
"""
|
|
Server-sent chain in exact order [leaf, inter1, inter2, ...].
|
|
|
|
"""
|
|
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
|
|
ctx.check_hostname = False
|
|
ctx.verify_mode = ssl.CERT_NONE
|
|
|
|
with _connect(ctx, host, port, timeout=10.0) as ssock:
|
|
negotiated_version = ssock.version()
|
|
|
|
chain_objs = ssock.get_unverified_chain()
|
|
if not chain_objs:
|
|
leaf_der = ssock.getpeercert(binary_form=True)
|
|
if not leaf_der:
|
|
return ([], negotiated_version)
|
|
cert = x509.load_der_x509_certificate(leaf_der, default_backend())
|
|
return ([cert], negotiated_version)
|
|
|
|
# If single bytes element was returned
|
|
if isinstance(chain_objs, (bytes, bytearray)):
|
|
chain_iter = [chain_objs]
|
|
else:
|
|
chain_iter = chain_objs
|
|
|
|
certs: List[x509.Certificate] = []
|
|
for c in chain_iter:
|
|
if isinstance(c, (bytes, bytearray)):
|
|
der = bytes(c)
|
|
elif hasattr(c, "public_bytes"):
|
|
der = c.public_bytes(Encoding.DER)
|
|
else:
|
|
continue
|
|
certs.append(x509.load_der_x509_certificate(der, default_backend()))
|
|
|
|
return (certs, negotiated_version)
|
|
|
|
|
|
def validate_chain_sequence(certs: List[x509.Certificate]) -> Dict:
|
|
"""
|
|
Validates ONLY the sequence (issuer/subject links) and flags roots in the middle.
|
|
"""
|
|
if not certs:
|
|
return {
|
|
"sequence_ok": False,
|
|
"errors": ["empty chain"],
|
|
"length": 0,
|
|
"last_self_signed": False,
|
|
"root_in_middle": False,
|
|
"root_indices": [],
|
|
}
|
|
|
|
errors: List[str] = []
|
|
for i in range(len(certs) - 1):
|
|
if certs[i].issuer != certs[i + 1].subject:
|
|
exp = certs[i].issuer.rfc4514_string()
|
|
got = certs[i + 1].subject.rfc4514_string()
|
|
errors.append(f"link {i}->{i+1}: expected next subject = {exp}, got = {got}")
|
|
|
|
root_indices = [i for i in range(len(certs) - 1) if is_self_signed(certs[i])]
|
|
last = certs[-1]
|
|
last_self_signed = is_self_signed(last)
|
|
|
|
return {
|
|
"sequence_ok": len(errors) == 0,
|
|
"errors": errors,
|
|
"length": len(certs),
|
|
"last_self_signed": last_self_signed,
|
|
"root_in_middle": len(root_indices) > 0,
|
|
"root_indices": root_indices,
|
|
}
|
|
|
|
|
|
def summarize_cert(cert: x509.Certificate) -> Dict:
|
|
now_utc = datetime.now(timezone.utc)
|
|
days_left = max(0, int((cert.not_valid_after_utc - now_utc).total_seconds() / 86400))
|
|
|
|
pub = cert.public_key()
|
|
key_size = getattr(pub, "key_size", None)
|
|
key_type = pub.__class__.__name__.replace("PublicKey", "")
|
|
|
|
sig_name = getattr(cert.signature_algorithm_oid, "_name", "") or ""
|
|
|
|
return {
|
|
"subject_cn": _get_cn(cert.subject) or "-",
|
|
"issuer_cn": _get_cn(cert.issuer) or "-",
|
|
"not_after": cert.not_valid_after_utc.strftime("%Y-%m-%d"),
|
|
"days_left": days_left,
|
|
"key": f"{key_type}-{key_size}" if key_size else f"{key_type}",
|
|
"sig": sig_name or "-",
|
|
}
|
|
|
|
|
|
# -----------------------------
|
|
# Reporting: tables
|
|
# -----------------------------
|
|
def _table(rows: List[List[str]], headers: List[str]) -> str:
|
|
widths = [len(h) for h in headers]
|
|
for r in rows:
|
|
for i, cell in enumerate(r):
|
|
widths[i] = max(widths[i], len(cell))
|
|
|
|
def fmt_row(r):
|
|
return "| " + " | ".join(r[i].ljust(widths[i]) for i in range(len(headers))) + " |"
|
|
|
|
sep = "|-" + "-|-".join("-" * w for w in widths) + "-|"
|
|
out = [fmt_row(headers), sep]
|
|
out += [fmt_row(r) for r in rows]
|
|
return "\n".join(out)
|
|
|
|
|
|
# -----------------------------
|
|
# Scoring (simple, deterministic)
|
|
# -----------------------------
|
|
def compute_score(protocols: List[str], ciphers: List[str], leaf_info: Dict, chain_seq: Dict) -> Tuple[float, str, Dict[str, int]]:
|
|
score_breakdown = {
|
|
"base": 100,
|
|
"proto": 0, # -20 if no TLSv1.2+
|
|
"cipher": 0, # -25 if negotiated cipher is obviously weak
|
|
"key": 0, # -15 if weak key
|
|
"chain": 0, # -30 if missing or order broken
|
|
"cert": 0, # -25 if expired; -10 if <30 days
|
|
"sig": 0, # -10 if LEAF is SHA1
|
|
}
|
|
|
|
if not any(p in ("TLSv1.2", "TLSv1.3") for p in protocols):
|
|
score_breakdown["proto"] = -20
|
|
|
|
if ciphers:
|
|
c = ciphers[0].upper()
|
|
if "RC4" in c or "3DES" in c or " DES " in f" {c} ":
|
|
score_breakdown["cipher"] = -25
|
|
|
|
if not leaf_info.get("key_strong", False):
|
|
score_breakdown["key"] = -15
|
|
|
|
if not (chain_seq.get("length", 0) >= 2 and chain_seq.get("sequence_ok", False)):
|
|
score_breakdown["chain"] = -30
|
|
|
|
if leaf_info.get("expired", True):
|
|
score_breakdown["cert"] -= 25
|
|
else:
|
|
if leaf_info.get("days_left", 0) < 30:
|
|
score_breakdown["cert"] -= 10
|
|
|
|
if leaf_info.get("weak_sig", False):
|
|
score_breakdown["sig"] = -10
|
|
|
|
score = float(sum(score_breakdown.values()))
|
|
grade = "A+" if score >= 95 else "A" if score >= 85 else "B" if score >= 70 else "C" if score >= 55 else "F"
|
|
return round(score, 1), grade, score_breakdown
|
|
|
|
|
|
# -----------------------------
|
|
# Scan one host
|
|
# -----------------------------
|
|
def scan_host(host: str, port: int) -> Dict:
|
|
protocols = test_protocol_versions(host, port)
|
|
ciphers = get_negotiated_cipher(host, port)
|
|
|
|
leaf_der = b""
|
|
try:
|
|
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
|
|
ctx.check_hostname = False
|
|
ctx.verify_mode = ssl.CERT_NONE
|
|
with _connect(ctx, host, port, timeout=8.0) as s:
|
|
leaf_der = s.getpeercert(binary_form=True) or b""
|
|
except Exception:
|
|
pass
|
|
leaf_info = analyze_leaf_cert(leaf_der)
|
|
|
|
chain_certs: List[x509.Certificate] = []
|
|
negotiated = None
|
|
chain_err = None
|
|
try:
|
|
chain_certs, negotiated = get_server_chain(host, port)
|
|
except Exception as e:
|
|
chain_err = str(e)
|
|
|
|
chain_seq = validate_chain_sequence(chain_certs)
|
|
score, grade, score_breakdown = compute_score(protocols, ciphers, leaf_info, chain_seq)
|
|
|
|
chain_rows: List[List[str]] = []
|
|
for i, cert in enumerate(chain_certs):
|
|
s = summarize_cert(cert)
|
|
|
|
if i == 0:
|
|
role = "LEAF"
|
|
else:
|
|
role = f"INT{i}"
|
|
if is_self_signed(cert):
|
|
role = "ROOT" if i == len(chain_certs) - 1 else "ROOT!"
|
|
|
|
expected_next_cn = s["issuer_cn"]
|
|
next_is_cn = summarize_cert(chain_certs[i + 1])["subject_cn"] if i + 1 < len(chain_certs) else "-"
|
|
|
|
chain_rows.append([
|
|
str(i),
|
|
role,
|
|
s["subject_cn"][:32],
|
|
s["issuer_cn"][:32],
|
|
expected_next_cn[:26],
|
|
next_is_cn[:26],
|
|
s["not_after"],
|
|
f"{s['days_left']}d",
|
|
s["key"],
|
|
s["sig"][:18],
|
|
])
|
|
|
|
return {
|
|
"host": host,
|
|
"port": port,
|
|
"protocols": protocols,
|
|
"negotiated_version": negotiated,
|
|
"ciphers": ciphers,
|
|
"leaf_cert": leaf_info,
|
|
"chain_seq": chain_seq,
|
|
"chain_rows": chain_rows,
|
|
"chain_error": chain_err,
|
|
"score": score,
|
|
"grade": grade,
|
|
"score_breakdown": score_breakdown,
|
|
}
|
|
|
|
|
|
# -----------------------------
|
|
# Print results
|
|
# -----------------------------
|
|
def print_results(results: List[Dict]) -> None:
|
|
print("\n" + "=" * 90)
|
|
print("TLS / CERT / CHAIN REPORT")
|
|
print("=" * 90)
|
|
|
|
# Summary
|
|
sum_rows: List[List[str]] = []
|
|
for r in results:
|
|
proto = ",".join(r["protocols"]) if r["protocols"] else "NONE"
|
|
cipher = r["ciphers"][0] if r["ciphers"] else "-"
|
|
chain_len = r["chain_seq"]["length"]
|
|
chain_ok = "YES" if (chain_len >= 2 and r["chain_seq"]["sequence_ok"]) else "NO"
|
|
cert_days = "EXPIRED" if r["leaf_cert"].get("expired") else f"{r['leaf_cert'].get('days_left', 0)}d"
|
|
sum_rows.append([
|
|
f"{r['host']}:{r['port']}",
|
|
r["grade"],
|
|
f"{r['score']}",
|
|
proto,
|
|
cipher[:22],
|
|
chain_ok,
|
|
str(chain_len),
|
|
cert_days,
|
|
])
|
|
|
|
print(_table(
|
|
sum_rows,
|
|
["Host", "Grade", "Score", "Proto", "Cipher", "ChainOK", "Len", "Cert"]
|
|
))
|
|
|
|
for r in results:
|
|
print("\n" + "-" * 90)
|
|
print(f"{r['host']}:{r['port']} Grade={r['grade']} Score={r['score']}")
|
|
if r["chain_error"]:
|
|
print(f"CHAIN FETCH ERROR: {r['chain_error']}")
|
|
continue
|
|
|
|
seq = r["chain_seq"]
|
|
if seq["errors"] or seq["root_in_middle"]:
|
|
print("CHAIN DIAGNOSTICS:")
|
|
for e in seq["errors"]:
|
|
print(f" - {e}")
|
|
if seq["root_in_middle"]:
|
|
idxs = ", ".join(str(i) for i in seq["root_indices"])
|
|
print(f" - root in the middle at index(es): {idxs}")
|
|
|
|
if not r["chain_rows"]:
|
|
print("No chain data.")
|
|
continue
|
|
|
|
print(_table(
|
|
r["chain_rows"],
|
|
["#", "Role", "Subject(CN)", "Issuer(CN)", "ExpectedNext", "NextIs", "NotAfter", "Left", "Key", "Sig"]
|
|
))
|
|
|
|
br = r["score_breakdown"]
|
|
print(f"Breakdown: base={br['base']} proto={br['proto']} cipher={br['cipher']} key={br['key']} chain={br['chain']} cert={br['cert']} sig={br['sig']}")
|
|
|
|
|
|
# -----------------------------
|
|
# CLI
|
|
# -----------------------------
|
|
def parse_targets(args_targets: List[str], default_port: int) -> List[Tuple[str, int]]:
|
|
out: List[Tuple[str, int]] = []
|
|
for t in args_targets:
|
|
t = t.strip()
|
|
if not t:
|
|
continue
|
|
if ":" in t:
|
|
h, p = t.rsplit(":", 1)
|
|
out.append((h.strip(), int(p.strip())))
|
|
else:
|
|
out.append((t, default_port))
|
|
return out
|
|
|
|
|
|
def main() -> None:
|
|
parser = argparse.ArgumentParser(description="TLS Scanner (Python 3.14) with server chain + order validation")
|
|
parser.add_argument("targets", nargs="+", help="host or host:port")
|
|
parser.add_argument("-p", "--port", type=int, default=443, help="default port")
|
|
parser.add_argument("-w", "--workers", type=int, default=min(8, mp.cpu_count()), help="threads")
|
|
args = parser.parse_args()
|
|
|
|
targets = parse_targets(args.targets, args.port)
|
|
|
|
with ThreadPoolExecutor(max_workers=max(1, args.workers)) as ex:
|
|
futures = [ex.submit(scan_host, h, p) for (h, p) in targets]
|
|
results = [f.result() for f in futures]
|
|
|
|
print_results(results)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|