1279 lines
50 KiB
Python
1279 lines
50 KiB
Python
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass
|
|
from datetime import date, datetime, timedelta, timezone
|
|
from decimal import Decimal
|
|
import base64
|
|
import hashlib
|
|
import re
|
|
import time
|
|
import xml.etree.ElementTree as ET
|
|
from typing import Any
|
|
|
|
import requests
|
|
|
|
from app.services.settings_service import SettingsService
|
|
|
|
try:
|
|
from cryptography import x509
|
|
from cryptography.hazmat.primitives import hashes, serialization, padding as sym_padding
|
|
from cryptography.hazmat.primitives.asymmetric import padding
|
|
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
|
|
except Exception: # pragma: no cover
|
|
x509 = None
|
|
hashes = None
|
|
serialization = None
|
|
sym_padding = None
|
|
Cipher = None
|
|
algorithms = None
|
|
modes = None
|
|
padding = None
|
|
|
|
|
|
@dataclass
|
|
class KSeFDocument:
|
|
ksef_number: str
|
|
invoice_number: str
|
|
contractor_name: str
|
|
contractor_nip: str
|
|
issue_date: date
|
|
received_date: date
|
|
fetched_at: datetime
|
|
net_amount: Decimal
|
|
vat_amount: Decimal
|
|
gross_amount: Decimal
|
|
invoice_type: str
|
|
xml_content: str
|
|
metadata: dict
|
|
|
|
|
|
class KSeFAdapter:
|
|
def list_documents(self, since: datetime | None = None):
|
|
raise NotImplementedError
|
|
|
|
def issue_invoice(self, payload: dict):
|
|
raise NotImplementedError
|
|
|
|
def ping(self):
|
|
return {"status": "unknown"}
|
|
|
|
def diagnostics(self):
|
|
return self.ping()
|
|
|
|
|
|
class MockKSeFAdapter(KSeFAdapter):
|
|
def __init__(self, company_id=None):
|
|
self.company_id = company_id or 0
|
|
|
|
def list_documents(self, since=None):
|
|
base_date = datetime.utcnow()
|
|
docs = []
|
|
for i in range(1, 6):
|
|
issue = (base_date - timedelta(days=i * 3)).date()
|
|
net = Decimal(1000 + i * 250)
|
|
vat = (net * Decimal("0.23")).quantize(Decimal("0.01"))
|
|
gross = net + vat
|
|
seq = 10000 + i
|
|
docs.append(
|
|
KSeFDocument(
|
|
ksef_number=f"KSEF/MOCK/C{self.company_id}/{issue.year}/{seq}",
|
|
invoice_number=f"FV/{self.company_id}/{i:03d}/{issue.year}",
|
|
contractor_name=f"Firma {self.company_id}-{i}",
|
|
contractor_nip=f"525{self.company_id:02d}000{i:02d}",
|
|
issue_date=issue,
|
|
received_date=issue + timedelta(days=1),
|
|
fetched_at=base_date - timedelta(hours=i),
|
|
net_amount=net,
|
|
vat_amount=vat,
|
|
gross_amount=gross,
|
|
invoice_type="purchase" if i % 2 else "sale",
|
|
xml_content=(
|
|
f"<Invoice><Company>{self.company_id}</Company>"
|
|
f"<Ksef>KSEF/MOCK/C{self.company_id}/{issue.year}/{seq}</Ksef></Invoice>"
|
|
),
|
|
metadata={"source": "mock", "sequence": i, "company_id": self.company_id},
|
|
)
|
|
)
|
|
if since:
|
|
docs = [d for d in docs if d.fetched_at >= since]
|
|
return docs
|
|
|
|
def issue_invoice(self, payload: dict):
|
|
now = datetime.utcnow()
|
|
seq = int(now.timestamp())
|
|
return {
|
|
"ksef_number": f"KSEF/MOCK/ISSUED/C{self.company_id}/{now.year}/{seq}",
|
|
"status": "issued_mock",
|
|
"message": "Tryb mock: dokument nie został wysłany do produkcyjnego KSeF, tylko zasymulowany lokalnie.",
|
|
"xml_content": payload.get("xml_content", ""),
|
|
}
|
|
|
|
def ping(self):
|
|
return {
|
|
"status": "mock",
|
|
"message": "Tryb mock aktywny - połączenie z prawdziwym API nie jest wykonywane.",
|
|
}
|
|
|
|
def diagnostics(self):
|
|
ping = self.ping()
|
|
ping["base_url"] = "mock://ksef"
|
|
ping["sample"] = {
|
|
"status": "mock",
|
|
"documentExample": {
|
|
"invoiceNumber": f"FV/{self.company_id}/001/{datetime.utcnow().year}",
|
|
"ksefNumber": f"KSEF/MOCK/C{self.company_id}/{datetime.utcnow().year}/10001",
|
|
},
|
|
}
|
|
return ping
|
|
|
|
|
|
class RequestsKSeFAdapter(KSeFAdapter):
|
|
ENVIRONMENT_URLS = {
|
|
'prod': 'https://api.ksef.mf.gov.pl/v2',
|
|
'test': 'https://api-test.ksef.mf.gov.pl/v2',
|
|
'demo': 'https://api-demo.ksef.mf.gov.pl/v2',
|
|
}
|
|
ACCESS_TOKEN_KEY = "ksef.auth.access_token"
|
|
ACCESS_TOKEN_VALID_UNTIL_KEY = "ksef.auth.access_token_valid_until"
|
|
REFRESH_TOKEN_KEY = "ksef.auth.refresh_token"
|
|
REFRESH_TOKEN_VALID_UNTIL_KEY = "ksef.auth.refresh_token_valid_until"
|
|
|
|
def __init__(self, company_id=None):
|
|
self.company_id = company_id
|
|
self.environment = self._resolve_environment(company_id=company_id)
|
|
configured_base_url = SettingsService.get(
|
|
"ksef.base_url",
|
|
self.ENVIRONMENT_URLS[self.environment],
|
|
company_id=company_id,
|
|
)
|
|
self.base_url = self._normalize_base_url(configured_base_url or self.ENVIRONMENT_URLS[self.environment])
|
|
self.auth_mode = SettingsService.get_effective("ksef.auth_mode", "token", company_id=company_id, scope_name='ksef', user_default='user')
|
|
self.token = (self._settings_get_secret("ksef.token", "") or "").strip()
|
|
self.client_id = (SettingsService.get_effective("ksef.client_id", "", company_id=company_id, scope_name='ksef', user_default='user') or "").strip()
|
|
self.certificate_name = (SettingsService.get_effective("ksef.certificate_name", "", company_id=company_id, scope_name='ksef', user_default='user') or "").strip()
|
|
self.certificate_data = self._settings_get_secret("ksef.certificate_data", "") or ""
|
|
self.tax_id = self._resolve_tax_id()
|
|
|
|
self._runtime_secret_cache: dict[str, str] = {}
|
|
self._runtime_value_cache: dict[str, str] = {}
|
|
|
|
def _settings_get_secret(self, key: str, default: str = "") -> str:
|
|
getter = getattr(SettingsService, "get_secret", None)
|
|
if callable(getter):
|
|
return getter(key, default, company_id=self.company_id)
|
|
return SettingsService.get(key, default, company_id=self.company_id)
|
|
|
|
@classmethod
|
|
def _resolve_environment(cls, company_id=None) -> str:
|
|
environment = (SettingsService.get_effective("ksef.environment", "prod", company_id=company_id, scope_name='ksef', user_default='user') or "").strip().lower()
|
|
if environment in cls.ENVIRONMENT_URLS:
|
|
return environment
|
|
|
|
base_url = (SettingsService.get_effective("ksef.base_url", "", company_id=company_id, scope_name='ksef', user_default='user') or "").strip().lower()
|
|
if "api-test.ksef.mf.gov.pl" in base_url:
|
|
return "test"
|
|
return "prod"
|
|
|
|
@staticmethod
|
|
def _normalize_base_url(base_url: str) -> str:
|
|
base = (base_url or "https://api.ksef.mf.gov.pl/v2").strip().rstrip("/")
|
|
if base.endswith("/docs/v2"):
|
|
return base[:-8] + "/v2"
|
|
if not base.endswith("/v2"):
|
|
base = f"{base}/v2"
|
|
return base.rstrip("/")
|
|
|
|
def _resolve_tax_id(self) -> str:
|
|
candidates = [
|
|
SettingsService.get("company.tax_id", "", company_id=self.company_id),
|
|
SettingsService.get("company.nip", "", company_id=self.company_id),
|
|
SettingsService.get("tax_id", "", company_id=self.company_id),
|
|
SettingsService.get("nip", "", company_id=self.company_id),
|
|
]
|
|
|
|
try:
|
|
from app.models.company import Company
|
|
|
|
if self.company_id:
|
|
company = Company.query.get(self.company_id)
|
|
if company:
|
|
candidates.extend([
|
|
getattr(company, "tax_id", ""),
|
|
getattr(company, "nip", ""),
|
|
getattr(company, "vat_number", ""),
|
|
])
|
|
except Exception:
|
|
pass
|
|
|
|
for value in candidates:
|
|
digits = re.sub(r"\D", "", value or "")
|
|
if len(digits) == 10:
|
|
return digits
|
|
|
|
return ""
|
|
|
|
@staticmethod
|
|
def _parse_datetime(value: str | None) -> datetime | None:
|
|
if not value:
|
|
return None
|
|
raw = value.strip()
|
|
if raw.endswith("Z"):
|
|
raw = raw[:-1] + "+00:00"
|
|
try:
|
|
return datetime.fromisoformat(raw)
|
|
except ValueError:
|
|
return None
|
|
|
|
@staticmethod
|
|
def _ensure_naive_utc(dt: datetime) -> datetime:
|
|
if dt.tzinfo is None:
|
|
return dt
|
|
return dt.astimezone(timezone.utc).replace(tzinfo=None)
|
|
|
|
def _get_cached_secret(self, key: str) -> str:
|
|
if key in self._runtime_secret_cache:
|
|
return (self._runtime_secret_cache.get(key) or "").strip()
|
|
return (self._settings_get_secret(key, "") or "").strip()
|
|
|
|
def _set_cached_secret(self, key: str, value: str):
|
|
self._runtime_secret_cache[key] = value or ""
|
|
|
|
def _get_cached_value(self, key: str) -> str:
|
|
if key in self._runtime_value_cache:
|
|
return (self._runtime_value_cache.get(key) or "").strip()
|
|
return (SettingsService.get(key, "", company_id=self.company_id) or "").strip()
|
|
|
|
def _set_cached_value(self, key: str, value: str):
|
|
self._runtime_value_cache[key] = value or ""
|
|
|
|
def _clear_access_token_cache(self):
|
|
self._set_cached_secret(self.ACCESS_TOKEN_KEY, "")
|
|
self._set_cached_value(self.ACCESS_TOKEN_VALID_UNTIL_KEY, "")
|
|
|
|
def _clear_refresh_token_cache(self):
|
|
self._set_cached_secret(self.REFRESH_TOKEN_KEY, "")
|
|
self._set_cached_value(self.REFRESH_TOKEN_VALID_UNTIL_KEY, "")
|
|
|
|
def _access_token_is_valid(self) -> bool:
|
|
token = self._get_cached_secret(self.ACCESS_TOKEN_KEY)
|
|
valid_until = self._parse_datetime(self._get_cached_value(self.ACCESS_TOKEN_VALID_UNTIL_KEY))
|
|
if not token or not valid_until:
|
|
return False
|
|
return valid_until > datetime.now(timezone.utc) + timedelta(seconds=30)
|
|
|
|
def _refresh_token_is_valid(self) -> bool:
|
|
token = self._get_cached_secret(self.REFRESH_TOKEN_KEY)
|
|
valid_until = self._parse_datetime(self._get_cached_value(self.REFRESH_TOKEN_VALID_UNTIL_KEY))
|
|
if not token or not valid_until:
|
|
return False
|
|
return valid_until > datetime.now(timezone.utc) + timedelta(seconds=30)
|
|
|
|
def _get_public_key_pem(self) -> str:
|
|
url = f"{self.base_url}/security/public-key-certificates"
|
|
response = requests.get(
|
|
url,
|
|
headers={"Accept": "application/json"},
|
|
timeout=30,
|
|
)
|
|
response.raise_for_status()
|
|
|
|
payload = response.json()
|
|
if not isinstance(payload, list) or not payload:
|
|
raise RuntimeError("KSeF nie zwrócił listy certyfikatów klucza publicznego.")
|
|
|
|
for item in payload:
|
|
if not isinstance(item, dict):
|
|
continue
|
|
|
|
raw_cert = item.get("certificate")
|
|
if not raw_cert:
|
|
continue
|
|
|
|
raw_cert = raw_cert.strip()
|
|
|
|
if "BEGIN CERTIFICATE" in raw_cert:
|
|
return raw_cert
|
|
|
|
wrapped = "\n".join(
|
|
raw_cert[i:i + 64] for i in range(0, len(raw_cert), 64)
|
|
)
|
|
return f"-----BEGIN CERTIFICATE-----\n{wrapped}\n-----END CERTIFICATE-----"
|
|
|
|
raise RuntimeError("Nie udało się odczytać certyfikatu klucza publicznego KSeF.")
|
|
|
|
@staticmethod
|
|
def _sha256_base64(payload: bytes) -> str:
|
|
return base64.b64encode(hashlib.sha256(payload).digest()).decode("ascii")
|
|
|
|
@staticmethod
|
|
def _normalize_xml_bytes(xml_content: str) -> bytes:
|
|
if xml_content is None:
|
|
raise RuntimeError("Brak treści XML faktury do wysyłki.")
|
|
xml_bytes = xml_content.encode("utf-8")
|
|
if xml_bytes.startswith(b"\xef\xbb\xbf"):
|
|
xml_bytes = xml_bytes[3:]
|
|
if len(xml_bytes) > 1_000_000:
|
|
raise RuntimeError("Plik XML faktury przekracza limit 1 MB wymagany przez KSeF dla faktury bez załączników.")
|
|
return xml_bytes
|
|
|
|
def _build_online_encryption_data(self) -> dict[str, str | bytes]:
|
|
if not x509 or not serialization or not hashes or not padding or not Cipher or not algorithms or not modes or not sym_padding:
|
|
raise RuntimeError(
|
|
"Brak biblioteki cryptography. Zainstaluj ją, aby obsłużyć wysyłkę faktur do KSeF 2.x."
|
|
)
|
|
|
|
key = self._get_public_key_pem().encode("utf-8")
|
|
cert = x509.load_pem_x509_certificate(key)
|
|
public_key = cert.public_key()
|
|
|
|
symmetric_key = __import__("secrets").token_bytes(32)
|
|
iv = __import__("secrets").token_bytes(16)
|
|
encrypted_key = public_key.encrypt(
|
|
symmetric_key,
|
|
padding.OAEP(
|
|
mgf=padding.MGF1(algorithm=hashes.SHA256()),
|
|
algorithm=hashes.SHA256(),
|
|
label=None,
|
|
),
|
|
)
|
|
return {
|
|
"symmetric_key": symmetric_key,
|
|
"iv": iv,
|
|
"encrypted_symmetric_key": base64.b64encode(encrypted_key).decode("ascii"),
|
|
"initialization_vector": base64.b64encode(iv).decode("ascii"),
|
|
}
|
|
|
|
@staticmethod
|
|
def _encrypt_invoice_bytes(invoice_bytes: bytes, symmetric_key: bytes, iv: bytes) -> bytes:
|
|
padder = sym_padding.PKCS7(128).padder()
|
|
padded = padder.update(invoice_bytes) + padder.finalize()
|
|
cipher = Cipher(algorithms.AES(symmetric_key), modes.CBC(iv))
|
|
encryptor = cipher.encryptor()
|
|
encrypted = encryptor.update(padded) + encryptor.finalize()
|
|
return iv + encrypted
|
|
|
|
@staticmethod
|
|
def _extract_status_code(payload: dict | None) -> int | None:
|
|
if not isinstance(payload, dict):
|
|
return None
|
|
status = payload.get("status")
|
|
if isinstance(status, dict):
|
|
status = status.get("code")
|
|
elif isinstance(status, list) and status:
|
|
head = status[0]
|
|
status = head.get("code") if isinstance(head, dict) else head
|
|
if status is None and isinstance(payload.get("code"), int):
|
|
status = payload.get("code")
|
|
try:
|
|
return int(status) if status is not None else None
|
|
except Exception:
|
|
return None
|
|
|
|
@staticmethod
|
|
def _extract_status_description(payload: dict | None) -> str:
|
|
if not isinstance(payload, dict):
|
|
return ""
|
|
status = payload.get("status")
|
|
if isinstance(status, dict):
|
|
parts = [status.get("description") or ""]
|
|
details = status.get("details") or []
|
|
if isinstance(details, list):
|
|
parts.extend(str(item) for item in details if item)
|
|
return " | ".join(part for part in parts if part)
|
|
if payload.get("description"):
|
|
return str(payload.get("description"))
|
|
return ""
|
|
|
|
@staticmethod
|
|
def _first_present(mapping: dict | None, *keys: str):
|
|
if not isinstance(mapping, dict):
|
|
return None
|
|
for key in keys:
|
|
value = mapping.get(key)
|
|
if value not in (None, "", []):
|
|
return value
|
|
return None
|
|
|
|
def _request_binary(self, method, path, params=None, json=None, data=None, accept="application/octet-stream", content_type=None, use_auth=True):
|
|
last_error = None
|
|
url = f"{self.base_url}/{path.lstrip('/')}"
|
|
for attempt in range(3):
|
|
try:
|
|
headers = self._headers(use_auth=use_auth, accept=accept)
|
|
if content_type:
|
|
headers["Content-Type"] = content_type
|
|
response = requests.request(
|
|
method,
|
|
url,
|
|
headers=headers,
|
|
params=params,
|
|
json=json,
|
|
data=data,
|
|
timeout=30,
|
|
)
|
|
if response.status_code == 401 and use_auth and attempt == 0:
|
|
self._clear_access_token_cache()
|
|
headers = self._headers(use_auth=use_auth, accept=accept)
|
|
if content_type:
|
|
headers["Content-Type"] = content_type
|
|
response = requests.request(
|
|
method,
|
|
url,
|
|
headers=headers,
|
|
params=params,
|
|
json=json,
|
|
data=data,
|
|
timeout=30,
|
|
)
|
|
response.raise_for_status()
|
|
return response.content
|
|
except requests.HTTPError as exc:
|
|
response = getattr(exc, "response", None)
|
|
if response is not None and response.status_code == 429:
|
|
retry_after = response.headers.get("Retry-After")
|
|
wait_hint = f" Spróbuj ponownie za około {retry_after} s." if retry_after else " Spróbuj ponownie za chwilę."
|
|
last_error = RuntimeError("KSeF chwilowo ogranicza liczbę zapytań (HTTP 429)." + wait_hint)
|
|
else:
|
|
last_error = exc
|
|
time.sleep(2 ** attempt)
|
|
except Exception as exc:
|
|
last_error = exc
|
|
time.sleep(2 ** attempt)
|
|
raise last_error
|
|
|
|
def _open_online_session(self, schema_version: str, encryption_data: dict[str, str | bytes]) -> dict:
|
|
open_payload = {
|
|
"formCode": {
|
|
"systemCode": SettingsService.get_effective("ksef.form_code_system_code", "FA (3)", company_id=self.company_id, scope_name="ksef", user_default="user"),
|
|
"schemaVersion": SettingsService.get_effective("ksef.form_code_schema_version", "1-0E", company_id=self.company_id, scope_name="ksef", user_default="user"),
|
|
"value": SettingsService.get_effective("ksef.form_code_value", "FA", company_id=self.company_id, scope_name="ksef", user_default="user"),
|
|
},
|
|
"encryption": {
|
|
"encryptedSymmetricKey": encryption_data["encrypted_symmetric_key"],
|
|
"initializationVector": encryption_data["initialization_vector"],
|
|
},
|
|
}
|
|
if schema_version and schema_version.upper().startswith("FA("):
|
|
open_payload["formCode"]["systemCode"] = schema_version.upper().replace("(", " (")
|
|
return self._request("POST", "/sessions/online", json=open_payload)
|
|
|
|
def _get_session_invoice_status(self, session_reference: str, invoice_reference: str) -> dict:
|
|
return self._request("GET", f"/sessions/{session_reference}/invoices/{invoice_reference}")
|
|
|
|
def _poll_invoice_processing(self, session_reference: str, invoice_reference: str, timeout_seconds: int = 60) -> dict:
|
|
deadline = time.time() + timeout_seconds
|
|
last_payload: dict[str, Any] | None = None
|
|
while time.time() < deadline:
|
|
payload = self._get_session_invoice_status(session_reference, invoice_reference)
|
|
last_payload = payload if isinstance(payload, dict) else {"payload": payload}
|
|
code = self._extract_status_code(last_payload)
|
|
if code == 200:
|
|
return last_payload
|
|
if code and code >= 300:
|
|
description = self._extract_status_description(last_payload) or f"KSeF odrzucił fakturę. Kod statusu: {code}."
|
|
raise RuntimeError(description)
|
|
time.sleep(2)
|
|
return last_payload or {}
|
|
|
|
def _fetch_invoice_upo(self, session_reference: str, invoice_reference: str, ksef_number: str | None = None) -> str | None:
|
|
try:
|
|
if ksef_number:
|
|
raw = self._request_binary("GET", f"/sessions/{session_reference}/invoices/ksef/{ksef_number}/upo", accept="application/xml")
|
|
else:
|
|
raw = self._request_binary("GET", f"/sessions/{session_reference}/invoices/{invoice_reference}/upo", accept="application/xml")
|
|
return raw.decode("utf-8", errors="replace") if raw else None
|
|
except Exception:
|
|
return None
|
|
|
|
def _encrypt_token_payload(self, plain_text: str) -> str:
|
|
if not x509 or not serialization or not hashes or not padding:
|
|
raise RuntimeError(
|
|
"Brak biblioteki cryptography. Zainstaluj ją, aby obsłużyć logowanie tokenem KSeF 2.x."
|
|
)
|
|
|
|
pem = self._get_public_key_pem().encode("utf-8")
|
|
cert = x509.load_pem_x509_certificate(pem)
|
|
public_key = cert.public_key()
|
|
|
|
encrypted = public_key.encrypt(
|
|
plain_text.encode("utf-8"),
|
|
padding.OAEP(
|
|
mgf=padding.MGF1(algorithm=hashes.SHA256()),
|
|
algorithm=hashes.SHA256(),
|
|
label=None,
|
|
),
|
|
)
|
|
return base64.b64encode(encrypted).decode("ascii")
|
|
|
|
def _build_context_identifier(self) -> dict[str, str]:
|
|
if not self.tax_id:
|
|
raise RuntimeError(
|
|
f"Brak NIP firmy w ustawieniach lub rekordzie firmy dla company_id={self.company_id}. "
|
|
"Token KSeF wymaga contextIdentifier typu Nip."
|
|
)
|
|
return {"type": "Nip", "value": self.tax_id}
|
|
|
|
def _authenticate_with_refresh_token(self) -> bool:
|
|
refresh_token = self._get_cached_secret(self.REFRESH_TOKEN_KEY)
|
|
if not refresh_token:
|
|
return False
|
|
|
|
headers = {
|
|
"Accept": "application/json",
|
|
"Authorization": f"Bearer {refresh_token}",
|
|
}
|
|
if self.client_id:
|
|
headers["X-Client-Id"] = self.client_id
|
|
|
|
response = requests.post(
|
|
f"{self.base_url}/auth/token/refresh",
|
|
headers=headers,
|
|
timeout=30,
|
|
)
|
|
if response.status_code >= 400:
|
|
self._clear_refresh_token_cache()
|
|
return False
|
|
|
|
data = response.json()
|
|
access = data.get("accessToken") or {}
|
|
refresh = data.get("refreshToken") or {}
|
|
|
|
if not access.get("token"):
|
|
self._clear_refresh_token_cache()
|
|
return False
|
|
|
|
self._set_cached_secret(self.ACCESS_TOKEN_KEY, access["token"])
|
|
self._set_cached_value(self.ACCESS_TOKEN_VALID_UNTIL_KEY, access.get("validUntil", ""))
|
|
|
|
if refresh.get("token"):
|
|
self._set_cached_secret(self.REFRESH_TOKEN_KEY, refresh["token"])
|
|
self._set_cached_value(self.REFRESH_TOKEN_VALID_UNTIL_KEY, refresh.get("validUntil", ""))
|
|
|
|
return True
|
|
|
|
def _authenticate_with_ksef_token(self):
|
|
if not self.token:
|
|
raise RuntimeError("Brak tokenu KSeF w ustawieniach użytkownika.")
|
|
|
|
challenge_response = requests.post(
|
|
f"{self.base_url}/auth/challenge",
|
|
headers={"Accept": "application/json"},
|
|
timeout=30,
|
|
)
|
|
challenge_response.raise_for_status()
|
|
challenge_data = challenge_response.json()
|
|
challenge = challenge_data["challenge"]
|
|
|
|
challenge_timestamp_ms = challenge_data.get("timestampMs")
|
|
if challenge_timestamp_ms is None:
|
|
raise RuntimeError("KSeF nie zwrócił timestampMs w odpowiedzi /auth/challenge.")
|
|
|
|
try:
|
|
challenge_timestamp_ms = int(challenge_timestamp_ms)
|
|
except Exception as exc:
|
|
raise RuntimeError("Nieprawidłowy timestampMs z /auth/challenge.") from exc
|
|
|
|
encrypted_token = self._encrypt_token_payload(
|
|
f"{self.token}|{challenge_timestamp_ms}"
|
|
)
|
|
|
|
init_payload = {
|
|
"challenge": challenge,
|
|
"contextIdentifier": self._build_context_identifier(),
|
|
"encryptedToken": encrypted_token,
|
|
}
|
|
|
|
init_headers = {"Accept": "application/json"}
|
|
if self.client_id:
|
|
init_headers["X-Client-Id"] = self.client_id
|
|
|
|
init_response = requests.post(
|
|
f"{self.base_url}/auth/ksef-token",
|
|
headers=init_headers,
|
|
json=init_payload,
|
|
timeout=30,
|
|
)
|
|
init_response.raise_for_status()
|
|
init_data = init_response.json()
|
|
|
|
auth_token = (init_data.get("authenticationToken") or {}).get("token")
|
|
reference_number = init_data.get("referenceNumber")
|
|
if not auth_token or not reference_number:
|
|
raise RuntimeError("KSeF nie zwrócił authenticationToken lub referenceNumber.")
|
|
|
|
status_headers = {
|
|
"Accept": "application/json",
|
|
"Authorization": f"Bearer {auth_token}",
|
|
}
|
|
if self.client_id:
|
|
status_headers["X-Client-Id"] = self.client_id
|
|
|
|
for _ in range(20):
|
|
status_response = requests.get(
|
|
f"{self.base_url}/auth/{reference_number}",
|
|
headers=status_headers,
|
|
timeout=30,
|
|
)
|
|
status_response.raise_for_status()
|
|
|
|
payload = status_response.json()
|
|
status = payload.get("status") or {}
|
|
code = status.get("code")
|
|
|
|
if code == 200:
|
|
break
|
|
|
|
if code and code >= 300:
|
|
description = status.get("description") or "Błąd uwierzytelnienia KSeF."
|
|
details = status.get("details") or []
|
|
details_text = " | ".join(str(x) for x in details if x)
|
|
if details_text:
|
|
raise RuntimeError(f"{description} Details: {details_text}")
|
|
raise RuntimeError(description)
|
|
|
|
time.sleep(1)
|
|
else:
|
|
raise RuntimeError(
|
|
f"Przekroczono czas oczekiwania na zakończenie uwierzytelnienia KSeF dla {reference_number}."
|
|
)
|
|
|
|
redeem_headers = {
|
|
"Accept": "application/json",
|
|
"Authorization": f"Bearer {auth_token}",
|
|
}
|
|
if self.client_id:
|
|
redeem_headers["X-Client-Id"] = self.client_id
|
|
|
|
redeem_response = requests.post(
|
|
f"{self.base_url}/auth/token/redeem",
|
|
headers=redeem_headers,
|
|
timeout=30,
|
|
)
|
|
redeem_response.raise_for_status()
|
|
tokens = redeem_response.json()
|
|
|
|
access = tokens.get("accessToken") or {}
|
|
refresh = tokens.get("refreshToken") or {}
|
|
|
|
access_token = access.get("token")
|
|
refresh_token = refresh.get("token")
|
|
|
|
if not access_token:
|
|
raise RuntimeError("KSeF nie zwrócił access tokena.")
|
|
|
|
self._set_cached_secret(self.ACCESS_TOKEN_KEY, access_token)
|
|
self._set_cached_value(self.ACCESS_TOKEN_VALID_UNTIL_KEY, access.get("validUntil", ""))
|
|
|
|
if refresh_token:
|
|
self._set_cached_secret(self.REFRESH_TOKEN_KEY, refresh_token)
|
|
self._set_cached_value(self.REFRESH_TOKEN_VALID_UNTIL_KEY, refresh.get("validUntil", ""))
|
|
|
|
def _ensure_access_token(self) -> str:
|
|
if self._access_token_is_valid():
|
|
return self._get_cached_secret(self.ACCESS_TOKEN_KEY)
|
|
|
|
if self._refresh_token_is_valid() and self._authenticate_with_refresh_token():
|
|
return self._get_cached_secret(self.ACCESS_TOKEN_KEY)
|
|
|
|
if self.auth_mode != "token":
|
|
raise RuntimeError(f"Nieobsługiwany tryb autoryzacji KSeF: {self.auth_mode}")
|
|
|
|
self._authenticate_with_ksef_token()
|
|
token = self._get_cached_secret(self.ACCESS_TOKEN_KEY)
|
|
if not token:
|
|
raise RuntimeError("Nie udało się uzyskać access tokena KSeF.")
|
|
return token
|
|
|
|
def _headers(self, use_auth: bool = True, accept: str = "application/json") -> dict[str, str]:
|
|
headers = {"Accept": accept}
|
|
if self.client_id:
|
|
headers["X-Client-Id"] = self.client_id
|
|
if use_auth:
|
|
headers["Authorization"] = f"Bearer {self._ensure_access_token()}"
|
|
return headers
|
|
|
|
def _request(self, method, path, params=None, json=None, accept="application/json", use_auth=True):
|
|
last_error = None
|
|
url = f"{self.base_url}/{path.lstrip('/')}"
|
|
for attempt in range(3):
|
|
try:
|
|
headers = self._headers(use_auth=use_auth, accept=accept)
|
|
response = requests.request(
|
|
method,
|
|
url,
|
|
headers=headers,
|
|
params=params,
|
|
json=json,
|
|
timeout=30,
|
|
)
|
|
|
|
if response.status_code == 401 and use_auth and attempt == 0:
|
|
self._clear_access_token_cache()
|
|
headers = self._headers(use_auth=use_auth, accept=accept)
|
|
response = requests.request(
|
|
method,
|
|
url,
|
|
headers=headers,
|
|
params=params,
|
|
json=json,
|
|
timeout=30,
|
|
)
|
|
|
|
response.raise_for_status()
|
|
|
|
if not response.content:
|
|
return {"status": "ok"}
|
|
|
|
content_type = response.headers.get("Content-Type", "")
|
|
if "application/json" in content_type or accept == "application/json":
|
|
return response.json()
|
|
return response.text
|
|
except requests.HTTPError as exc:
|
|
response = getattr(exc, 'response', None)
|
|
if response is not None and response.status_code == 429:
|
|
retry_after = response.headers.get('Retry-After')
|
|
wait_hint = f' Spróbuj ponownie za około {retry_after} s.' if retry_after else ' Spróbuj ponownie za chwilę.'
|
|
last_error = RuntimeError('KSeF chwilowo ogranicza liczbę zapytań (HTTP 429).' + wait_hint)
|
|
else:
|
|
last_error = exc
|
|
time.sleep(2 ** attempt)
|
|
except Exception as exc:
|
|
last_error = exc
|
|
time.sleep(2 ** attempt)
|
|
raise last_error
|
|
|
|
@staticmethod
|
|
def _decimal_from_item(item: dict, *keys: str) -> Decimal:
|
|
for key in keys:
|
|
if item.get(key) is not None:
|
|
return Decimal(str(item.get(key)))
|
|
return Decimal("0")
|
|
|
|
@staticmethod
|
|
def _clean_text(value: Any) -> str:
|
|
if value is None:
|
|
return ""
|
|
return str(value).strip()
|
|
|
|
@staticmethod
|
|
def _digits_only(value: Any) -> str:
|
|
return re.sub(r"\D", "", str(value or ""))
|
|
|
|
def _pick_contractor(self, item: dict) -> tuple[str, str]:
|
|
seller = item.get("seller") or {}
|
|
buyer = item.get("buyer") or {}
|
|
|
|
seller_nip = re.sub(r"\D", "", seller.get("nip") or ((seller.get("identifier") or {}).get("value", "")))
|
|
buyer_nip = re.sub(r"\D", "", buyer.get("nip") or ((buyer.get("identifier") or {}).get("value", "")))
|
|
|
|
if self.tax_id and seller_nip == self.tax_id:
|
|
contractor = buyer
|
|
elif self.tax_id and buyer_nip == self.tax_id:
|
|
contractor = seller
|
|
else:
|
|
contractor = buyer or seller
|
|
|
|
identifier = contractor.get("identifier") or {}
|
|
nip = contractor.get("nip") or identifier.get("value", "")
|
|
return contractor.get("name", "Brak"), nip
|
|
|
|
@staticmethod
|
|
def _strip_namespace(tag: str) -> str:
|
|
if "}" in tag:
|
|
return tag.split("}", 1)[1]
|
|
return tag
|
|
|
|
def _find_first_text(self, root: ET.Element, local_names: list[str]) -> str:
|
|
wanted = {name.lower() for name in local_names}
|
|
for element in root.iter():
|
|
if self._strip_namespace(element.tag).lower() in wanted:
|
|
text = self._clean_text(element.text)
|
|
if text:
|
|
return text
|
|
return ""
|
|
|
|
def _find_first_child_text(self, element: ET.Element, local_names: list[str]) -> str:
|
|
wanted = {name.lower() for name in local_names}
|
|
for node in element.iter():
|
|
if self._strip_namespace(node.tag).lower() in wanted:
|
|
text = self._clean_text(node.text)
|
|
if text:
|
|
return text
|
|
return ""
|
|
|
|
def _find_direct_child(self, element: ET.Element, local_names: list[str]) -> ET.Element | None:
|
|
wanted = {name.lower() for name in local_names}
|
|
for child in list(element):
|
|
if self._strip_namespace(child.tag).lower() in wanted:
|
|
return child
|
|
return None
|
|
|
|
def _find_party_nodes(self, root: ET.Element) -> list[ET.Element]:
|
|
result: list[ET.Element] = []
|
|
wanted = {
|
|
"podmiot1",
|
|
"podmiot2",
|
|
"sprzedawca",
|
|
"nabywca",
|
|
"sprzedawca1",
|
|
"nabywca1",
|
|
}
|
|
|
|
for element in root.iter():
|
|
local_name = self._strip_namespace(element.tag).lower()
|
|
if local_name in wanted:
|
|
result.append(element)
|
|
|
|
return result
|
|
|
|
def _parse_adres_node(self, address_node: ET.Element | None) -> dict[str, str]:
|
|
result = {
|
|
"street": "",
|
|
"city": "",
|
|
"postal_code": "",
|
|
"country": "",
|
|
"address": "",
|
|
}
|
|
if address_node is None:
|
|
return result
|
|
|
|
def get_text(names: list[str]) -> str:
|
|
return self._find_first_child_text(address_node, names)
|
|
|
|
def split_adres_l2(raw: str) -> tuple[str, str]:
|
|
cleaned = self._clean_text(raw)
|
|
if not cleaned:
|
|
return "", ""
|
|
match = re.match(r"^(\d{2}-\d{3})\s+(.+)$", cleaned)
|
|
if match:
|
|
return match.group(1), self._clean_text(match.group(2))
|
|
return "", cleaned
|
|
|
|
street = get_text(["Ulica", "AdresL1"])
|
|
house_no = get_text(["NrDomu"])
|
|
apartment_no = get_text(["NrLokalu"])
|
|
city = get_text(["Miejscowosc"])
|
|
postal_code = get_text(["KodPocztowy"])
|
|
country = get_text(["Kraj", "KodKraju"])
|
|
adres_l2 = get_text(["AdresL2"])
|
|
|
|
if adres_l2:
|
|
parsed_postal_code, parsed_city = split_adres_l2(adres_l2)
|
|
postal_code = postal_code or parsed_postal_code
|
|
city = city or parsed_city
|
|
|
|
street_line = self._clean_text(street)
|
|
if not street_line:
|
|
street_parts = [part for part in [street, house_no] if part]
|
|
street_line = " ".join(street_parts).strip()
|
|
if apartment_no:
|
|
street_line = f"{street_line}/{apartment_no}" if street_line else apartment_no
|
|
|
|
address_parts = [part for part in [street_line, postal_code, city, country] if part]
|
|
full_address = ", ".join(address_parts)
|
|
|
|
result["street"] = street_line
|
|
result["city"] = city
|
|
result["postal_code"] = postal_code
|
|
result["country"] = country
|
|
result["address"] = full_address
|
|
return result
|
|
|
|
def _parse_party_element(self, element: ET.Element) -> dict[str, str]:
|
|
name = ""
|
|
nip = ""
|
|
|
|
identity_node = self._find_direct_child(element, ["DaneIdentyfikacyjne"])
|
|
if identity_node is not None:
|
|
name = self._find_first_child_text(identity_node, ["PelnaNazwa", "Nazwa", "NazwaPodmiotu"])
|
|
nip = self._digits_only(self._find_first_child_text(identity_node, ["NIP", "NrIdentyfikacjiPodatkowej"]))
|
|
|
|
if not name:
|
|
name = self._find_first_child_text(element, ["PelnaNazwa", "Nazwa", "NazwaPodmiotu"])
|
|
if not nip:
|
|
nip = self._digits_only(self._find_first_child_text(element, ["NIP", "NrIdentyfikacjiPodatkowej"]))
|
|
|
|
main_address_node = self._find_direct_child(element, ["Adres"])
|
|
correspondence_address_node = self._find_direct_child(element, ["AdresKoresp"])
|
|
|
|
main_address = self._parse_adres_node(main_address_node)
|
|
correspondence_address = self._parse_adres_node(correspondence_address_node)
|
|
|
|
chosen_address = main_address
|
|
if not chosen_address.get("address") and correspondence_address.get("address"):
|
|
chosen_address = correspondence_address
|
|
|
|
return {
|
|
"name": name,
|
|
"nip": nip,
|
|
"street": chosen_address.get("street", ""),
|
|
"city": chosen_address.get("city", ""),
|
|
"postal_code": chosen_address.get("postal_code", ""),
|
|
"country": chosen_address.get("country", ""),
|
|
"address": chosen_address.get("address", ""),
|
|
}
|
|
|
|
def _match_party_role_from_root(
|
|
self,
|
|
root: ET.Element,
|
|
contractor_nip: str = "",
|
|
contractor_name: str = "",
|
|
) -> dict[str, str]:
|
|
empty_result = {
|
|
"name": "",
|
|
"nip": "",
|
|
"street": "",
|
|
"city": "",
|
|
"postal_code": "",
|
|
"country": "",
|
|
"address": "",
|
|
}
|
|
|
|
normalized_tax_id = self._digits_only(self.tax_id)
|
|
normalized_contractor_nip = self._digits_only(contractor_nip)
|
|
normalized_contractor_name = self._clean_text(contractor_name).lower()
|
|
|
|
podmiot1 = None
|
|
podmiot2 = None
|
|
for node in self._find_party_nodes(root):
|
|
local_name = self._strip_namespace(node.tag).lower()
|
|
if local_name == "podmiot1" and podmiot1 is None:
|
|
podmiot1 = self._parse_party_element(node)
|
|
elif local_name == "podmiot2" and podmiot2 is None:
|
|
podmiot2 = self._parse_party_element(node)
|
|
|
|
if podmiot1 or podmiot2:
|
|
if normalized_tax_id:
|
|
if podmiot1 and self._digits_only(podmiot1.get("nip")) == normalized_tax_id:
|
|
return podmiot2 or empty_result
|
|
if podmiot2 and self._digits_only(podmiot2.get("nip")) == normalized_tax_id:
|
|
return podmiot1 or empty_result
|
|
|
|
if normalized_contractor_nip:
|
|
if podmiot1 and self._digits_only(podmiot1.get("nip")) == normalized_contractor_nip:
|
|
return podmiot1
|
|
if podmiot2 and self._digits_only(podmiot2.get("nip")) == normalized_contractor_nip:
|
|
return podmiot2
|
|
|
|
if normalized_contractor_name:
|
|
if podmiot1 and self._clean_text(podmiot1.get("name")).lower() == normalized_contractor_name:
|
|
return podmiot1
|
|
if podmiot2 and self._clean_text(podmiot2.get("name")).lower() == normalized_contractor_name:
|
|
return podmiot2
|
|
|
|
return podmiot2 or podmiot1 or empty_result
|
|
|
|
parties = [self._parse_party_element(node) for node in self._find_party_nodes(root)]
|
|
parties = [party for party in parties if any(party.values())]
|
|
|
|
if normalized_contractor_nip:
|
|
for party in parties:
|
|
if self._digits_only(party.get("nip")) == normalized_contractor_nip:
|
|
return party
|
|
|
|
if normalized_contractor_name:
|
|
for party in parties:
|
|
party_name = self._clean_text(party.get("name")).lower()
|
|
if party_name and party_name == normalized_contractor_name:
|
|
return party
|
|
|
|
return parties[0] if parties else empty_result
|
|
|
|
def _extract_party_details_from_xml(
|
|
self,
|
|
xml_content: str,
|
|
contractor_nip: str = "",
|
|
contractor_name: str = "",
|
|
) -> dict[str, str]:
|
|
empty_result = {
|
|
"name": "",
|
|
"nip": "",
|
|
"street": "",
|
|
"city": "",
|
|
"postal_code": "",
|
|
"country": "",
|
|
"address": "",
|
|
}
|
|
|
|
if not xml_content:
|
|
return empty_result
|
|
|
|
try:
|
|
root = ET.fromstring(xml_content)
|
|
except Exception:
|
|
return empty_result
|
|
|
|
matched_party = self._match_party_role_from_root(
|
|
root=root,
|
|
contractor_nip=contractor_nip,
|
|
contractor_name=contractor_name,
|
|
)
|
|
if matched_party and any(matched_party.values()):
|
|
return matched_party
|
|
|
|
street = self._find_first_text(root, ["Ulica", "AdresL1"])
|
|
house_no = self._find_first_text(root, ["NrDomu"])
|
|
apartment_no = self._find_first_text(root, ["NrLokalu"])
|
|
city = self._find_first_text(root, ["Miejscowosc"])
|
|
postal_code = self._find_first_text(root, ["KodPocztowy"])
|
|
country = self._find_first_text(root, ["Kraj", "KodKraju"])
|
|
adres_l2 = self._find_first_text(root, ["AdresL2"])
|
|
|
|
if adres_l2 and (not postal_code or not city):
|
|
match = re.match(r"^(\d{2}-\d{3})\s+(.+)$", self._clean_text(adres_l2))
|
|
if match:
|
|
postal_code = postal_code or match.group(1)
|
|
city = city or self._clean_text(match.group(2))
|
|
elif not city:
|
|
city = self._clean_text(adres_l2)
|
|
|
|
street_parts = [part for part in [street, house_no] if part]
|
|
street_line = " ".join(street_parts).strip()
|
|
if apartment_no:
|
|
street_line = f"{street_line}/{apartment_no}" if street_line else apartment_no
|
|
|
|
address_parts = [part for part in [street_line, postal_code, city, country] if part]
|
|
address = ", ".join(address_parts)
|
|
|
|
return {
|
|
"name": "",
|
|
"nip": "",
|
|
"street": street_line,
|
|
"city": city,
|
|
"postal_code": postal_code,
|
|
"country": country,
|
|
"address": address,
|
|
}
|
|
|
|
def _enrich_metadata_with_xml_party_data(
|
|
self,
|
|
item: dict[str, Any],
|
|
xml_content: str,
|
|
contractor_name: str,
|
|
contractor_nip: str,
|
|
) -> dict[str, Any]:
|
|
metadata = dict(item or {})
|
|
party = self._extract_party_details_from_xml(
|
|
xml_content=xml_content,
|
|
contractor_nip=contractor_nip,
|
|
contractor_name=contractor_name,
|
|
)
|
|
|
|
metadata.pop("contractor_regon", None)
|
|
metadata["contractor_street"] = party.get("street", "")
|
|
metadata["contractor_city"] = party.get("city", "")
|
|
metadata["contractor_postal_code"] = party.get("postal_code", "")
|
|
metadata["contractor_country"] = party.get("country", "")
|
|
metadata["contractor_address"] = party.get("address", "")
|
|
|
|
if party.get("name") and contractor_name == "Brak":
|
|
metadata["contractor_name_from_xml"] = party.get("name", "")
|
|
|
|
if party.get("nip") and not contractor_nip:
|
|
metadata["contractor_nip_from_xml"] = party.get("nip", "")
|
|
|
|
return metadata
|
|
|
|
def list_documents(self, since=None):
|
|
now = datetime.utcnow()
|
|
date_from = self._ensure_naive_utc(since) if since else (now - timedelta(days=30))
|
|
|
|
request_body = {
|
|
"subjectType": "Subject2",
|
|
"dateRange": {
|
|
"dateType": "PermanentStorage",
|
|
"from": date_from.isoformat(),
|
|
"to": now.isoformat(),
|
|
},
|
|
"pageOffset": 0,
|
|
"pageSize": 100,
|
|
}
|
|
|
|
data = self._request("POST", "/invoices/query/metadata", json=request_body)
|
|
invoices = data.get("invoices", [])
|
|
documents = []
|
|
|
|
for item in invoices:
|
|
issue_date_raw = item.get("issueDate") or now.date().isoformat()
|
|
issue_date = date.fromisoformat(issue_date_raw)
|
|
|
|
fetched_at = (
|
|
self._parse_datetime(item.get("acquisitionDate"))
|
|
or self._parse_datetime(item.get("permanentStorageDate"))
|
|
or datetime.utcnow().replace(tzinfo=timezone.utc)
|
|
)
|
|
fetched_at = fetched_at.astimezone(timezone.utc).replace(tzinfo=None)
|
|
|
|
contractor_name, contractor_nip = self._pick_contractor(item)
|
|
xml_content = ""
|
|
ksef_number = item["ksefNumber"]
|
|
|
|
try:
|
|
xml_content = self._request(
|
|
"GET",
|
|
f"/invoices/ksef/{ksef_number}",
|
|
accept="application/xml",
|
|
)
|
|
except Exception:
|
|
xml_content = "<Invoice/>"
|
|
|
|
enriched_metadata = self._enrich_metadata_with_xml_party_data(
|
|
item=item,
|
|
xml_content=xml_content,
|
|
contractor_name=contractor_name,
|
|
contractor_nip=contractor_nip,
|
|
)
|
|
|
|
if enriched_metadata.get("contractor_name_from_xml") and contractor_name == "Brak":
|
|
contractor_name = enriched_metadata["contractor_name_from_xml"]
|
|
|
|
if enriched_metadata.get("contractor_nip_from_xml") and not contractor_nip:
|
|
contractor_nip = enriched_metadata["contractor_nip_from_xml"]
|
|
|
|
documents.append(
|
|
KSeFDocument(
|
|
ksef_number=ksef_number,
|
|
invoice_number=item.get("invoiceNumber", ksef_number),
|
|
contractor_name=contractor_name,
|
|
contractor_nip=contractor_nip,
|
|
issue_date=issue_date,
|
|
received_date=issue_date,
|
|
fetched_at=fetched_at,
|
|
net_amount=self._decimal_from_item(item, "netAmount", "invoiceNetAmount"),
|
|
vat_amount=self._decimal_from_item(item, "vatAmount", "invoiceVatAmount"),
|
|
gross_amount=self._decimal_from_item(item, "grossAmount", "invoiceGrossAmount"),
|
|
invoice_type="purchase",
|
|
xml_content=xml_content,
|
|
metadata=enriched_metadata,
|
|
)
|
|
)
|
|
return documents
|
|
|
|
def issue_invoice(self, payload: dict):
|
|
xml_content = (payload or {}).get("xml_content")
|
|
if not xml_content:
|
|
raise RuntimeError("Brak xml_content w payloadzie wysyłki do KSeF.")
|
|
|
|
schema_version = str((payload or {}).get("schemaVersion") or "FA(3)")
|
|
xml_bytes = self._normalize_xml_bytes(xml_content)
|
|
encryption_data = self._build_online_encryption_data()
|
|
encrypted_invoice = self._encrypt_invoice_bytes(
|
|
xml_bytes,
|
|
encryption_data["symmetric_key"],
|
|
encryption_data["iv"],
|
|
)
|
|
|
|
session_response = self._open_online_session(schema_version=schema_version, encryption_data=encryption_data)
|
|
session_reference = self._first_present(session_response, "referenceNumber", "sessionReferenceNumber")
|
|
if not session_reference:
|
|
raise RuntimeError("KSeF nie zwrócił numeru referencyjnego sesji interaktywnej.")
|
|
|
|
send_payload = {
|
|
"invoiceHash": self._sha256_base64(xml_bytes),
|
|
"invoiceSize": len(xml_bytes),
|
|
"encryptedDocumentHash": self._sha256_base64(encrypted_invoice),
|
|
"encryptedDocumentSize": len(encrypted_invoice),
|
|
"encryptedDocumentContent": base64.b64encode(encrypted_invoice).decode("ascii"),
|
|
}
|
|
send_response = self._request("POST", f"/sessions/online/{session_reference}/invoices", json=send_payload)
|
|
invoice_reference = self._first_present(send_response, "referenceNumber", "invoiceReferenceNumber")
|
|
if not invoice_reference:
|
|
raise RuntimeError("KSeF nie zwrócił numeru referencyjnego przesłanej faktury.")
|
|
|
|
close_error = None
|
|
try:
|
|
self._request("POST", f"/sessions/online/{session_reference}/close")
|
|
except Exception as exc:
|
|
close_error = exc
|
|
|
|
status_payload = self._poll_invoice_processing(session_reference, invoice_reference)
|
|
code = self._extract_status_code(status_payload)
|
|
ksef_number = self._first_present(
|
|
status_payload,
|
|
"ksefNumber",
|
|
"ksefReferenceNumber",
|
|
"invoiceKsefNumber",
|
|
)
|
|
if not ksef_number and isinstance(status_payload.get("invoice"), dict):
|
|
ksef_number = self._first_present(status_payload.get("invoice"), "ksefNumber", "ksefReferenceNumber")
|
|
|
|
upo_xml = self._fetch_invoice_upo(session_reference, invoice_reference, ksef_number=str(ksef_number) if ksef_number else None)
|
|
message = "Wysłano fakturę do KSeF."
|
|
if code == 200 and ksef_number:
|
|
message = f"Faktura została przyjęta przez KSeF. Numer KSeF: {ksef_number}."
|
|
elif code and code < 300:
|
|
message = "Faktura została wysłana do KSeF i oczekuje na końcowe przetworzenie."
|
|
if close_error:
|
|
message += f" Sesja została zamknięta z ostrzeżeniem: {close_error}"
|
|
|
|
return {
|
|
"ksef_number": ksef_number or f"PENDING/{session_reference}/{invoice_reference}",
|
|
"status": "issued" if code == 200 and ksef_number else "queued",
|
|
"message": message,
|
|
"xml_content": xml_content,
|
|
"session_reference_number": session_reference,
|
|
"invoice_reference_number": invoice_reference,
|
|
"status_payload": status_payload,
|
|
"upo_xml": upo_xml,
|
|
}
|
|
|
|
def ping(self):
|
|
try:
|
|
self._request("GET", "/auth/sessions", params={"pageSize": 10})
|
|
return {"status": "ok", "message": "API KSeF odpowiada, a uwierzytelnienie działa poprawnie."}
|
|
except Exception as exc:
|
|
return {"status": "error", "message": str(exc)}
|
|
|
|
def diagnostics(self):
|
|
try:
|
|
access_token_state = "valid" if self._access_token_is_valid() else "missing_or_expired"
|
|
refresh_token_state = "valid" if self._refresh_token_is_valid() else "missing_or_expired"
|
|
|
|
sessions = self._request("GET", "/auth/sessions", params={"pageSize": 10})
|
|
sample = sessions if isinstance(sessions, dict) else {"sessions": sessions}
|
|
|
|
return {
|
|
"status": "ok",
|
|
"message": "API KSeF odpowiada.",
|
|
"base_url": self.base_url,
|
|
"auth_mode": self.auth_mode,
|
|
"tax_id": self.tax_id,
|
|
"token_configured": bool(self.token),
|
|
"certificate_configured": bool(self.certificate_data),
|
|
"access_token_state": access_token_state,
|
|
"refresh_token_state": refresh_token_state,
|
|
"sample": sample,
|
|
}
|
|
except Exception as exc:
|
|
return {
|
|
"status": "error",
|
|
"message": str(exc),
|
|
"base_url": self.base_url,
|
|
"auth_mode": self.auth_mode,
|
|
"tax_id": self.tax_id,
|
|
"token_configured": bool(self.token),
|
|
"certificate_configured": bool(self.certificate_data),
|
|
"sample": {"error": str(exc)},
|
|
}
|
|
|
|
|
|
class KSeFService:
|
|
def __init__(self, company_id=None):
|
|
use_mock = SettingsService.get("ksef.mock_mode", "true", company_id=company_id) == "true"
|
|
self.adapter = MockKSeFAdapter(company_id=company_id) if use_mock else RequestsKSeFAdapter(company_id=company_id)
|
|
|
|
def list_documents(self, since=None):
|
|
return self.adapter.list_documents(since=since)
|
|
|
|
def issue_invoice(self, payload: dict):
|
|
return self.adapter.issue_invoice(payload)
|
|
|
|
def ping(self):
|
|
return self.adapter.ping()
|
|
|
|
def diagnostics(self):
|
|
return self.adapter.diagnostics()
|
|
|
|
@staticmethod
|
|
def calc_hash(xml_content: str) -> str:
|
|
return hashlib.sha256(xml_content.encode("utf-8")).hexdigest() |