This commit is contained in:
Mateusz Gruszczyński
2026-03-13 11:03:13 +01:00
commit 35571df778
132 changed files with 11197 additions and 0 deletions

View File

@@ -0,0 +1,334 @@
from __future__ import annotations
import json
import re
from typing import Any
import requests
from app.models.setting import AppSetting
class CeidgService:
DEFAULT_TIMEOUT = 12
API_URLS = {
'production': 'https://dane.biznes.gov.pl/api/ceidg/v3/firmy',
'test': 'https://test-dane.biznes.gov.pl/api/ceidg/v3/firmy',
}
@staticmethod
def _digits(value: str | None) -> str:
return re.sub(r'\D+', '', value or '')
@staticmethod
def _clean_text(value: str | None) -> str:
if value is None:
return ''
value = re.sub(r'\s+', ' ', str(value))
return value.strip(' ,;:-')
@staticmethod
def _normalize_empty(value: Any) -> Any:
if value is None:
return ''
if isinstance(value, dict):
return {key: CeidgService._normalize_empty(item) for key, item in value.items()}
if isinstance(value, list):
return [CeidgService._normalize_empty(item) for item in value]
return value
@classmethod
def get_environment(cls) -> str:
environment = AppSetting.get('ceidg.environment', 'production')
return environment if environment in cls.API_URLS else 'production'
@classmethod
def get_api_url(cls, environment: str | None = None) -> str:
env = environment or cls.get_environment()
return cls.API_URLS.get(env, cls.API_URLS['production'])
@staticmethod
def get_api_key() -> str:
return (AppSetting.get('ceidg.api_key', '', decrypt=True) or '').strip()
@classmethod
def has_api_key(cls) -> bool:
return bool(cls.get_api_key())
@classmethod
def _headers(cls) -> dict[str, str]:
headers = {'Accept': 'application/json', 'User-Agent': 'KSeF Manager/1.0'}
api_key = cls.get_api_key()
if api_key:
token = api_key.strip()
headers['Authorization'] = token if token.lower().startswith('bearer ') else f'Bearer {token}'
return headers
@staticmethod
def _safe_payload(response: requests.Response):
try:
return CeidgService._normalize_empty(response.json())
except Exception:
return response.text[:1000]
def fetch_company(self, identifier: str | None = None, *, nip: str | None = None) -> dict:
nip = self._digits(nip or identifier)
if len(nip) != 10:
return {'ok': False, 'message': 'Podaj poprawny 10-cyfrowy NIP.'}
if not self.has_api_key():
return {
'ok': False,
'message': 'Brak API KEY do CEIDG. Uzupełnij klucz w panelu admina.',
'fallback': {'tax_id': nip},
}
environment = self.get_environment()
api_url = self.get_api_url(environment)
try:
response = requests.get(api_url, headers=self._headers(), params={'nip': nip}, timeout=self.DEFAULT_TIMEOUT)
if response.status_code in {401, 403}:
return {
'ok': False,
'message': 'CEIDG odrzucił token. Sprawdź klucz w panelu admina.',
'error': response.text[:500],
'debug': {
'environment': environment,
'url': api_url,
'auth_prefix': 'Bearer' if self._headers().get('Authorization', '').startswith('Bearer ') else 'raw',
'token_len': len(self.get_api_key()),
},
'fallback': {'tax_id': nip},
}
if response.status_code == 404:
return {'ok': False, 'message': 'Nie znaleziono firmy o podanym NIP w CEIDG.', 'fallback': {'tax_id': nip}}
response.raise_for_status()
parsed = self._parse_payload(response.text, nip)
if parsed:
parsed['ok'] = True
parsed['source_url'] = api_url
parsed['environment'] = environment
parsed['message'] = 'Pobrano dane z API CEIDG.'
return parsed
return {
'ok': False,
'message': 'Nie znaleziono firmy w CEIDG. Podmiot może być zarejestrowany w KRS.',
'sample': self._safe_payload(response),
'fallback': {'tax_id': nip},
}
except requests.exceptions.Timeout as exc:
error = f'Timeout: {exc}'
except requests.exceptions.RequestException as exc:
error = str(exc)
return {
'ok': False,
'message': 'Nie udało się pobrać danych z API CEIDG. Sprawdź konfigurację połączenia i API KEY w panelu admina.',
'error': error,
'fallback': {'tax_id': nip},
}
def diagnostics(self) -> dict:
environment = self.get_environment()
api_url = self.get_api_url(environment)
if not self.has_api_key():
return {
'status': 'error',
'message': 'Brak API KEY do CEIDG.',
'environment': environment,
'url': api_url,
'sample': {'error': 'missing_api_key'},
'technical_details': None,
'token_length': 0,
}
try:
headers = self._headers()
response = requests.get(api_url, headers=headers, params={'nip': '3563457932'}, timeout=self.DEFAULT_TIMEOUT)
payload = self._safe_payload(response)
status = 'ok' if response.status_code not in {401, 403} and response.status_code < 500 else 'error'
technical_details = None
if response.status_code in {401, 403}:
technical_details = 'Autoryzacja odrzucona przez CEIDG.'
elif response.status_code in {400, 404, 422}:
technical_details = 'Połączenie działa, ale zapytanie diagnostyczne nie zwróciło danych testowych.'
return {
'status': status,
'message': f'HTTP {response.status_code}',
'environment': environment,
'url': api_url,
'sample': payload,
'technical_details': technical_details,
'token_length': len(self.get_api_key()),
'authorization_preview': headers.get('Authorization', '')[:20],
}
except Exception as exc:
message = f'Timeout: {exc}' if isinstance(exc, requests.exceptions.Timeout) else str(exc)
return {
'status': 'error',
'message': message,
'environment': environment,
'url': api_url,
'sample': {'error': str(exc)},
'technical_details': None,
'token_length': len(self.get_api_key()),
}
def _parse_payload(self, payload: str, nip: str) -> dict | None:
try:
obj = json.loads(payload)
except Exception:
return None
found = self._walk_candidate(obj, nip)
if not found:
return None
found = self._normalize_empty(found)
found['tax_id'] = nip
found['name'] = self._clean_text(found.get('name'))
found['regon'] = self._digits(found.get('regon'))
found['address'] = self._clean_text(found.get('address'))
found['phone'] = self._clean_text(found.get('phone'))
found['email'] = self._clean_text(found.get('email'))
if not found['phone']:
found.pop('phone', None)
if not found['email']:
found.pop('email', None)
return found if found['name'] else None
def _walk_candidate(self, candidate: Any, nip: str) -> dict | None:
if isinstance(candidate, dict):
candidate_nip = self._extract_nip(candidate)
if candidate_nip == nip:
return {
'name': self._extract_name(candidate),
'regon': self._extract_regon(candidate),
'address': self._compose_address(candidate),
'phone': self._extract_phone(candidate),
'email': self._extract_email(candidate),
}
for value in candidate.values():
nested = self._walk_candidate(value, nip)
if nested:
return nested
elif isinstance(candidate, list):
for item in candidate:
nested = self._walk_candidate(item, nip)
if nested:
return nested
return None
def _extract_nip(self, candidate: dict) -> str:
owner = candidate.get('wlasciciel') or candidate.get('owner') or {}
values = [
candidate.get('nip'),
candidate.get('Nip'),
candidate.get('taxId'),
candidate.get('tax_id'),
candidate.get('NIP'),
owner.get('nip') if isinstance(owner, dict) else '',
]
for value in values:
digits = self._digits(value)
if digits:
return digits
return ''
def _extract_name(self, candidate: dict) -> str:
owner = candidate.get('wlasciciel') or candidate.get('owner') or {}
values = [
candidate.get('firma'),
candidate.get('nazwa'),
candidate.get('name'),
candidate.get('nazwaFirmy'),
candidate.get('przedsiebiorca'),
candidate.get('entrepreneurName'),
owner.get('nazwa') if isinstance(owner, dict) else '',
owner.get('nazwaSkrocona') if isinstance(owner, dict) else '',
owner.get('imieNazwisko') if isinstance(owner, dict) else '',
]
for value in values:
cleaned = self._clean_text(value)
if cleaned:
return cleaned
return ''
def _extract_regon(self, candidate: dict) -> str:
owner = candidate.get('wlasciciel') or candidate.get('owner') or {}
values = [
candidate.get('regon'),
candidate.get('REGON'),
candidate.get('regon9'),
owner.get('regon') if isinstance(owner, dict) else '',
]
for value in values:
digits = self._digits(value)
if digits:
return digits
return ''
def _extract_phone(self, candidate: dict) -> str:
owner = candidate.get('wlasciciel') or candidate.get('owner') or {}
contact = candidate.get('kontakt') or candidate.get('contact') or {}
values = [
candidate.get('telefon'),
candidate.get('telefonKontaktowy'),
candidate.get('phone'),
candidate.get('phoneNumber'),
contact.get('telefon') if isinstance(contact, dict) else '',
contact.get('phone') if isinstance(contact, dict) else '',
owner.get('telefon') if isinstance(owner, dict) else '',
owner.get('phone') if isinstance(owner, dict) else '',
]
for value in values:
cleaned = self._clean_text(value)
if cleaned:
return cleaned
return ''
def _extract_email(self, candidate: dict) -> str:
owner = candidate.get('wlasciciel') or candidate.get('owner') or {}
contact = candidate.get('kontakt') or candidate.get('contact') or {}
values = [
candidate.get('email'),
candidate.get('adresEmail'),
candidate.get('emailAddress'),
contact.get('email') if isinstance(contact, dict) else '',
owner.get('email') if isinstance(owner, dict) else '',
]
for value in values:
cleaned = self._clean_text(value)
if cleaned:
return cleaned
return ''
def _compose_address(self, candidate: dict) -> str:
address = (
candidate.get('adresDzialalnosci')
or candidate.get('adresGlownegoMiejscaWykonywaniaDzialalnosci')
or candidate.get('adres')
or candidate.get('address')
or {}
)
if isinstance(address, str):
return self._clean_text(address)
if isinstance(address, dict):
street = self._clean_text(address.get('ulica') or address.get('street'))
building = self._clean_text(address.get('budynek') or address.get('numerNieruchomosci') or address.get('buildingNumber'))
apartment = self._clean_text(address.get('lokal') or address.get('numerLokalu') or address.get('apartmentNumber'))
postal_code = self._clean_text(address.get('kod') or address.get('kodPocztowy') or address.get('postalCode'))
city = self._clean_text(address.get('miasto') or address.get('miejscowosc') or address.get('city'))
parts = []
street_part = ' '.join([part for part in [street, building] if part]).strip()
if apartment:
street_part = f'{street_part}/{apartment}' if street_part else apartment
if street_part:
parts.append(street_part)
city_part = ' '.join([part for part in [postal_code, city] if part]).strip()
if city_part:
parts.append(city_part)
return ', '.join(parts)
values = [
candidate.get('ulica'),
candidate.get('numerNieruchomosci'),
candidate.get('numerLokalu'),
candidate.get('kodPocztowy'),
candidate.get('miejscowosc'),
]
return ', '.join([self._clean_text(v) for v in values if self._clean_text(v)])