Add ssl_cert_check.py
This commit is contained in:
451
ssl_cert_check.py
Normal file
451
ssl_cert_check.py
Normal file
@@ -0,0 +1,451 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
TLS/SSL Scanner
|
||||
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import socket
|
||||
import ssl
|
||||
from datetime import datetime, timezone
|
||||
from typing import List, Dict, Tuple, Optional
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
import multiprocessing as mp
|
||||
|
||||
from cryptography import x509
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from cryptography.hazmat.primitives.serialization import Encoding
|
||||
from cryptography.x509.oid import NameOID, ExtensionOID
|
||||
|
||||
|
||||
# -----------------------------
|
||||
# Helpers: TLS connectivity
|
||||
# -----------------------------
|
||||
def _connect(ctx: ssl.SSLContext, host: str, port: int, timeout: float = 8.0) -> ssl.SSLSocket:
|
||||
sock = socket.create_connection((host, port), timeout=timeout)
|
||||
try:
|
||||
ssock = ctx.wrap_socket(sock, server_hostname=host)
|
||||
return ssock
|
||||
except Exception:
|
||||
sock.close()
|
||||
raise
|
||||
|
||||
|
||||
def test_protocol_versions(host: str, port: int) -> List[str]:
|
||||
"""Return supported versions in descending preference order, e.g. ['TLSv1.3','TLSv1.2']."""
|
||||
supported: List[str] = []
|
||||
|
||||
def _try(ver: ssl.TLSVersion) -> Optional[str]:
|
||||
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
|
||||
ctx.check_hostname = False
|
||||
ctx.verify_mode = ssl.CERT_NONE
|
||||
ctx.minimum_version = ver
|
||||
ctx.maximum_version = ver
|
||||
try:
|
||||
with _connect(ctx, host, port, timeout=8.0) as s:
|
||||
return s.version()
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
for v in (ssl.TLSVersion.TLSv1_3, ssl.TLSVersion.TLSv1_2):
|
||||
r = _try(v)
|
||||
if r:
|
||||
supported.append(r)
|
||||
|
||||
return supported
|
||||
|
||||
|
||||
def get_negotiated_cipher(host: str, port: int) -> List[str]:
|
||||
"""Return negotiated cipher name as 1-element list, or empty."""
|
||||
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
|
||||
ctx.check_hostname = False
|
||||
ctx.verify_mode = ssl.CERT_NONE
|
||||
try:
|
||||
with _connect(ctx, host, port, timeout=8.0) as s:
|
||||
c = s.cipher()
|
||||
return [c[0]] if c and c[0] else []
|
||||
except Exception:
|
||||
return []
|
||||
|
||||
|
||||
# -----------------------------
|
||||
# Certificate parsing
|
||||
# -----------------------------
|
||||
def _get_cn(name: x509.Name) -> str:
|
||||
try:
|
||||
attrs = name.get_attributes_for_oid(NameOID.COMMON_NAME)
|
||||
return attrs[0].value if attrs else ""
|
||||
except Exception:
|
||||
return ""
|
||||
|
||||
|
||||
def is_self_signed(cert: x509.Certificate) -> bool:
|
||||
return cert.issuer == cert.subject
|
||||
|
||||
|
||||
def analyze_leaf_cert(leaf_der: bytes) -> Dict:
|
||||
if not leaf_der:
|
||||
return {"expired": True, "days_left": 0, "key_strong": False}
|
||||
|
||||
try:
|
||||
cert = x509.load_der_x509_certificate(leaf_der, default_backend())
|
||||
now_utc = datetime.now(timezone.utc)
|
||||
|
||||
pub = cert.public_key()
|
||||
key_size = getattr(pub, "key_size", None)
|
||||
key_type = pub.__class__.__name__.replace("PublicKey", "")
|
||||
|
||||
sans = []
|
||||
try:
|
||||
san_ext = cert.extensions.get_extension_for_oid(ExtensionOID.SUBJECT_ALTERNATIVE_NAME)
|
||||
sans = san_ext.value.get_values_for_type(x509.DNSName)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
sig_name = getattr(cert.signature_algorithm_oid, "_name", "") or ""
|
||||
weak_sig = "sha1" in sig_name.lower()
|
||||
|
||||
expired = now_utc > cert.not_valid_after_utc
|
||||
days_left = max(0, int((cert.not_valid_after_utc - now_utc).total_seconds() / 86400))
|
||||
|
||||
key_strong = True
|
||||
if key_size is not None:
|
||||
key_strong = key_size >= 2048 if "RSA" in key_type.upper() else key_size >= 256
|
||||
|
||||
return {
|
||||
"cn": _get_cn(cert.subject) or "N/A",
|
||||
"expired": expired,
|
||||
"days_left": days_left,
|
||||
"not_after": cert.not_valid_after_utc.isoformat(),
|
||||
"key_type": key_type or "N/A",
|
||||
"key_size": key_size or "N/A",
|
||||
"key_strong": key_strong,
|
||||
"weak_sig": weak_sig,
|
||||
"san_count": len(sans),
|
||||
}
|
||||
except Exception:
|
||||
return {"expired": True, "days_left": 0, "key_strong": False}
|
||||
|
||||
|
||||
def get_server_chain(host: str, port: int) -> Tuple[List[x509.Certificate], Optional[str]]:
|
||||
"""
|
||||
Server-sent chain in exact order [leaf, inter1, inter2, ...].
|
||||
|
||||
"""
|
||||
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
|
||||
ctx.check_hostname = False
|
||||
ctx.verify_mode = ssl.CERT_NONE
|
||||
|
||||
with _connect(ctx, host, port, timeout=10.0) as ssock:
|
||||
negotiated_version = ssock.version()
|
||||
|
||||
chain_objs = ssock.get_unverified_chain()
|
||||
if not chain_objs:
|
||||
leaf_der = ssock.getpeercert(binary_form=True)
|
||||
if not leaf_der:
|
||||
return ([], negotiated_version)
|
||||
cert = x509.load_der_x509_certificate(leaf_der, default_backend())
|
||||
return ([cert], negotiated_version)
|
||||
|
||||
# If single bytes element was returned
|
||||
if isinstance(chain_objs, (bytes, bytearray)):
|
||||
chain_iter = [chain_objs]
|
||||
else:
|
||||
chain_iter = chain_objs
|
||||
|
||||
certs: List[x509.Certificate] = []
|
||||
for c in chain_iter:
|
||||
if isinstance(c, (bytes, bytearray)):
|
||||
der = bytes(c)
|
||||
elif hasattr(c, "public_bytes"):
|
||||
der = c.public_bytes(Encoding.DER)
|
||||
else:
|
||||
continue
|
||||
certs.append(x509.load_der_x509_certificate(der, default_backend()))
|
||||
|
||||
return (certs, negotiated_version)
|
||||
|
||||
|
||||
def validate_chain_sequence(certs: List[x509.Certificate]) -> Dict:
|
||||
"""
|
||||
Validates ONLY the sequence (issuer/subject links) and flags roots in the middle.
|
||||
"""
|
||||
if not certs:
|
||||
return {
|
||||
"sequence_ok": False,
|
||||
"errors": ["empty chain"],
|
||||
"length": 0,
|
||||
"last_self_signed": False,
|
||||
"root_in_middle": False,
|
||||
"root_indices": [],
|
||||
}
|
||||
|
||||
errors: List[str] = []
|
||||
for i in range(len(certs) - 1):
|
||||
if certs[i].issuer != certs[i + 1].subject:
|
||||
exp = certs[i].issuer.rfc4514_string()
|
||||
got = certs[i + 1].subject.rfc4514_string()
|
||||
errors.append(f"link {i}->{i+1}: expected next subject = {exp}, got = {got}")
|
||||
|
||||
root_indices = [i for i in range(len(certs) - 1) if is_self_signed(certs[i])]
|
||||
last = certs[-1]
|
||||
last_self_signed = is_self_signed(last)
|
||||
|
||||
return {
|
||||
"sequence_ok": len(errors) == 0,
|
||||
"errors": errors,
|
||||
"length": len(certs),
|
||||
"last_self_signed": last_self_signed,
|
||||
"root_in_middle": len(root_indices) > 0,
|
||||
"root_indices": root_indices,
|
||||
}
|
||||
|
||||
|
||||
def summarize_cert(cert: x509.Certificate) -> Dict:
|
||||
now_utc = datetime.now(timezone.utc)
|
||||
days_left = max(0, int((cert.not_valid_after_utc - now_utc).total_seconds() / 86400))
|
||||
|
||||
pub = cert.public_key()
|
||||
key_size = getattr(pub, "key_size", None)
|
||||
key_type = pub.__class__.__name__.replace("PublicKey", "")
|
||||
|
||||
sig_name = getattr(cert.signature_algorithm_oid, "_name", "") or ""
|
||||
|
||||
return {
|
||||
"subject_cn": _get_cn(cert.subject) or "-",
|
||||
"issuer_cn": _get_cn(cert.issuer) or "-",
|
||||
"not_after": cert.not_valid_after_utc.strftime("%Y-%m-%d"),
|
||||
"days_left": days_left,
|
||||
"key": f"{key_type}-{key_size}" if key_size else f"{key_type}",
|
||||
"sig": sig_name or "-",
|
||||
}
|
||||
|
||||
|
||||
# -----------------------------
|
||||
# Reporting: tables
|
||||
# -----------------------------
|
||||
def _table(rows: List[List[str]], headers: List[str]) -> str:
|
||||
widths = [len(h) for h in headers]
|
||||
for r in rows:
|
||||
for i, cell in enumerate(r):
|
||||
widths[i] = max(widths[i], len(cell))
|
||||
|
||||
def fmt_row(r):
|
||||
return "| " + " | ".join(r[i].ljust(widths[i]) for i in range(len(headers))) + " |"
|
||||
|
||||
sep = "|-" + "-|-".join("-" * w for w in widths) + "-|"
|
||||
out = [fmt_row(headers), sep]
|
||||
out += [fmt_row(r) for r in rows]
|
||||
return "\n".join(out)
|
||||
|
||||
|
||||
# -----------------------------
|
||||
# Scoring (simple, deterministic)
|
||||
# -----------------------------
|
||||
def compute_score(protocols: List[str], ciphers: List[str], leaf_info: Dict, chain_seq: Dict) -> Tuple[float, str, Dict[str, int]]:
|
||||
score_breakdown = {
|
||||
"base": 100,
|
||||
"proto": 0, # -20 if no TLSv1.2+
|
||||
"cipher": 0, # -25 if negotiated cipher is obviously weak
|
||||
"key": 0, # -15 if weak key
|
||||
"chain": 0, # -30 if missing or order broken
|
||||
"cert": 0, # -25 if expired; -10 if <30 days
|
||||
"sig": 0, # -10 if LEAF is SHA1
|
||||
}
|
||||
|
||||
if not any(p in ("TLSv1.2", "TLSv1.3") for p in protocols):
|
||||
score_breakdown["proto"] = -20
|
||||
|
||||
if ciphers:
|
||||
c = ciphers[0].upper()
|
||||
if "RC4" in c or "3DES" in c or " DES " in f" {c} ":
|
||||
score_breakdown["cipher"] = -25
|
||||
|
||||
if not leaf_info.get("key_strong", False):
|
||||
score_breakdown["key"] = -15
|
||||
|
||||
if not (chain_seq.get("length", 0) >= 2 and chain_seq.get("sequence_ok", False)):
|
||||
score_breakdown["chain"] = -30
|
||||
|
||||
if leaf_info.get("expired", True):
|
||||
score_breakdown["cert"] -= 25
|
||||
else:
|
||||
if leaf_info.get("days_left", 0) < 30:
|
||||
score_breakdown["cert"] -= 10
|
||||
|
||||
if leaf_info.get("weak_sig", False):
|
||||
score_breakdown["sig"] = -10
|
||||
|
||||
score = float(sum(score_breakdown.values()))
|
||||
grade = "A+" if score >= 95 else "A" if score >= 85 else "B" if score >= 70 else "C" if score >= 55 else "F"
|
||||
return round(score, 1), grade, score_breakdown
|
||||
|
||||
|
||||
# -----------------------------
|
||||
# Scan one host
|
||||
# -----------------------------
|
||||
def scan_host(host: str, port: int) -> Dict:
|
||||
protocols = test_protocol_versions(host, port)
|
||||
ciphers = get_negotiated_cipher(host, port)
|
||||
|
||||
leaf_der = b""
|
||||
try:
|
||||
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
|
||||
ctx.check_hostname = False
|
||||
ctx.verify_mode = ssl.CERT_NONE
|
||||
with _connect(ctx, host, port, timeout=8.0) as s:
|
||||
leaf_der = s.getpeercert(binary_form=True) or b""
|
||||
except Exception:
|
||||
pass
|
||||
leaf_info = analyze_leaf_cert(leaf_der)
|
||||
|
||||
chain_certs: List[x509.Certificate] = []
|
||||
negotiated = None
|
||||
chain_err = None
|
||||
try:
|
||||
chain_certs, negotiated = get_server_chain(host, port)
|
||||
except Exception as e:
|
||||
chain_err = str(e)
|
||||
|
||||
chain_seq = validate_chain_sequence(chain_certs)
|
||||
score, grade, score_breakdown = compute_score(protocols, ciphers, leaf_info, chain_seq)
|
||||
|
||||
chain_rows: List[List[str]] = []
|
||||
for i, cert in enumerate(chain_certs):
|
||||
s = summarize_cert(cert)
|
||||
|
||||
if i == 0:
|
||||
role = "LEAF"
|
||||
else:
|
||||
role = f"INT{i}"
|
||||
if is_self_signed(cert):
|
||||
role = "ROOT" if i == len(chain_certs) - 1 else "ROOT!"
|
||||
|
||||
expected_next_cn = s["issuer_cn"]
|
||||
next_is_cn = summarize_cert(chain_certs[i + 1])["subject_cn"] if i + 1 < len(chain_certs) else "-"
|
||||
|
||||
chain_rows.append([
|
||||
str(i),
|
||||
role,
|
||||
s["subject_cn"][:32],
|
||||
s["issuer_cn"][:32],
|
||||
expected_next_cn[:26],
|
||||
next_is_cn[:26],
|
||||
s["not_after"],
|
||||
f"{s['days_left']}d",
|
||||
s["key"],
|
||||
s["sig"][:18],
|
||||
])
|
||||
|
||||
return {
|
||||
"host": host,
|
||||
"port": port,
|
||||
"protocols": protocols,
|
||||
"negotiated_version": negotiated,
|
||||
"ciphers": ciphers,
|
||||
"leaf_cert": leaf_info,
|
||||
"chain_seq": chain_seq,
|
||||
"chain_rows": chain_rows,
|
||||
"chain_error": chain_err,
|
||||
"score": score,
|
||||
"grade": grade,
|
||||
"score_breakdown": score_breakdown,
|
||||
}
|
||||
|
||||
|
||||
# -----------------------------
|
||||
# Print results
|
||||
# -----------------------------
|
||||
def print_results(results: List[Dict]) -> None:
|
||||
print("\n" + "=" * 90)
|
||||
print("TLS / CERT / CHAIN REPORT")
|
||||
print("=" * 90)
|
||||
|
||||
# Summary
|
||||
sum_rows: List[List[str]] = []
|
||||
for r in results:
|
||||
proto = ",".join(r["protocols"]) if r["protocols"] else "NONE"
|
||||
cipher = r["ciphers"][0] if r["ciphers"] else "-"
|
||||
chain_len = r["chain_seq"]["length"]
|
||||
chain_ok = "YES" if (chain_len >= 2 and r["chain_seq"]["sequence_ok"]) else "NO"
|
||||
cert_days = "EXPIRED" if r["leaf_cert"].get("expired") else f"{r['leaf_cert'].get('days_left', 0)}d"
|
||||
sum_rows.append([
|
||||
f"{r['host']}:{r['port']}",
|
||||
r["grade"],
|
||||
f"{r['score']}",
|
||||
proto,
|
||||
cipher[:22],
|
||||
chain_ok,
|
||||
str(chain_len),
|
||||
cert_days,
|
||||
])
|
||||
|
||||
print(_table(
|
||||
sum_rows,
|
||||
["Host", "Grade", "Score", "Proto", "Cipher", "ChainOK", "Len", "Cert"]
|
||||
))
|
||||
|
||||
for r in results:
|
||||
print("\n" + "-" * 90)
|
||||
print(f"{r['host']}:{r['port']} Grade={r['grade']} Score={r['score']}")
|
||||
if r["chain_error"]:
|
||||
print(f"CHAIN FETCH ERROR: {r['chain_error']}")
|
||||
continue
|
||||
|
||||
seq = r["chain_seq"]
|
||||
if seq["errors"] or seq["root_in_middle"]:
|
||||
print("CHAIN DIAGNOSTICS:")
|
||||
for e in seq["errors"]:
|
||||
print(f" - {e}")
|
||||
if seq["root_in_middle"]:
|
||||
idxs = ", ".join(str(i) for i in seq["root_indices"])
|
||||
print(f" - root in the middle at index(es): {idxs}")
|
||||
|
||||
if not r["chain_rows"]:
|
||||
print("No chain data.")
|
||||
continue
|
||||
|
||||
print(_table(
|
||||
r["chain_rows"],
|
||||
["#", "Role", "Subject(CN)", "Issuer(CN)", "ExpectedNext", "NextIs", "NotAfter", "Left", "Key", "Sig"]
|
||||
))
|
||||
|
||||
br = r["score_breakdown"]
|
||||
print(f"Breakdown: base={br['base']} proto={br['proto']} cipher={br['cipher']} key={br['key']} chain={br['chain']} cert={br['cert']} sig={br['sig']}")
|
||||
|
||||
|
||||
# -----------------------------
|
||||
# CLI
|
||||
# -----------------------------
|
||||
def parse_targets(args_targets: List[str], default_port: int) -> List[Tuple[str, int]]:
|
||||
out: List[Tuple[str, int]] = []
|
||||
for t in args_targets:
|
||||
t = t.strip()
|
||||
if not t:
|
||||
continue
|
||||
if ":" in t:
|
||||
h, p = t.rsplit(":", 1)
|
||||
out.append((h.strip(), int(p.strip())))
|
||||
else:
|
||||
out.append((t, default_port))
|
||||
return out
|
||||
|
||||
|
||||
def main() -> None:
|
||||
parser = argparse.ArgumentParser(description="TLS Scanner (Python 3.14) with server chain + order validation")
|
||||
parser.add_argument("targets", nargs="+", help="host or host:port")
|
||||
parser.add_argument("-p", "--port", type=int, default=443, help="default port")
|
||||
parser.add_argument("-w", "--workers", type=int, default=min(8, mp.cpu_count()), help="threads")
|
||||
args = parser.parse_args()
|
||||
|
||||
targets = parse_targets(args.targets, args.port)
|
||||
|
||||
with ThreadPoolExecutor(max_workers=max(1, args.workers)) as ex:
|
||||
futures = [ex.submit(scan_host, h, p) for (h, p) in targets]
|
||||
results = [f.result() for f in futures]
|
||||
|
||||
print_results(results)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user