Files
ksef_app/app/services/ksef_service.py
Mateusz Gruszczyński 35571df778 push
2026-03-13 11:03:13 +01:00

1279 lines
50 KiB
Python

from __future__ import annotations
from dataclasses import dataclass
from datetime import date, datetime, timedelta, timezone
from decimal import Decimal
import base64
import hashlib
import re
import time
import xml.etree.ElementTree as ET
from typing import Any
import requests
from app.services.settings_service import SettingsService
try:
from cryptography import x509
from cryptography.hazmat.primitives import hashes, serialization, padding as sym_padding
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
except Exception: # pragma: no cover
x509 = None
hashes = None
serialization = None
sym_padding = None
Cipher = None
algorithms = None
modes = None
padding = None
@dataclass
class KSeFDocument:
ksef_number: str
invoice_number: str
contractor_name: str
contractor_nip: str
issue_date: date
received_date: date
fetched_at: datetime
net_amount: Decimal
vat_amount: Decimal
gross_amount: Decimal
invoice_type: str
xml_content: str
metadata: dict
class KSeFAdapter:
def list_documents(self, since: datetime | None = None):
raise NotImplementedError
def issue_invoice(self, payload: dict):
raise NotImplementedError
def ping(self):
return {"status": "unknown"}
def diagnostics(self):
return self.ping()
class MockKSeFAdapter(KSeFAdapter):
def __init__(self, company_id=None):
self.company_id = company_id or 0
def list_documents(self, since=None):
base_date = datetime.utcnow()
docs = []
for i in range(1, 6):
issue = (base_date - timedelta(days=i * 3)).date()
net = Decimal(1000 + i * 250)
vat = (net * Decimal("0.23")).quantize(Decimal("0.01"))
gross = net + vat
seq = 10000 + i
docs.append(
KSeFDocument(
ksef_number=f"KSEF/MOCK/C{self.company_id}/{issue.year}/{seq}",
invoice_number=f"FV/{self.company_id}/{i:03d}/{issue.year}",
contractor_name=f"Firma {self.company_id}-{i}",
contractor_nip=f"525{self.company_id:02d}000{i:02d}",
issue_date=issue,
received_date=issue + timedelta(days=1),
fetched_at=base_date - timedelta(hours=i),
net_amount=net,
vat_amount=vat,
gross_amount=gross,
invoice_type="purchase" if i % 2 else "sale",
xml_content=(
f"<Invoice><Company>{self.company_id}</Company>"
f"<Ksef>KSEF/MOCK/C{self.company_id}/{issue.year}/{seq}</Ksef></Invoice>"
),
metadata={"source": "mock", "sequence": i, "company_id": self.company_id},
)
)
if since:
docs = [d for d in docs if d.fetched_at >= since]
return docs
def issue_invoice(self, payload: dict):
now = datetime.utcnow()
seq = int(now.timestamp())
return {
"ksef_number": f"KSEF/MOCK/ISSUED/C{self.company_id}/{now.year}/{seq}",
"status": "issued_mock",
"message": "Tryb mock: dokument nie został wysłany do produkcyjnego KSeF, tylko zasymulowany lokalnie.",
"xml_content": payload.get("xml_content", ""),
}
def ping(self):
return {
"status": "mock",
"message": "Tryb mock aktywny - połączenie z prawdziwym API nie jest wykonywane.",
}
def diagnostics(self):
ping = self.ping()
ping["base_url"] = "mock://ksef"
ping["sample"] = {
"status": "mock",
"documentExample": {
"invoiceNumber": f"FV/{self.company_id}/001/{datetime.utcnow().year}",
"ksefNumber": f"KSEF/MOCK/C{self.company_id}/{datetime.utcnow().year}/10001",
},
}
return ping
class RequestsKSeFAdapter(KSeFAdapter):
ENVIRONMENT_URLS = {
'prod': 'https://api.ksef.mf.gov.pl/v2',
'test': 'https://api-test.ksef.mf.gov.pl/v2',
'demo': 'https://api-demo.ksef.mf.gov.pl/v2',
}
ACCESS_TOKEN_KEY = "ksef.auth.access_token"
ACCESS_TOKEN_VALID_UNTIL_KEY = "ksef.auth.access_token_valid_until"
REFRESH_TOKEN_KEY = "ksef.auth.refresh_token"
REFRESH_TOKEN_VALID_UNTIL_KEY = "ksef.auth.refresh_token_valid_until"
def __init__(self, company_id=None):
self.company_id = company_id
self.environment = self._resolve_environment(company_id=company_id)
configured_base_url = SettingsService.get(
"ksef.base_url",
self.ENVIRONMENT_URLS[self.environment],
company_id=company_id,
)
self.base_url = self._normalize_base_url(configured_base_url or self.ENVIRONMENT_URLS[self.environment])
self.auth_mode = SettingsService.get_effective("ksef.auth_mode", "token", company_id=company_id, scope_name='ksef', user_default='user')
self.token = (self._settings_get_secret("ksef.token", "") or "").strip()
self.client_id = (SettingsService.get_effective("ksef.client_id", "", company_id=company_id, scope_name='ksef', user_default='user') or "").strip()
self.certificate_name = (SettingsService.get_effective("ksef.certificate_name", "", company_id=company_id, scope_name='ksef', user_default='user') or "").strip()
self.certificate_data = self._settings_get_secret("ksef.certificate_data", "") or ""
self.tax_id = self._resolve_tax_id()
self._runtime_secret_cache: dict[str, str] = {}
self._runtime_value_cache: dict[str, str] = {}
def _settings_get_secret(self, key: str, default: str = "") -> str:
getter = getattr(SettingsService, "get_secret", None)
if callable(getter):
return getter(key, default, company_id=self.company_id)
return SettingsService.get(key, default, company_id=self.company_id)
@classmethod
def _resolve_environment(cls, company_id=None) -> str:
environment = (SettingsService.get_effective("ksef.environment", "prod", company_id=company_id, scope_name='ksef', user_default='user') or "").strip().lower()
if environment in cls.ENVIRONMENT_URLS:
return environment
base_url = (SettingsService.get_effective("ksef.base_url", "", company_id=company_id, scope_name='ksef', user_default='user') or "").strip().lower()
if "api-test.ksef.mf.gov.pl" in base_url:
return "test"
return "prod"
@staticmethod
def _normalize_base_url(base_url: str) -> str:
base = (base_url or "https://api.ksef.mf.gov.pl/v2").strip().rstrip("/")
if base.endswith("/docs/v2"):
return base[:-8] + "/v2"
if not base.endswith("/v2"):
base = f"{base}/v2"
return base.rstrip("/")
def _resolve_tax_id(self) -> str:
candidates = [
SettingsService.get("company.tax_id", "", company_id=self.company_id),
SettingsService.get("company.nip", "", company_id=self.company_id),
SettingsService.get("tax_id", "", company_id=self.company_id),
SettingsService.get("nip", "", company_id=self.company_id),
]
try:
from app.models.company import Company
if self.company_id:
company = Company.query.get(self.company_id)
if company:
candidates.extend([
getattr(company, "tax_id", ""),
getattr(company, "nip", ""),
getattr(company, "vat_number", ""),
])
except Exception:
pass
for value in candidates:
digits = re.sub(r"\D", "", value or "")
if len(digits) == 10:
return digits
return ""
@staticmethod
def _parse_datetime(value: str | None) -> datetime | None:
if not value:
return None
raw = value.strip()
if raw.endswith("Z"):
raw = raw[:-1] + "+00:00"
try:
return datetime.fromisoformat(raw)
except ValueError:
return None
@staticmethod
def _ensure_naive_utc(dt: datetime) -> datetime:
if dt.tzinfo is None:
return dt
return dt.astimezone(timezone.utc).replace(tzinfo=None)
def _get_cached_secret(self, key: str) -> str:
if key in self._runtime_secret_cache:
return (self._runtime_secret_cache.get(key) or "").strip()
return (self._settings_get_secret(key, "") or "").strip()
def _set_cached_secret(self, key: str, value: str):
self._runtime_secret_cache[key] = value or ""
def _get_cached_value(self, key: str) -> str:
if key in self._runtime_value_cache:
return (self._runtime_value_cache.get(key) or "").strip()
return (SettingsService.get(key, "", company_id=self.company_id) or "").strip()
def _set_cached_value(self, key: str, value: str):
self._runtime_value_cache[key] = value or ""
def _clear_access_token_cache(self):
self._set_cached_secret(self.ACCESS_TOKEN_KEY, "")
self._set_cached_value(self.ACCESS_TOKEN_VALID_UNTIL_KEY, "")
def _clear_refresh_token_cache(self):
self._set_cached_secret(self.REFRESH_TOKEN_KEY, "")
self._set_cached_value(self.REFRESH_TOKEN_VALID_UNTIL_KEY, "")
def _access_token_is_valid(self) -> bool:
token = self._get_cached_secret(self.ACCESS_TOKEN_KEY)
valid_until = self._parse_datetime(self._get_cached_value(self.ACCESS_TOKEN_VALID_UNTIL_KEY))
if not token or not valid_until:
return False
return valid_until > datetime.now(timezone.utc) + timedelta(seconds=30)
def _refresh_token_is_valid(self) -> bool:
token = self._get_cached_secret(self.REFRESH_TOKEN_KEY)
valid_until = self._parse_datetime(self._get_cached_value(self.REFRESH_TOKEN_VALID_UNTIL_KEY))
if not token or not valid_until:
return False
return valid_until > datetime.now(timezone.utc) + timedelta(seconds=30)
def _get_public_key_pem(self) -> str:
url = f"{self.base_url}/security/public-key-certificates"
response = requests.get(
url,
headers={"Accept": "application/json"},
timeout=30,
)
response.raise_for_status()
payload = response.json()
if not isinstance(payload, list) or not payload:
raise RuntimeError("KSeF nie zwrócił listy certyfikatów klucza publicznego.")
for item in payload:
if not isinstance(item, dict):
continue
raw_cert = item.get("certificate")
if not raw_cert:
continue
raw_cert = raw_cert.strip()
if "BEGIN CERTIFICATE" in raw_cert:
return raw_cert
wrapped = "\n".join(
raw_cert[i:i + 64] for i in range(0, len(raw_cert), 64)
)
return f"-----BEGIN CERTIFICATE-----\n{wrapped}\n-----END CERTIFICATE-----"
raise RuntimeError("Nie udało się odczytać certyfikatu klucza publicznego KSeF.")
@staticmethod
def _sha256_base64(payload: bytes) -> str:
return base64.b64encode(hashlib.sha256(payload).digest()).decode("ascii")
@staticmethod
def _normalize_xml_bytes(xml_content: str) -> bytes:
if xml_content is None:
raise RuntimeError("Brak treści XML faktury do wysyłki.")
xml_bytes = xml_content.encode("utf-8")
if xml_bytes.startswith(b"\xef\xbb\xbf"):
xml_bytes = xml_bytes[3:]
if len(xml_bytes) > 1_000_000:
raise RuntimeError("Plik XML faktury przekracza limit 1 MB wymagany przez KSeF dla faktury bez załączników.")
return xml_bytes
def _build_online_encryption_data(self) -> dict[str, str | bytes]:
if not x509 or not serialization or not hashes or not padding or not Cipher or not algorithms or not modes or not sym_padding:
raise RuntimeError(
"Brak biblioteki cryptography. Zainstaluj ją, aby obsłużyć wysyłkę faktur do KSeF 2.x."
)
key = self._get_public_key_pem().encode("utf-8")
cert = x509.load_pem_x509_certificate(key)
public_key = cert.public_key()
symmetric_key = __import__("secrets").token_bytes(32)
iv = __import__("secrets").token_bytes(16)
encrypted_key = public_key.encrypt(
symmetric_key,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None,
),
)
return {
"symmetric_key": symmetric_key,
"iv": iv,
"encrypted_symmetric_key": base64.b64encode(encrypted_key).decode("ascii"),
"initialization_vector": base64.b64encode(iv).decode("ascii"),
}
@staticmethod
def _encrypt_invoice_bytes(invoice_bytes: bytes, symmetric_key: bytes, iv: bytes) -> bytes:
padder = sym_padding.PKCS7(128).padder()
padded = padder.update(invoice_bytes) + padder.finalize()
cipher = Cipher(algorithms.AES(symmetric_key), modes.CBC(iv))
encryptor = cipher.encryptor()
encrypted = encryptor.update(padded) + encryptor.finalize()
return iv + encrypted
@staticmethod
def _extract_status_code(payload: dict | None) -> int | None:
if not isinstance(payload, dict):
return None
status = payload.get("status")
if isinstance(status, dict):
status = status.get("code")
elif isinstance(status, list) and status:
head = status[0]
status = head.get("code") if isinstance(head, dict) else head
if status is None and isinstance(payload.get("code"), int):
status = payload.get("code")
try:
return int(status) if status is not None else None
except Exception:
return None
@staticmethod
def _extract_status_description(payload: dict | None) -> str:
if not isinstance(payload, dict):
return ""
status = payload.get("status")
if isinstance(status, dict):
parts = [status.get("description") or ""]
details = status.get("details") or []
if isinstance(details, list):
parts.extend(str(item) for item in details if item)
return " | ".join(part for part in parts if part)
if payload.get("description"):
return str(payload.get("description"))
return ""
@staticmethod
def _first_present(mapping: dict | None, *keys: str):
if not isinstance(mapping, dict):
return None
for key in keys:
value = mapping.get(key)
if value not in (None, "", []):
return value
return None
def _request_binary(self, method, path, params=None, json=None, data=None, accept="application/octet-stream", content_type=None, use_auth=True):
last_error = None
url = f"{self.base_url}/{path.lstrip('/')}"
for attempt in range(3):
try:
headers = self._headers(use_auth=use_auth, accept=accept)
if content_type:
headers["Content-Type"] = content_type
response = requests.request(
method,
url,
headers=headers,
params=params,
json=json,
data=data,
timeout=30,
)
if response.status_code == 401 and use_auth and attempt == 0:
self._clear_access_token_cache()
headers = self._headers(use_auth=use_auth, accept=accept)
if content_type:
headers["Content-Type"] = content_type
response = requests.request(
method,
url,
headers=headers,
params=params,
json=json,
data=data,
timeout=30,
)
response.raise_for_status()
return response.content
except requests.HTTPError as exc:
response = getattr(exc, "response", None)
if response is not None and response.status_code == 429:
retry_after = response.headers.get("Retry-After")
wait_hint = f" Spróbuj ponownie za około {retry_after} s." if retry_after else " Spróbuj ponownie za chwilę."
last_error = RuntimeError("KSeF chwilowo ogranicza liczbę zapytań (HTTP 429)." + wait_hint)
else:
last_error = exc
time.sleep(2 ** attempt)
except Exception as exc:
last_error = exc
time.sleep(2 ** attempt)
raise last_error
def _open_online_session(self, schema_version: str, encryption_data: dict[str, str | bytes]) -> dict:
open_payload = {
"formCode": {
"systemCode": SettingsService.get_effective("ksef.form_code_system_code", "FA (3)", company_id=self.company_id, scope_name="ksef", user_default="user"),
"schemaVersion": SettingsService.get_effective("ksef.form_code_schema_version", "1-0E", company_id=self.company_id, scope_name="ksef", user_default="user"),
"value": SettingsService.get_effective("ksef.form_code_value", "FA", company_id=self.company_id, scope_name="ksef", user_default="user"),
},
"encryption": {
"encryptedSymmetricKey": encryption_data["encrypted_symmetric_key"],
"initializationVector": encryption_data["initialization_vector"],
},
}
if schema_version and schema_version.upper().startswith("FA("):
open_payload["formCode"]["systemCode"] = schema_version.upper().replace("(", " (")
return self._request("POST", "/sessions/online", json=open_payload)
def _get_session_invoice_status(self, session_reference: str, invoice_reference: str) -> dict:
return self._request("GET", f"/sessions/{session_reference}/invoices/{invoice_reference}")
def _poll_invoice_processing(self, session_reference: str, invoice_reference: str, timeout_seconds: int = 60) -> dict:
deadline = time.time() + timeout_seconds
last_payload: dict[str, Any] | None = None
while time.time() < deadline:
payload = self._get_session_invoice_status(session_reference, invoice_reference)
last_payload = payload if isinstance(payload, dict) else {"payload": payload}
code = self._extract_status_code(last_payload)
if code == 200:
return last_payload
if code and code >= 300:
description = self._extract_status_description(last_payload) or f"KSeF odrzucił fakturę. Kod statusu: {code}."
raise RuntimeError(description)
time.sleep(2)
return last_payload or {}
def _fetch_invoice_upo(self, session_reference: str, invoice_reference: str, ksef_number: str | None = None) -> str | None:
try:
if ksef_number:
raw = self._request_binary("GET", f"/sessions/{session_reference}/invoices/ksef/{ksef_number}/upo", accept="application/xml")
else:
raw = self._request_binary("GET", f"/sessions/{session_reference}/invoices/{invoice_reference}/upo", accept="application/xml")
return raw.decode("utf-8", errors="replace") if raw else None
except Exception:
return None
def _encrypt_token_payload(self, plain_text: str) -> str:
if not x509 or not serialization or not hashes or not padding:
raise RuntimeError(
"Brak biblioteki cryptography. Zainstaluj ją, aby obsłużyć logowanie tokenem KSeF 2.x."
)
pem = self._get_public_key_pem().encode("utf-8")
cert = x509.load_pem_x509_certificate(pem)
public_key = cert.public_key()
encrypted = public_key.encrypt(
plain_text.encode("utf-8"),
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None,
),
)
return base64.b64encode(encrypted).decode("ascii")
def _build_context_identifier(self) -> dict[str, str]:
if not self.tax_id:
raise RuntimeError(
f"Brak NIP firmy w ustawieniach lub rekordzie firmy dla company_id={self.company_id}. "
"Token KSeF wymaga contextIdentifier typu Nip."
)
return {"type": "Nip", "value": self.tax_id}
def _authenticate_with_refresh_token(self) -> bool:
refresh_token = self._get_cached_secret(self.REFRESH_TOKEN_KEY)
if not refresh_token:
return False
headers = {
"Accept": "application/json",
"Authorization": f"Bearer {refresh_token}",
}
if self.client_id:
headers["X-Client-Id"] = self.client_id
response = requests.post(
f"{self.base_url}/auth/token/refresh",
headers=headers,
timeout=30,
)
if response.status_code >= 400:
self._clear_refresh_token_cache()
return False
data = response.json()
access = data.get("accessToken") or {}
refresh = data.get("refreshToken") or {}
if not access.get("token"):
self._clear_refresh_token_cache()
return False
self._set_cached_secret(self.ACCESS_TOKEN_KEY, access["token"])
self._set_cached_value(self.ACCESS_TOKEN_VALID_UNTIL_KEY, access.get("validUntil", ""))
if refresh.get("token"):
self._set_cached_secret(self.REFRESH_TOKEN_KEY, refresh["token"])
self._set_cached_value(self.REFRESH_TOKEN_VALID_UNTIL_KEY, refresh.get("validUntil", ""))
return True
def _authenticate_with_ksef_token(self):
if not self.token:
raise RuntimeError("Brak tokenu KSeF w ustawieniach użytkownika.")
challenge_response = requests.post(
f"{self.base_url}/auth/challenge",
headers={"Accept": "application/json"},
timeout=30,
)
challenge_response.raise_for_status()
challenge_data = challenge_response.json()
challenge = challenge_data["challenge"]
challenge_timestamp_ms = challenge_data.get("timestampMs")
if challenge_timestamp_ms is None:
raise RuntimeError("KSeF nie zwrócił timestampMs w odpowiedzi /auth/challenge.")
try:
challenge_timestamp_ms = int(challenge_timestamp_ms)
except Exception as exc:
raise RuntimeError("Nieprawidłowy timestampMs z /auth/challenge.") from exc
encrypted_token = self._encrypt_token_payload(
f"{self.token}|{challenge_timestamp_ms}"
)
init_payload = {
"challenge": challenge,
"contextIdentifier": self._build_context_identifier(),
"encryptedToken": encrypted_token,
}
init_headers = {"Accept": "application/json"}
if self.client_id:
init_headers["X-Client-Id"] = self.client_id
init_response = requests.post(
f"{self.base_url}/auth/ksef-token",
headers=init_headers,
json=init_payload,
timeout=30,
)
init_response.raise_for_status()
init_data = init_response.json()
auth_token = (init_data.get("authenticationToken") or {}).get("token")
reference_number = init_data.get("referenceNumber")
if not auth_token or not reference_number:
raise RuntimeError("KSeF nie zwrócił authenticationToken lub referenceNumber.")
status_headers = {
"Accept": "application/json",
"Authorization": f"Bearer {auth_token}",
}
if self.client_id:
status_headers["X-Client-Id"] = self.client_id
for _ in range(20):
status_response = requests.get(
f"{self.base_url}/auth/{reference_number}",
headers=status_headers,
timeout=30,
)
status_response.raise_for_status()
payload = status_response.json()
status = payload.get("status") or {}
code = status.get("code")
if code == 200:
break
if code and code >= 300:
description = status.get("description") or "Błąd uwierzytelnienia KSeF."
details = status.get("details") or []
details_text = " | ".join(str(x) for x in details if x)
if details_text:
raise RuntimeError(f"{description} Details: {details_text}")
raise RuntimeError(description)
time.sleep(1)
else:
raise RuntimeError(
f"Przekroczono czas oczekiwania na zakończenie uwierzytelnienia KSeF dla {reference_number}."
)
redeem_headers = {
"Accept": "application/json",
"Authorization": f"Bearer {auth_token}",
}
if self.client_id:
redeem_headers["X-Client-Id"] = self.client_id
redeem_response = requests.post(
f"{self.base_url}/auth/token/redeem",
headers=redeem_headers,
timeout=30,
)
redeem_response.raise_for_status()
tokens = redeem_response.json()
access = tokens.get("accessToken") or {}
refresh = tokens.get("refreshToken") or {}
access_token = access.get("token")
refresh_token = refresh.get("token")
if not access_token:
raise RuntimeError("KSeF nie zwrócił access tokena.")
self._set_cached_secret(self.ACCESS_TOKEN_KEY, access_token)
self._set_cached_value(self.ACCESS_TOKEN_VALID_UNTIL_KEY, access.get("validUntil", ""))
if refresh_token:
self._set_cached_secret(self.REFRESH_TOKEN_KEY, refresh_token)
self._set_cached_value(self.REFRESH_TOKEN_VALID_UNTIL_KEY, refresh.get("validUntil", ""))
def _ensure_access_token(self) -> str:
if self._access_token_is_valid():
return self._get_cached_secret(self.ACCESS_TOKEN_KEY)
if self._refresh_token_is_valid() and self._authenticate_with_refresh_token():
return self._get_cached_secret(self.ACCESS_TOKEN_KEY)
if self.auth_mode != "token":
raise RuntimeError(f"Nieobsługiwany tryb autoryzacji KSeF: {self.auth_mode}")
self._authenticate_with_ksef_token()
token = self._get_cached_secret(self.ACCESS_TOKEN_KEY)
if not token:
raise RuntimeError("Nie udało się uzyskać access tokena KSeF.")
return token
def _headers(self, use_auth: bool = True, accept: str = "application/json") -> dict[str, str]:
headers = {"Accept": accept}
if self.client_id:
headers["X-Client-Id"] = self.client_id
if use_auth:
headers["Authorization"] = f"Bearer {self._ensure_access_token()}"
return headers
def _request(self, method, path, params=None, json=None, accept="application/json", use_auth=True):
last_error = None
url = f"{self.base_url}/{path.lstrip('/')}"
for attempt in range(3):
try:
headers = self._headers(use_auth=use_auth, accept=accept)
response = requests.request(
method,
url,
headers=headers,
params=params,
json=json,
timeout=30,
)
if response.status_code == 401 and use_auth and attempt == 0:
self._clear_access_token_cache()
headers = self._headers(use_auth=use_auth, accept=accept)
response = requests.request(
method,
url,
headers=headers,
params=params,
json=json,
timeout=30,
)
response.raise_for_status()
if not response.content:
return {"status": "ok"}
content_type = response.headers.get("Content-Type", "")
if "application/json" in content_type or accept == "application/json":
return response.json()
return response.text
except requests.HTTPError as exc:
response = getattr(exc, 'response', None)
if response is not None and response.status_code == 429:
retry_after = response.headers.get('Retry-After')
wait_hint = f' Spróbuj ponownie za około {retry_after} s.' if retry_after else ' Spróbuj ponownie za chwilę.'
last_error = RuntimeError('KSeF chwilowo ogranicza liczbę zapytań (HTTP 429).' + wait_hint)
else:
last_error = exc
time.sleep(2 ** attempt)
except Exception as exc:
last_error = exc
time.sleep(2 ** attempt)
raise last_error
@staticmethod
def _decimal_from_item(item: dict, *keys: str) -> Decimal:
for key in keys:
if item.get(key) is not None:
return Decimal(str(item.get(key)))
return Decimal("0")
@staticmethod
def _clean_text(value: Any) -> str:
if value is None:
return ""
return str(value).strip()
@staticmethod
def _digits_only(value: Any) -> str:
return re.sub(r"\D", "", str(value or ""))
def _pick_contractor(self, item: dict) -> tuple[str, str]:
seller = item.get("seller") or {}
buyer = item.get("buyer") or {}
seller_nip = re.sub(r"\D", "", seller.get("nip") or ((seller.get("identifier") or {}).get("value", "")))
buyer_nip = re.sub(r"\D", "", buyer.get("nip") or ((buyer.get("identifier") or {}).get("value", "")))
if self.tax_id and seller_nip == self.tax_id:
contractor = buyer
elif self.tax_id and buyer_nip == self.tax_id:
contractor = seller
else:
contractor = buyer or seller
identifier = contractor.get("identifier") or {}
nip = contractor.get("nip") or identifier.get("value", "")
return contractor.get("name", "Brak"), nip
@staticmethod
def _strip_namespace(tag: str) -> str:
if "}" in tag:
return tag.split("}", 1)[1]
return tag
def _find_first_text(self, root: ET.Element, local_names: list[str]) -> str:
wanted = {name.lower() for name in local_names}
for element in root.iter():
if self._strip_namespace(element.tag).lower() in wanted:
text = self._clean_text(element.text)
if text:
return text
return ""
def _find_first_child_text(self, element: ET.Element, local_names: list[str]) -> str:
wanted = {name.lower() for name in local_names}
for node in element.iter():
if self._strip_namespace(node.tag).lower() in wanted:
text = self._clean_text(node.text)
if text:
return text
return ""
def _find_direct_child(self, element: ET.Element, local_names: list[str]) -> ET.Element | None:
wanted = {name.lower() for name in local_names}
for child in list(element):
if self._strip_namespace(child.tag).lower() in wanted:
return child
return None
def _find_party_nodes(self, root: ET.Element) -> list[ET.Element]:
result: list[ET.Element] = []
wanted = {
"podmiot1",
"podmiot2",
"sprzedawca",
"nabywca",
"sprzedawca1",
"nabywca1",
}
for element in root.iter():
local_name = self._strip_namespace(element.tag).lower()
if local_name in wanted:
result.append(element)
return result
def _parse_adres_node(self, address_node: ET.Element | None) -> dict[str, str]:
result = {
"street": "",
"city": "",
"postal_code": "",
"country": "",
"address": "",
}
if address_node is None:
return result
def get_text(names: list[str]) -> str:
return self._find_first_child_text(address_node, names)
def split_adres_l2(raw: str) -> tuple[str, str]:
cleaned = self._clean_text(raw)
if not cleaned:
return "", ""
match = re.match(r"^(\d{2}-\d{3})\s+(.+)$", cleaned)
if match:
return match.group(1), self._clean_text(match.group(2))
return "", cleaned
street = get_text(["Ulica", "AdresL1"])
house_no = get_text(["NrDomu"])
apartment_no = get_text(["NrLokalu"])
city = get_text(["Miejscowosc"])
postal_code = get_text(["KodPocztowy"])
country = get_text(["Kraj", "KodKraju"])
adres_l2 = get_text(["AdresL2"])
if adres_l2:
parsed_postal_code, parsed_city = split_adres_l2(adres_l2)
postal_code = postal_code or parsed_postal_code
city = city or parsed_city
street_line = self._clean_text(street)
if not street_line:
street_parts = [part for part in [street, house_no] if part]
street_line = " ".join(street_parts).strip()
if apartment_no:
street_line = f"{street_line}/{apartment_no}" if street_line else apartment_no
address_parts = [part for part in [street_line, postal_code, city, country] if part]
full_address = ", ".join(address_parts)
result["street"] = street_line
result["city"] = city
result["postal_code"] = postal_code
result["country"] = country
result["address"] = full_address
return result
def _parse_party_element(self, element: ET.Element) -> dict[str, str]:
name = ""
nip = ""
identity_node = self._find_direct_child(element, ["DaneIdentyfikacyjne"])
if identity_node is not None:
name = self._find_first_child_text(identity_node, ["PelnaNazwa", "Nazwa", "NazwaPodmiotu"])
nip = self._digits_only(self._find_first_child_text(identity_node, ["NIP", "NrIdentyfikacjiPodatkowej"]))
if not name:
name = self._find_first_child_text(element, ["PelnaNazwa", "Nazwa", "NazwaPodmiotu"])
if not nip:
nip = self._digits_only(self._find_first_child_text(element, ["NIP", "NrIdentyfikacjiPodatkowej"]))
main_address_node = self._find_direct_child(element, ["Adres"])
correspondence_address_node = self._find_direct_child(element, ["AdresKoresp"])
main_address = self._parse_adres_node(main_address_node)
correspondence_address = self._parse_adres_node(correspondence_address_node)
chosen_address = main_address
if not chosen_address.get("address") and correspondence_address.get("address"):
chosen_address = correspondence_address
return {
"name": name,
"nip": nip,
"street": chosen_address.get("street", ""),
"city": chosen_address.get("city", ""),
"postal_code": chosen_address.get("postal_code", ""),
"country": chosen_address.get("country", ""),
"address": chosen_address.get("address", ""),
}
def _match_party_role_from_root(
self,
root: ET.Element,
contractor_nip: str = "",
contractor_name: str = "",
) -> dict[str, str]:
empty_result = {
"name": "",
"nip": "",
"street": "",
"city": "",
"postal_code": "",
"country": "",
"address": "",
}
normalized_tax_id = self._digits_only(self.tax_id)
normalized_contractor_nip = self._digits_only(contractor_nip)
normalized_contractor_name = self._clean_text(contractor_name).lower()
podmiot1 = None
podmiot2 = None
for node in self._find_party_nodes(root):
local_name = self._strip_namespace(node.tag).lower()
if local_name == "podmiot1" and podmiot1 is None:
podmiot1 = self._parse_party_element(node)
elif local_name == "podmiot2" and podmiot2 is None:
podmiot2 = self._parse_party_element(node)
if podmiot1 or podmiot2:
if normalized_tax_id:
if podmiot1 and self._digits_only(podmiot1.get("nip")) == normalized_tax_id:
return podmiot2 or empty_result
if podmiot2 and self._digits_only(podmiot2.get("nip")) == normalized_tax_id:
return podmiot1 or empty_result
if normalized_contractor_nip:
if podmiot1 and self._digits_only(podmiot1.get("nip")) == normalized_contractor_nip:
return podmiot1
if podmiot2 and self._digits_only(podmiot2.get("nip")) == normalized_contractor_nip:
return podmiot2
if normalized_contractor_name:
if podmiot1 and self._clean_text(podmiot1.get("name")).lower() == normalized_contractor_name:
return podmiot1
if podmiot2 and self._clean_text(podmiot2.get("name")).lower() == normalized_contractor_name:
return podmiot2
return podmiot2 or podmiot1 or empty_result
parties = [self._parse_party_element(node) for node in self._find_party_nodes(root)]
parties = [party for party in parties if any(party.values())]
if normalized_contractor_nip:
for party in parties:
if self._digits_only(party.get("nip")) == normalized_contractor_nip:
return party
if normalized_contractor_name:
for party in parties:
party_name = self._clean_text(party.get("name")).lower()
if party_name and party_name == normalized_contractor_name:
return party
return parties[0] if parties else empty_result
def _extract_party_details_from_xml(
self,
xml_content: str,
contractor_nip: str = "",
contractor_name: str = "",
) -> dict[str, str]:
empty_result = {
"name": "",
"nip": "",
"street": "",
"city": "",
"postal_code": "",
"country": "",
"address": "",
}
if not xml_content:
return empty_result
try:
root = ET.fromstring(xml_content)
except Exception:
return empty_result
matched_party = self._match_party_role_from_root(
root=root,
contractor_nip=contractor_nip,
contractor_name=contractor_name,
)
if matched_party and any(matched_party.values()):
return matched_party
street = self._find_first_text(root, ["Ulica", "AdresL1"])
house_no = self._find_first_text(root, ["NrDomu"])
apartment_no = self._find_first_text(root, ["NrLokalu"])
city = self._find_first_text(root, ["Miejscowosc"])
postal_code = self._find_first_text(root, ["KodPocztowy"])
country = self._find_first_text(root, ["Kraj", "KodKraju"])
adres_l2 = self._find_first_text(root, ["AdresL2"])
if adres_l2 and (not postal_code or not city):
match = re.match(r"^(\d{2}-\d{3})\s+(.+)$", self._clean_text(adres_l2))
if match:
postal_code = postal_code or match.group(1)
city = city or self._clean_text(match.group(2))
elif not city:
city = self._clean_text(adres_l2)
street_parts = [part for part in [street, house_no] if part]
street_line = " ".join(street_parts).strip()
if apartment_no:
street_line = f"{street_line}/{apartment_no}" if street_line else apartment_no
address_parts = [part for part in [street_line, postal_code, city, country] if part]
address = ", ".join(address_parts)
return {
"name": "",
"nip": "",
"street": street_line,
"city": city,
"postal_code": postal_code,
"country": country,
"address": address,
}
def _enrich_metadata_with_xml_party_data(
self,
item: dict[str, Any],
xml_content: str,
contractor_name: str,
contractor_nip: str,
) -> dict[str, Any]:
metadata = dict(item or {})
party = self._extract_party_details_from_xml(
xml_content=xml_content,
contractor_nip=contractor_nip,
contractor_name=contractor_name,
)
metadata.pop("contractor_regon", None)
metadata["contractor_street"] = party.get("street", "")
metadata["contractor_city"] = party.get("city", "")
metadata["contractor_postal_code"] = party.get("postal_code", "")
metadata["contractor_country"] = party.get("country", "")
metadata["contractor_address"] = party.get("address", "")
if party.get("name") and contractor_name == "Brak":
metadata["contractor_name_from_xml"] = party.get("name", "")
if party.get("nip") and not contractor_nip:
metadata["contractor_nip_from_xml"] = party.get("nip", "")
return metadata
def list_documents(self, since=None):
now = datetime.utcnow()
date_from = self._ensure_naive_utc(since) if since else (now - timedelta(days=30))
request_body = {
"subjectType": "Subject2",
"dateRange": {
"dateType": "PermanentStorage",
"from": date_from.isoformat(),
"to": now.isoformat(),
},
"pageOffset": 0,
"pageSize": 100,
}
data = self._request("POST", "/invoices/query/metadata", json=request_body)
invoices = data.get("invoices", [])
documents = []
for item in invoices:
issue_date_raw = item.get("issueDate") or now.date().isoformat()
issue_date = date.fromisoformat(issue_date_raw)
fetched_at = (
self._parse_datetime(item.get("acquisitionDate"))
or self._parse_datetime(item.get("permanentStorageDate"))
or datetime.utcnow().replace(tzinfo=timezone.utc)
)
fetched_at = fetched_at.astimezone(timezone.utc).replace(tzinfo=None)
contractor_name, contractor_nip = self._pick_contractor(item)
xml_content = ""
ksef_number = item["ksefNumber"]
try:
xml_content = self._request(
"GET",
f"/invoices/ksef/{ksef_number}",
accept="application/xml",
)
except Exception:
xml_content = "<Invoice/>"
enriched_metadata = self._enrich_metadata_with_xml_party_data(
item=item,
xml_content=xml_content,
contractor_name=contractor_name,
contractor_nip=contractor_nip,
)
if enriched_metadata.get("contractor_name_from_xml") and contractor_name == "Brak":
contractor_name = enriched_metadata["contractor_name_from_xml"]
if enriched_metadata.get("contractor_nip_from_xml") and not contractor_nip:
contractor_nip = enriched_metadata["contractor_nip_from_xml"]
documents.append(
KSeFDocument(
ksef_number=ksef_number,
invoice_number=item.get("invoiceNumber", ksef_number),
contractor_name=contractor_name,
contractor_nip=contractor_nip,
issue_date=issue_date,
received_date=issue_date,
fetched_at=fetched_at,
net_amount=self._decimal_from_item(item, "netAmount", "invoiceNetAmount"),
vat_amount=self._decimal_from_item(item, "vatAmount", "invoiceVatAmount"),
gross_amount=self._decimal_from_item(item, "grossAmount", "invoiceGrossAmount"),
invoice_type="purchase",
xml_content=xml_content,
metadata=enriched_metadata,
)
)
return documents
def issue_invoice(self, payload: dict):
xml_content = (payload or {}).get("xml_content")
if not xml_content:
raise RuntimeError("Brak xml_content w payloadzie wysyłki do KSeF.")
schema_version = str((payload or {}).get("schemaVersion") or "FA(3)")
xml_bytes = self._normalize_xml_bytes(xml_content)
encryption_data = self._build_online_encryption_data()
encrypted_invoice = self._encrypt_invoice_bytes(
xml_bytes,
encryption_data["symmetric_key"],
encryption_data["iv"],
)
session_response = self._open_online_session(schema_version=schema_version, encryption_data=encryption_data)
session_reference = self._first_present(session_response, "referenceNumber", "sessionReferenceNumber")
if not session_reference:
raise RuntimeError("KSeF nie zwrócił numeru referencyjnego sesji interaktywnej.")
send_payload = {
"invoiceHash": self._sha256_base64(xml_bytes),
"invoiceSize": len(xml_bytes),
"encryptedDocumentHash": self._sha256_base64(encrypted_invoice),
"encryptedDocumentSize": len(encrypted_invoice),
"encryptedDocumentContent": base64.b64encode(encrypted_invoice).decode("ascii"),
}
send_response = self._request("POST", f"/sessions/online/{session_reference}/invoices", json=send_payload)
invoice_reference = self._first_present(send_response, "referenceNumber", "invoiceReferenceNumber")
if not invoice_reference:
raise RuntimeError("KSeF nie zwrócił numeru referencyjnego przesłanej faktury.")
close_error = None
try:
self._request("POST", f"/sessions/online/{session_reference}/close")
except Exception as exc:
close_error = exc
status_payload = self._poll_invoice_processing(session_reference, invoice_reference)
code = self._extract_status_code(status_payload)
ksef_number = self._first_present(
status_payload,
"ksefNumber",
"ksefReferenceNumber",
"invoiceKsefNumber",
)
if not ksef_number and isinstance(status_payload.get("invoice"), dict):
ksef_number = self._first_present(status_payload.get("invoice"), "ksefNumber", "ksefReferenceNumber")
upo_xml = self._fetch_invoice_upo(session_reference, invoice_reference, ksef_number=str(ksef_number) if ksef_number else None)
message = "Wysłano fakturę do KSeF."
if code == 200 and ksef_number:
message = f"Faktura została przyjęta przez KSeF. Numer KSeF: {ksef_number}."
elif code and code < 300:
message = "Faktura została wysłana do KSeF i oczekuje na końcowe przetworzenie."
if close_error:
message += f" Sesja została zamknięta z ostrzeżeniem: {close_error}"
return {
"ksef_number": ksef_number or f"PENDING/{session_reference}/{invoice_reference}",
"status": "issued" if code == 200 and ksef_number else "queued",
"message": message,
"xml_content": xml_content,
"session_reference_number": session_reference,
"invoice_reference_number": invoice_reference,
"status_payload": status_payload,
"upo_xml": upo_xml,
}
def ping(self):
try:
self._request("GET", "/auth/sessions", params={"pageSize": 10})
return {"status": "ok", "message": "API KSeF odpowiada, a uwierzytelnienie działa poprawnie."}
except Exception as exc:
return {"status": "error", "message": str(exc)}
def diagnostics(self):
try:
access_token_state = "valid" if self._access_token_is_valid() else "missing_or_expired"
refresh_token_state = "valid" if self._refresh_token_is_valid() else "missing_or_expired"
sessions = self._request("GET", "/auth/sessions", params={"pageSize": 10})
sample = sessions if isinstance(sessions, dict) else {"sessions": sessions}
return {
"status": "ok",
"message": "API KSeF odpowiada.",
"base_url": self.base_url,
"auth_mode": self.auth_mode,
"tax_id": self.tax_id,
"token_configured": bool(self.token),
"certificate_configured": bool(self.certificate_data),
"access_token_state": access_token_state,
"refresh_token_state": refresh_token_state,
"sample": sample,
}
except Exception as exc:
return {
"status": "error",
"message": str(exc),
"base_url": self.base_url,
"auth_mode": self.auth_mode,
"tax_id": self.tax_id,
"token_configured": bool(self.token),
"certificate_configured": bool(self.certificate_data),
"sample": {"error": str(exc)},
}
class KSeFService:
def __init__(self, company_id=None):
use_mock = SettingsService.get("ksef.mock_mode", "true", company_id=company_id) == "true"
self.adapter = MockKSeFAdapter(company_id=company_id) if use_mock else RequestsKSeFAdapter(company_id=company_id)
def list_documents(self, since=None):
return self.adapter.list_documents(since=since)
def issue_invoice(self, payload: dict):
return self.adapter.issue_invoice(payload)
def ping(self):
return self.adapter.ping()
def diagnostics(self):
return self.adapter.diagnostics()
@staticmethod
def calc_hash(xml_content: str) -> str:
return hashlib.sha256(xml_content.encode("utf-8")).hexdigest()