diff --git a/pfx_extract_new.py b/pfx_extract_new.py new file mode 100644 index 0000000..49d2049 --- /dev/null +++ b/pfx_extract_new.py @@ -0,0 +1,570 @@ +#!/usr/bin/env python3 +import argparse +import os +import subprocess +import tarfile +import tempfile +import shutil +import sys + +def run_cmd(cmd, password): + env = os.environ.copy() + # hasło przekazujemy przez zmienną środowiskową, żeby nie wisiało w ps + env["PFXPASSWORD"] = password + # używamy -passin env:PFXPASSWORD w openssl + result = subprocess.run( + cmd, + env=env, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + ) + if result.returncode != 0: + print("Błąd wykonania:", " ".join(cmd), file=sys.stderr) + print(result.stderr, file=sys.stderr) + sys.exit(result.returncode) + return result.stdout + +def extract_from_pfx(pfx_path, password, out_dir, base_name): + key_path = os.path.join(out_dir, f"{base_name}.key.pem") + cert_path = os.path.join(out_dir, f"{base_name}.cert.pem") + chain_path = os.path.join(out_dir, f"{base_name}.chain.pem") + + # klucz prywatny + run_cmd([ + "openssl", "pkcs12", + "-in", pfx_path, + "-passin", "env:PFXPASSWORD", + "-nodes", + "-nocerts", + "-out", key_path + ], password) + + # cert końcowy + run_cmd([ + "openssl", "pkcs12", + "-in", pfx_path, + "-passin", "env:PFXPASSWORD", + "-clcerts", + "-nokeys", + "-out", cert_path + ], password) + + # certy pośrednie (CA chain) – może być puste, wtedy plik zostanie, ale bez zawartości + run_cmd([ + "openssl", "pkcs12", + "-in", pfx_path, + "-passin", "env:PFXPASSWORD", + "-nokeys", + "-cacerts", + "-out", chain_path + ], password) + + return key_path, cert_path, chain_path + +def clean_pem_content(file_path): + """ + Usuwa Bag Attributes i inne metadata z pliku PEM, + zostawiając tylko czyste certyfikaty/klucze + """ + if not os.path.isfile(file_path) or os.path.getsize(file_path) == 0: + return "" + + with open(file_path, 'r') as f: + lines = f.readlines() + + clean_lines = [] + inside_cert_or_key = False + + for line in lines: + # Rozpoczęcie certyfikatu lub klucza + if line.startswith('-----BEGIN'): + inside_cert_or_key = True + clean_lines.append(line) + # Koniec certyfikatu lub klucza + elif line.startswith('-----END'): + clean_lines.append(line) + inside_cert_or_key = False + # Kopiuj tylko linie wewnątrz bloków BEGIN/END + elif inside_cert_or_key: + clean_lines.append(line) + + return ''.join(clean_lines) + +def create_haproxy_bundle(key_path, cert_path, chain_path, bundle_path): + """ + Tworzy bundle dla HAProxy w poprawnej kolejności: + 1. Klucz prywatny + 2. Certyfikat końcowy + 3. Certyfikaty pośrednie (chain) - od najbliższego do root CA + + Usuwa Bag Attributes i inne metadata z PKCS12 + """ + with open(bundle_path, 'w') as bundle: + # 1. Klucz prywatny (wyczyszczony) + key_content = clean_pem_content(key_path) + if key_content: + bundle.write(key_content) + if not key_content.endswith('\n'): + bundle.write('\n') + + # 2. Certyfikat końcowy (wyczyszczony) + cert_content = clean_pem_content(cert_path) + if cert_content: + bundle.write(cert_content) + if not cert_content.endswith('\n'): + bundle.write('\n') + + # 3. Chain (certyfikaty pośrednie, wyczyszczone) + chain_content = clean_pem_content(chain_path) + if chain_content: + bundle.write(chain_content) + if not chain_content.endswith('\n'): + bundle.write('\n') + + # Weryfikacja czy bundle zawiera co najmniej klucz i certyfikat + if os.path.getsize(bundle_path) == 0: + print("UWAGA: Bundle jest pusty!", file=sys.stderr) + return False + + return True + +def verify_bundle(bundle_path): + """ + Kompleksowa weryfikacja poprawności bundle'a HAProxy: + 1. Sprawdzenie czy klucz prywatny pasuje do certyfikatu + 2. Sprawdzenie łańcucha certyfikatów (chain validation) + 3. Wyświetlenie informacji o certyfikacie + """ + print("\n=== Weryfikacja bundle'a ===") + + # 1. Sprawdzenie czy klucz pasuje do certyfikatu (porównanie modulus) + print("1. Sprawdzanie zgodności klucza prywatnego z certyfikatem...") + try: + # Najpierw sprawdźmy typ klucza + key_check = subprocess.run([ + "openssl", "pkey", + "-in", bundle_path, + "-noout", + "-text" + ], capture_output=True, text=True) + + if key_check.returncode != 0: + print(" [ ERROR ] Nie można odczytać klucza prywatnego", file=sys.stderr) + return False + + # Sprawdź typ klucza (RSA, EC, itp.) + key_type = None + if "RSA" in key_check.stdout or "rsaEncryption" in key_check.stdout: + key_type = "rsa" + elif "EC" in key_check.stdout or "id-ecPublicKey" in key_check.stdout: + key_type = "ec" + + # Modulus dla RSA lub pubkey dla EC/innych + key_modulus = None + if key_type == "rsa": + key_result = subprocess.run([ + "openssl", "rsa", + "-in", bundle_path, + "-noout", + "-modulus" + ], capture_output=True, text=True) + if key_result.returncode == 0: + key_modulus = key_result.stdout.strip() + else: + # Dla EC i innych typów używamy pubkey + key_result = subprocess.run([ + "openssl", "pkey", + "-in", bundle_path, + "-pubout" + ], capture_output=True, text=True) + if key_result.returncode == 0: + key_modulus = key_result.stdout.strip() + + # Modulus/pubkey z certyfikatu + cert_modulus = None + if key_type == "rsa": + cert_result = subprocess.run([ + "openssl", "x509", + "-in", bundle_path, + "-noout", + "-modulus" + ], capture_output=True, text=True) + if cert_result.returncode == 0: + cert_modulus = cert_result.stdout.strip() + else: + # Dla EC i innych używamy pubkey z certyfikatu + cert_result = subprocess.run([ + "openssl", "x509", + "-in", bundle_path, + "-noout", + "-pubkey" + ], capture_output=True, text=True) + if cert_result.returncode == 0: + cert_modulus = cert_result.stdout.strip() + + if key_modulus and cert_modulus and key_modulus == cert_modulus: + print(f" [ OK ] Klucz prywatny ({key_type.upper() if key_type else 'UNKNOWN'}) pasuje do certyfikatu") + else: + print(" [ ERROR ] Klucz prywatny NIE pasuje do certyfikatu!", file=sys.stderr) + return False + + except Exception as e: + print(f" [ ERROR ] Błąd podczas weryfikacji klucza: {e}", file=sys.stderr) + return False + + # 2. Weryfikacja łańcucha certyfikatów + print("2. Weryfikacja łańcucha certyfikatów...") + try: + # openssl verify sprawdzi czy łańcuch jest poprawny + verify_result = subprocess.run([ + "openssl", "verify", + "-CAfile", bundle_path, + "-untrusted", bundle_path, + bundle_path + ], capture_output=True, text=True) + + if "OK" in verify_result.stdout: + print(" [ OK ] Łańcuch certyfikatów jest poprawny") + else: + # Czasem verify może nie działać z bundle'em, to nie jest krytyczne + print(" [ INFO ] Nie można w pełni zweryfikować łańcucha (może wymagać root CA)") + except Exception as e: + print(f" [ INFO ] {e}") + + # 3. Wyświetlenie informacji o certyfikacie + print("3. Informacje o certyfikacie:") + try: + cert_info = subprocess.run([ + "openssl", "x509", + "-in", bundle_path, + "-noout", + "-subject", + "-issuer", + "-dates" + ], capture_output=True, text=True) + + if cert_info.returncode == 0: + for line in cert_info.stdout.strip().split('\n'): + print(f" {line}") + + # Sprawdzenie czy certyfikat nie wygasł + dates_result = subprocess.run([ + "openssl", "x509", + "-in", bundle_path, + "-noout", + "-checkend", "0" + ], capture_output=True, text=True) + + if dates_result.returncode == 0: + print(" [ OK ] Certyfikat jest ważny (nie wygasł)") + else: + print(" [ ERROR ] Certyfikat wygasł!", file=sys.stderr) + return False + + except Exception as e: + print(f" [ WARNING ] Nie można pobrać informacji o certyfikacie: {e}") + + # 4. Sprawdzenie liczby certyfikatów w bundle + print("4. Struktura bundle'a:") + try: + with open(bundle_path, 'r') as f: + content = f.read() + + cert_count = content.count('-----BEGIN CERTIFICATE-----') + private_key_count = content.count('-----BEGIN PRIVATE KEY-----') + \ + content.count('-----BEGIN RSA PRIVATE KEY-----') + \ + content.count('-----BEGIN EC PRIVATE KEY-----') + + print(f" - Klucze prywatne: {private_key_count}") + print(f" - Certyfikaty: {cert_count}") + + if private_key_count == 0: + print(" [ ERROR ] Brak klucza prywatnego!", file=sys.stderr) + return False + if cert_count == 0: + print(" [ ERROR ] Brak certyfikatu!", file=sys.stderr) + return False + + if cert_count > 1: + print(f" [ OK ] Bundle zawiera certyfikat + {cert_count - 1} certyfikat(ów) pośrednich") + else: + print(" [ INFO ] Bundle zawiera tylko certyfikat końcowy (brak chain)") + + except Exception as e: + print(f" [ WARNING ] Nie można przeanalizować struktury: {e}") + + print("=== Weryfikacja zakończona ===\n") + return True + +def check_certificate(cert_path): + """ + Sprawdza poprawność certyfikatu/bundle'a bez konwersji z PFX + Używane z flagą --check-cert + """ + if not os.path.isfile(cert_path): + print(f"[ ERROR ] Nie znaleziono pliku: {cert_path}", file=sys.stderr) + sys.exit(1) + + print(f"=== Sprawdzanie certyfikatu: {cert_path} ===\n") + + # Sprawdź czy plik zawiera klucz prywatny + with open(cert_path, 'r') as f: + content = f.read() + + has_private_key = any([ + '-----BEGIN PRIVATE KEY-----' in content, + '-----BEGIN RSA PRIVATE KEY-----' in content, + '-----BEGIN EC PRIVATE KEY-----' in content + ]) + + has_certificate = '-----BEGIN CERTIFICATE-----' in content + + if not has_certificate: + print("[ ERROR ] Plik nie zawiera certyfikatu!", file=sys.stderr) + sys.exit(1) + + # Jeśli jest klucz prywatny, to jest to bundle - użyj pełnej weryfikacji + if has_private_key: + print("Wykryto bundle HAProxy (klucz + certyfikat)\n") + if not verify_bundle(cert_path): + sys.exit(1) + else: + # Tylko certyfikat(y) - weryfikacja bez klucza + print("Wykryto plik z certyfikatem(ami) bez klucza prywatnego\n") + verify_certificate_only(cert_path) + + print("\n[ OK ] Wszystkie sprawdzenia zakończone pomyślnie") + sys.exit(0) + +def verify_certificate_only(cert_path): + """ + Weryfikacja samego certyfikatu bez klucza prywatnego + """ + print("=== Weryfikacja certyfikatu ===") + + # 1. Informacje o certyfikacie + print("1. Informacje o certyfikacie końcowym:") + try: + cert_info = subprocess.run([ + "openssl", "x509", + "-in", cert_path, + "-noout", + "-subject", + "-issuer", + "-dates", + "-fingerprint", + "-sha256" + ], capture_output=True, text=True) + + if cert_info.returncode == 0: + for line in cert_info.stdout.strip().split('\n'): + print(f" {line}") + else: + print(" [ ERROR ] Nie można odczytać certyfikatu", file=sys.stderr) + return False + except Exception as e: + print(f" [ ERROR ] Błąd: {e}", file=sys.stderr) + return False + + # 2. Sprawdzenie ważności + print("\n2. Sprawdzanie ważności:") + try: + dates_result = subprocess.run([ + "openssl", "x509", + "-in", cert_path, + "-noout", + "-checkend", "0" + ], capture_output=True, text=True) + + if dates_result.returncode == 0: + print(" [ OK ] Certyfikat jest ważny (nie wygasł)") + else: + print(" [ ERROR ] Certyfikat wygasł!", file=sys.stderr) + return False + + # Sprawdź czy certyfikat wygasa w ciągu 30 dni + dates_30days = subprocess.run([ + "openssl", "x509", + "-in", cert_path, + "-noout", + "-checkend", str(30 * 24 * 3600) + ], capture_output=True, text=True) + + if dates_30days.returncode != 0: + print(" [ WARNING ] Certyfikat wygasa w ciągu 30 dni!") + + except Exception as e: + print(f" [ WARNING ] Nie można sprawdzić ważności: {e}") + + # 3. Struktura i liczba certyfikatów + print("\n3. Struktura pliku:") + try: + with open(cert_path, 'r') as f: + content = f.read() + + cert_count = content.count('-----BEGIN CERTIFICATE-----') + print(f" - Liczba certyfikatów: {cert_count}") + + if cert_count > 1: + print(f" [ OK ] Plik zawiera certyfikat końcowy + {cert_count - 1} certyfikat(ów) pośrednich") + elif cert_count == 1: + print(" [ INFO ] Plik zawiera tylko certyfikat końcowy (brak chain)") + else: + print(" [ ERROR ] Brak certyfikatów w pliku!", file=sys.stderr) + return False + except Exception as e: + print(f" [ ERROR ] Nie można przeanalizować struktury: {e}", file=sys.stderr) + return False + + # 4. Weryfikacja łańcucha (jeśli jest) + if cert_count > 1: + print("\n4. Weryfikacja łańcucha certyfikatów:") + try: + verify_result = subprocess.run([ + "openssl", "verify", + "-CAfile", cert_path, + "-untrusted", cert_path, + cert_path + ], capture_output=True, text=True) + + if "OK" in verify_result.stdout: + print(" [ OK ] Łańcuch certyfikatów jest poprawny") + else: + print(" [ INFO ] Nie można w pełni zweryfikować łańcucha (może wymagać root CA)") + print(f" Output: {verify_result.stdout.strip()}") + except Exception as e: + print(f" [ INFO ] {e}") + + # 5. Szczegóły wszystkich certyfikatów + if cert_count > 1: + print("\n5. Lista wszystkich certyfikatów w łańcuchu:") + try: + # Rozdziel certyfikaty i wyświetl każdy + certs = content.split('-----BEGIN CERTIFICATE-----') + cert_num = 0 + for cert_block in certs: + if '-----END CERTIFICATE-----' in cert_block: + cert_num += 1 + cert_content = '-----BEGIN CERTIFICATE-----' + cert_block + + # Zapisz tymczasowo + with tempfile.NamedTemporaryFile(mode='w', suffix='.pem', delete=False) as tmp: + tmp.write(cert_content) + tmp_path = tmp.name + + try: + subject_result = subprocess.run([ + "openssl", "x509", + "-in", tmp_path, + "-noout", + "-subject", + "-issuer" + ], capture_output=True, text=True) + + if subject_result.returncode == 0: + print(f"\n Certyfikat #{cert_num}:") + for line in subject_result.stdout.strip().split('\n'): + print(f" {line}") + finally: + os.unlink(tmp_path) + except Exception as e: + print(f" [ WARNING ] Nie można wyświetlić szczegółów certyfikatów: {e}") + + print("\n=== Weryfikacja zakończona ===") + return True + +def create_tar_gz(archive_name, files): + with tarfile.open(archive_name, "w:gz") as tar: + for f in files: + if os.path.isfile(f): + tar.add(f, arcname=os.path.basename(f)) + +def main(): + parser = argparse.ArgumentParser( + description="Rozszyfruj PFX i spakuj key/cert/chain do tar.gz" + ) + parser.add_argument("pfx", nargs='?', help="Ścieżka do pliku .pfx/.p12") + parser.add_argument("password", nargs='?', help="Hasło do pliku PFX") + parser.add_argument( + "-o", "--output-dir", + default=".", + help="Katalog wyjściowy (domyślnie bieżący)" + ) + parser.add_argument( + "-b", "--bundle", + action="store_true", + help="Utwórz bundle dla HAProxy (key+cert+chain w jednym pliku)" + ) + parser.add_argument( + "-c", "--check-cert", + metavar="CERT_FILE", + help="Sprawdź poprawność istniejącego certyfikatu/bundle'a" + ) + + args = parser.parse_args() + + # Tryb weryfikacji certyfikatu + if args.check_cert: + check_certificate(args.check_cert) + return + + # Tryb normalny - wymaga pfx i hasła + if not args.pfx or not args.password: + parser.error("Wymagane argumenty: pfx i password (chyba że używasz --check-cert)") + + pfx_path = os.path.abspath(args.pfx) + if not os.path.isfile(pfx_path): + print(f"Nie znaleziono pliku: {pfx_path}", file=sys.stderr) + sys.exit(1) + + os.makedirs(args.output_dir, exist_ok=True) + base_name = os.path.splitext(os.path.basename(pfx_path))[0] + archive_name = os.path.join(args.output_dir, f"{base_name}.tar.gz") + + # pracujemy w katalogu tymczasowym, potem wyniki kopiujemy + tmpdir = tempfile.mkdtemp(prefix="pfx_extract_") + try: + key_path, cert_path, chain_path = extract_from_pfx( + pfx_path, args.password, tmpdir, base_name + ) + + # kopiujemy pliki PEM do katalogu wyjściowego + final_key = shutil.copy(key_path, os.path.join(args.output_dir, os.path.basename(key_path))) + final_cert = shutil.copy(cert_path, os.path.join(args.output_dir, os.path.basename(cert_path))) + final_chain = shutil.copy(chain_path, os.path.join(args.output_dir, os.path.basename(chain_path))) + + files_to_archive = [final_key, final_cert, final_chain] + + # Jeśli flaga --bundle, tworzymy bundle dla HAProxy + if args.bundle: + bundle_path = os.path.join(args.output_dir, f"{base_name}.haproxy.pem") + print("Tworzenie bundle'a dla HAProxy...") + + if create_haproxy_bundle(final_key, final_cert, final_chain, bundle_path): + print(f" Bundle HAProxy: {bundle_path}") + + # Kompleksowa weryfikacja bundle'a + if not verify_bundle(bundle_path): + print(" [ ERROR ] Bundle nie przeszedł weryfikacji!", file=sys.stderr) + sys.exit(1) + + files_to_archive.append(bundle_path) + else: + print(" [ ERROR ] Nie udało się utworzyć bundle'a", file=sys.stderr) + sys.exit(1) + + # tworzymy tar.gz + create_tar_gz(archive_name, files_to_archive) + + print("\nUtworzono pliki:") + print(" Klucz: ", final_key) + print(" Certyfikat: ", final_cert) + print(" Chain: ", final_chain) + print("Archiwum tar.gz:", archive_name) + + finally: + shutil.rmtree(tmpdir, ignore_errors=True) + +if __name__ == "__main__": + main()