Files
tools_scripts/ssl_cert_check.py
2026-03-02 12:00:01 +01:00

452 lines
14 KiB
Python

#!/usr/bin/env python3
"""
TLS/SSL Scanner @linuxiarz.pl, Mateusz Gruszczyński
"""
import argparse
import socket
import ssl
from datetime import datetime, timezone
from typing import List, Dict, Tuple, Optional
from concurrent.futures import ThreadPoolExecutor
import multiprocessing as mp
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.serialization import Encoding
from cryptography.x509.oid import NameOID, ExtensionOID
# -----------------------------
# Helpers: TLS connectivity
# -----------------------------
def _connect(ctx: ssl.SSLContext, host: str, port: int, timeout: float = 8.0) -> ssl.SSLSocket:
sock = socket.create_connection((host, port), timeout=timeout)
try:
ssock = ctx.wrap_socket(sock, server_hostname=host)
return ssock
except Exception:
sock.close()
raise
def test_protocol_versions(host: str, port: int) -> List[str]:
"""Return supported versions in descending preference order, e.g. ['TLSv1.3','TLSv1.2']."""
supported: List[str] = []
def _try(ver: ssl.TLSVersion) -> Optional[str]:
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
ctx.minimum_version = ver
ctx.maximum_version = ver
try:
with _connect(ctx, host, port, timeout=8.0) as s:
return s.version()
except Exception:
return None
for v in (ssl.TLSVersion.TLSv1_3, ssl.TLSVersion.TLSv1_2):
r = _try(v)
if r:
supported.append(r)
return supported
def get_negotiated_cipher(host: str, port: int) -> List[str]:
"""Return negotiated cipher name as 1-element list, or empty."""
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
try:
with _connect(ctx, host, port, timeout=8.0) as s:
c = s.cipher()
return [c[0]] if c and c[0] else []
except Exception:
return []
# -----------------------------
# Certificate parsing
# -----------------------------
def _get_cn(name: x509.Name) -> str:
try:
attrs = name.get_attributes_for_oid(NameOID.COMMON_NAME)
return attrs[0].value if attrs else ""
except Exception:
return ""
def is_self_signed(cert: x509.Certificate) -> bool:
return cert.issuer == cert.subject
def analyze_leaf_cert(leaf_der: bytes) -> Dict:
if not leaf_der:
return {"expired": True, "days_left": 0, "key_strong": False}
try:
cert = x509.load_der_x509_certificate(leaf_der, default_backend())
now_utc = datetime.now(timezone.utc)
pub = cert.public_key()
key_size = getattr(pub, "key_size", None)
key_type = pub.__class__.__name__.replace("PublicKey", "")
sans = []
try:
san_ext = cert.extensions.get_extension_for_oid(ExtensionOID.SUBJECT_ALTERNATIVE_NAME)
sans = san_ext.value.get_values_for_type(x509.DNSName)
except Exception:
pass
sig_name = getattr(cert.signature_algorithm_oid, "_name", "") or ""
weak_sig = "sha1" in sig_name.lower()
expired = now_utc > cert.not_valid_after_utc
days_left = max(0, int((cert.not_valid_after_utc - now_utc).total_seconds() / 86400))
key_strong = True
if key_size is not None:
key_strong = key_size >= 2048 if "RSA" in key_type.upper() else key_size >= 256
return {
"cn": _get_cn(cert.subject) or "N/A",
"expired": expired,
"days_left": days_left,
"not_after": cert.not_valid_after_utc.isoformat(),
"key_type": key_type or "N/A",
"key_size": key_size or "N/A",
"key_strong": key_strong,
"weak_sig": weak_sig,
"san_count": len(sans),
}
except Exception:
return {"expired": True, "days_left": 0, "key_strong": False}
def get_server_chain(host: str, port: int) -> Tuple[List[x509.Certificate], Optional[str]]:
"""
Server-sent chain in exact order [leaf, inter1, inter2, ...].
"""
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
with _connect(ctx, host, port, timeout=10.0) as ssock:
negotiated_version = ssock.version()
chain_objs = ssock.get_unverified_chain()
if not chain_objs:
leaf_der = ssock.getpeercert(binary_form=True)
if not leaf_der:
return ([], negotiated_version)
cert = x509.load_der_x509_certificate(leaf_der, default_backend())
return ([cert], negotiated_version)
# If single bytes element was returned
if isinstance(chain_objs, (bytes, bytearray)):
chain_iter = [chain_objs]
else:
chain_iter = chain_objs
certs: List[x509.Certificate] = []
for c in chain_iter:
if isinstance(c, (bytes, bytearray)):
der = bytes(c)
elif hasattr(c, "public_bytes"):
der = c.public_bytes(Encoding.DER)
else:
continue
certs.append(x509.load_der_x509_certificate(der, default_backend()))
return (certs, negotiated_version)
def validate_chain_sequence(certs: List[x509.Certificate]) -> Dict:
"""
Validates ONLY the sequence (issuer/subject links) and flags roots in the middle.
"""
if not certs:
return {
"sequence_ok": False,
"errors": ["empty chain"],
"length": 0,
"last_self_signed": False,
"root_in_middle": False,
"root_indices": [],
}
errors: List[str] = []
for i in range(len(certs) - 1):
if certs[i].issuer != certs[i + 1].subject:
exp = certs[i].issuer.rfc4514_string()
got = certs[i + 1].subject.rfc4514_string()
errors.append(f"link {i}->{i+1}: expected next subject = {exp}, got = {got}")
root_indices = [i for i in range(len(certs) - 1) if is_self_signed(certs[i])]
last = certs[-1]
last_self_signed = is_self_signed(last)
return {
"sequence_ok": len(errors) == 0,
"errors": errors,
"length": len(certs),
"last_self_signed": last_self_signed,
"root_in_middle": len(root_indices) > 0,
"root_indices": root_indices,
}
def summarize_cert(cert: x509.Certificate) -> Dict:
now_utc = datetime.now(timezone.utc)
days_left = max(0, int((cert.not_valid_after_utc - now_utc).total_seconds() / 86400))
pub = cert.public_key()
key_size = getattr(pub, "key_size", None)
key_type = pub.__class__.__name__.replace("PublicKey", "")
sig_name = getattr(cert.signature_algorithm_oid, "_name", "") or ""
return {
"subject_cn": _get_cn(cert.subject) or "-",
"issuer_cn": _get_cn(cert.issuer) or "-",
"not_after": cert.not_valid_after_utc.strftime("%Y-%m-%d"),
"days_left": days_left,
"key": f"{key_type}-{key_size}" if key_size else f"{key_type}",
"sig": sig_name or "-",
}
# -----------------------------
# Reporting: tables
# -----------------------------
def _table(rows: List[List[str]], headers: List[str]) -> str:
widths = [len(h) for h in headers]
for r in rows:
for i, cell in enumerate(r):
widths[i] = max(widths[i], len(cell))
def fmt_row(r):
return "| " + " | ".join(r[i].ljust(widths[i]) for i in range(len(headers))) + " |"
sep = "|-" + "-|-".join("-" * w for w in widths) + "-|"
out = [fmt_row(headers), sep]
out += [fmt_row(r) for r in rows]
return "\n".join(out)
# -----------------------------
# Scoring (simple, deterministic)
# -----------------------------
def compute_score(protocols: List[str], ciphers: List[str], leaf_info: Dict, chain_seq: Dict) -> Tuple[float, str, Dict[str, int]]:
score_breakdown = {
"base": 100,
"proto": 0, # -20 if no TLSv1.2+
"cipher": 0, # -25 if negotiated cipher is obviously weak
"key": 0, # -15 if weak key
"chain": 0, # -30 if missing or order broken
"cert": 0, # -25 if expired; -10 if <30 days
"sig": 0, # -10 if LEAF is SHA1
}
if not any(p in ("TLSv1.2", "TLSv1.3") for p in protocols):
score_breakdown["proto"] = -20
if ciphers:
c = ciphers[0].upper()
if "RC4" in c or "3DES" in c or " DES " in f" {c} ":
score_breakdown["cipher"] = -25
if not leaf_info.get("key_strong", False):
score_breakdown["key"] = -15
if not (chain_seq.get("length", 0) >= 2 and chain_seq.get("sequence_ok", False)):
score_breakdown["chain"] = -30
if leaf_info.get("expired", True):
score_breakdown["cert"] -= 25
else:
if leaf_info.get("days_left", 0) < 30:
score_breakdown["cert"] -= 10
if leaf_info.get("weak_sig", False):
score_breakdown["sig"] = -10
score = float(sum(score_breakdown.values()))
grade = "A+" if score >= 95 else "A" if score >= 85 else "B" if score >= 70 else "C" if score >= 55 else "F"
return round(score, 1), grade, score_breakdown
# -----------------------------
# Scan one host
# -----------------------------
def scan_host(host: str, port: int) -> Dict:
protocols = test_protocol_versions(host, port)
ciphers = get_negotiated_cipher(host, port)
leaf_der = b""
try:
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
with _connect(ctx, host, port, timeout=8.0) as s:
leaf_der = s.getpeercert(binary_form=True) or b""
except Exception:
pass
leaf_info = analyze_leaf_cert(leaf_der)
chain_certs: List[x509.Certificate] = []
negotiated = None
chain_err = None
try:
chain_certs, negotiated = get_server_chain(host, port)
except Exception as e:
chain_err = str(e)
chain_seq = validate_chain_sequence(chain_certs)
score, grade, score_breakdown = compute_score(protocols, ciphers, leaf_info, chain_seq)
chain_rows: List[List[str]] = []
for i, cert in enumerate(chain_certs):
s = summarize_cert(cert)
if i == 0:
role = "LEAF"
else:
role = f"INT{i}"
if is_self_signed(cert):
role = "ROOT" if i == len(chain_certs) - 1 else "ROOT!"
expected_next_cn = s["issuer_cn"]
next_is_cn = summarize_cert(chain_certs[i + 1])["subject_cn"] if i + 1 < len(chain_certs) else "-"
chain_rows.append([
str(i),
role,
s["subject_cn"][:32],
s["issuer_cn"][:32],
expected_next_cn[:26],
next_is_cn[:26],
s["not_after"],
f"{s['days_left']}d",
s["key"],
s["sig"][:18],
])
return {
"host": host,
"port": port,
"protocols": protocols,
"negotiated_version": negotiated,
"ciphers": ciphers,
"leaf_cert": leaf_info,
"chain_seq": chain_seq,
"chain_rows": chain_rows,
"chain_error": chain_err,
"score": score,
"grade": grade,
"score_breakdown": score_breakdown,
}
# -----------------------------
# Print results
# -----------------------------
def print_results(results: List[Dict]) -> None:
print("\n" + "=" * 90)
print("TLS / CERT / CHAIN REPORT")
print("=" * 90)
# Summary
sum_rows: List[List[str]] = []
for r in results:
proto = ",".join(r["protocols"]) if r["protocols"] else "NONE"
cipher = r["ciphers"][0] if r["ciphers"] else "-"
chain_len = r["chain_seq"]["length"]
chain_ok = "YES" if (chain_len >= 2 and r["chain_seq"]["sequence_ok"]) else "NO"
cert_days = "EXPIRED" if r["leaf_cert"].get("expired") else f"{r['leaf_cert'].get('days_left', 0)}d"
sum_rows.append([
f"{r['host']}:{r['port']}",
r["grade"],
f"{r['score']}",
proto,
cipher[:22],
chain_ok,
str(chain_len),
cert_days,
])
print(_table(
sum_rows,
["Host", "Grade", "Score", "Proto", "Cipher", "ChainOK", "Len", "Cert"]
))
for r in results:
print("\n" + "-" * 90)
print(f"{r['host']}:{r['port']} Grade={r['grade']} Score={r['score']}")
if r["chain_error"]:
print(f"CHAIN FETCH ERROR: {r['chain_error']}")
continue
seq = r["chain_seq"]
if seq["errors"] or seq["root_in_middle"]:
print("CHAIN DIAGNOSTICS:")
for e in seq["errors"]:
print(f" - {e}")
if seq["root_in_middle"]:
idxs = ", ".join(str(i) for i in seq["root_indices"])
print(f" - root in the middle at index(es): {idxs}")
if not r["chain_rows"]:
print("No chain data.")
continue
print(_table(
r["chain_rows"],
["#", "Role", "Subject(CN)", "Issuer(CN)", "ExpectedNext", "NextIs", "NotAfter", "Left", "Key", "Sig"]
))
br = r["score_breakdown"]
print(f"Breakdown: base={br['base']} proto={br['proto']} cipher={br['cipher']} key={br['key']} chain={br['chain']} cert={br['cert']} sig={br['sig']}")
# -----------------------------
# CLI
# -----------------------------
def parse_targets(args_targets: List[str], default_port: int) -> List[Tuple[str, int]]:
out: List[Tuple[str, int]] = []
for t in args_targets:
t = t.strip()
if not t:
continue
if ":" in t:
h, p = t.rsplit(":", 1)
out.append((h.strip(), int(p.strip())))
else:
out.append((t, default_port))
return out
def main() -> None:
parser = argparse.ArgumentParser(description="TLS Scanner (Python 3.14) with server chain + order validation")
parser.add_argument("targets", nargs="+", help="host or host:port")
parser.add_argument("-p", "--port", type=int, default=443, help="default port")
parser.add_argument("-w", "--workers", type=int, default=min(8, mp.cpu_count()), help="threads")
args = parser.parse_args()
targets = parse_targets(args.targets, args.port)
with ThreadPoolExecutor(max_workers=max(1, args.workers)) as ex:
futures = [ex.submit(scan_host, h, p) for (h, p) in targets]
results = [f.result() for f in futures]
print_results(results)
if __name__ == "__main__":
main()