mirror of
https://gitlab.rd.nic.fr/labs/frcrawler/scripts.git
synced 2025-04-04 19:45:48 +02:00
282 lines
10 KiB
Python
282 lines
10 KiB
Python
#
|
|
# SPDX-FileCopyrightText: 2023 Afnic
|
|
#
|
|
# SPDX-License-Identifier: GPL-3.0-or-later
|
|
#
|
|
|
|
import os
|
|
import uuid
|
|
import logging
|
|
import datetime
|
|
import sys
|
|
from collections import namedtuple
|
|
from urllib.parse import urlparse, urlencode, parse_qsl
|
|
|
|
import click
|
|
from dotenv import load_dotenv
|
|
from clickhouse_driver import Client
|
|
|
|
from frcrawler_content_clustering import HashedContentEntry, HashedContentBag
|
|
from frcrawler_content_clustering.dbscan import DbScan
|
|
|
|
|
|
load_dotenv(os.getenv('FRCRAWLER_SCRIPT_ENV_FILE', 'crawler.env'))
|
|
|
|
FORMAT = '%(levelname)s %(name)s %(asctime)-15s %(filename)s:%(lineno)d %(message)s'
|
|
|
|
logging.basicConfig(format=FORMAT, level=logging.INFO)
|
|
|
|
CLUSTERING_NB_THREAD = int(os.getenv('CLUSTERING_NB_THREAD', 1))
|
|
CLUSTERING_MAX_MEMORY = int(os.getenv('CLUSTERING_MAX_MEMORY', CLUSTERING_NB_THREAD * 2**30))
|
|
CLUSTERING_THRESHOLD = int(os.getenv('CLUSTERING_THRESHOLD', 80))
|
|
CLUSTER_MIN_SIZE = int(os.getenv('CLUSTER_MIN_SIZE', 10))
|
|
CH_DB_URL = os.getenv('CH_DB_URL', 'clickhouse://test:test@localhost:9001/test')
|
|
CH_HTTP_URL = os.getenv('CH_HTTP_URL', 'http://test:test@localhost:8124/?database=test')
|
|
CH_CLUSTER = os.getenv('CH_CLUSTER', 'dev_cluster')
|
|
CH_DATABASE = urlparse(CH_DB_URL).path.replace('/', '', 1)
|
|
|
|
def build_query_url(base_url, query):
|
|
base_url = urlparse(base_url)
|
|
query_string = dict(parse_qsl(base_url.query, keep_blank_values=True))
|
|
query_string['query'] = query
|
|
return base_url._replace(query=urlencode(query_string)).geturl()
|
|
|
|
|
|
def load_content(client, crawler_batch_id):
|
|
bag = HashedContentBag()
|
|
|
|
logging.info('Reading hashes')
|
|
rows_gen = client.execute_iter(
|
|
f"""
|
|
select
|
|
distinct on (job_id)
|
|
job_id, ssdeep, lzjd
|
|
from clustering_hashes
|
|
where
|
|
batch_id = %(batch_id)s and
|
|
lzjd is not NULL
|
|
and ssdeep is not NULL
|
|
and job_id in (
|
|
select job_id
|
|
from {CH_DATABASE}.web_info_data
|
|
where
|
|
batch_id = %(batch_id)s and
|
|
domain_redirected = false
|
|
and http_status_code < 300 and http_status_code >= 200
|
|
and response_redirected = false
|
|
)
|
|
""",
|
|
{
|
|
'batch_id': crawler_batch_id
|
|
}
|
|
)
|
|
for (job_id, ssdeep, lzjd) in rows_gen:
|
|
bag.insert(f'job_id:{job_id}', lzjd, ssdeep)
|
|
|
|
logging.info('Adding label hints')
|
|
rows = client.execute('select hint_id, ssdeep, lzjd from clustering_label_hints')
|
|
|
|
for (hint_id, ssdeep, lzjd) in rows:
|
|
bag.insert(f'label_hint_id:{hint_id}', lzjd, ssdeep)
|
|
|
|
return bag
|
|
|
|
|
|
def compute_similarities(client, clustering_batch_id, started_at, crawler_batch_id):
|
|
bag = load_content(client, crawler_batch_id)
|
|
|
|
logging.info('Computing similarities')
|
|
bag.compute_similarities(CLUSTERING_NB_THREAD, CLUSTERING_MAX_MEMORY, CH_HTTP_URL, clustering_batch_id, started_at, CLUSTERING_THRESHOLD)
|
|
logging.info('Done')
|
|
|
|
return bag.identifiers
|
|
|
|
|
|
def compute_clusters(clustering_batch_id, item_counts):
|
|
logging.info('Reading data from db')
|
|
db_scan = DbScan(item_counts, min_size=CLUSTER_MIN_SIZE)
|
|
db_scan.feed_from_http(
|
|
build_query_url(
|
|
CH_HTTP_URL,
|
|
f'SELECT first_id, second_id FROM clustering_similiarities WHERE batch_id = {clustering_batch_id} order by first_id, second_id FORMAT RowBinary'
|
|
)
|
|
)
|
|
logging.info('Computing clusters')
|
|
db_scan.compute()
|
|
logging.info('Done')
|
|
|
|
# { cluster_id => [ ...content_ids ] }
|
|
return db_scan.clusters()
|
|
|
|
|
|
def insert_cluster_results(client, clustering_batch_id, started_at, crawler_batch_id, identifiers, raw_clusters):
|
|
clusters = {}
|
|
logging.info('Fetching content info from db')
|
|
DomainInfo = namedtuple('DomainInfo', ['domain', 'metadata', 'tld'])
|
|
job_to_domain_map = {
|
|
job_id: DomainInfo(domain=domain, metadata=metadata, tld=tld)
|
|
for job_id, domain, tld, metadata in
|
|
client.execute_iter(
|
|
'SELECT toString(job_id), domain, tld, metadata FROM clustering_hashes WHERE batch_id = %(batch_id)s',
|
|
{'batch_id': crawler_batch_id}
|
|
)
|
|
}
|
|
|
|
hint_to_label_map = dict(client.execute('select toString(hint_id), toString(label_id) from clustering_label_hints'))
|
|
|
|
logging.info('Converting content ids to names')
|
|
for (raw_cluster_id, content_ids) in raw_clusters.items():
|
|
cluster = {'labels': [], 'entries': []}
|
|
for content_id in content_ids:
|
|
content_name = identifiers[content_id]
|
|
(content_type, content_uuid) = content_name.split(':')
|
|
if content_type == 'job_id':
|
|
cluster['entries'].append(content_uuid)
|
|
elif content_type == 'label_hint_id':
|
|
cluster['labels'].append(content_uuid)
|
|
|
|
clusters[raw_cluster_id] = cluster
|
|
|
|
logging.info('Inserting results')
|
|
|
|
for (raw_cluster_id, cluster) in clusters.items():
|
|
cluster_proposals = set(hint_to_label_map[hint] for hint in cluster['labels'])
|
|
cluster_label = None
|
|
if len(cluster_proposals) == 1:
|
|
cluster_label = cluster_proposals.pop()
|
|
# TODO: what if proposals > 1??
|
|
|
|
if cluster_label is None:
|
|
cluster_label = str(uuid.uuid4())
|
|
|
|
client.execute(
|
|
f'INSERT INTO clustering_results (batch_id, tld, metadata, job_id, clustering_batch_id, domain, cluster_label, clustering_started_at) VALUES',
|
|
(
|
|
{
|
|
'batch_id': crawler_batch_id,
|
|
'tld': job_to_domain_map[job_id].tld,
|
|
'metadata': job_to_domain_map[job_id].metadata,
|
|
'clustering_batch_id': clustering_batch_id,
|
|
'job_id': job_id,
|
|
'domain': job_to_domain_map[job_id].domain,
|
|
'cluster_label': cluster_label,
|
|
'clustering_started_at': started_at
|
|
}
|
|
for job_id in cluster['entries']
|
|
)
|
|
)
|
|
|
|
logging.info('Done')
|
|
|
|
|
|
def clean_up(client, clustering_batch_id):
|
|
logging.info('Cleaning up temporary data for clustering run %d', clustering_batch_id
|
|
)
|
|
client.execute(
|
|
'alter table clustering_similiarities_data on cluster %(ch_cluster)s delete where batch_id = %(clustering_batch_id)d',
|
|
{ 'clustering_batch_id': clustering_batch_id, 'ch_cluster': CH_CLUSTER }
|
|
)
|
|
|
|
|
|
@click.group()
|
|
def cli():
|
|
"""
|
|
Compute clusters of HTML pages hashes.
|
|
|
|
Only hashes of HTML pages from web sites that are not redirected
|
|
to another domain, and responded with a successful response are taken.
|
|
"""
|
|
pass
|
|
|
|
|
|
@cli.command()
|
|
def clean():
|
|
client = Client.from_url(CH_DB_URL)
|
|
logging.info('Cleaning up all temporary data')
|
|
client.execute('truncate table clustering_similiarities_data on cluster %(ch_cluster)s', { 'ch_cluster': CH_CLUSTER })
|
|
|
|
|
|
@cli.command()
|
|
@click.option(
|
|
'--crawler-batch-id',
|
|
default=None,
|
|
help='Id of the batch used for the clustering (default to the last `full` batch)',
|
|
)
|
|
@click.option(
|
|
'--no-clean-up',
|
|
is_flag=True,
|
|
default=False,
|
|
help='Delete transient data after the clustering is done',
|
|
)
|
|
def run(crawler_batch_id, no_clean_up):
|
|
client = Client.from_url(CH_DB_URL)
|
|
started_at = int(datetime.datetime.utcnow().timestamp())
|
|
|
|
if crawler_batch_id is None:
|
|
result = client.execute("""
|
|
with batch_candidates as (
|
|
select
|
|
bm.batch_id batch_id,
|
|
scheduled_job_count,
|
|
from
|
|
batches_metadata bm
|
|
global join batches b on b.batch_id = bm.batch_id
|
|
global left join clustering_batches_metadata cbm on cbm.batch_id = bm.batch_id
|
|
where
|
|
bm.job_name = 'full' and
|
|
cbm.ended_at is null and
|
|
bm.batch_id global not in (
|
|
select distinct batch_id from clustering_results
|
|
)
|
|
order by scheduled_at asc
|
|
),
|
|
job_count as (
|
|
select
|
|
batch_id,
|
|
count(distinct(job_id)) total_job_count
|
|
from
|
|
crawler.web_info
|
|
where
|
|
batch_id global in (select batch_id from batch_candidates)
|
|
group by batch_id
|
|
)
|
|
select
|
|
batch_id, scheduled_job_count, total_job_count, scheduled_job_count / total_job_count progression
|
|
from batch_candidates join job_count using(batch_id)
|
|
where progression > .99
|
|
""")
|
|
|
|
if len(result) == 0:
|
|
logging.info('Not clustering job to run, exiting')
|
|
return
|
|
|
|
crawler_batch_id, scheduled_count, job_count, progression = result[0]
|
|
logging.info('Selecting crawler batch %s (Scheduled Jobs : %d ; Finished Jobs : %d ; Progression : %.2f)', crawler_batch_id, scheduled_count, job_count, progression)
|
|
|
|
((last_clustering_batch_id, ), ) = client.execute("""
|
|
SELECT max(m)
|
|
FROM (
|
|
SELECT max(batch_id) as m FROM clustering_similiarities
|
|
UNION ALL
|
|
SELECT max(clustering_batch_id) as m FROM clustering_results
|
|
)
|
|
""")
|
|
|
|
clustering_batch_id = last_clustering_batch_id + 1
|
|
logging.info('Starting clustering run %d for batch %s', clustering_batch_id, crawler_batch_id)
|
|
|
|
identifiers = compute_similarities(client, clustering_batch_id, started_at, crawler_batch_id)
|
|
clusters = compute_clusters(clustering_batch_id, len(identifiers))
|
|
insert_cluster_results(client, clustering_batch_id, started_at, crawler_batch_id, identifiers, clusters)
|
|
|
|
client.execute(
|
|
'insert into clustering_batches_metadata (batch_id, clustering_batch_id, started_at, ended_at) values',
|
|
((crawler_batch_id, clustering_batch_id, started_at, datetime.datetime.utcnow()),)
|
|
)
|
|
|
|
if not no_clean_up:
|
|
clean_up(client, clustering_batch_id)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
cli()
|