#!/usr/bin/env python3 """ TLS/SSL Scanner @linuxiarz.pl, Mateusz GruszczyƄski """ import argparse import socket import ssl from datetime import datetime, timezone from typing import List, Dict, Tuple, Optional from concurrent.futures import ThreadPoolExecutor import multiprocessing as mp from cryptography import x509 from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives.serialization import Encoding from cryptography.x509.oid import NameOID, ExtensionOID # ----------------------------- # Helpers: TLS connectivity # ----------------------------- def _connect(ctx: ssl.SSLContext, host: str, port: int, timeout: float = 8.0) -> ssl.SSLSocket: sock = socket.create_connection((host, port), timeout=timeout) try: ssock = ctx.wrap_socket(sock, server_hostname=host) return ssock except Exception: sock.close() raise def test_protocol_versions(host: str, port: int) -> List[str]: """Return supported versions in descending preference order, e.g. ['TLSv1.3','TLSv1.2'].""" supported: List[str] = [] def _try(ver: ssl.TLSVersion) -> Optional[str]: ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE ctx.minimum_version = ver ctx.maximum_version = ver try: with _connect(ctx, host, port, timeout=8.0) as s: return s.version() except Exception: return None for v in (ssl.TLSVersion.TLSv1_3, ssl.TLSVersion.TLSv1_2): r = _try(v) if r: supported.append(r) return supported def get_negotiated_cipher(host: str, port: int) -> List[str]: """Return negotiated cipher name as 1-element list, or empty.""" ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE try: with _connect(ctx, host, port, timeout=8.0) as s: c = s.cipher() return [c[0]] if c and c[0] else [] except Exception: return [] # ----------------------------- # Certificate parsing # ----------------------------- def _get_cn(name: x509.Name) -> str: try: attrs = name.get_attributes_for_oid(NameOID.COMMON_NAME) return attrs[0].value if attrs else "" except Exception: return "" def is_self_signed(cert: x509.Certificate) -> bool: return cert.issuer == cert.subject def analyze_leaf_cert(leaf_der: bytes) -> Dict: if not leaf_der: return {"expired": True, "days_left": 0, "key_strong": False} try: cert = x509.load_der_x509_certificate(leaf_der, default_backend()) now_utc = datetime.now(timezone.utc) pub = cert.public_key() key_size = getattr(pub, "key_size", None) key_type = pub.__class__.__name__.replace("PublicKey", "") sans = [] try: san_ext = cert.extensions.get_extension_for_oid(ExtensionOID.SUBJECT_ALTERNATIVE_NAME) sans = san_ext.value.get_values_for_type(x509.DNSName) except Exception: pass sig_name = getattr(cert.signature_algorithm_oid, "_name", "") or "" weak_sig = "sha1" in sig_name.lower() expired = now_utc > cert.not_valid_after_utc days_left = max(0, int((cert.not_valid_after_utc - now_utc).total_seconds() / 86400)) key_strong = True if key_size is not None: key_strong = key_size >= 2048 if "RSA" in key_type.upper() else key_size >= 256 return { "cn": _get_cn(cert.subject) or "N/A", "expired": expired, "days_left": days_left, "not_after": cert.not_valid_after_utc.isoformat(), "key_type": key_type or "N/A", "key_size": key_size or "N/A", "key_strong": key_strong, "weak_sig": weak_sig, "san_count": len(sans), } except Exception: return {"expired": True, "days_left": 0, "key_strong": False} def get_server_chain(host: str, port: int) -> Tuple[List[x509.Certificate], Optional[str]]: """ Server-sent chain in exact order [leaf, inter1, inter2, ...]. """ ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE with _connect(ctx, host, port, timeout=10.0) as ssock: negotiated_version = ssock.version() chain_objs = ssock.get_unverified_chain() if not chain_objs: leaf_der = ssock.getpeercert(binary_form=True) if not leaf_der: return ([], negotiated_version) cert = x509.load_der_x509_certificate(leaf_der, default_backend()) return ([cert], negotiated_version) # If single bytes element was returned if isinstance(chain_objs, (bytes, bytearray)): chain_iter = [chain_objs] else: chain_iter = chain_objs certs: List[x509.Certificate] = [] for c in chain_iter: if isinstance(c, (bytes, bytearray)): der = bytes(c) elif hasattr(c, "public_bytes"): der = c.public_bytes(Encoding.DER) else: continue certs.append(x509.load_der_x509_certificate(der, default_backend())) return (certs, negotiated_version) def validate_chain_sequence(certs: List[x509.Certificate]) -> Dict: """ Validates ONLY the sequence (issuer/subject links) and flags roots in the middle. """ if not certs: return { "sequence_ok": False, "errors": ["empty chain"], "length": 0, "last_self_signed": False, "root_in_middle": False, "root_indices": [], } errors: List[str] = [] for i in range(len(certs) - 1): if certs[i].issuer != certs[i + 1].subject: exp = certs[i].issuer.rfc4514_string() got = certs[i + 1].subject.rfc4514_string() errors.append(f"link {i}->{i+1}: expected next subject = {exp}, got = {got}") root_indices = [i for i in range(len(certs) - 1) if is_self_signed(certs[i])] last = certs[-1] last_self_signed = is_self_signed(last) return { "sequence_ok": len(errors) == 0, "errors": errors, "length": len(certs), "last_self_signed": last_self_signed, "root_in_middle": len(root_indices) > 0, "root_indices": root_indices, } def summarize_cert(cert: x509.Certificate) -> Dict: now_utc = datetime.now(timezone.utc) days_left = max(0, int((cert.not_valid_after_utc - now_utc).total_seconds() / 86400)) pub = cert.public_key() key_size = getattr(pub, "key_size", None) key_type = pub.__class__.__name__.replace("PublicKey", "") sig_name = getattr(cert.signature_algorithm_oid, "_name", "") or "" return { "subject_cn": _get_cn(cert.subject) or "-", "issuer_cn": _get_cn(cert.issuer) or "-", "not_after": cert.not_valid_after_utc.strftime("%Y-%m-%d"), "days_left": days_left, "key": f"{key_type}-{key_size}" if key_size else f"{key_type}", "sig": sig_name or "-", } # ----------------------------- # Reporting: tables # ----------------------------- def _table(rows: List[List[str]], headers: List[str]) -> str: widths = [len(h) for h in headers] for r in rows: for i, cell in enumerate(r): widths[i] = max(widths[i], len(cell)) def fmt_row(r): return "| " + " | ".join(r[i].ljust(widths[i]) for i in range(len(headers))) + " |" sep = "|-" + "-|-".join("-" * w for w in widths) + "-|" out = [fmt_row(headers), sep] out += [fmt_row(r) for r in rows] return "\n".join(out) # ----------------------------- # Scoring (simple, deterministic) # ----------------------------- def compute_score(protocols: List[str], ciphers: List[str], leaf_info: Dict, chain_seq: Dict) -> Tuple[float, str, Dict[str, int]]: score_breakdown = { "base": 100, "proto": 0, # -20 if no TLSv1.2+ "cipher": 0, # -25 if negotiated cipher is obviously weak "key": 0, # -15 if weak key "chain": 0, # -30 if missing or order broken "cert": 0, # -25 if expired; -10 if <30 days "sig": 0, # -10 if LEAF is SHA1 } if not any(p in ("TLSv1.2", "TLSv1.3") for p in protocols): score_breakdown["proto"] = -20 if ciphers: c = ciphers[0].upper() if "RC4" in c or "3DES" in c or " DES " in f" {c} ": score_breakdown["cipher"] = -25 if not leaf_info.get("key_strong", False): score_breakdown["key"] = -15 if not (chain_seq.get("length", 0) >= 2 and chain_seq.get("sequence_ok", False)): score_breakdown["chain"] = -30 if leaf_info.get("expired", True): score_breakdown["cert"] -= 25 else: if leaf_info.get("days_left", 0) < 30: score_breakdown["cert"] -= 10 if leaf_info.get("weak_sig", False): score_breakdown["sig"] = -10 score = float(sum(score_breakdown.values())) grade = "A+" if score >= 95 else "A" if score >= 85 else "B" if score >= 70 else "C" if score >= 55 else "F" return round(score, 1), grade, score_breakdown # ----------------------------- # Scan one host # ----------------------------- def scan_host(host: str, port: int) -> Dict: protocols = test_protocol_versions(host, port) ciphers = get_negotiated_cipher(host, port) leaf_der = b"" try: ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE with _connect(ctx, host, port, timeout=8.0) as s: leaf_der = s.getpeercert(binary_form=True) or b"" except Exception: pass leaf_info = analyze_leaf_cert(leaf_der) chain_certs: List[x509.Certificate] = [] negotiated = None chain_err = None try: chain_certs, negotiated = get_server_chain(host, port) except Exception as e: chain_err = str(e) chain_seq = validate_chain_sequence(chain_certs) score, grade, score_breakdown = compute_score(protocols, ciphers, leaf_info, chain_seq) chain_rows: List[List[str]] = [] for i, cert in enumerate(chain_certs): s = summarize_cert(cert) if i == 0: role = "LEAF" else: role = f"INT{i}" if is_self_signed(cert): role = "ROOT" if i == len(chain_certs) - 1 else "ROOT!" expected_next_cn = s["issuer_cn"] next_is_cn = summarize_cert(chain_certs[i + 1])["subject_cn"] if i + 1 < len(chain_certs) else "-" chain_rows.append([ str(i), role, s["subject_cn"][:32], s["issuer_cn"][:32], expected_next_cn[:26], next_is_cn[:26], s["not_after"], f"{s['days_left']}d", s["key"], s["sig"][:18], ]) return { "host": host, "port": port, "protocols": protocols, "negotiated_version": negotiated, "ciphers": ciphers, "leaf_cert": leaf_info, "chain_seq": chain_seq, "chain_rows": chain_rows, "chain_error": chain_err, "score": score, "grade": grade, "score_breakdown": score_breakdown, } # ----------------------------- # Print results # ----------------------------- def print_results(results: List[Dict]) -> None: print("\n" + "=" * 90) print("TLS / CERT / CHAIN REPORT") print("=" * 90) # Summary sum_rows: List[List[str]] = [] for r in results: proto = ",".join(r["protocols"]) if r["protocols"] else "NONE" cipher = r["ciphers"][0] if r["ciphers"] else "-" chain_len = r["chain_seq"]["length"] chain_ok = "YES" if (chain_len >= 2 and r["chain_seq"]["sequence_ok"]) else "NO" cert_days = "EXPIRED" if r["leaf_cert"].get("expired") else f"{r['leaf_cert'].get('days_left', 0)}d" sum_rows.append([ f"{r['host']}:{r['port']}", r["grade"], f"{r['score']}", proto, cipher[:22], chain_ok, str(chain_len), cert_days, ]) print(_table( sum_rows, ["Host", "Grade", "Score", "Proto", "Cipher", "ChainOK", "Len", "Cert"] )) for r in results: print("\n" + "-" * 90) print(f"{r['host']}:{r['port']} Grade={r['grade']} Score={r['score']}") if r["chain_error"]: print(f"CHAIN FETCH ERROR: {r['chain_error']}") continue seq = r["chain_seq"] if seq["errors"] or seq["root_in_middle"]: print("CHAIN DIAGNOSTICS:") for e in seq["errors"]: print(f" - {e}") if seq["root_in_middle"]: idxs = ", ".join(str(i) for i in seq["root_indices"]) print(f" - root in the middle at index(es): {idxs}") if not r["chain_rows"]: print("No chain data.") continue print(_table( r["chain_rows"], ["#", "Role", "Subject(CN)", "Issuer(CN)", "ExpectedNext", "NextIs", "NotAfter", "Left", "Key", "Sig"] )) br = r["score_breakdown"] print(f"Breakdown: base={br['base']} proto={br['proto']} cipher={br['cipher']} key={br['key']} chain={br['chain']} cert={br['cert']} sig={br['sig']}") # ----------------------------- # CLI # ----------------------------- def parse_targets(args_targets: List[str], default_port: int) -> List[Tuple[str, int]]: out: List[Tuple[str, int]] = [] for t in args_targets: t = t.strip() if not t: continue if ":" in t: h, p = t.rsplit(":", 1) out.append((h.strip(), int(p.strip()))) else: out.append((t, default_port)) return out def main() -> None: parser = argparse.ArgumentParser(description="TLS Scanner (Python 3.14) with server chain + order validation") parser.add_argument("targets", nargs="+", help="host or host:port") parser.add_argument("-p", "--port", type=int, default=443, help="default port") parser.add_argument("-w", "--workers", type=int, default=min(8, mp.cpu_count()), help="threads") args = parser.parse_args() targets = parse_targets(args.targets, args.port) with ThreadPoolExecutor(max_workers=max(1, args.workers)) as ex: futures = [ex.submit(scan_host, h, p) for (h, p) in targets] results = [f.result() for f in futures] print_results(results) if __name__ == "__main__": main()