frcrawler-siren/frcrawler_siren/__init__.py
Gaël Berthaud-Müller 81bb10e475 add all files
2024-03-05 18:14:54 +01:00

227 lines
8 KiB
Python

#
# SPDX-FileCopyrightText: 2023 Afnic
#
# SPDX-License-Identifier: GPL-3.0-or-later
#
import re
import logging
from html.parser import HTMLParser
from urllib.parse import urlparse, urljoin
import luhn
import httpx
from bs4 import BeautifulSoup
from frcrawler.tasks import BaseTask
logger = logging.getLogger(__name__)
class LinkExtracter(HTMLParser):
def __init__(self, base_url, allowed_schemes=('https', 'http')):
super().__init__()
self.links = []
self.base_url = base_url
self.base_url_parsed = urlparse(base_url)
self.allowed_schemes = allowed_schemes
def handle_starttag(self, tag, attrs):
if tag == 'a':
for attr in attrs:
if attr[0] == 'href':
link = attr[1]
self.links.append(link)
if tag == 'base':
for attr in attrs:
if attr[0] == 'href':
self.base_url = urljoin(self.base_url, attr[1])
def extract_links(self, body):
try:
self.feed(body)
except Exception as e:
logger.warning('Failed to parse HTML content while reading %s: %s: %s', self.base_url, type(e).__name__, e)
final_links_set = set()
final_links = []
for link in self.links:
try:
url = urlparse(link)
except ValueError as e:
logger.info('Parsing of link %s failed while reading %s: %s', link, self.base_url, e)
url._replace(fragment='')
link = url.geturl()
if not url.scheme:
link = self.remove_fragment(urljoin(self.base_url, link))
elif url.scheme in self.allowed_schemes and url.netloc == self.base_url_parsed.netloc:
link = self.remove_fragment(link)
else:
link = None
if link is not None and link not in final_links_set:
final_links.append(link)
final_links_set.add(link)
return final_links
@staticmethod
def remove_fragment(link):
url = urlparse(link)
return url._replace(fragment='').geturl()
class ExtractSiren(BaseTask):
def __init__(self, config):
self.useragent = config.get('useragent', None)
self.max_content_length = config.get('max_content_length', 5120000)
def do(self, data):
"""
:param data: dict, Un dictionnaire contenant le résultat d'un crawl,
les champs suivants sont requis:
* web: dictionnaire avec les champs suivants
* details: tableau de dictionnaire avec les champs suivants
* url: URL de la page
* is_redirect: Booléen indiquant si la réponse est une
redirection
* response: dictionnaire avec les champs suivants
* content: contenu texte de la page téléchargée
Retourne un dictionnaire avec les champs suivants
* siren: dictionnaire avec les champs suivants
* siren: le premier siren trouvé ou `None` si aucun candidat
n'a été trouvé
* input: la chaine candidate du siren non traitée ou `None` si
aucun candidat n'a été trouvé
* url: l'URL où le siren a été trouvé ou `None` si aucun
candidat n'a été trouvé
"""
best_siren = {
'siren': None,
'input': None,
'url': None
}
if 'web' in data:
terminal = [
details
for details in data['web']['details']
if not details['is_redirect']['redirected'] and 'content' in details['response']
]
if terminal:
try:
sirens = self.extract_sirens_from_pages(terminal[0])
if logger.isEnabledFor(logging.DEBUG):
sirens_list = list(sirens)
logger.debug('All candidates : %s', sirens_list)
best_siren = sirens_list[0]
else:
best_siren = next(sirens)
except (StopIteration, IndexError):
pass
return { 'siren': best_siren }
def extract_sirens_from_pages(self, data):
"""
:param data: dict, Un dictionnaire contenant le résultat d'un crawl,
les champs suivants sont requis:
* url: URL de la page
* is_redirect: Booléen indiquant si la réponse est une redirection
* response: dictionnaire avec les champs suivants
* content: contenu texte de la page téléchargée
Retourne un générateur de dictionnaire correspondant aux siren trouvés.
Le premier siren de la page est renvoyés si plusieurs valeurs sont
possibles.
"""
link_extracter = LinkExtracter(data['url'])
# \u00a0 -> Espace insécable
re_digits = re.compile('[€$£]?[ \u00a0]*\d[\d\-\. \u00a0]+\d[ \u00a0]*[€$£]?')
headers = {
'Accept-Language': 'fr_FR,en,q=0.5'
}
if self.useragent is not None:
headers['User-Agent'] = self.useragent
with httpx.Client(timeout=5, follow_redirects=True, http2=True) as client:
links = link_extracter.extract_links(data['response']['content'])
links.append(data['url'])
# Reverse link list to search for siren from the bottom up
links.reverse()
logger.debug('Links = %s', links)
for link in links:
logger.debug('Trying URL %s', link)
try:
with client.stream('GET', link, headers=headers) as res:
content_type = res.headers.get('content-type', default='application/octet-stream')
if 'text' not in content_type:
logger.debug('Binary resource, skipping')
continue
content = ''
# Truncate response if too long
for chunk in res.iter_text(self.max_content_length):
content = chunk
break
soup = BeautifulSoup(content, 'html.parser')
content = soup.get_text(' ')
candidates = re_digits.findall(content)
if '005 376' in candidates:
logger.debug('Text = %s', content)
logger.debug('Found %d candidates on page %s: %s', len(candidates), link, candidates)
sirens = list(filter(None, map(self.validate_siren, candidates)))
if sirens:
logger.debug('Filtered sirens: %s', sirens)
yield {
'url': link,
**sirens[0]
}
except httpx.HTTPError as e:
logger.debug('Received exception: %s', e)
continue
except Exception as e:
logger.exception('Received exception: %s', e)
continue
@classmethod
def validate_siren(cls, candidate):
if any(currency in candidate for currency in ('', '$', '£')):
return None
candidate_num = re.sub('\\D', '', candidate)
if len(candidate_num) != 9 and len(candidate_num) != 14:
return None
nb_groups = len(list(filter(lambda x: x, re.split('[\-\. \u00a0]', re.sub('[€$£]', '', candidate)))))
# Probably a phone number
if len(candidate_num) == 9 and nb_groups >= 4:
return None
if not luhn.verify(candidate_num):
return None
candidate_num = candidate_num[:9]
if candidate_num != '000000000':
return {
'input': candidate,
'siren': candidate_num,
}