Files
tools_scripts/pfx_extract_new.py
2026-01-28 14:57:59 +01:00

571 lines
20 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
import argparse
import os
import subprocess
import tarfile
import tempfile
import shutil
import sys
def run_cmd(cmd, password):
env = os.environ.copy()
# hasło przekazujemy przez zmienną środowiskową, żeby nie wisiało w ps
env["PFXPASSWORD"] = password
# używamy -passin env:PFXPASSWORD w openssl
result = subprocess.run(
cmd,
env=env,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
if result.returncode != 0:
print("Błąd wykonania:", " ".join(cmd), file=sys.stderr)
print(result.stderr, file=sys.stderr)
sys.exit(result.returncode)
return result.stdout
def extract_from_pfx(pfx_path, password, out_dir, base_name):
key_path = os.path.join(out_dir, f"{base_name}.key.pem")
cert_path = os.path.join(out_dir, f"{base_name}.cert.pem")
chain_path = os.path.join(out_dir, f"{base_name}.chain.pem")
# klucz prywatny
run_cmd([
"openssl", "pkcs12",
"-in", pfx_path,
"-passin", "env:PFXPASSWORD",
"-nodes",
"-nocerts",
"-out", key_path
], password)
# cert końcowy
run_cmd([
"openssl", "pkcs12",
"-in", pfx_path,
"-passin", "env:PFXPASSWORD",
"-clcerts",
"-nokeys",
"-out", cert_path
], password)
# certy pośrednie (CA chain) może być puste, wtedy plik zostanie, ale bez zawartości
run_cmd([
"openssl", "pkcs12",
"-in", pfx_path,
"-passin", "env:PFXPASSWORD",
"-nokeys",
"-cacerts",
"-out", chain_path
], password)
return key_path, cert_path, chain_path
def clean_pem_content(file_path):
"""
Usuwa Bag Attributes i inne metadata z pliku PEM,
zostawiając tylko czyste certyfikaty/klucze
"""
if not os.path.isfile(file_path) or os.path.getsize(file_path) == 0:
return ""
with open(file_path, 'r') as f:
lines = f.readlines()
clean_lines = []
inside_cert_or_key = False
for line in lines:
# Rozpoczęcie certyfikatu lub klucza
if line.startswith('-----BEGIN'):
inside_cert_or_key = True
clean_lines.append(line)
# Koniec certyfikatu lub klucza
elif line.startswith('-----END'):
clean_lines.append(line)
inside_cert_or_key = False
# Kopiuj tylko linie wewnątrz bloków BEGIN/END
elif inside_cert_or_key:
clean_lines.append(line)
return ''.join(clean_lines)
def create_haproxy_bundle(key_path, cert_path, chain_path, bundle_path):
"""
Tworzy bundle dla HAProxy w poprawnej kolejności:
1. Klucz prywatny
2. Certyfikat końcowy
3. Certyfikaty pośrednie (chain) - od najbliższego do root CA
Usuwa Bag Attributes i inne metadata z PKCS12
"""
with open(bundle_path, 'w') as bundle:
# 1. Klucz prywatny (wyczyszczony)
key_content = clean_pem_content(key_path)
if key_content:
bundle.write(key_content)
if not key_content.endswith('\n'):
bundle.write('\n')
# 2. Certyfikat końcowy (wyczyszczony)
cert_content = clean_pem_content(cert_path)
if cert_content:
bundle.write(cert_content)
if not cert_content.endswith('\n'):
bundle.write('\n')
# 3. Chain (certyfikaty pośrednie, wyczyszczone)
chain_content = clean_pem_content(chain_path)
if chain_content:
bundle.write(chain_content)
if not chain_content.endswith('\n'):
bundle.write('\n')
# Weryfikacja czy bundle zawiera co najmniej klucz i certyfikat
if os.path.getsize(bundle_path) == 0:
print("UWAGA: Bundle jest pusty!", file=sys.stderr)
return False
return True
def verify_bundle(bundle_path):
"""
Kompleksowa weryfikacja poprawności bundle'a HAProxy:
1. Sprawdzenie czy klucz prywatny pasuje do certyfikatu
2. Sprawdzenie łańcucha certyfikatów (chain validation)
3. Wyświetlenie informacji o certyfikacie
"""
print("\n=== Weryfikacja bundle'a ===")
# 1. Sprawdzenie czy klucz pasuje do certyfikatu (porównanie modulus)
print("1. Sprawdzanie zgodności klucza prywatnego z certyfikatem...")
try:
# Najpierw sprawdźmy typ klucza
key_check = subprocess.run([
"openssl", "pkey",
"-in", bundle_path,
"-noout",
"-text"
], capture_output=True, text=True)
if key_check.returncode != 0:
print(" [ ERROR ] Nie można odczytać klucza prywatnego", file=sys.stderr)
return False
# Sprawdź typ klucza (RSA, EC, itp.)
key_type = None
if "RSA" in key_check.stdout or "rsaEncryption" in key_check.stdout:
key_type = "rsa"
elif "EC" in key_check.stdout or "id-ecPublicKey" in key_check.stdout:
key_type = "ec"
# Modulus dla RSA lub pubkey dla EC/innych
key_modulus = None
if key_type == "rsa":
key_result = subprocess.run([
"openssl", "rsa",
"-in", bundle_path,
"-noout",
"-modulus"
], capture_output=True, text=True)
if key_result.returncode == 0:
key_modulus = key_result.stdout.strip()
else:
# Dla EC i innych typów używamy pubkey
key_result = subprocess.run([
"openssl", "pkey",
"-in", bundle_path,
"-pubout"
], capture_output=True, text=True)
if key_result.returncode == 0:
key_modulus = key_result.stdout.strip()
# Modulus/pubkey z certyfikatu
cert_modulus = None
if key_type == "rsa":
cert_result = subprocess.run([
"openssl", "x509",
"-in", bundle_path,
"-noout",
"-modulus"
], capture_output=True, text=True)
if cert_result.returncode == 0:
cert_modulus = cert_result.stdout.strip()
else:
# Dla EC i innych używamy pubkey z certyfikatu
cert_result = subprocess.run([
"openssl", "x509",
"-in", bundle_path,
"-noout",
"-pubkey"
], capture_output=True, text=True)
if cert_result.returncode == 0:
cert_modulus = cert_result.stdout.strip()
if key_modulus and cert_modulus and key_modulus == cert_modulus:
print(f" [ OK ] Klucz prywatny ({key_type.upper() if key_type else 'UNKNOWN'}) pasuje do certyfikatu")
else:
print(" [ ERROR ] Klucz prywatny NIE pasuje do certyfikatu!", file=sys.stderr)
return False
except Exception as e:
print(f" [ ERROR ] Błąd podczas weryfikacji klucza: {e}", file=sys.stderr)
return False
# 2. Weryfikacja łańcucha certyfikatów
print("2. Weryfikacja łańcucha certyfikatów...")
try:
# openssl verify sprawdzi czy łańcuch jest poprawny
verify_result = subprocess.run([
"openssl", "verify",
"-CAfile", bundle_path,
"-untrusted", bundle_path,
bundle_path
], capture_output=True, text=True)
if "OK" in verify_result.stdout:
print(" [ OK ] Łańcuch certyfikatów jest poprawny")
else:
# Czasem verify może nie działać z bundle'em, to nie jest krytyczne
print(" [ INFO ] Nie można w pełni zweryfikować łańcucha (może wymagać root CA)")
except Exception as e:
print(f" [ INFO ] {e}")
# 3. Wyświetlenie informacji o certyfikacie
print("3. Informacje o certyfikacie:")
try:
cert_info = subprocess.run([
"openssl", "x509",
"-in", bundle_path,
"-noout",
"-subject",
"-issuer",
"-dates"
], capture_output=True, text=True)
if cert_info.returncode == 0:
for line in cert_info.stdout.strip().split('\n'):
print(f" {line}")
# Sprawdzenie czy certyfikat nie wygasł
dates_result = subprocess.run([
"openssl", "x509",
"-in", bundle_path,
"-noout",
"-checkend", "0"
], capture_output=True, text=True)
if dates_result.returncode == 0:
print(" [ OK ] Certyfikat jest ważny (nie wygasł)")
else:
print(" [ ERROR ] Certyfikat wygasł!", file=sys.stderr)
return False
except Exception as e:
print(f" [ WARNING ] Nie można pobrać informacji o certyfikacie: {e}")
# 4. Sprawdzenie liczby certyfikatów w bundle
print("4. Struktura bundle'a:")
try:
with open(bundle_path, 'r') as f:
content = f.read()
cert_count = content.count('-----BEGIN CERTIFICATE-----')
private_key_count = content.count('-----BEGIN PRIVATE KEY-----') + \
content.count('-----BEGIN RSA PRIVATE KEY-----') + \
content.count('-----BEGIN EC PRIVATE KEY-----')
print(f" - Klucze prywatne: {private_key_count}")
print(f" - Certyfikaty: {cert_count}")
if private_key_count == 0:
print(" [ ERROR ] Brak klucza prywatnego!", file=sys.stderr)
return False
if cert_count == 0:
print(" [ ERROR ] Brak certyfikatu!", file=sys.stderr)
return False
if cert_count > 1:
print(f" [ OK ] Bundle zawiera certyfikat + {cert_count - 1} certyfikat(ów) pośrednich")
else:
print(" [ INFO ] Bundle zawiera tylko certyfikat końcowy (brak chain)")
except Exception as e:
print(f" [ WARNING ] Nie można przeanalizować struktury: {e}")
print("=== Weryfikacja zakończona ===\n")
return True
def check_certificate(cert_path):
"""
Sprawdza poprawność certyfikatu/bundle'a bez konwersji z PFX
Używane z flagą --check-cert
"""
if not os.path.isfile(cert_path):
print(f"[ ERROR ] Nie znaleziono pliku: {cert_path}", file=sys.stderr)
sys.exit(1)
print(f"=== Sprawdzanie certyfikatu: {cert_path} ===\n")
# Sprawdź czy plik zawiera klucz prywatny
with open(cert_path, 'r') as f:
content = f.read()
has_private_key = any([
'-----BEGIN PRIVATE KEY-----' in content,
'-----BEGIN RSA PRIVATE KEY-----' in content,
'-----BEGIN EC PRIVATE KEY-----' in content
])
has_certificate = '-----BEGIN CERTIFICATE-----' in content
if not has_certificate:
print("[ ERROR ] Plik nie zawiera certyfikatu!", file=sys.stderr)
sys.exit(1)
# Jeśli jest klucz prywatny, to jest to bundle - użyj pełnej weryfikacji
if has_private_key:
print("Wykryto bundle HAProxy (klucz + certyfikat)\n")
if not verify_bundle(cert_path):
sys.exit(1)
else:
# Tylko certyfikat(y) - weryfikacja bez klucza
print("Wykryto plik z certyfikatem(ami) bez klucza prywatnego\n")
verify_certificate_only(cert_path)
print("\n[ OK ] Wszystkie sprawdzenia zakończone pomyślnie")
sys.exit(0)
def verify_certificate_only(cert_path):
"""
Weryfikacja samego certyfikatu bez klucza prywatnego
"""
print("=== Weryfikacja certyfikatu ===")
# 1. Informacje o certyfikacie
print("1. Informacje o certyfikacie końcowym:")
try:
cert_info = subprocess.run([
"openssl", "x509",
"-in", cert_path,
"-noout",
"-subject",
"-issuer",
"-dates",
"-fingerprint",
"-sha256"
], capture_output=True, text=True)
if cert_info.returncode == 0:
for line in cert_info.stdout.strip().split('\n'):
print(f" {line}")
else:
print(" [ ERROR ] Nie można odczytać certyfikatu", file=sys.stderr)
return False
except Exception as e:
print(f" [ ERROR ] Błąd: {e}", file=sys.stderr)
return False
# 2. Sprawdzenie ważności
print("\n2. Sprawdzanie ważności:")
try:
dates_result = subprocess.run([
"openssl", "x509",
"-in", cert_path,
"-noout",
"-checkend", "0"
], capture_output=True, text=True)
if dates_result.returncode == 0:
print(" [ OK ] Certyfikat jest ważny (nie wygasł)")
else:
print(" [ ERROR ] Certyfikat wygasł!", file=sys.stderr)
return False
# Sprawdź czy certyfikat wygasa w ciągu 30 dni
dates_30days = subprocess.run([
"openssl", "x509",
"-in", cert_path,
"-noout",
"-checkend", str(30 * 24 * 3600)
], capture_output=True, text=True)
if dates_30days.returncode != 0:
print(" [ WARNING ] Certyfikat wygasa w ciągu 30 dni!")
except Exception as e:
print(f" [ WARNING ] Nie można sprawdzić ważności: {e}")
# 3. Struktura i liczba certyfikatów
print("\n3. Struktura pliku:")
try:
with open(cert_path, 'r') as f:
content = f.read()
cert_count = content.count('-----BEGIN CERTIFICATE-----')
print(f" - Liczba certyfikatów: {cert_count}")
if cert_count > 1:
print(f" [ OK ] Plik zawiera certyfikat końcowy + {cert_count - 1} certyfikat(ów) pośrednich")
elif cert_count == 1:
print(" [ INFO ] Plik zawiera tylko certyfikat końcowy (brak chain)")
else:
print(" [ ERROR ] Brak certyfikatów w pliku!", file=sys.stderr)
return False
except Exception as e:
print(f" [ ERROR ] Nie można przeanalizować struktury: {e}", file=sys.stderr)
return False
# 4. Weryfikacja łańcucha (jeśli jest)
if cert_count > 1:
print("\n4. Weryfikacja łańcucha certyfikatów:")
try:
verify_result = subprocess.run([
"openssl", "verify",
"-CAfile", cert_path,
"-untrusted", cert_path,
cert_path
], capture_output=True, text=True)
if "OK" in verify_result.stdout:
print(" [ OK ] Łańcuch certyfikatów jest poprawny")
else:
print(" [ INFO ] Nie można w pełni zweryfikować łańcucha (może wymagać root CA)")
print(f" Output: {verify_result.stdout.strip()}")
except Exception as e:
print(f" [ INFO ] {e}")
# 5. Szczegóły wszystkich certyfikatów
if cert_count > 1:
print("\n5. Lista wszystkich certyfikatów w łańcuchu:")
try:
# Rozdziel certyfikaty i wyświetl każdy
certs = content.split('-----BEGIN CERTIFICATE-----')
cert_num = 0
for cert_block in certs:
if '-----END CERTIFICATE-----' in cert_block:
cert_num += 1
cert_content = '-----BEGIN CERTIFICATE-----' + cert_block
# Zapisz tymczasowo
with tempfile.NamedTemporaryFile(mode='w', suffix='.pem', delete=False) as tmp:
tmp.write(cert_content)
tmp_path = tmp.name
try:
subject_result = subprocess.run([
"openssl", "x509",
"-in", tmp_path,
"-noout",
"-subject",
"-issuer"
], capture_output=True, text=True)
if subject_result.returncode == 0:
print(f"\n Certyfikat #{cert_num}:")
for line in subject_result.stdout.strip().split('\n'):
print(f" {line}")
finally:
os.unlink(tmp_path)
except Exception as e:
print(f" [ WARNING ] Nie można wyświetlić szczegółów certyfikatów: {e}")
print("\n=== Weryfikacja zakończona ===")
return True
def create_tar_gz(archive_name, files):
with tarfile.open(archive_name, "w:gz") as tar:
for f in files:
if os.path.isfile(f):
tar.add(f, arcname=os.path.basename(f))
def main():
parser = argparse.ArgumentParser(
description="Rozszyfruj PFX i spakuj key/cert/chain do tar.gz"
)
parser.add_argument("pfx", nargs='?', help="Ścieżka do pliku .pfx/.p12")
parser.add_argument("password", nargs='?', help="Hasło do pliku PFX")
parser.add_argument(
"-o", "--output-dir",
default=".",
help="Katalog wyjściowy (domyślnie bieżący)"
)
parser.add_argument(
"-b", "--bundle",
action="store_true",
help="Utwórz bundle dla HAProxy (key+cert+chain w jednym pliku)"
)
parser.add_argument(
"-c", "--check-cert",
metavar="CERT_FILE",
help="Sprawdź poprawność istniejącego certyfikatu/bundle'a"
)
args = parser.parse_args()
# Tryb weryfikacji certyfikatu
if args.check_cert:
check_certificate(args.check_cert)
return
# Tryb normalny - wymaga pfx i hasła
if not args.pfx or not args.password:
parser.error("Wymagane argumenty: pfx i password (chyba że używasz --check-cert)")
pfx_path = os.path.abspath(args.pfx)
if not os.path.isfile(pfx_path):
print(f"Nie znaleziono pliku: {pfx_path}", file=sys.stderr)
sys.exit(1)
os.makedirs(args.output_dir, exist_ok=True)
base_name = os.path.splitext(os.path.basename(pfx_path))[0]
archive_name = os.path.join(args.output_dir, f"{base_name}.tar.gz")
# pracujemy w katalogu tymczasowym, potem wyniki kopiujemy
tmpdir = tempfile.mkdtemp(prefix="pfx_extract_")
try:
key_path, cert_path, chain_path = extract_from_pfx(
pfx_path, args.password, tmpdir, base_name
)
# kopiujemy pliki PEM do katalogu wyjściowego
final_key = shutil.copy(key_path, os.path.join(args.output_dir, os.path.basename(key_path)))
final_cert = shutil.copy(cert_path, os.path.join(args.output_dir, os.path.basename(cert_path)))
final_chain = shutil.copy(chain_path, os.path.join(args.output_dir, os.path.basename(chain_path)))
files_to_archive = [final_key, final_cert, final_chain]
# Jeśli flaga --bundle, tworzymy bundle dla HAProxy
if args.bundle:
bundle_path = os.path.join(args.output_dir, f"{base_name}.haproxy.pem")
print("Tworzenie bundle'a dla HAProxy...")
if create_haproxy_bundle(final_key, final_cert, final_chain, bundle_path):
print(f" Bundle HAProxy: {bundle_path}")
# Kompleksowa weryfikacja bundle'a
if not verify_bundle(bundle_path):
print(" [ ERROR ] Bundle nie przeszedł weryfikacji!", file=sys.stderr)
sys.exit(1)
files_to_archive.append(bundle_path)
else:
print(" [ ERROR ] Nie udało się utworzyć bundle'a", file=sys.stderr)
sys.exit(1)
# tworzymy tar.gz
create_tar_gz(archive_name, files_to_archive)
print("\nUtworzono pliki:")
print(" Klucz: ", final_key)
print(" Certyfikat: ", final_cert)
print(" Chain: ", final_chain)
print("Archiwum tar.gz:", archive_name)
finally:
shutil.rmtree(tmpdir, ignore_errors=True)
if __name__ == "__main__":
main()