mirror of
https://gitlab.rd.nic.fr/labs/frcrawler/siren.git
synced 2025-04-11 23:05:12 +02:00
227 lines
8 KiB
Python
227 lines
8 KiB
Python
#
|
|
# SPDX-FileCopyrightText: 2023 Afnic
|
|
#
|
|
# SPDX-License-Identifier: GPL-3.0-or-later
|
|
#
|
|
|
|
import re
|
|
import logging
|
|
from html.parser import HTMLParser
|
|
from urllib.parse import urlparse, urljoin
|
|
|
|
import luhn
|
|
import httpx
|
|
from bs4 import BeautifulSoup
|
|
|
|
from frcrawler.tasks import BaseTask
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class LinkExtracter(HTMLParser):
|
|
def __init__(self, base_url, allowed_schemes=('https', 'http')):
|
|
super().__init__()
|
|
|
|
self.links = []
|
|
self.base_url = base_url
|
|
self.base_url_parsed = urlparse(base_url)
|
|
self.allowed_schemes = allowed_schemes
|
|
|
|
def handle_starttag(self, tag, attrs):
|
|
if tag == 'a':
|
|
for attr in attrs:
|
|
if attr[0] == 'href':
|
|
link = attr[1]
|
|
self.links.append(link)
|
|
|
|
if tag == 'base':
|
|
for attr in attrs:
|
|
if attr[0] == 'href':
|
|
self.base_url = urljoin(self.base_url, attr[1])
|
|
|
|
def extract_links(self, body):
|
|
try:
|
|
self.feed(body)
|
|
except Exception as e:
|
|
logger.warning('Failed to parse HTML content while reading %s: %s: %s', self.base_url, type(e).__name__, e)
|
|
|
|
final_links_set = set()
|
|
final_links = []
|
|
|
|
for link in self.links:
|
|
try:
|
|
url = urlparse(link)
|
|
except ValueError as e:
|
|
logger.info('Parsing of link %s failed while reading %s: %s', link, self.base_url, e)
|
|
|
|
url._replace(fragment='')
|
|
link = url.geturl()
|
|
|
|
if not url.scheme:
|
|
link = self.remove_fragment(urljoin(self.base_url, link))
|
|
elif url.scheme in self.allowed_schemes and url.netloc == self.base_url_parsed.netloc:
|
|
link = self.remove_fragment(link)
|
|
else:
|
|
link = None
|
|
|
|
if link is not None and link not in final_links_set:
|
|
final_links.append(link)
|
|
final_links_set.add(link)
|
|
|
|
|
|
return final_links
|
|
|
|
@staticmethod
|
|
def remove_fragment(link):
|
|
url = urlparse(link)
|
|
return url._replace(fragment='').geturl()
|
|
|
|
|
|
class ExtractSiren(BaseTask):
|
|
def __init__(self, config):
|
|
self.useragent = config.get('useragent', None)
|
|
self.max_content_length = config.get('max_content_length', 5120000)
|
|
|
|
def do(self, data):
|
|
"""
|
|
:param data: dict, Un dictionnaire contenant le résultat d'un crawl,
|
|
les champs suivants sont requis:
|
|
* web: dictionnaire avec les champs suivants
|
|
* details: tableau de dictionnaire avec les champs suivants
|
|
* url: URL de la page
|
|
* is_redirect: Booléen indiquant si la réponse est une
|
|
redirection
|
|
* response: dictionnaire avec les champs suivants
|
|
* content: contenu texte de la page téléchargée
|
|
|
|
Retourne un dictionnaire avec les champs suivants
|
|
* siren: dictionnaire avec les champs suivants
|
|
* siren: le premier siren trouvé ou `None` si aucun candidat
|
|
n'a été trouvé
|
|
* input: la chaine candidate du siren non traitée ou `None` si
|
|
aucun candidat n'a été trouvé
|
|
* url: l'URL où le siren a été trouvé ou `None` si aucun
|
|
candidat n'a été trouvé
|
|
"""
|
|
best_siren = {
|
|
'siren': None,
|
|
'input': None,
|
|
'url': None
|
|
}
|
|
|
|
if 'web' in data:
|
|
terminal = [
|
|
details
|
|
for details in data['web']['details']
|
|
if not details['is_redirect']['redirected'] and 'content' in details['response']
|
|
]
|
|
if terminal:
|
|
try:
|
|
sirens = self.extract_sirens_from_pages(terminal[0])
|
|
if logger.isEnabledFor(logging.DEBUG):
|
|
sirens_list = list(sirens)
|
|
logger.debug('All candidates : %s', sirens_list)
|
|
best_siren = sirens_list[0]
|
|
else:
|
|
best_siren = next(sirens)
|
|
except (StopIteration, IndexError):
|
|
pass
|
|
|
|
return { 'siren': best_siren }
|
|
|
|
def extract_sirens_from_pages(self, data):
|
|
"""
|
|
:param data: dict, Un dictionnaire contenant le résultat d'un crawl,
|
|
les champs suivants sont requis:
|
|
* url: URL de la page
|
|
* is_redirect: Booléen indiquant si la réponse est une redirection
|
|
* response: dictionnaire avec les champs suivants
|
|
* content: contenu texte de la page téléchargée
|
|
|
|
Retourne un générateur de dictionnaire correspondant aux siren trouvés.
|
|
Le premier siren de la page est renvoyés si plusieurs valeurs sont
|
|
possibles.
|
|
"""
|
|
|
|
link_extracter = LinkExtracter(data['url'])
|
|
# \u00a0 -> Espace insécable
|
|
re_digits = re.compile('[€$£]?[ \u00a0]*\d[\d\-\. \u00a0]+\d[ \u00a0]*[€$£]?')
|
|
headers = {
|
|
'Accept-Language': 'fr_FR,en,q=0.5'
|
|
}
|
|
|
|
if self.useragent is not None:
|
|
headers['User-Agent'] = self.useragent
|
|
|
|
with httpx.Client(timeout=5, follow_redirects=True, http2=True) as client:
|
|
links = link_extracter.extract_links(data['response']['content'])
|
|
links.append(data['url'])
|
|
|
|
# Reverse link list to search for siren from the bottom up
|
|
links.reverse()
|
|
logger.debug('Links = %s', links)
|
|
|
|
for link in links:
|
|
logger.debug('Trying URL %s', link)
|
|
try:
|
|
with client.stream('GET', link, headers=headers) as res:
|
|
content_type = res.headers.get('content-type', default='application/octet-stream')
|
|
|
|
if 'text' not in content_type:
|
|
logger.debug('Binary resource, skipping')
|
|
continue
|
|
|
|
content = ''
|
|
# Truncate response if too long
|
|
for chunk in res.iter_text(self.max_content_length):
|
|
content = chunk
|
|
break
|
|
|
|
soup = BeautifulSoup(content, 'html.parser')
|
|
content = soup.get_text(' ')
|
|
|
|
candidates = re_digits.findall(content)
|
|
if '005 376' in candidates:
|
|
logger.debug('Text = %s', content)
|
|
logger.debug('Found %d candidates on page %s: %s', len(candidates), link, candidates)
|
|
sirens = list(filter(None, map(self.validate_siren, candidates)))
|
|
if sirens:
|
|
logger.debug('Filtered sirens: %s', sirens)
|
|
yield {
|
|
'url': link,
|
|
**sirens[0]
|
|
}
|
|
except httpx.HTTPError as e:
|
|
logger.debug('Received exception: %s', e)
|
|
continue
|
|
except Exception as e:
|
|
logger.exception('Received exception: %s', e)
|
|
continue
|
|
|
|
@classmethod
|
|
def validate_siren(cls, candidate):
|
|
if any(currency in candidate for currency in ('€', '$', '£')):
|
|
return None
|
|
|
|
|
|
candidate_num = re.sub('\\D', '', candidate)
|
|
|
|
if len(candidate_num) != 9 and len(candidate_num) != 14:
|
|
return None
|
|
|
|
nb_groups = len(list(filter(lambda x: x, re.split('[\-\. \u00a0]', re.sub('[€$£]', '', candidate)))))
|
|
|
|
# Probably a phone number
|
|
if len(candidate_num) == 9 and nb_groups >= 4:
|
|
return None
|
|
|
|
if not luhn.verify(candidate_num):
|
|
return None
|
|
|
|
candidate_num = candidate_num[:9]
|
|
|
|
if candidate_num != '000000000':
|
|
return {
|
|
'input': candidate,
|
|
'siren': candidate_num,
|
|
}
|