from __future__ import annotations import json import re from typing import Any import requests from app.models.setting import AppSetting class CeidgService: DEFAULT_TIMEOUT = 12 API_URLS = { 'production': 'https://dane.biznes.gov.pl/api/ceidg/v3/firmy', 'test': 'https://test-dane.biznes.gov.pl/api/ceidg/v3/firmy', } @staticmethod def _digits(value: str | None) -> str: return re.sub(r'\D+', '', value or '') @staticmethod def _clean_text(value: str | None) -> str: if value is None: return '' value = re.sub(r'\s+', ' ', str(value)) return value.strip(' ,;:-') @staticmethod def _normalize_empty(value: Any) -> Any: if value is None: return '' if isinstance(value, dict): return {key: CeidgService._normalize_empty(item) for key, item in value.items()} if isinstance(value, list): return [CeidgService._normalize_empty(item) for item in value] return value @classmethod def get_environment(cls) -> str: environment = AppSetting.get('ceidg.environment', 'production') return environment if environment in cls.API_URLS else 'production' @classmethod def get_api_url(cls, environment: str | None = None) -> str: env = environment or cls.get_environment() return cls.API_URLS.get(env, cls.API_URLS['production']) @staticmethod def get_api_key() -> str: return (AppSetting.get('ceidg.api_key', '', decrypt=True) or '').strip() @classmethod def has_api_key(cls) -> bool: return bool(cls.get_api_key()) @classmethod def _headers(cls) -> dict[str, str]: headers = {'Accept': 'application/json', 'User-Agent': 'KSeF Manager/1.0'} api_key = cls.get_api_key() if api_key: token = api_key.strip() headers['Authorization'] = token if token.lower().startswith('bearer ') else f'Bearer {token}' return headers @staticmethod def _safe_payload(response: requests.Response): try: return CeidgService._normalize_empty(response.json()) except Exception: return response.text[:1000] def fetch_company(self, identifier: str | None = None, *, nip: str | None = None) -> dict: nip = self._digits(nip or identifier) if len(nip) != 10: return {'ok': False, 'message': 'Podaj poprawny 10-cyfrowy NIP.'} if not self.has_api_key(): return { 'ok': False, 'message': 'Brak API KEY do CEIDG. Uzupełnij klucz w panelu admina.', 'fallback': {'tax_id': nip}, } environment = self.get_environment() api_url = self.get_api_url(environment) try: response = requests.get(api_url, headers=self._headers(), params={'nip': nip}, timeout=self.DEFAULT_TIMEOUT) if response.status_code in {401, 403}: return { 'ok': False, 'message': 'CEIDG odrzucił token. Sprawdź klucz w panelu admina.', 'error': response.text[:500], 'debug': { 'environment': environment, 'url': api_url, 'auth_prefix': 'Bearer' if self._headers().get('Authorization', '').startswith('Bearer ') else 'raw', 'token_len': len(self.get_api_key()), }, 'fallback': {'tax_id': nip}, } if response.status_code == 404: return {'ok': False, 'message': 'Nie znaleziono firmy o podanym NIP w CEIDG.', 'fallback': {'tax_id': nip}} response.raise_for_status() parsed = self._parse_payload(response.text, nip) if parsed: parsed['ok'] = True parsed['source_url'] = api_url parsed['environment'] = environment parsed['message'] = 'Pobrano dane z API CEIDG.' return parsed return { 'ok': False, 'message': 'Nie znaleziono firmy w CEIDG. Podmiot może być zarejestrowany w KRS.', 'sample': self._safe_payload(response), 'fallback': {'tax_id': nip}, } except requests.exceptions.Timeout as exc: error = f'Timeout: {exc}' except requests.exceptions.RequestException as exc: error = str(exc) return { 'ok': False, 'message': 'Nie udało się pobrać danych z API CEIDG. Sprawdź konfigurację połączenia i API KEY w panelu admina.', 'error': error, 'fallback': {'tax_id': nip}, } def diagnostics(self) -> dict: environment = self.get_environment() api_url = self.get_api_url(environment) if not self.has_api_key(): return { 'status': 'error', 'message': 'Brak API KEY do CEIDG.', 'environment': environment, 'url': api_url, 'sample': {'error': 'missing_api_key'}, 'technical_details': None, 'token_length': 0, } try: headers = self._headers() response = requests.get(api_url, headers=headers, params={'nip': '3563457932'}, timeout=self.DEFAULT_TIMEOUT) payload = self._safe_payload(response) status = 'ok' if response.status_code not in {401, 403} and response.status_code < 500 else 'error' technical_details = None if response.status_code in {401, 403}: technical_details = 'Autoryzacja odrzucona przez CEIDG.' elif response.status_code in {400, 404, 422}: technical_details = 'Połączenie działa, ale zapytanie diagnostyczne nie zwróciło danych testowych.' return { 'status': status, 'message': f'HTTP {response.status_code}', 'environment': environment, 'url': api_url, 'sample': payload, 'technical_details': technical_details, 'token_length': len(self.get_api_key()), 'authorization_preview': headers.get('Authorization', '')[:20], } except Exception as exc: message = f'Timeout: {exc}' if isinstance(exc, requests.exceptions.Timeout) else str(exc) return { 'status': 'error', 'message': message, 'environment': environment, 'url': api_url, 'sample': {'error': str(exc)}, 'technical_details': None, 'token_length': len(self.get_api_key()), } def _parse_payload(self, payload: str, nip: str) -> dict | None: try: obj = json.loads(payload) except Exception: return None found = self._walk_candidate(obj, nip) if not found: return None found = self._normalize_empty(found) found['tax_id'] = nip found['name'] = self._clean_text(found.get('name')) found['regon'] = self._digits(found.get('regon')) found['address'] = self._clean_text(found.get('address')) found['phone'] = self._clean_text(found.get('phone')) found['email'] = self._clean_text(found.get('email')) if not found['phone']: found.pop('phone', None) if not found['email']: found.pop('email', None) return found if found['name'] else None def _walk_candidate(self, candidate: Any, nip: str) -> dict | None: if isinstance(candidate, dict): candidate_nip = self._extract_nip(candidate) if candidate_nip == nip: return { 'name': self._extract_name(candidate), 'regon': self._extract_regon(candidate), 'address': self._compose_address(candidate), 'phone': self._extract_phone(candidate), 'email': self._extract_email(candidate), } for value in candidate.values(): nested = self._walk_candidate(value, nip) if nested: return nested elif isinstance(candidate, list): for item in candidate: nested = self._walk_candidate(item, nip) if nested: return nested return None def _extract_nip(self, candidate: dict) -> str: owner = candidate.get('wlasciciel') or candidate.get('owner') or {} values = [ candidate.get('nip'), candidate.get('Nip'), candidate.get('taxId'), candidate.get('tax_id'), candidate.get('NIP'), owner.get('nip') if isinstance(owner, dict) else '', ] for value in values: digits = self._digits(value) if digits: return digits return '' def _extract_name(self, candidate: dict) -> str: owner = candidate.get('wlasciciel') or candidate.get('owner') or {} values = [ candidate.get('firma'), candidate.get('nazwa'), candidate.get('name'), candidate.get('nazwaFirmy'), candidate.get('przedsiebiorca'), candidate.get('entrepreneurName'), owner.get('nazwa') if isinstance(owner, dict) else '', owner.get('nazwaSkrocona') if isinstance(owner, dict) else '', owner.get('imieNazwisko') if isinstance(owner, dict) else '', ] for value in values: cleaned = self._clean_text(value) if cleaned: return cleaned return '' def _extract_regon(self, candidate: dict) -> str: owner = candidate.get('wlasciciel') or candidate.get('owner') or {} values = [ candidate.get('regon'), candidate.get('REGON'), candidate.get('regon9'), owner.get('regon') if isinstance(owner, dict) else '', ] for value in values: digits = self._digits(value) if digits: return digits return '' def _extract_phone(self, candidate: dict) -> str: owner = candidate.get('wlasciciel') or candidate.get('owner') or {} contact = candidate.get('kontakt') or candidate.get('contact') or {} values = [ candidate.get('telefon'), candidate.get('telefonKontaktowy'), candidate.get('phone'), candidate.get('phoneNumber'), contact.get('telefon') if isinstance(contact, dict) else '', contact.get('phone') if isinstance(contact, dict) else '', owner.get('telefon') if isinstance(owner, dict) else '', owner.get('phone') if isinstance(owner, dict) else '', ] for value in values: cleaned = self._clean_text(value) if cleaned: return cleaned return '' def _extract_email(self, candidate: dict) -> str: owner = candidate.get('wlasciciel') or candidate.get('owner') or {} contact = candidate.get('kontakt') or candidate.get('contact') or {} values = [ candidate.get('email'), candidate.get('adresEmail'), candidate.get('emailAddress'), contact.get('email') if isinstance(contact, dict) else '', owner.get('email') if isinstance(owner, dict) else '', ] for value in values: cleaned = self._clean_text(value) if cleaned: return cleaned return '' def _compose_address(self, candidate: dict) -> str: address = ( candidate.get('adresDzialalnosci') or candidate.get('adresGlownegoMiejscaWykonywaniaDzialalnosci') or candidate.get('adres') or candidate.get('address') or {} ) if isinstance(address, str): return self._clean_text(address) if isinstance(address, dict): street = self._clean_text(address.get('ulica') or address.get('street')) building = self._clean_text(address.get('budynek') or address.get('numerNieruchomosci') or address.get('buildingNumber')) apartment = self._clean_text(address.get('lokal') or address.get('numerLokalu') or address.get('apartmentNumber')) postal_code = self._clean_text(address.get('kod') or address.get('kodPocztowy') or address.get('postalCode')) city = self._clean_text(address.get('miasto') or address.get('miejscowosc') or address.get('city')) parts = [] street_part = ' '.join([part for part in [street, building] if part]).strip() if apartment: street_part = f'{street_part}/{apartment}' if street_part else apartment if street_part: parts.append(street_part) city_part = ' '.join([part for part in [postal_code, city] if part]).strip() if city_part: parts.append(city_part) return ', '.join(parts) values = [ candidate.get('ulica'), candidate.get('numerNieruchomosci'), candidate.get('numerLokalu'), candidate.get('kodPocztowy'), candidate.get('miejscowosc'), ] return ', '.join([self._clean_text(v) for v in values if self._clean_text(v)])