Add pfx_extract_new.py

This commit is contained in:
gru
2026-01-28 14:57:59 +01:00
parent e3ed8d0ad2
commit 6de4b930c3

570
pfx_extract_new.py Normal file
View File

@@ -0,0 +1,570 @@
#!/usr/bin/env python3
import argparse
import os
import subprocess
import tarfile
import tempfile
import shutil
import sys
def run_cmd(cmd, password):
env = os.environ.copy()
# hasło przekazujemy przez zmienną środowiskową, żeby nie wisiało w ps
env["PFXPASSWORD"] = password
# używamy -passin env:PFXPASSWORD w openssl
result = subprocess.run(
cmd,
env=env,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
if result.returncode != 0:
print("Błąd wykonania:", " ".join(cmd), file=sys.stderr)
print(result.stderr, file=sys.stderr)
sys.exit(result.returncode)
return result.stdout
def extract_from_pfx(pfx_path, password, out_dir, base_name):
key_path = os.path.join(out_dir, f"{base_name}.key.pem")
cert_path = os.path.join(out_dir, f"{base_name}.cert.pem")
chain_path = os.path.join(out_dir, f"{base_name}.chain.pem")
# klucz prywatny
run_cmd([
"openssl", "pkcs12",
"-in", pfx_path,
"-passin", "env:PFXPASSWORD",
"-nodes",
"-nocerts",
"-out", key_path
], password)
# cert końcowy
run_cmd([
"openssl", "pkcs12",
"-in", pfx_path,
"-passin", "env:PFXPASSWORD",
"-clcerts",
"-nokeys",
"-out", cert_path
], password)
# certy pośrednie (CA chain) może być puste, wtedy plik zostanie, ale bez zawartości
run_cmd([
"openssl", "pkcs12",
"-in", pfx_path,
"-passin", "env:PFXPASSWORD",
"-nokeys",
"-cacerts",
"-out", chain_path
], password)
return key_path, cert_path, chain_path
def clean_pem_content(file_path):
"""
Usuwa Bag Attributes i inne metadata z pliku PEM,
zostawiając tylko czyste certyfikaty/klucze
"""
if not os.path.isfile(file_path) or os.path.getsize(file_path) == 0:
return ""
with open(file_path, 'r') as f:
lines = f.readlines()
clean_lines = []
inside_cert_or_key = False
for line in lines:
# Rozpoczęcie certyfikatu lub klucza
if line.startswith('-----BEGIN'):
inside_cert_or_key = True
clean_lines.append(line)
# Koniec certyfikatu lub klucza
elif line.startswith('-----END'):
clean_lines.append(line)
inside_cert_or_key = False
# Kopiuj tylko linie wewnątrz bloków BEGIN/END
elif inside_cert_or_key:
clean_lines.append(line)
return ''.join(clean_lines)
def create_haproxy_bundle(key_path, cert_path, chain_path, bundle_path):
"""
Tworzy bundle dla HAProxy w poprawnej kolejności:
1. Klucz prywatny
2. Certyfikat końcowy
3. Certyfikaty pośrednie (chain) - od najbliższego do root CA
Usuwa Bag Attributes i inne metadata z PKCS12
"""
with open(bundle_path, 'w') as bundle:
# 1. Klucz prywatny (wyczyszczony)
key_content = clean_pem_content(key_path)
if key_content:
bundle.write(key_content)
if not key_content.endswith('\n'):
bundle.write('\n')
# 2. Certyfikat końcowy (wyczyszczony)
cert_content = clean_pem_content(cert_path)
if cert_content:
bundle.write(cert_content)
if not cert_content.endswith('\n'):
bundle.write('\n')
# 3. Chain (certyfikaty pośrednie, wyczyszczone)
chain_content = clean_pem_content(chain_path)
if chain_content:
bundle.write(chain_content)
if not chain_content.endswith('\n'):
bundle.write('\n')
# Weryfikacja czy bundle zawiera co najmniej klucz i certyfikat
if os.path.getsize(bundle_path) == 0:
print("UWAGA: Bundle jest pusty!", file=sys.stderr)
return False
return True
def verify_bundle(bundle_path):
"""
Kompleksowa weryfikacja poprawności bundle'a HAProxy:
1. Sprawdzenie czy klucz prywatny pasuje do certyfikatu
2. Sprawdzenie łańcucha certyfikatów (chain validation)
3. Wyświetlenie informacji o certyfikacie
"""
print("\n=== Weryfikacja bundle'a ===")
# 1. Sprawdzenie czy klucz pasuje do certyfikatu (porównanie modulus)
print("1. Sprawdzanie zgodności klucza prywatnego z certyfikatem...")
try:
# Najpierw sprawdźmy typ klucza
key_check = subprocess.run([
"openssl", "pkey",
"-in", bundle_path,
"-noout",
"-text"
], capture_output=True, text=True)
if key_check.returncode != 0:
print(" [ ERROR ] Nie można odczytać klucza prywatnego", file=sys.stderr)
return False
# Sprawdź typ klucza (RSA, EC, itp.)
key_type = None
if "RSA" in key_check.stdout or "rsaEncryption" in key_check.stdout:
key_type = "rsa"
elif "EC" in key_check.stdout or "id-ecPublicKey" in key_check.stdout:
key_type = "ec"
# Modulus dla RSA lub pubkey dla EC/innych
key_modulus = None
if key_type == "rsa":
key_result = subprocess.run([
"openssl", "rsa",
"-in", bundle_path,
"-noout",
"-modulus"
], capture_output=True, text=True)
if key_result.returncode == 0:
key_modulus = key_result.stdout.strip()
else:
# Dla EC i innych typów używamy pubkey
key_result = subprocess.run([
"openssl", "pkey",
"-in", bundle_path,
"-pubout"
], capture_output=True, text=True)
if key_result.returncode == 0:
key_modulus = key_result.stdout.strip()
# Modulus/pubkey z certyfikatu
cert_modulus = None
if key_type == "rsa":
cert_result = subprocess.run([
"openssl", "x509",
"-in", bundle_path,
"-noout",
"-modulus"
], capture_output=True, text=True)
if cert_result.returncode == 0:
cert_modulus = cert_result.stdout.strip()
else:
# Dla EC i innych używamy pubkey z certyfikatu
cert_result = subprocess.run([
"openssl", "x509",
"-in", bundle_path,
"-noout",
"-pubkey"
], capture_output=True, text=True)
if cert_result.returncode == 0:
cert_modulus = cert_result.stdout.strip()
if key_modulus and cert_modulus and key_modulus == cert_modulus:
print(f" [ OK ] Klucz prywatny ({key_type.upper() if key_type else 'UNKNOWN'}) pasuje do certyfikatu")
else:
print(" [ ERROR ] Klucz prywatny NIE pasuje do certyfikatu!", file=sys.stderr)
return False
except Exception as e:
print(f" [ ERROR ] Błąd podczas weryfikacji klucza: {e}", file=sys.stderr)
return False
# 2. Weryfikacja łańcucha certyfikatów
print("2. Weryfikacja łańcucha certyfikatów...")
try:
# openssl verify sprawdzi czy łańcuch jest poprawny
verify_result = subprocess.run([
"openssl", "verify",
"-CAfile", bundle_path,
"-untrusted", bundle_path,
bundle_path
], capture_output=True, text=True)
if "OK" in verify_result.stdout:
print(" [ OK ] Łańcuch certyfikatów jest poprawny")
else:
# Czasem verify może nie działać z bundle'em, to nie jest krytyczne
print(" [ INFO ] Nie można w pełni zweryfikować łańcucha (może wymagać root CA)")
except Exception as e:
print(f" [ INFO ] {e}")
# 3. Wyświetlenie informacji o certyfikacie
print("3. Informacje o certyfikacie:")
try:
cert_info = subprocess.run([
"openssl", "x509",
"-in", bundle_path,
"-noout",
"-subject",
"-issuer",
"-dates"
], capture_output=True, text=True)
if cert_info.returncode == 0:
for line in cert_info.stdout.strip().split('\n'):
print(f" {line}")
# Sprawdzenie czy certyfikat nie wygasł
dates_result = subprocess.run([
"openssl", "x509",
"-in", bundle_path,
"-noout",
"-checkend", "0"
], capture_output=True, text=True)
if dates_result.returncode == 0:
print(" [ OK ] Certyfikat jest ważny (nie wygasł)")
else:
print(" [ ERROR ] Certyfikat wygasł!", file=sys.stderr)
return False
except Exception as e:
print(f" [ WARNING ] Nie można pobrać informacji o certyfikacie: {e}")
# 4. Sprawdzenie liczby certyfikatów w bundle
print("4. Struktura bundle'a:")
try:
with open(bundle_path, 'r') as f:
content = f.read()
cert_count = content.count('-----BEGIN CERTIFICATE-----')
private_key_count = content.count('-----BEGIN PRIVATE KEY-----') + \
content.count('-----BEGIN RSA PRIVATE KEY-----') + \
content.count('-----BEGIN EC PRIVATE KEY-----')
print(f" - Klucze prywatne: {private_key_count}")
print(f" - Certyfikaty: {cert_count}")
if private_key_count == 0:
print(" [ ERROR ] Brak klucza prywatnego!", file=sys.stderr)
return False
if cert_count == 0:
print(" [ ERROR ] Brak certyfikatu!", file=sys.stderr)
return False
if cert_count > 1:
print(f" [ OK ] Bundle zawiera certyfikat + {cert_count - 1} certyfikat(ów) pośrednich")
else:
print(" [ INFO ] Bundle zawiera tylko certyfikat końcowy (brak chain)")
except Exception as e:
print(f" [ WARNING ] Nie można przeanalizować struktury: {e}")
print("=== Weryfikacja zakończona ===\n")
return True
def check_certificate(cert_path):
"""
Sprawdza poprawność certyfikatu/bundle'a bez konwersji z PFX
Używane z flagą --check-cert
"""
if not os.path.isfile(cert_path):
print(f"[ ERROR ] Nie znaleziono pliku: {cert_path}", file=sys.stderr)
sys.exit(1)
print(f"=== Sprawdzanie certyfikatu: {cert_path} ===\n")
# Sprawdź czy plik zawiera klucz prywatny
with open(cert_path, 'r') as f:
content = f.read()
has_private_key = any([
'-----BEGIN PRIVATE KEY-----' in content,
'-----BEGIN RSA PRIVATE KEY-----' in content,
'-----BEGIN EC PRIVATE KEY-----' in content
])
has_certificate = '-----BEGIN CERTIFICATE-----' in content
if not has_certificate:
print("[ ERROR ] Plik nie zawiera certyfikatu!", file=sys.stderr)
sys.exit(1)
# Jeśli jest klucz prywatny, to jest to bundle - użyj pełnej weryfikacji
if has_private_key:
print("Wykryto bundle HAProxy (klucz + certyfikat)\n")
if not verify_bundle(cert_path):
sys.exit(1)
else:
# Tylko certyfikat(y) - weryfikacja bez klucza
print("Wykryto plik z certyfikatem(ami) bez klucza prywatnego\n")
verify_certificate_only(cert_path)
print("\n[ OK ] Wszystkie sprawdzenia zakończone pomyślnie")
sys.exit(0)
def verify_certificate_only(cert_path):
"""
Weryfikacja samego certyfikatu bez klucza prywatnego
"""
print("=== Weryfikacja certyfikatu ===")
# 1. Informacje o certyfikacie
print("1. Informacje o certyfikacie końcowym:")
try:
cert_info = subprocess.run([
"openssl", "x509",
"-in", cert_path,
"-noout",
"-subject",
"-issuer",
"-dates",
"-fingerprint",
"-sha256"
], capture_output=True, text=True)
if cert_info.returncode == 0:
for line in cert_info.stdout.strip().split('\n'):
print(f" {line}")
else:
print(" [ ERROR ] Nie można odczytać certyfikatu", file=sys.stderr)
return False
except Exception as e:
print(f" [ ERROR ] Błąd: {e}", file=sys.stderr)
return False
# 2. Sprawdzenie ważności
print("\n2. Sprawdzanie ważności:")
try:
dates_result = subprocess.run([
"openssl", "x509",
"-in", cert_path,
"-noout",
"-checkend", "0"
], capture_output=True, text=True)
if dates_result.returncode == 0:
print(" [ OK ] Certyfikat jest ważny (nie wygasł)")
else:
print(" [ ERROR ] Certyfikat wygasł!", file=sys.stderr)
return False
# Sprawdź czy certyfikat wygasa w ciągu 30 dni
dates_30days = subprocess.run([
"openssl", "x509",
"-in", cert_path,
"-noout",
"-checkend", str(30 * 24 * 3600)
], capture_output=True, text=True)
if dates_30days.returncode != 0:
print(" [ WARNING ] Certyfikat wygasa w ciągu 30 dni!")
except Exception as e:
print(f" [ WARNING ] Nie można sprawdzić ważności: {e}")
# 3. Struktura i liczba certyfikatów
print("\n3. Struktura pliku:")
try:
with open(cert_path, 'r') as f:
content = f.read()
cert_count = content.count('-----BEGIN CERTIFICATE-----')
print(f" - Liczba certyfikatów: {cert_count}")
if cert_count > 1:
print(f" [ OK ] Plik zawiera certyfikat końcowy + {cert_count - 1} certyfikat(ów) pośrednich")
elif cert_count == 1:
print(" [ INFO ] Plik zawiera tylko certyfikat końcowy (brak chain)")
else:
print(" [ ERROR ] Brak certyfikatów w pliku!", file=sys.stderr)
return False
except Exception as e:
print(f" [ ERROR ] Nie można przeanalizować struktury: {e}", file=sys.stderr)
return False
# 4. Weryfikacja łańcucha (jeśli jest)
if cert_count > 1:
print("\n4. Weryfikacja łańcucha certyfikatów:")
try:
verify_result = subprocess.run([
"openssl", "verify",
"-CAfile", cert_path,
"-untrusted", cert_path,
cert_path
], capture_output=True, text=True)
if "OK" in verify_result.stdout:
print(" [ OK ] Łańcuch certyfikatów jest poprawny")
else:
print(" [ INFO ] Nie można w pełni zweryfikować łańcucha (może wymagać root CA)")
print(f" Output: {verify_result.stdout.strip()}")
except Exception as e:
print(f" [ INFO ] {e}")
# 5. Szczegóły wszystkich certyfikatów
if cert_count > 1:
print("\n5. Lista wszystkich certyfikatów w łańcuchu:")
try:
# Rozdziel certyfikaty i wyświetl każdy
certs = content.split('-----BEGIN CERTIFICATE-----')
cert_num = 0
for cert_block in certs:
if '-----END CERTIFICATE-----' in cert_block:
cert_num += 1
cert_content = '-----BEGIN CERTIFICATE-----' + cert_block
# Zapisz tymczasowo
with tempfile.NamedTemporaryFile(mode='w', suffix='.pem', delete=False) as tmp:
tmp.write(cert_content)
tmp_path = tmp.name
try:
subject_result = subprocess.run([
"openssl", "x509",
"-in", tmp_path,
"-noout",
"-subject",
"-issuer"
], capture_output=True, text=True)
if subject_result.returncode == 0:
print(f"\n Certyfikat #{cert_num}:")
for line in subject_result.stdout.strip().split('\n'):
print(f" {line}")
finally:
os.unlink(tmp_path)
except Exception as e:
print(f" [ WARNING ] Nie można wyświetlić szczegółów certyfikatów: {e}")
print("\n=== Weryfikacja zakończona ===")
return True
def create_tar_gz(archive_name, files):
with tarfile.open(archive_name, "w:gz") as tar:
for f in files:
if os.path.isfile(f):
tar.add(f, arcname=os.path.basename(f))
def main():
parser = argparse.ArgumentParser(
description="Rozszyfruj PFX i spakuj key/cert/chain do tar.gz"
)
parser.add_argument("pfx", nargs='?', help="Ścieżka do pliku .pfx/.p12")
parser.add_argument("password", nargs='?', help="Hasło do pliku PFX")
parser.add_argument(
"-o", "--output-dir",
default=".",
help="Katalog wyjściowy (domyślnie bieżący)"
)
parser.add_argument(
"-b", "--bundle",
action="store_true",
help="Utwórz bundle dla HAProxy (key+cert+chain w jednym pliku)"
)
parser.add_argument(
"-c", "--check-cert",
metavar="CERT_FILE",
help="Sprawdź poprawność istniejącego certyfikatu/bundle'a"
)
args = parser.parse_args()
# Tryb weryfikacji certyfikatu
if args.check_cert:
check_certificate(args.check_cert)
return
# Tryb normalny - wymaga pfx i hasła
if not args.pfx or not args.password:
parser.error("Wymagane argumenty: pfx i password (chyba że używasz --check-cert)")
pfx_path = os.path.abspath(args.pfx)
if not os.path.isfile(pfx_path):
print(f"Nie znaleziono pliku: {pfx_path}", file=sys.stderr)
sys.exit(1)
os.makedirs(args.output_dir, exist_ok=True)
base_name = os.path.splitext(os.path.basename(pfx_path))[0]
archive_name = os.path.join(args.output_dir, f"{base_name}.tar.gz")
# pracujemy w katalogu tymczasowym, potem wyniki kopiujemy
tmpdir = tempfile.mkdtemp(prefix="pfx_extract_")
try:
key_path, cert_path, chain_path = extract_from_pfx(
pfx_path, args.password, tmpdir, base_name
)
# kopiujemy pliki PEM do katalogu wyjściowego
final_key = shutil.copy(key_path, os.path.join(args.output_dir, os.path.basename(key_path)))
final_cert = shutil.copy(cert_path, os.path.join(args.output_dir, os.path.basename(cert_path)))
final_chain = shutil.copy(chain_path, os.path.join(args.output_dir, os.path.basename(chain_path)))
files_to_archive = [final_key, final_cert, final_chain]
# Jeśli flaga --bundle, tworzymy bundle dla HAProxy
if args.bundle:
bundle_path = os.path.join(args.output_dir, f"{base_name}.haproxy.pem")
print("Tworzenie bundle'a dla HAProxy...")
if create_haproxy_bundle(final_key, final_cert, final_chain, bundle_path):
print(f" Bundle HAProxy: {bundle_path}")
# Kompleksowa weryfikacja bundle'a
if not verify_bundle(bundle_path):
print(" [ ERROR ] Bundle nie przeszedł weryfikacji!", file=sys.stderr)
sys.exit(1)
files_to_archive.append(bundle_path)
else:
print(" [ ERROR ] Nie udało się utworzyć bundle'a", file=sys.stderr)
sys.exit(1)
# tworzymy tar.gz
create_tar_gz(archive_name, files_to_archive)
print("\nUtworzono pliki:")
print(" Klucz: ", final_key)
print(" Certyfikat: ", final_cert)
print(" Chain: ", final_chain)
print("Archiwum tar.gz:", archive_name)
finally:
shutil.rmtree(tmpdir, ignore_errors=True)
if __name__ == "__main__":
main()