571 lines
20 KiB
Python
571 lines
20 KiB
Python
#!/usr/bin/env python3
|
||
import argparse
|
||
import os
|
||
import subprocess
|
||
import tarfile
|
||
import tempfile
|
||
import shutil
|
||
import sys
|
||
|
||
def run_cmd(cmd, password):
|
||
env = os.environ.copy()
|
||
# hasło przekazujemy przez zmienną środowiskową, żeby nie wisiało w ps
|
||
env["PFXPASSWORD"] = password
|
||
# używamy -passin env:PFXPASSWORD w openssl
|
||
result = subprocess.run(
|
||
cmd,
|
||
env=env,
|
||
stdout=subprocess.PIPE,
|
||
stderr=subprocess.PIPE,
|
||
text=True,
|
||
)
|
||
if result.returncode != 0:
|
||
print("Błąd wykonania:", " ".join(cmd), file=sys.stderr)
|
||
print(result.stderr, file=sys.stderr)
|
||
sys.exit(result.returncode)
|
||
return result.stdout
|
||
|
||
def extract_from_pfx(pfx_path, password, out_dir, base_name):
|
||
key_path = os.path.join(out_dir, f"{base_name}.key.pem")
|
||
cert_path = os.path.join(out_dir, f"{base_name}.cert.pem")
|
||
chain_path = os.path.join(out_dir, f"{base_name}.chain.pem")
|
||
|
||
# klucz prywatny
|
||
run_cmd([
|
||
"openssl", "pkcs12",
|
||
"-in", pfx_path,
|
||
"-passin", "env:PFXPASSWORD",
|
||
"-nodes",
|
||
"-nocerts",
|
||
"-out", key_path
|
||
], password)
|
||
|
||
# cert końcowy
|
||
run_cmd([
|
||
"openssl", "pkcs12",
|
||
"-in", pfx_path,
|
||
"-passin", "env:PFXPASSWORD",
|
||
"-clcerts",
|
||
"-nokeys",
|
||
"-out", cert_path
|
||
], password)
|
||
|
||
# certy pośrednie (CA chain) – może być puste, wtedy plik zostanie, ale bez zawartości
|
||
run_cmd([
|
||
"openssl", "pkcs12",
|
||
"-in", pfx_path,
|
||
"-passin", "env:PFXPASSWORD",
|
||
"-nokeys",
|
||
"-cacerts",
|
||
"-out", chain_path
|
||
], password)
|
||
|
||
return key_path, cert_path, chain_path
|
||
|
||
def clean_pem_content(file_path):
|
||
"""
|
||
Usuwa Bag Attributes i inne metadata z pliku PEM,
|
||
zostawiając tylko czyste certyfikaty/klucze
|
||
"""
|
||
if not os.path.isfile(file_path) or os.path.getsize(file_path) == 0:
|
||
return ""
|
||
|
||
with open(file_path, 'r') as f:
|
||
lines = f.readlines()
|
||
|
||
clean_lines = []
|
||
inside_cert_or_key = False
|
||
|
||
for line in lines:
|
||
# Rozpoczęcie certyfikatu lub klucza
|
||
if line.startswith('-----BEGIN'):
|
||
inside_cert_or_key = True
|
||
clean_lines.append(line)
|
||
# Koniec certyfikatu lub klucza
|
||
elif line.startswith('-----END'):
|
||
clean_lines.append(line)
|
||
inside_cert_or_key = False
|
||
# Kopiuj tylko linie wewnątrz bloków BEGIN/END
|
||
elif inside_cert_or_key:
|
||
clean_lines.append(line)
|
||
|
||
return ''.join(clean_lines)
|
||
|
||
def create_haproxy_bundle(key_path, cert_path, chain_path, bundle_path):
|
||
"""
|
||
Tworzy bundle dla HAProxy w poprawnej kolejności:
|
||
1. Klucz prywatny
|
||
2. Certyfikat końcowy
|
||
3. Certyfikaty pośrednie (chain) - od najbliższego do root CA
|
||
|
||
Usuwa Bag Attributes i inne metadata z PKCS12
|
||
"""
|
||
with open(bundle_path, 'w') as bundle:
|
||
# 1. Klucz prywatny (wyczyszczony)
|
||
key_content = clean_pem_content(key_path)
|
||
if key_content:
|
||
bundle.write(key_content)
|
||
if not key_content.endswith('\n'):
|
||
bundle.write('\n')
|
||
|
||
# 2. Certyfikat końcowy (wyczyszczony)
|
||
cert_content = clean_pem_content(cert_path)
|
||
if cert_content:
|
||
bundle.write(cert_content)
|
||
if not cert_content.endswith('\n'):
|
||
bundle.write('\n')
|
||
|
||
# 3. Chain (certyfikaty pośrednie, wyczyszczone)
|
||
chain_content = clean_pem_content(chain_path)
|
||
if chain_content:
|
||
bundle.write(chain_content)
|
||
if not chain_content.endswith('\n'):
|
||
bundle.write('\n')
|
||
|
||
# Weryfikacja czy bundle zawiera co najmniej klucz i certyfikat
|
||
if os.path.getsize(bundle_path) == 0:
|
||
print("UWAGA: Bundle jest pusty!", file=sys.stderr)
|
||
return False
|
||
|
||
return True
|
||
|
||
def verify_bundle(bundle_path):
|
||
"""
|
||
Kompleksowa weryfikacja poprawności bundle'a HAProxy:
|
||
1. Sprawdzenie czy klucz prywatny pasuje do certyfikatu
|
||
2. Sprawdzenie łańcucha certyfikatów (chain validation)
|
||
3. Wyświetlenie informacji o certyfikacie
|
||
"""
|
||
print("\n=== Weryfikacja bundle'a ===")
|
||
|
||
# 1. Sprawdzenie czy klucz pasuje do certyfikatu (porównanie modulus)
|
||
print("1. Sprawdzanie zgodności klucza prywatnego z certyfikatem...")
|
||
try:
|
||
# Najpierw sprawdźmy typ klucza
|
||
key_check = subprocess.run([
|
||
"openssl", "pkey",
|
||
"-in", bundle_path,
|
||
"-noout",
|
||
"-text"
|
||
], capture_output=True, text=True)
|
||
|
||
if key_check.returncode != 0:
|
||
print(" [ ERROR ] Nie można odczytać klucza prywatnego", file=sys.stderr)
|
||
return False
|
||
|
||
# Sprawdź typ klucza (RSA, EC, itp.)
|
||
key_type = None
|
||
if "RSA" in key_check.stdout or "rsaEncryption" in key_check.stdout:
|
||
key_type = "rsa"
|
||
elif "EC" in key_check.stdout or "id-ecPublicKey" in key_check.stdout:
|
||
key_type = "ec"
|
||
|
||
# Modulus dla RSA lub pubkey dla EC/innych
|
||
key_modulus = None
|
||
if key_type == "rsa":
|
||
key_result = subprocess.run([
|
||
"openssl", "rsa",
|
||
"-in", bundle_path,
|
||
"-noout",
|
||
"-modulus"
|
||
], capture_output=True, text=True)
|
||
if key_result.returncode == 0:
|
||
key_modulus = key_result.stdout.strip()
|
||
else:
|
||
# Dla EC i innych typów używamy pubkey
|
||
key_result = subprocess.run([
|
||
"openssl", "pkey",
|
||
"-in", bundle_path,
|
||
"-pubout"
|
||
], capture_output=True, text=True)
|
||
if key_result.returncode == 0:
|
||
key_modulus = key_result.stdout.strip()
|
||
|
||
# Modulus/pubkey z certyfikatu
|
||
cert_modulus = None
|
||
if key_type == "rsa":
|
||
cert_result = subprocess.run([
|
||
"openssl", "x509",
|
||
"-in", bundle_path,
|
||
"-noout",
|
||
"-modulus"
|
||
], capture_output=True, text=True)
|
||
if cert_result.returncode == 0:
|
||
cert_modulus = cert_result.stdout.strip()
|
||
else:
|
||
# Dla EC i innych używamy pubkey z certyfikatu
|
||
cert_result = subprocess.run([
|
||
"openssl", "x509",
|
||
"-in", bundle_path,
|
||
"-noout",
|
||
"-pubkey"
|
||
], capture_output=True, text=True)
|
||
if cert_result.returncode == 0:
|
||
cert_modulus = cert_result.stdout.strip()
|
||
|
||
if key_modulus and cert_modulus and key_modulus == cert_modulus:
|
||
print(f" [ OK ] Klucz prywatny ({key_type.upper() if key_type else 'UNKNOWN'}) pasuje do certyfikatu")
|
||
else:
|
||
print(" [ ERROR ] Klucz prywatny NIE pasuje do certyfikatu!", file=sys.stderr)
|
||
return False
|
||
|
||
except Exception as e:
|
||
print(f" [ ERROR ] Błąd podczas weryfikacji klucza: {e}", file=sys.stderr)
|
||
return False
|
||
|
||
# 2. Weryfikacja łańcucha certyfikatów
|
||
print("2. Weryfikacja łańcucha certyfikatów...")
|
||
try:
|
||
# openssl verify sprawdzi czy łańcuch jest poprawny
|
||
verify_result = subprocess.run([
|
||
"openssl", "verify",
|
||
"-CAfile", bundle_path,
|
||
"-untrusted", bundle_path,
|
||
bundle_path
|
||
], capture_output=True, text=True)
|
||
|
||
if "OK" in verify_result.stdout:
|
||
print(" [ OK ] Łańcuch certyfikatów jest poprawny")
|
||
else:
|
||
# Czasem verify może nie działać z bundle'em, to nie jest krytyczne
|
||
print(" [ INFO ] Nie można w pełni zweryfikować łańcucha (może wymagać root CA)")
|
||
except Exception as e:
|
||
print(f" [ INFO ] {e}")
|
||
|
||
# 3. Wyświetlenie informacji o certyfikacie
|
||
print("3. Informacje o certyfikacie:")
|
||
try:
|
||
cert_info = subprocess.run([
|
||
"openssl", "x509",
|
||
"-in", bundle_path,
|
||
"-noout",
|
||
"-subject",
|
||
"-issuer",
|
||
"-dates"
|
||
], capture_output=True, text=True)
|
||
|
||
if cert_info.returncode == 0:
|
||
for line in cert_info.stdout.strip().split('\n'):
|
||
print(f" {line}")
|
||
|
||
# Sprawdzenie czy certyfikat nie wygasł
|
||
dates_result = subprocess.run([
|
||
"openssl", "x509",
|
||
"-in", bundle_path,
|
||
"-noout",
|
||
"-checkend", "0"
|
||
], capture_output=True, text=True)
|
||
|
||
if dates_result.returncode == 0:
|
||
print(" [ OK ] Certyfikat jest ważny (nie wygasł)")
|
||
else:
|
||
print(" [ ERROR ] Certyfikat wygasł!", file=sys.stderr)
|
||
return False
|
||
|
||
except Exception as e:
|
||
print(f" [ WARNING ] Nie można pobrać informacji o certyfikacie: {e}")
|
||
|
||
# 4. Sprawdzenie liczby certyfikatów w bundle
|
||
print("4. Struktura bundle'a:")
|
||
try:
|
||
with open(bundle_path, 'r') as f:
|
||
content = f.read()
|
||
|
||
cert_count = content.count('-----BEGIN CERTIFICATE-----')
|
||
private_key_count = content.count('-----BEGIN PRIVATE KEY-----') + \
|
||
content.count('-----BEGIN RSA PRIVATE KEY-----') + \
|
||
content.count('-----BEGIN EC PRIVATE KEY-----')
|
||
|
||
print(f" - Klucze prywatne: {private_key_count}")
|
||
print(f" - Certyfikaty: {cert_count}")
|
||
|
||
if private_key_count == 0:
|
||
print(" [ ERROR ] Brak klucza prywatnego!", file=sys.stderr)
|
||
return False
|
||
if cert_count == 0:
|
||
print(" [ ERROR ] Brak certyfikatu!", file=sys.stderr)
|
||
return False
|
||
|
||
if cert_count > 1:
|
||
print(f" [ OK ] Bundle zawiera certyfikat + {cert_count - 1} certyfikat(ów) pośrednich")
|
||
else:
|
||
print(" [ INFO ] Bundle zawiera tylko certyfikat końcowy (brak chain)")
|
||
|
||
except Exception as e:
|
||
print(f" [ WARNING ] Nie można przeanalizować struktury: {e}")
|
||
|
||
print("=== Weryfikacja zakończona ===\n")
|
||
return True
|
||
|
||
def check_certificate(cert_path):
|
||
"""
|
||
Sprawdza poprawność certyfikatu/bundle'a bez konwersji z PFX
|
||
Używane z flagą --check-cert
|
||
"""
|
||
if not os.path.isfile(cert_path):
|
||
print(f"[ ERROR ] Nie znaleziono pliku: {cert_path}", file=sys.stderr)
|
||
sys.exit(1)
|
||
|
||
print(f"=== Sprawdzanie certyfikatu: {cert_path} ===\n")
|
||
|
||
# Sprawdź czy plik zawiera klucz prywatny
|
||
with open(cert_path, 'r') as f:
|
||
content = f.read()
|
||
|
||
has_private_key = any([
|
||
'-----BEGIN PRIVATE KEY-----' in content,
|
||
'-----BEGIN RSA PRIVATE KEY-----' in content,
|
||
'-----BEGIN EC PRIVATE KEY-----' in content
|
||
])
|
||
|
||
has_certificate = '-----BEGIN CERTIFICATE-----' in content
|
||
|
||
if not has_certificate:
|
||
print("[ ERROR ] Plik nie zawiera certyfikatu!", file=sys.stderr)
|
||
sys.exit(1)
|
||
|
||
# Jeśli jest klucz prywatny, to jest to bundle - użyj pełnej weryfikacji
|
||
if has_private_key:
|
||
print("Wykryto bundle HAProxy (klucz + certyfikat)\n")
|
||
if not verify_bundle(cert_path):
|
||
sys.exit(1)
|
||
else:
|
||
# Tylko certyfikat(y) - weryfikacja bez klucza
|
||
print("Wykryto plik z certyfikatem(ami) bez klucza prywatnego\n")
|
||
verify_certificate_only(cert_path)
|
||
|
||
print("\n[ OK ] Wszystkie sprawdzenia zakończone pomyślnie")
|
||
sys.exit(0)
|
||
|
||
def verify_certificate_only(cert_path):
|
||
"""
|
||
Weryfikacja samego certyfikatu bez klucza prywatnego
|
||
"""
|
||
print("=== Weryfikacja certyfikatu ===")
|
||
|
||
# 1. Informacje o certyfikacie
|
||
print("1. Informacje o certyfikacie końcowym:")
|
||
try:
|
||
cert_info = subprocess.run([
|
||
"openssl", "x509",
|
||
"-in", cert_path,
|
||
"-noout",
|
||
"-subject",
|
||
"-issuer",
|
||
"-dates",
|
||
"-fingerprint",
|
||
"-sha256"
|
||
], capture_output=True, text=True)
|
||
|
||
if cert_info.returncode == 0:
|
||
for line in cert_info.stdout.strip().split('\n'):
|
||
print(f" {line}")
|
||
else:
|
||
print(" [ ERROR ] Nie można odczytać certyfikatu", file=sys.stderr)
|
||
return False
|
||
except Exception as e:
|
||
print(f" [ ERROR ] Błąd: {e}", file=sys.stderr)
|
||
return False
|
||
|
||
# 2. Sprawdzenie ważności
|
||
print("\n2. Sprawdzanie ważności:")
|
||
try:
|
||
dates_result = subprocess.run([
|
||
"openssl", "x509",
|
||
"-in", cert_path,
|
||
"-noout",
|
||
"-checkend", "0"
|
||
], capture_output=True, text=True)
|
||
|
||
if dates_result.returncode == 0:
|
||
print(" [ OK ] Certyfikat jest ważny (nie wygasł)")
|
||
else:
|
||
print(" [ ERROR ] Certyfikat wygasł!", file=sys.stderr)
|
||
return False
|
||
|
||
# Sprawdź czy certyfikat wygasa w ciągu 30 dni
|
||
dates_30days = subprocess.run([
|
||
"openssl", "x509",
|
||
"-in", cert_path,
|
||
"-noout",
|
||
"-checkend", str(30 * 24 * 3600)
|
||
], capture_output=True, text=True)
|
||
|
||
if dates_30days.returncode != 0:
|
||
print(" [ WARNING ] Certyfikat wygasa w ciągu 30 dni!")
|
||
|
||
except Exception as e:
|
||
print(f" [ WARNING ] Nie można sprawdzić ważności: {e}")
|
||
|
||
# 3. Struktura i liczba certyfikatów
|
||
print("\n3. Struktura pliku:")
|
||
try:
|
||
with open(cert_path, 'r') as f:
|
||
content = f.read()
|
||
|
||
cert_count = content.count('-----BEGIN CERTIFICATE-----')
|
||
print(f" - Liczba certyfikatów: {cert_count}")
|
||
|
||
if cert_count > 1:
|
||
print(f" [ OK ] Plik zawiera certyfikat końcowy + {cert_count - 1} certyfikat(ów) pośrednich")
|
||
elif cert_count == 1:
|
||
print(" [ INFO ] Plik zawiera tylko certyfikat końcowy (brak chain)")
|
||
else:
|
||
print(" [ ERROR ] Brak certyfikatów w pliku!", file=sys.stderr)
|
||
return False
|
||
except Exception as e:
|
||
print(f" [ ERROR ] Nie można przeanalizować struktury: {e}", file=sys.stderr)
|
||
return False
|
||
|
||
# 4. Weryfikacja łańcucha (jeśli jest)
|
||
if cert_count > 1:
|
||
print("\n4. Weryfikacja łańcucha certyfikatów:")
|
||
try:
|
||
verify_result = subprocess.run([
|
||
"openssl", "verify",
|
||
"-CAfile", cert_path,
|
||
"-untrusted", cert_path,
|
||
cert_path
|
||
], capture_output=True, text=True)
|
||
|
||
if "OK" in verify_result.stdout:
|
||
print(" [ OK ] Łańcuch certyfikatów jest poprawny")
|
||
else:
|
||
print(" [ INFO ] Nie można w pełni zweryfikować łańcucha (może wymagać root CA)")
|
||
print(f" Output: {verify_result.stdout.strip()}")
|
||
except Exception as e:
|
||
print(f" [ INFO ] {e}")
|
||
|
||
# 5. Szczegóły wszystkich certyfikatów
|
||
if cert_count > 1:
|
||
print("\n5. Lista wszystkich certyfikatów w łańcuchu:")
|
||
try:
|
||
# Rozdziel certyfikaty i wyświetl każdy
|
||
certs = content.split('-----BEGIN CERTIFICATE-----')
|
||
cert_num = 0
|
||
for cert_block in certs:
|
||
if '-----END CERTIFICATE-----' in cert_block:
|
||
cert_num += 1
|
||
cert_content = '-----BEGIN CERTIFICATE-----' + cert_block
|
||
|
||
# Zapisz tymczasowo
|
||
with tempfile.NamedTemporaryFile(mode='w', suffix='.pem', delete=False) as tmp:
|
||
tmp.write(cert_content)
|
||
tmp_path = tmp.name
|
||
|
||
try:
|
||
subject_result = subprocess.run([
|
||
"openssl", "x509",
|
||
"-in", tmp_path,
|
||
"-noout",
|
||
"-subject",
|
||
"-issuer"
|
||
], capture_output=True, text=True)
|
||
|
||
if subject_result.returncode == 0:
|
||
print(f"\n Certyfikat #{cert_num}:")
|
||
for line in subject_result.stdout.strip().split('\n'):
|
||
print(f" {line}")
|
||
finally:
|
||
os.unlink(tmp_path)
|
||
except Exception as e:
|
||
print(f" [ WARNING ] Nie można wyświetlić szczegółów certyfikatów: {e}")
|
||
|
||
print("\n=== Weryfikacja zakończona ===")
|
||
return True
|
||
|
||
def create_tar_gz(archive_name, files):
|
||
with tarfile.open(archive_name, "w:gz") as tar:
|
||
for f in files:
|
||
if os.path.isfile(f):
|
||
tar.add(f, arcname=os.path.basename(f))
|
||
|
||
def main():
|
||
parser = argparse.ArgumentParser(
|
||
description="Rozszyfruj PFX i spakuj key/cert/chain do tar.gz"
|
||
)
|
||
parser.add_argument("pfx", nargs='?', help="Ścieżka do pliku .pfx/.p12")
|
||
parser.add_argument("password", nargs='?', help="Hasło do pliku PFX")
|
||
parser.add_argument(
|
||
"-o", "--output-dir",
|
||
default=".",
|
||
help="Katalog wyjściowy (domyślnie bieżący)"
|
||
)
|
||
parser.add_argument(
|
||
"-b", "--bundle",
|
||
action="store_true",
|
||
help="Utwórz bundle dla HAProxy (key+cert+chain w jednym pliku)"
|
||
)
|
||
parser.add_argument(
|
||
"-c", "--check-cert",
|
||
metavar="CERT_FILE",
|
||
help="Sprawdź poprawność istniejącego certyfikatu/bundle'a"
|
||
)
|
||
|
||
args = parser.parse_args()
|
||
|
||
# Tryb weryfikacji certyfikatu
|
||
if args.check_cert:
|
||
check_certificate(args.check_cert)
|
||
return
|
||
|
||
# Tryb normalny - wymaga pfx i hasła
|
||
if not args.pfx or not args.password:
|
||
parser.error("Wymagane argumenty: pfx i password (chyba że używasz --check-cert)")
|
||
|
||
pfx_path = os.path.abspath(args.pfx)
|
||
if not os.path.isfile(pfx_path):
|
||
print(f"Nie znaleziono pliku: {pfx_path}", file=sys.stderr)
|
||
sys.exit(1)
|
||
|
||
os.makedirs(args.output_dir, exist_ok=True)
|
||
base_name = os.path.splitext(os.path.basename(pfx_path))[0]
|
||
archive_name = os.path.join(args.output_dir, f"{base_name}.tar.gz")
|
||
|
||
# pracujemy w katalogu tymczasowym, potem wyniki kopiujemy
|
||
tmpdir = tempfile.mkdtemp(prefix="pfx_extract_")
|
||
try:
|
||
key_path, cert_path, chain_path = extract_from_pfx(
|
||
pfx_path, args.password, tmpdir, base_name
|
||
)
|
||
|
||
# kopiujemy pliki PEM do katalogu wyjściowego
|
||
final_key = shutil.copy(key_path, os.path.join(args.output_dir, os.path.basename(key_path)))
|
||
final_cert = shutil.copy(cert_path, os.path.join(args.output_dir, os.path.basename(cert_path)))
|
||
final_chain = shutil.copy(chain_path, os.path.join(args.output_dir, os.path.basename(chain_path)))
|
||
|
||
files_to_archive = [final_key, final_cert, final_chain]
|
||
|
||
# Jeśli flaga --bundle, tworzymy bundle dla HAProxy
|
||
if args.bundle:
|
||
bundle_path = os.path.join(args.output_dir, f"{base_name}.haproxy.pem")
|
||
print("Tworzenie bundle'a dla HAProxy...")
|
||
|
||
if create_haproxy_bundle(final_key, final_cert, final_chain, bundle_path):
|
||
print(f" Bundle HAProxy: {bundle_path}")
|
||
|
||
# Kompleksowa weryfikacja bundle'a
|
||
if not verify_bundle(bundle_path):
|
||
print(" [ ERROR ] Bundle nie przeszedł weryfikacji!", file=sys.stderr)
|
||
sys.exit(1)
|
||
|
||
files_to_archive.append(bundle_path)
|
||
else:
|
||
print(" [ ERROR ] Nie udało się utworzyć bundle'a", file=sys.stderr)
|
||
sys.exit(1)
|
||
|
||
# tworzymy tar.gz
|
||
create_tar_gz(archive_name, files_to_archive)
|
||
|
||
print("\nUtworzono pliki:")
|
||
print(" Klucz: ", final_key)
|
||
print(" Certyfikat: ", final_cert)
|
||
print(" Chain: ", final_chain)
|
||
print("Archiwum tar.gz:", archive_name)
|
||
|
||
finally:
|
||
shutil.rmtree(tmpdir, ignore_errors=True)
|
||
|
||
if __name__ == "__main__":
|
||
main()
|