from __future__ import annotations from dataclasses import dataclass from datetime import date, datetime, timedelta, timezone from decimal import Decimal import base64 import hashlib import re import time import xml.etree.ElementTree as ET from typing import Any import requests from app.services.settings_service import SettingsService try: from cryptography import x509 from cryptography.hazmat.primitives import hashes, serialization, padding as sym_padding from cryptography.hazmat.primitives.asymmetric import padding from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes except Exception: # pragma: no cover x509 = None hashes = None serialization = None sym_padding = None Cipher = None algorithms = None modes = None padding = None @dataclass class KSeFDocument: ksef_number: str invoice_number: str contractor_name: str contractor_nip: str issue_date: date received_date: date fetched_at: datetime net_amount: Decimal vat_amount: Decimal gross_amount: Decimal invoice_type: str xml_content: str metadata: dict class KSeFAdapter: def list_documents(self, since: datetime | None = None): raise NotImplementedError def issue_invoice(self, payload: dict): raise NotImplementedError def ping(self): return {"status": "unknown"} def diagnostics(self): return self.ping() class MockKSeFAdapter(KSeFAdapter): def __init__(self, company_id=None): self.company_id = company_id or 0 def list_documents(self, since=None): base_date = datetime.utcnow() docs = [] for i in range(1, 6): issue = (base_date - timedelta(days=i * 3)).date() net = Decimal(1000 + i * 250) vat = (net * Decimal("0.23")).quantize(Decimal("0.01")) gross = net + vat seq = 10000 + i docs.append( KSeFDocument( ksef_number=f"KSEF/MOCK/C{self.company_id}/{issue.year}/{seq}", invoice_number=f"FV/{self.company_id}/{i:03d}/{issue.year}", contractor_name=f"Firma {self.company_id}-{i}", contractor_nip=f"525{self.company_id:02d}000{i:02d}", issue_date=issue, received_date=issue + timedelta(days=1), fetched_at=base_date - timedelta(hours=i), net_amount=net, vat_amount=vat, gross_amount=gross, invoice_type="purchase" if i % 2 else "sale", xml_content=( f"{self.company_id}" f"KSEF/MOCK/C{self.company_id}/{issue.year}/{seq}" ), metadata={"source": "mock", "sequence": i, "company_id": self.company_id}, ) ) if since: docs = [d for d in docs if d.fetched_at >= since] return docs def issue_invoice(self, payload: dict): now = datetime.utcnow() seq = int(now.timestamp()) return { "ksef_number": f"KSEF/MOCK/ISSUED/C{self.company_id}/{now.year}/{seq}", "status": "issued_mock", "message": "Tryb mock: dokument nie został wysłany do produkcyjnego KSeF, tylko zasymulowany lokalnie.", "xml_content": payload.get("xml_content", ""), } def ping(self): return { "status": "mock", "message": "Tryb mock aktywny - połączenie z prawdziwym API nie jest wykonywane.", } def diagnostics(self): ping = self.ping() ping["base_url"] = "mock://ksef" ping["sample"] = { "status": "mock", "documentExample": { "invoiceNumber": f"FV/{self.company_id}/001/{datetime.utcnow().year}", "ksefNumber": f"KSEF/MOCK/C{self.company_id}/{datetime.utcnow().year}/10001", }, } return ping class RequestsKSeFAdapter(KSeFAdapter): ENVIRONMENT_URLS = { 'prod': 'https://api.ksef.mf.gov.pl/v2', 'test': 'https://api-test.ksef.mf.gov.pl/v2', 'demo': 'https://api-demo.ksef.mf.gov.pl/v2', } ACCESS_TOKEN_KEY = "ksef.auth.access_token" ACCESS_TOKEN_VALID_UNTIL_KEY = "ksef.auth.access_token_valid_until" REFRESH_TOKEN_KEY = "ksef.auth.refresh_token" REFRESH_TOKEN_VALID_UNTIL_KEY = "ksef.auth.refresh_token_valid_until" def __init__(self, company_id=None): self.company_id = company_id self.environment = self._resolve_environment(company_id=company_id) configured_base_url = SettingsService.get( "ksef.base_url", self.ENVIRONMENT_URLS[self.environment], company_id=company_id, ) self.base_url = self._normalize_base_url(configured_base_url or self.ENVIRONMENT_URLS[self.environment]) self.auth_mode = SettingsService.get_effective("ksef.auth_mode", "token", company_id=company_id, scope_name='ksef', user_default='user') self.token = (self._settings_get_secret("ksef.token", "") or "").strip() self.client_id = (SettingsService.get_effective("ksef.client_id", "", company_id=company_id, scope_name='ksef', user_default='user') or "").strip() self.certificate_name = (SettingsService.get_effective("ksef.certificate_name", "", company_id=company_id, scope_name='ksef', user_default='user') or "").strip() self.certificate_data = self._settings_get_secret("ksef.certificate_data", "") or "" self.tax_id = self._resolve_tax_id() self._runtime_secret_cache: dict[str, str] = {} self._runtime_value_cache: dict[str, str] = {} def _settings_get_secret(self, key: str, default: str = "") -> str: getter = getattr(SettingsService, "get_secret", None) if callable(getter): return getter(key, default, company_id=self.company_id) return SettingsService.get(key, default, company_id=self.company_id) @classmethod def _resolve_environment(cls, company_id=None) -> str: environment = (SettingsService.get_effective("ksef.environment", "prod", company_id=company_id, scope_name='ksef', user_default='user') or "").strip().lower() if environment in cls.ENVIRONMENT_URLS: return environment base_url = (SettingsService.get_effective("ksef.base_url", "", company_id=company_id, scope_name='ksef', user_default='user') or "").strip().lower() if "api-test.ksef.mf.gov.pl" in base_url: return "test" return "prod" @staticmethod def _normalize_base_url(base_url: str) -> str: base = (base_url or "https://api.ksef.mf.gov.pl/v2").strip().rstrip("/") if base.endswith("/docs/v2"): return base[:-8] + "/v2" if not base.endswith("/v2"): base = f"{base}/v2" return base.rstrip("/") def _resolve_tax_id(self) -> str: candidates = [ SettingsService.get("company.tax_id", "", company_id=self.company_id), SettingsService.get("company.nip", "", company_id=self.company_id), SettingsService.get("tax_id", "", company_id=self.company_id), SettingsService.get("nip", "", company_id=self.company_id), ] try: from app.models.company import Company if self.company_id: company = Company.query.get(self.company_id) if company: candidates.extend([ getattr(company, "tax_id", ""), getattr(company, "nip", ""), getattr(company, "vat_number", ""), ]) except Exception: pass for value in candidates: digits = re.sub(r"\D", "", value or "") if len(digits) == 10: return digits return "" @staticmethod def _parse_datetime(value: str | None) -> datetime | None: if not value: return None raw = value.strip() if raw.endswith("Z"): raw = raw[:-1] + "+00:00" try: return datetime.fromisoformat(raw) except ValueError: return None @staticmethod def _ensure_naive_utc(dt: datetime) -> datetime: if dt.tzinfo is None: return dt return dt.astimezone(timezone.utc).replace(tzinfo=None) def _get_cached_secret(self, key: str) -> str: if key in self._runtime_secret_cache: return (self._runtime_secret_cache.get(key) or "").strip() return (self._settings_get_secret(key, "") or "").strip() def _set_cached_secret(self, key: str, value: str): self._runtime_secret_cache[key] = value or "" def _get_cached_value(self, key: str) -> str: if key in self._runtime_value_cache: return (self._runtime_value_cache.get(key) or "").strip() return (SettingsService.get(key, "", company_id=self.company_id) or "").strip() def _set_cached_value(self, key: str, value: str): self._runtime_value_cache[key] = value or "" def _clear_access_token_cache(self): self._set_cached_secret(self.ACCESS_TOKEN_KEY, "") self._set_cached_value(self.ACCESS_TOKEN_VALID_UNTIL_KEY, "") def _clear_refresh_token_cache(self): self._set_cached_secret(self.REFRESH_TOKEN_KEY, "") self._set_cached_value(self.REFRESH_TOKEN_VALID_UNTIL_KEY, "") def _access_token_is_valid(self) -> bool: token = self._get_cached_secret(self.ACCESS_TOKEN_KEY) valid_until = self._parse_datetime(self._get_cached_value(self.ACCESS_TOKEN_VALID_UNTIL_KEY)) if not token or not valid_until: return False return valid_until > datetime.now(timezone.utc) + timedelta(seconds=30) def _refresh_token_is_valid(self) -> bool: token = self._get_cached_secret(self.REFRESH_TOKEN_KEY) valid_until = self._parse_datetime(self._get_cached_value(self.REFRESH_TOKEN_VALID_UNTIL_KEY)) if not token or not valid_until: return False return valid_until > datetime.now(timezone.utc) + timedelta(seconds=30) def _get_public_key_pem(self) -> str: url = f"{self.base_url}/security/public-key-certificates" response = requests.get( url, headers={"Accept": "application/json"}, timeout=30, ) response.raise_for_status() payload = response.json() if not isinstance(payload, list) or not payload: raise RuntimeError("KSeF nie zwrócił listy certyfikatów klucza publicznego.") for item in payload: if not isinstance(item, dict): continue raw_cert = item.get("certificate") if not raw_cert: continue raw_cert = raw_cert.strip() if "BEGIN CERTIFICATE" in raw_cert: return raw_cert wrapped = "\n".join( raw_cert[i:i + 64] for i in range(0, len(raw_cert), 64) ) return f"-----BEGIN CERTIFICATE-----\n{wrapped}\n-----END CERTIFICATE-----" raise RuntimeError("Nie udało się odczytać certyfikatu klucza publicznego KSeF.") @staticmethod def _sha256_base64(payload: bytes) -> str: return base64.b64encode(hashlib.sha256(payload).digest()).decode("ascii") @staticmethod def _normalize_xml_bytes(xml_content: str) -> bytes: if xml_content is None: raise RuntimeError("Brak treści XML faktury do wysyłki.") xml_bytes = xml_content.encode("utf-8") if xml_bytes.startswith(b"\xef\xbb\xbf"): xml_bytes = xml_bytes[3:] if len(xml_bytes) > 1_000_000: raise RuntimeError("Plik XML faktury przekracza limit 1 MB wymagany przez KSeF dla faktury bez załączników.") return xml_bytes def _build_online_encryption_data(self) -> dict[str, str | bytes]: if not x509 or not serialization or not hashes or not padding or not Cipher or not algorithms or not modes or not sym_padding: raise RuntimeError( "Brak biblioteki cryptography. Zainstaluj ją, aby obsłużyć wysyłkę faktur do KSeF 2.x." ) key = self._get_public_key_pem().encode("utf-8") cert = x509.load_pem_x509_certificate(key) public_key = cert.public_key() symmetric_key = __import__("secrets").token_bytes(32) iv = __import__("secrets").token_bytes(16) encrypted_key = public_key.encrypt( symmetric_key, padding.OAEP( mgf=padding.MGF1(algorithm=hashes.SHA256()), algorithm=hashes.SHA256(), label=None, ), ) return { "symmetric_key": symmetric_key, "iv": iv, "encrypted_symmetric_key": base64.b64encode(encrypted_key).decode("ascii"), "initialization_vector": base64.b64encode(iv).decode("ascii"), } @staticmethod def _encrypt_invoice_bytes(invoice_bytes: bytes, symmetric_key: bytes, iv: bytes) -> bytes: padder = sym_padding.PKCS7(128).padder() padded = padder.update(invoice_bytes) + padder.finalize() cipher = Cipher(algorithms.AES(symmetric_key), modes.CBC(iv)) encryptor = cipher.encryptor() encrypted = encryptor.update(padded) + encryptor.finalize() return iv + encrypted @staticmethod def _extract_status_code(payload: dict | None) -> int | None: if not isinstance(payload, dict): return None status = payload.get("status") if isinstance(status, dict): status = status.get("code") elif isinstance(status, list) and status: head = status[0] status = head.get("code") if isinstance(head, dict) else head if status is None and isinstance(payload.get("code"), int): status = payload.get("code") try: return int(status) if status is not None else None except Exception: return None @staticmethod def _extract_status_description(payload: dict | None) -> str: if not isinstance(payload, dict): return "" status = payload.get("status") if isinstance(status, dict): parts = [status.get("description") or ""] details = status.get("details") or [] if isinstance(details, list): parts.extend(str(item) for item in details if item) return " | ".join(part for part in parts if part) if payload.get("description"): return str(payload.get("description")) return "" @staticmethod def _first_present(mapping: dict | None, *keys: str): if not isinstance(mapping, dict): return None for key in keys: value = mapping.get(key) if value not in (None, "", []): return value return None def _request_binary(self, method, path, params=None, json=None, data=None, accept="application/octet-stream", content_type=None, use_auth=True): last_error = None url = f"{self.base_url}/{path.lstrip('/')}" for attempt in range(3): try: headers = self._headers(use_auth=use_auth, accept=accept) if content_type: headers["Content-Type"] = content_type response = requests.request( method, url, headers=headers, params=params, json=json, data=data, timeout=30, ) if response.status_code == 401 and use_auth and attempt == 0: self._clear_access_token_cache() headers = self._headers(use_auth=use_auth, accept=accept) if content_type: headers["Content-Type"] = content_type response = requests.request( method, url, headers=headers, params=params, json=json, data=data, timeout=30, ) response.raise_for_status() return response.content except requests.HTTPError as exc: response = getattr(exc, "response", None) if response is not None and response.status_code == 429: retry_after = response.headers.get("Retry-After") wait_hint = f" Spróbuj ponownie za około {retry_after} s." if retry_after else " Spróbuj ponownie za chwilę." last_error = RuntimeError("KSeF chwilowo ogranicza liczbę zapytań (HTTP 429)." + wait_hint) else: last_error = exc time.sleep(2 ** attempt) except Exception as exc: last_error = exc time.sleep(2 ** attempt) raise last_error def _open_online_session(self, schema_version: str, encryption_data: dict[str, str | bytes]) -> dict: open_payload = { "formCode": { "systemCode": SettingsService.get_effective("ksef.form_code_system_code", "FA (3)", company_id=self.company_id, scope_name="ksef", user_default="user"), "schemaVersion": SettingsService.get_effective("ksef.form_code_schema_version", "1-0E", company_id=self.company_id, scope_name="ksef", user_default="user"), "value": SettingsService.get_effective("ksef.form_code_value", "FA", company_id=self.company_id, scope_name="ksef", user_default="user"), }, "encryption": { "encryptedSymmetricKey": encryption_data["encrypted_symmetric_key"], "initializationVector": encryption_data["initialization_vector"], }, } if schema_version and schema_version.upper().startswith("FA("): open_payload["formCode"]["systemCode"] = schema_version.upper().replace("(", " (") return self._request("POST", "/sessions/online", json=open_payload) def _get_session_invoice_status(self, session_reference: str, invoice_reference: str) -> dict: return self._request("GET", f"/sessions/{session_reference}/invoices/{invoice_reference}") def _poll_invoice_processing(self, session_reference: str, invoice_reference: str, timeout_seconds: int = 60) -> dict: deadline = time.time() + timeout_seconds last_payload: dict[str, Any] | None = None while time.time() < deadline: payload = self._get_session_invoice_status(session_reference, invoice_reference) last_payload = payload if isinstance(payload, dict) else {"payload": payload} code = self._extract_status_code(last_payload) if code == 200: return last_payload if code and code >= 300: description = self._extract_status_description(last_payload) or f"KSeF odrzucił fakturę. Kod statusu: {code}." raise RuntimeError(description) time.sleep(2) return last_payload or {} def _fetch_invoice_upo(self, session_reference: str, invoice_reference: str, ksef_number: str | None = None) -> str | None: try: if ksef_number: raw = self._request_binary("GET", f"/sessions/{session_reference}/invoices/ksef/{ksef_number}/upo", accept="application/xml") else: raw = self._request_binary("GET", f"/sessions/{session_reference}/invoices/{invoice_reference}/upo", accept="application/xml") return raw.decode("utf-8", errors="replace") if raw else None except Exception: return None def _encrypt_token_payload(self, plain_text: str) -> str: if not x509 or not serialization or not hashes or not padding: raise RuntimeError( "Brak biblioteki cryptography. Zainstaluj ją, aby obsłużyć logowanie tokenem KSeF 2.x." ) pem = self._get_public_key_pem().encode("utf-8") cert = x509.load_pem_x509_certificate(pem) public_key = cert.public_key() encrypted = public_key.encrypt( plain_text.encode("utf-8"), padding.OAEP( mgf=padding.MGF1(algorithm=hashes.SHA256()), algorithm=hashes.SHA256(), label=None, ), ) return base64.b64encode(encrypted).decode("ascii") def _build_context_identifier(self) -> dict[str, str]: if not self.tax_id: raise RuntimeError( f"Brak NIP firmy w ustawieniach lub rekordzie firmy dla company_id={self.company_id}. " "Token KSeF wymaga contextIdentifier typu Nip." ) return {"type": "Nip", "value": self.tax_id} def _authenticate_with_refresh_token(self) -> bool: refresh_token = self._get_cached_secret(self.REFRESH_TOKEN_KEY) if not refresh_token: return False headers = { "Accept": "application/json", "Authorization": f"Bearer {refresh_token}", } if self.client_id: headers["X-Client-Id"] = self.client_id response = requests.post( f"{self.base_url}/auth/token/refresh", headers=headers, timeout=30, ) if response.status_code >= 400: self._clear_refresh_token_cache() return False data = response.json() access = data.get("accessToken") or {} refresh = data.get("refreshToken") or {} if not access.get("token"): self._clear_refresh_token_cache() return False self._set_cached_secret(self.ACCESS_TOKEN_KEY, access["token"]) self._set_cached_value(self.ACCESS_TOKEN_VALID_UNTIL_KEY, access.get("validUntil", "")) if refresh.get("token"): self._set_cached_secret(self.REFRESH_TOKEN_KEY, refresh["token"]) self._set_cached_value(self.REFRESH_TOKEN_VALID_UNTIL_KEY, refresh.get("validUntil", "")) return True def _authenticate_with_ksef_token(self): if not self.token: raise RuntimeError("Brak tokenu KSeF w ustawieniach użytkownika.") challenge_response = requests.post( f"{self.base_url}/auth/challenge", headers={"Accept": "application/json"}, timeout=30, ) challenge_response.raise_for_status() challenge_data = challenge_response.json() challenge = challenge_data["challenge"] challenge_timestamp_ms = challenge_data.get("timestampMs") if challenge_timestamp_ms is None: raise RuntimeError("KSeF nie zwrócił timestampMs w odpowiedzi /auth/challenge.") try: challenge_timestamp_ms = int(challenge_timestamp_ms) except Exception as exc: raise RuntimeError("Nieprawidłowy timestampMs z /auth/challenge.") from exc encrypted_token = self._encrypt_token_payload( f"{self.token}|{challenge_timestamp_ms}" ) init_payload = { "challenge": challenge, "contextIdentifier": self._build_context_identifier(), "encryptedToken": encrypted_token, } init_headers = {"Accept": "application/json"} if self.client_id: init_headers["X-Client-Id"] = self.client_id init_response = requests.post( f"{self.base_url}/auth/ksef-token", headers=init_headers, json=init_payload, timeout=30, ) init_response.raise_for_status() init_data = init_response.json() auth_token = (init_data.get("authenticationToken") or {}).get("token") reference_number = init_data.get("referenceNumber") if not auth_token or not reference_number: raise RuntimeError("KSeF nie zwrócił authenticationToken lub referenceNumber.") status_headers = { "Accept": "application/json", "Authorization": f"Bearer {auth_token}", } if self.client_id: status_headers["X-Client-Id"] = self.client_id for _ in range(20): status_response = requests.get( f"{self.base_url}/auth/{reference_number}", headers=status_headers, timeout=30, ) status_response.raise_for_status() payload = status_response.json() status = payload.get("status") or {} code = status.get("code") if code == 200: break if code and code >= 300: description = status.get("description") or "Błąd uwierzytelnienia KSeF." details = status.get("details") or [] details_text = " | ".join(str(x) for x in details if x) if details_text: raise RuntimeError(f"{description} Details: {details_text}") raise RuntimeError(description) time.sleep(1) else: raise RuntimeError( f"Przekroczono czas oczekiwania na zakończenie uwierzytelnienia KSeF dla {reference_number}." ) redeem_headers = { "Accept": "application/json", "Authorization": f"Bearer {auth_token}", } if self.client_id: redeem_headers["X-Client-Id"] = self.client_id redeem_response = requests.post( f"{self.base_url}/auth/token/redeem", headers=redeem_headers, timeout=30, ) redeem_response.raise_for_status() tokens = redeem_response.json() access = tokens.get("accessToken") or {} refresh = tokens.get("refreshToken") or {} access_token = access.get("token") refresh_token = refresh.get("token") if not access_token: raise RuntimeError("KSeF nie zwrócił access tokena.") self._set_cached_secret(self.ACCESS_TOKEN_KEY, access_token) self._set_cached_value(self.ACCESS_TOKEN_VALID_UNTIL_KEY, access.get("validUntil", "")) if refresh_token: self._set_cached_secret(self.REFRESH_TOKEN_KEY, refresh_token) self._set_cached_value(self.REFRESH_TOKEN_VALID_UNTIL_KEY, refresh.get("validUntil", "")) def _ensure_access_token(self) -> str: if self._access_token_is_valid(): return self._get_cached_secret(self.ACCESS_TOKEN_KEY) if self._refresh_token_is_valid() and self._authenticate_with_refresh_token(): return self._get_cached_secret(self.ACCESS_TOKEN_KEY) if self.auth_mode != "token": raise RuntimeError(f"Nieobsługiwany tryb autoryzacji KSeF: {self.auth_mode}") self._authenticate_with_ksef_token() token = self._get_cached_secret(self.ACCESS_TOKEN_KEY) if not token: raise RuntimeError("Nie udało się uzyskać access tokena KSeF.") return token def _headers(self, use_auth: bool = True, accept: str = "application/json") -> dict[str, str]: headers = {"Accept": accept} if self.client_id: headers["X-Client-Id"] = self.client_id if use_auth: headers["Authorization"] = f"Bearer {self._ensure_access_token()}" return headers def _request(self, method, path, params=None, json=None, accept="application/json", use_auth=True): last_error = None url = f"{self.base_url}/{path.lstrip('/')}" for attempt in range(3): try: headers = self._headers(use_auth=use_auth, accept=accept) response = requests.request( method, url, headers=headers, params=params, json=json, timeout=30, ) if response.status_code == 401 and use_auth and attempt == 0: self._clear_access_token_cache() headers = self._headers(use_auth=use_auth, accept=accept) response = requests.request( method, url, headers=headers, params=params, json=json, timeout=30, ) response.raise_for_status() if not response.content: return {"status": "ok"} content_type = response.headers.get("Content-Type", "") if "application/json" in content_type or accept == "application/json": return response.json() return response.text except requests.HTTPError as exc: response = getattr(exc, 'response', None) if response is not None and response.status_code == 429: retry_after = response.headers.get('Retry-After') wait_hint = f' Spróbuj ponownie za około {retry_after} s.' if retry_after else ' Spróbuj ponownie za chwilę.' last_error = RuntimeError('KSeF chwilowo ogranicza liczbę zapytań (HTTP 429).' + wait_hint) else: last_error = exc time.sleep(2 ** attempt) except Exception as exc: last_error = exc time.sleep(2 ** attempt) raise last_error @staticmethod def _decimal_from_item(item: dict, *keys: str) -> Decimal: for key in keys: if item.get(key) is not None: return Decimal(str(item.get(key))) return Decimal("0") @staticmethod def _clean_text(value: Any) -> str: if value is None: return "" return str(value).strip() @staticmethod def _digits_only(value: Any) -> str: return re.sub(r"\D", "", str(value or "")) def _pick_contractor(self, item: dict) -> tuple[str, str]: seller = item.get("seller") or {} buyer = item.get("buyer") or {} seller_nip = re.sub(r"\D", "", seller.get("nip") or ((seller.get("identifier") or {}).get("value", ""))) buyer_nip = re.sub(r"\D", "", buyer.get("nip") or ((buyer.get("identifier") or {}).get("value", ""))) if self.tax_id and seller_nip == self.tax_id: contractor = buyer elif self.tax_id and buyer_nip == self.tax_id: contractor = seller else: contractor = buyer or seller identifier = contractor.get("identifier") or {} nip = contractor.get("nip") or identifier.get("value", "") return contractor.get("name", "Brak"), nip @staticmethod def _strip_namespace(tag: str) -> str: if "}" in tag: return tag.split("}", 1)[1] return tag def _find_first_text(self, root: ET.Element, local_names: list[str]) -> str: wanted = {name.lower() for name in local_names} for element in root.iter(): if self._strip_namespace(element.tag).lower() in wanted: text = self._clean_text(element.text) if text: return text return "" def _find_first_child_text(self, element: ET.Element, local_names: list[str]) -> str: wanted = {name.lower() for name in local_names} for node in element.iter(): if self._strip_namespace(node.tag).lower() in wanted: text = self._clean_text(node.text) if text: return text return "" def _find_direct_child(self, element: ET.Element, local_names: list[str]) -> ET.Element | None: wanted = {name.lower() for name in local_names} for child in list(element): if self._strip_namespace(child.tag).lower() in wanted: return child return None def _find_party_nodes(self, root: ET.Element) -> list[ET.Element]: result: list[ET.Element] = [] wanted = { "podmiot1", "podmiot2", "sprzedawca", "nabywca", "sprzedawca1", "nabywca1", } for element in root.iter(): local_name = self._strip_namespace(element.tag).lower() if local_name in wanted: result.append(element) return result def _parse_adres_node(self, address_node: ET.Element | None) -> dict[str, str]: result = { "street": "", "city": "", "postal_code": "", "country": "", "address": "", } if address_node is None: return result def get_text(names: list[str]) -> str: return self._find_first_child_text(address_node, names) def split_adres_l2(raw: str) -> tuple[str, str]: cleaned = self._clean_text(raw) if not cleaned: return "", "" match = re.match(r"^(\d{2}-\d{3})\s+(.+)$", cleaned) if match: return match.group(1), self._clean_text(match.group(2)) return "", cleaned street = get_text(["Ulica", "AdresL1"]) house_no = get_text(["NrDomu"]) apartment_no = get_text(["NrLokalu"]) city = get_text(["Miejscowosc"]) postal_code = get_text(["KodPocztowy"]) country = get_text(["Kraj", "KodKraju"]) adres_l2 = get_text(["AdresL2"]) if adres_l2: parsed_postal_code, parsed_city = split_adres_l2(adres_l2) postal_code = postal_code or parsed_postal_code city = city or parsed_city street_line = self._clean_text(street) if not street_line: street_parts = [part for part in [street, house_no] if part] street_line = " ".join(street_parts).strip() if apartment_no: street_line = f"{street_line}/{apartment_no}" if street_line else apartment_no address_parts = [part for part in [street_line, postal_code, city, country] if part] full_address = ", ".join(address_parts) result["street"] = street_line result["city"] = city result["postal_code"] = postal_code result["country"] = country result["address"] = full_address return result def _parse_party_element(self, element: ET.Element) -> dict[str, str]: name = "" nip = "" identity_node = self._find_direct_child(element, ["DaneIdentyfikacyjne"]) if identity_node is not None: name = self._find_first_child_text(identity_node, ["PelnaNazwa", "Nazwa", "NazwaPodmiotu"]) nip = self._digits_only(self._find_first_child_text(identity_node, ["NIP", "NrIdentyfikacjiPodatkowej"])) if not name: name = self._find_first_child_text(element, ["PelnaNazwa", "Nazwa", "NazwaPodmiotu"]) if not nip: nip = self._digits_only(self._find_first_child_text(element, ["NIP", "NrIdentyfikacjiPodatkowej"])) main_address_node = self._find_direct_child(element, ["Adres"]) correspondence_address_node = self._find_direct_child(element, ["AdresKoresp"]) main_address = self._parse_adres_node(main_address_node) correspondence_address = self._parse_adres_node(correspondence_address_node) chosen_address = main_address if not chosen_address.get("address") and correspondence_address.get("address"): chosen_address = correspondence_address return { "name": name, "nip": nip, "street": chosen_address.get("street", ""), "city": chosen_address.get("city", ""), "postal_code": chosen_address.get("postal_code", ""), "country": chosen_address.get("country", ""), "address": chosen_address.get("address", ""), } def _match_party_role_from_root( self, root: ET.Element, contractor_nip: str = "", contractor_name: str = "", ) -> dict[str, str]: empty_result = { "name": "", "nip": "", "street": "", "city": "", "postal_code": "", "country": "", "address": "", } normalized_tax_id = self._digits_only(self.tax_id) normalized_contractor_nip = self._digits_only(contractor_nip) normalized_contractor_name = self._clean_text(contractor_name).lower() podmiot1 = None podmiot2 = None for node in self._find_party_nodes(root): local_name = self._strip_namespace(node.tag).lower() if local_name == "podmiot1" and podmiot1 is None: podmiot1 = self._parse_party_element(node) elif local_name == "podmiot2" and podmiot2 is None: podmiot2 = self._parse_party_element(node) if podmiot1 or podmiot2: if normalized_tax_id: if podmiot1 and self._digits_only(podmiot1.get("nip")) == normalized_tax_id: return podmiot2 or empty_result if podmiot2 and self._digits_only(podmiot2.get("nip")) == normalized_tax_id: return podmiot1 or empty_result if normalized_contractor_nip: if podmiot1 and self._digits_only(podmiot1.get("nip")) == normalized_contractor_nip: return podmiot1 if podmiot2 and self._digits_only(podmiot2.get("nip")) == normalized_contractor_nip: return podmiot2 if normalized_contractor_name: if podmiot1 and self._clean_text(podmiot1.get("name")).lower() == normalized_contractor_name: return podmiot1 if podmiot2 and self._clean_text(podmiot2.get("name")).lower() == normalized_contractor_name: return podmiot2 return podmiot2 or podmiot1 or empty_result parties = [self._parse_party_element(node) for node in self._find_party_nodes(root)] parties = [party for party in parties if any(party.values())] if normalized_contractor_nip: for party in parties: if self._digits_only(party.get("nip")) == normalized_contractor_nip: return party if normalized_contractor_name: for party in parties: party_name = self._clean_text(party.get("name")).lower() if party_name and party_name == normalized_contractor_name: return party return parties[0] if parties else empty_result def _extract_party_details_from_xml( self, xml_content: str, contractor_nip: str = "", contractor_name: str = "", ) -> dict[str, str]: empty_result = { "name": "", "nip": "", "street": "", "city": "", "postal_code": "", "country": "", "address": "", } if not xml_content: return empty_result try: root = ET.fromstring(xml_content) except Exception: return empty_result matched_party = self._match_party_role_from_root( root=root, contractor_nip=contractor_nip, contractor_name=contractor_name, ) if matched_party and any(matched_party.values()): return matched_party street = self._find_first_text(root, ["Ulica", "AdresL1"]) house_no = self._find_first_text(root, ["NrDomu"]) apartment_no = self._find_first_text(root, ["NrLokalu"]) city = self._find_first_text(root, ["Miejscowosc"]) postal_code = self._find_first_text(root, ["KodPocztowy"]) country = self._find_first_text(root, ["Kraj", "KodKraju"]) adres_l2 = self._find_first_text(root, ["AdresL2"]) if adres_l2 and (not postal_code or not city): match = re.match(r"^(\d{2}-\d{3})\s+(.+)$", self._clean_text(adres_l2)) if match: postal_code = postal_code or match.group(1) city = city or self._clean_text(match.group(2)) elif not city: city = self._clean_text(adres_l2) street_parts = [part for part in [street, house_no] if part] street_line = " ".join(street_parts).strip() if apartment_no: street_line = f"{street_line}/{apartment_no}" if street_line else apartment_no address_parts = [part for part in [street_line, postal_code, city, country] if part] address = ", ".join(address_parts) return { "name": "", "nip": "", "street": street_line, "city": city, "postal_code": postal_code, "country": country, "address": address, } def _enrich_metadata_with_xml_party_data( self, item: dict[str, Any], xml_content: str, contractor_name: str, contractor_nip: str, ) -> dict[str, Any]: metadata = dict(item or {}) party = self._extract_party_details_from_xml( xml_content=xml_content, contractor_nip=contractor_nip, contractor_name=contractor_name, ) metadata.pop("contractor_regon", None) metadata["contractor_street"] = party.get("street", "") metadata["contractor_city"] = party.get("city", "") metadata["contractor_postal_code"] = party.get("postal_code", "") metadata["contractor_country"] = party.get("country", "") metadata["contractor_address"] = party.get("address", "") if party.get("name") and contractor_name == "Brak": metadata["contractor_name_from_xml"] = party.get("name", "") if party.get("nip") and not contractor_nip: metadata["contractor_nip_from_xml"] = party.get("nip", "") return metadata def list_documents(self, since=None): now = datetime.utcnow() date_from = self._ensure_naive_utc(since) if since else (now - timedelta(days=30)) request_body = { "subjectType": "Subject2", "dateRange": { "dateType": "PermanentStorage", "from": date_from.isoformat(), "to": now.isoformat(), }, "pageOffset": 0, "pageSize": 100, } data = self._request("POST", "/invoices/query/metadata", json=request_body) invoices = data.get("invoices", []) documents = [] for item in invoices: issue_date_raw = item.get("issueDate") or now.date().isoformat() issue_date = date.fromisoformat(issue_date_raw) fetched_at = ( self._parse_datetime(item.get("acquisitionDate")) or self._parse_datetime(item.get("permanentStorageDate")) or datetime.utcnow().replace(tzinfo=timezone.utc) ) fetched_at = fetched_at.astimezone(timezone.utc).replace(tzinfo=None) contractor_name, contractor_nip = self._pick_contractor(item) xml_content = "" ksef_number = item["ksefNumber"] try: xml_content = self._request( "GET", f"/invoices/ksef/{ksef_number}", accept="application/xml", ) except Exception: xml_content = "" enriched_metadata = self._enrich_metadata_with_xml_party_data( item=item, xml_content=xml_content, contractor_name=contractor_name, contractor_nip=contractor_nip, ) if enriched_metadata.get("contractor_name_from_xml") and contractor_name == "Brak": contractor_name = enriched_metadata["contractor_name_from_xml"] if enriched_metadata.get("contractor_nip_from_xml") and not contractor_nip: contractor_nip = enriched_metadata["contractor_nip_from_xml"] documents.append( KSeFDocument( ksef_number=ksef_number, invoice_number=item.get("invoiceNumber", ksef_number), contractor_name=contractor_name, contractor_nip=contractor_nip, issue_date=issue_date, received_date=issue_date, fetched_at=fetched_at, net_amount=self._decimal_from_item(item, "netAmount", "invoiceNetAmount"), vat_amount=self._decimal_from_item(item, "vatAmount", "invoiceVatAmount"), gross_amount=self._decimal_from_item(item, "grossAmount", "invoiceGrossAmount"), invoice_type="purchase", xml_content=xml_content, metadata=enriched_metadata, ) ) return documents def issue_invoice(self, payload: dict): xml_content = (payload or {}).get("xml_content") if not xml_content: raise RuntimeError("Brak xml_content w payloadzie wysyłki do KSeF.") schema_version = str((payload or {}).get("schemaVersion") or "FA(3)") xml_bytes = self._normalize_xml_bytes(xml_content) encryption_data = self._build_online_encryption_data() encrypted_invoice = self._encrypt_invoice_bytes( xml_bytes, encryption_data["symmetric_key"], encryption_data["iv"], ) session_response = self._open_online_session(schema_version=schema_version, encryption_data=encryption_data) session_reference = self._first_present(session_response, "referenceNumber", "sessionReferenceNumber") if not session_reference: raise RuntimeError("KSeF nie zwrócił numeru referencyjnego sesji interaktywnej.") send_payload = { "invoiceHash": self._sha256_base64(xml_bytes), "invoiceSize": len(xml_bytes), "encryptedDocumentHash": self._sha256_base64(encrypted_invoice), "encryptedDocumentSize": len(encrypted_invoice), "encryptedDocumentContent": base64.b64encode(encrypted_invoice).decode("ascii"), } send_response = self._request("POST", f"/sessions/online/{session_reference}/invoices", json=send_payload) invoice_reference = self._first_present(send_response, "referenceNumber", "invoiceReferenceNumber") if not invoice_reference: raise RuntimeError("KSeF nie zwrócił numeru referencyjnego przesłanej faktury.") close_error = None try: self._request("POST", f"/sessions/online/{session_reference}/close") except Exception as exc: close_error = exc status_payload = self._poll_invoice_processing(session_reference, invoice_reference) code = self._extract_status_code(status_payload) ksef_number = self._first_present( status_payload, "ksefNumber", "ksefReferenceNumber", "invoiceKsefNumber", ) if not ksef_number and isinstance(status_payload.get("invoice"), dict): ksef_number = self._first_present(status_payload.get("invoice"), "ksefNumber", "ksefReferenceNumber") upo_xml = self._fetch_invoice_upo(session_reference, invoice_reference, ksef_number=str(ksef_number) if ksef_number else None) message = "Wysłano fakturę do KSeF." if code == 200 and ksef_number: message = f"Faktura została przyjęta przez KSeF. Numer KSeF: {ksef_number}." elif code and code < 300: message = "Faktura została wysłana do KSeF i oczekuje na końcowe przetworzenie." if close_error: message += f" Sesja została zamknięta z ostrzeżeniem: {close_error}" return { "ksef_number": ksef_number or f"PENDING/{session_reference}/{invoice_reference}", "status": "issued" if code == 200 and ksef_number else "queued", "message": message, "xml_content": xml_content, "session_reference_number": session_reference, "invoice_reference_number": invoice_reference, "status_payload": status_payload, "upo_xml": upo_xml, } def ping(self): try: self._request("GET", "/auth/sessions", params={"pageSize": 10}) return {"status": "ok", "message": "API KSeF odpowiada, a uwierzytelnienie działa poprawnie."} except Exception as exc: return {"status": "error", "message": str(exc)} def diagnostics(self): try: access_token_state = "valid" if self._access_token_is_valid() else "missing_or_expired" refresh_token_state = "valid" if self._refresh_token_is_valid() else "missing_or_expired" sessions = self._request("GET", "/auth/sessions", params={"pageSize": 10}) sample = sessions if isinstance(sessions, dict) else {"sessions": sessions} return { "status": "ok", "message": "API KSeF odpowiada.", "base_url": self.base_url, "auth_mode": self.auth_mode, "tax_id": self.tax_id, "token_configured": bool(self.token), "certificate_configured": bool(self.certificate_data), "access_token_state": access_token_state, "refresh_token_state": refresh_token_state, "sample": sample, } except Exception as exc: return { "status": "error", "message": str(exc), "base_url": self.base_url, "auth_mode": self.auth_mode, "tax_id": self.tax_id, "token_configured": bool(self.token), "certificate_configured": bool(self.certificate_data), "sample": {"error": str(exc)}, } class KSeFService: def __init__(self, company_id=None): use_mock = SettingsService.get("ksef.mock_mode", "true", company_id=company_id) == "true" self.adapter = MockKSeFAdapter(company_id=company_id) if use_mock else RequestsKSeFAdapter(company_id=company_id) def list_documents(self, since=None): return self.adapter.list_documents(since=since) def issue_invoice(self, payload: dict): return self.adapter.issue_invoice(payload) def ping(self): return self.adapter.ping() def diagnostics(self): return self.adapter.diagnostics() @staticmethod def calc_hash(xml_content: str) -> str: return hashlib.sha256(xml_content.encode("utf-8")).hexdigest()