From 4ac8299e866747aeb659f5301ea3651a0ab1d1a3 Mon Sep 17 00:00:00 2001 From: gru Date: Mon, 2 Mar 2026 11:59:44 +0100 Subject: [PATCH] Add ssl_cert_check.py --- ssl_cert_check.py | 451 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 451 insertions(+) create mode 100644 ssl_cert_check.py diff --git a/ssl_cert_check.py b/ssl_cert_check.py new file mode 100644 index 0000000..7f026aa --- /dev/null +++ b/ssl_cert_check.py @@ -0,0 +1,451 @@ +#!/usr/bin/env python3 +""" +TLS/SSL Scanner + +""" + +import argparse +import socket +import ssl +from datetime import datetime, timezone +from typing import List, Dict, Tuple, Optional +from concurrent.futures import ThreadPoolExecutor +import multiprocessing as mp + +from cryptography import x509 +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives.serialization import Encoding +from cryptography.x509.oid import NameOID, ExtensionOID + + +# ----------------------------- +# Helpers: TLS connectivity +# ----------------------------- +def _connect(ctx: ssl.SSLContext, host: str, port: int, timeout: float = 8.0) -> ssl.SSLSocket: + sock = socket.create_connection((host, port), timeout=timeout) + try: + ssock = ctx.wrap_socket(sock, server_hostname=host) + return ssock + except Exception: + sock.close() + raise + + +def test_protocol_versions(host: str, port: int) -> List[str]: + """Return supported versions in descending preference order, e.g. ['TLSv1.3','TLSv1.2'].""" + supported: List[str] = [] + + def _try(ver: ssl.TLSVersion) -> Optional[str]: + ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) + ctx.check_hostname = False + ctx.verify_mode = ssl.CERT_NONE + ctx.minimum_version = ver + ctx.maximum_version = ver + try: + with _connect(ctx, host, port, timeout=8.0) as s: + return s.version() + except Exception: + return None + + for v in (ssl.TLSVersion.TLSv1_3, ssl.TLSVersion.TLSv1_2): + r = _try(v) + if r: + supported.append(r) + + return supported + + +def get_negotiated_cipher(host: str, port: int) -> List[str]: + """Return negotiated cipher name as 1-element list, or empty.""" + ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) + ctx.check_hostname = False + ctx.verify_mode = ssl.CERT_NONE + try: + with _connect(ctx, host, port, timeout=8.0) as s: + c = s.cipher() + return [c[0]] if c and c[0] else [] + except Exception: + return [] + + +# ----------------------------- +# Certificate parsing +# ----------------------------- +def _get_cn(name: x509.Name) -> str: + try: + attrs = name.get_attributes_for_oid(NameOID.COMMON_NAME) + return attrs[0].value if attrs else "" + except Exception: + return "" + + +def is_self_signed(cert: x509.Certificate) -> bool: + return cert.issuer == cert.subject + + +def analyze_leaf_cert(leaf_der: bytes) -> Dict: + if not leaf_der: + return {"expired": True, "days_left": 0, "key_strong": False} + + try: + cert = x509.load_der_x509_certificate(leaf_der, default_backend()) + now_utc = datetime.now(timezone.utc) + + pub = cert.public_key() + key_size = getattr(pub, "key_size", None) + key_type = pub.__class__.__name__.replace("PublicKey", "") + + sans = [] + try: + san_ext = cert.extensions.get_extension_for_oid(ExtensionOID.SUBJECT_ALTERNATIVE_NAME) + sans = san_ext.value.get_values_for_type(x509.DNSName) + except Exception: + pass + + sig_name = getattr(cert.signature_algorithm_oid, "_name", "") or "" + weak_sig = "sha1" in sig_name.lower() + + expired = now_utc > cert.not_valid_after_utc + days_left = max(0, int((cert.not_valid_after_utc - now_utc).total_seconds() / 86400)) + + key_strong = True + if key_size is not None: + key_strong = key_size >= 2048 if "RSA" in key_type.upper() else key_size >= 256 + + return { + "cn": _get_cn(cert.subject) or "N/A", + "expired": expired, + "days_left": days_left, + "not_after": cert.not_valid_after_utc.isoformat(), + "key_type": key_type or "N/A", + "key_size": key_size or "N/A", + "key_strong": key_strong, + "weak_sig": weak_sig, + "san_count": len(sans), + } + except Exception: + return {"expired": True, "days_left": 0, "key_strong": False} + + +def get_server_chain(host: str, port: int) -> Tuple[List[x509.Certificate], Optional[str]]: + """ + Server-sent chain in exact order [leaf, inter1, inter2, ...]. + + """ + ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) + ctx.check_hostname = False + ctx.verify_mode = ssl.CERT_NONE + + with _connect(ctx, host, port, timeout=10.0) as ssock: + negotiated_version = ssock.version() + + chain_objs = ssock.get_unverified_chain() + if not chain_objs: + leaf_der = ssock.getpeercert(binary_form=True) + if not leaf_der: + return ([], negotiated_version) + cert = x509.load_der_x509_certificate(leaf_der, default_backend()) + return ([cert], negotiated_version) + + # If single bytes element was returned + if isinstance(chain_objs, (bytes, bytearray)): + chain_iter = [chain_objs] + else: + chain_iter = chain_objs + + certs: List[x509.Certificate] = [] + for c in chain_iter: + if isinstance(c, (bytes, bytearray)): + der = bytes(c) + elif hasattr(c, "public_bytes"): + der = c.public_bytes(Encoding.DER) + else: + continue + certs.append(x509.load_der_x509_certificate(der, default_backend())) + + return (certs, negotiated_version) + + +def validate_chain_sequence(certs: List[x509.Certificate]) -> Dict: + """ + Validates ONLY the sequence (issuer/subject links) and flags roots in the middle. + """ + if not certs: + return { + "sequence_ok": False, + "errors": ["empty chain"], + "length": 0, + "last_self_signed": False, + "root_in_middle": False, + "root_indices": [], + } + + errors: List[str] = [] + for i in range(len(certs) - 1): + if certs[i].issuer != certs[i + 1].subject: + exp = certs[i].issuer.rfc4514_string() + got = certs[i + 1].subject.rfc4514_string() + errors.append(f"link {i}->{i+1}: expected next subject = {exp}, got = {got}") + + root_indices = [i for i in range(len(certs) - 1) if is_self_signed(certs[i])] + last = certs[-1] + last_self_signed = is_self_signed(last) + + return { + "sequence_ok": len(errors) == 0, + "errors": errors, + "length": len(certs), + "last_self_signed": last_self_signed, + "root_in_middle": len(root_indices) > 0, + "root_indices": root_indices, + } + + +def summarize_cert(cert: x509.Certificate) -> Dict: + now_utc = datetime.now(timezone.utc) + days_left = max(0, int((cert.not_valid_after_utc - now_utc).total_seconds() / 86400)) + + pub = cert.public_key() + key_size = getattr(pub, "key_size", None) + key_type = pub.__class__.__name__.replace("PublicKey", "") + + sig_name = getattr(cert.signature_algorithm_oid, "_name", "") or "" + + return { + "subject_cn": _get_cn(cert.subject) or "-", + "issuer_cn": _get_cn(cert.issuer) or "-", + "not_after": cert.not_valid_after_utc.strftime("%Y-%m-%d"), + "days_left": days_left, + "key": f"{key_type}-{key_size}" if key_size else f"{key_type}", + "sig": sig_name or "-", + } + + +# ----------------------------- +# Reporting: tables +# ----------------------------- +def _table(rows: List[List[str]], headers: List[str]) -> str: + widths = [len(h) for h in headers] + for r in rows: + for i, cell in enumerate(r): + widths[i] = max(widths[i], len(cell)) + + def fmt_row(r): + return "| " + " | ".join(r[i].ljust(widths[i]) for i in range(len(headers))) + " |" + + sep = "|-" + "-|-".join("-" * w for w in widths) + "-|" + out = [fmt_row(headers), sep] + out += [fmt_row(r) for r in rows] + return "\n".join(out) + + +# ----------------------------- +# Scoring (simple, deterministic) +# ----------------------------- +def compute_score(protocols: List[str], ciphers: List[str], leaf_info: Dict, chain_seq: Dict) -> Tuple[float, str, Dict[str, int]]: + score_breakdown = { + "base": 100, + "proto": 0, # -20 if no TLSv1.2+ + "cipher": 0, # -25 if negotiated cipher is obviously weak + "key": 0, # -15 if weak key + "chain": 0, # -30 if missing or order broken + "cert": 0, # -25 if expired; -10 if <30 days + "sig": 0, # -10 if LEAF is SHA1 + } + + if not any(p in ("TLSv1.2", "TLSv1.3") for p in protocols): + score_breakdown["proto"] = -20 + + if ciphers: + c = ciphers[0].upper() + if "RC4" in c or "3DES" in c or " DES " in f" {c} ": + score_breakdown["cipher"] = -25 + + if not leaf_info.get("key_strong", False): + score_breakdown["key"] = -15 + + if not (chain_seq.get("length", 0) >= 2 and chain_seq.get("sequence_ok", False)): + score_breakdown["chain"] = -30 + + if leaf_info.get("expired", True): + score_breakdown["cert"] -= 25 + else: + if leaf_info.get("days_left", 0) < 30: + score_breakdown["cert"] -= 10 + + if leaf_info.get("weak_sig", False): + score_breakdown["sig"] = -10 + + score = float(sum(score_breakdown.values())) + grade = "A+" if score >= 95 else "A" if score >= 85 else "B" if score >= 70 else "C" if score >= 55 else "F" + return round(score, 1), grade, score_breakdown + + +# ----------------------------- +# Scan one host +# ----------------------------- +def scan_host(host: str, port: int) -> Dict: + protocols = test_protocol_versions(host, port) + ciphers = get_negotiated_cipher(host, port) + + leaf_der = b"" + try: + ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) + ctx.check_hostname = False + ctx.verify_mode = ssl.CERT_NONE + with _connect(ctx, host, port, timeout=8.0) as s: + leaf_der = s.getpeercert(binary_form=True) or b"" + except Exception: + pass + leaf_info = analyze_leaf_cert(leaf_der) + + chain_certs: List[x509.Certificate] = [] + negotiated = None + chain_err = None + try: + chain_certs, negotiated = get_server_chain(host, port) + except Exception as e: + chain_err = str(e) + + chain_seq = validate_chain_sequence(chain_certs) + score, grade, score_breakdown = compute_score(protocols, ciphers, leaf_info, chain_seq) + + chain_rows: List[List[str]] = [] + for i, cert in enumerate(chain_certs): + s = summarize_cert(cert) + + if i == 0: + role = "LEAF" + else: + role = f"INT{i}" + if is_self_signed(cert): + role = "ROOT" if i == len(chain_certs) - 1 else "ROOT!" + + expected_next_cn = s["issuer_cn"] + next_is_cn = summarize_cert(chain_certs[i + 1])["subject_cn"] if i + 1 < len(chain_certs) else "-" + + chain_rows.append([ + str(i), + role, + s["subject_cn"][:32], + s["issuer_cn"][:32], + expected_next_cn[:26], + next_is_cn[:26], + s["not_after"], + f"{s['days_left']}d", + s["key"], + s["sig"][:18], + ]) + + return { + "host": host, + "port": port, + "protocols": protocols, + "negotiated_version": negotiated, + "ciphers": ciphers, + "leaf_cert": leaf_info, + "chain_seq": chain_seq, + "chain_rows": chain_rows, + "chain_error": chain_err, + "score": score, + "grade": grade, + "score_breakdown": score_breakdown, + } + + +# ----------------------------- +# Print results +# ----------------------------- +def print_results(results: List[Dict]) -> None: + print("\n" + "=" * 90) + print("TLS / CERT / CHAIN REPORT") + print("=" * 90) + + # Summary + sum_rows: List[List[str]] = [] + for r in results: + proto = ",".join(r["protocols"]) if r["protocols"] else "NONE" + cipher = r["ciphers"][0] if r["ciphers"] else "-" + chain_len = r["chain_seq"]["length"] + chain_ok = "YES" if (chain_len >= 2 and r["chain_seq"]["sequence_ok"]) else "NO" + cert_days = "EXPIRED" if r["leaf_cert"].get("expired") else f"{r['leaf_cert'].get('days_left', 0)}d" + sum_rows.append([ + f"{r['host']}:{r['port']}", + r["grade"], + f"{r['score']}", + proto, + cipher[:22], + chain_ok, + str(chain_len), + cert_days, + ]) + + print(_table( + sum_rows, + ["Host", "Grade", "Score", "Proto", "Cipher", "ChainOK", "Len", "Cert"] + )) + + for r in results: + print("\n" + "-" * 90) + print(f"{r['host']}:{r['port']} Grade={r['grade']} Score={r['score']}") + if r["chain_error"]: + print(f"CHAIN FETCH ERROR: {r['chain_error']}") + continue + + seq = r["chain_seq"] + if seq["errors"] or seq["root_in_middle"]: + print("CHAIN DIAGNOSTICS:") + for e in seq["errors"]: + print(f" - {e}") + if seq["root_in_middle"]: + idxs = ", ".join(str(i) for i in seq["root_indices"]) + print(f" - root in the middle at index(es): {idxs}") + + if not r["chain_rows"]: + print("No chain data.") + continue + + print(_table( + r["chain_rows"], + ["#", "Role", "Subject(CN)", "Issuer(CN)", "ExpectedNext", "NextIs", "NotAfter", "Left", "Key", "Sig"] + )) + + br = r["score_breakdown"] + print(f"Breakdown: base={br['base']} proto={br['proto']} cipher={br['cipher']} key={br['key']} chain={br['chain']} cert={br['cert']} sig={br['sig']}") + + +# ----------------------------- +# CLI +# ----------------------------- +def parse_targets(args_targets: List[str], default_port: int) -> List[Tuple[str, int]]: + out: List[Tuple[str, int]] = [] + for t in args_targets: + t = t.strip() + if not t: + continue + if ":" in t: + h, p = t.rsplit(":", 1) + out.append((h.strip(), int(p.strip()))) + else: + out.append((t, default_port)) + return out + + +def main() -> None: + parser = argparse.ArgumentParser(description="TLS Scanner (Python 3.14) with server chain + order validation") + parser.add_argument("targets", nargs="+", help="host or host:port") + parser.add_argument("-p", "--port", type=int, default=443, help="default port") + parser.add_argument("-w", "--workers", type=int, default=min(8, mp.cpu_count()), help="threads") + args = parser.parse_args() + + targets = parse_targets(args.targets, args.port) + + with ThreadPoolExecutor(max_workers=max(1, args.workers)) as ex: + futures = [ex.submit(scan_host, h, p) for (h, p) in targets] + results = [f.result() for f in futures] + + print_results(results) + + +if __name__ == "__main__": + main()