frcrawler-scripts/scripts/run-clustering.py
Gaël Berthaud-Müller 23e47ec6e7 add license mention
2024-03-06 14:51:03 +01:00

282 lines
10 KiB
Python

#
# SPDX-FileCopyrightText: 2023 Afnic
#
# SPDX-License-Identifier: GPL-3.0-or-later
#
import os
import uuid
import logging
import datetime
import sys
from collections import namedtuple
from urllib.parse import urlparse, urlencode, parse_qsl
import click
from dotenv import load_dotenv
from clickhouse_driver import Client
from frcrawler_content_clustering import HashedContentEntry, HashedContentBag
from frcrawler_content_clustering.dbscan import DbScan
load_dotenv(os.getenv('FRCRAWLER_SCRIPT_ENV_FILE', 'crawler.env'))
FORMAT = '%(levelname)s %(name)s %(asctime)-15s %(filename)s:%(lineno)d %(message)s'
logging.basicConfig(format=FORMAT, level=logging.INFO)
CLUSTERING_NB_THREAD = int(os.getenv('CLUSTERING_NB_THREAD', 1))
CLUSTERING_MAX_MEMORY = int(os.getenv('CLUSTERING_MAX_MEMORY', CLUSTERING_NB_THREAD * 2**30))
CLUSTERING_THRESHOLD = int(os.getenv('CLUSTERING_THRESHOLD', 80))
CLUSTER_MIN_SIZE = int(os.getenv('CLUSTER_MIN_SIZE', 10))
CH_DB_URL = os.getenv('CH_DB_URL', 'clickhouse://test:test@localhost:9001/test')
CH_HTTP_URL = os.getenv('CH_HTTP_URL', 'http://test:test@localhost:8124/?database=test')
CH_CLUSTER = os.getenv('CH_CLUSTER', 'dev_cluster')
CH_DATABASE = urlparse(CH_DB_URL).path.replace('/', '', 1)
def build_query_url(base_url, query):
base_url = urlparse(base_url)
query_string = dict(parse_qsl(base_url.query, keep_blank_values=True))
query_string['query'] = query
return base_url._replace(query=urlencode(query_string)).geturl()
def load_content(client, crawler_batch_id):
bag = HashedContentBag()
logging.info('Reading hashes')
rows_gen = client.execute_iter(
f"""
select
distinct on (job_id)
job_id, ssdeep, lzjd
from clustering_hashes
where
batch_id = %(batch_id)s and
lzjd is not NULL
and ssdeep is not NULL
and job_id in (
select job_id
from {CH_DATABASE}.web_info_data
where
batch_id = %(batch_id)s and
domain_redirected = false
and http_status_code < 300 and http_status_code >= 200
and response_redirected = false
)
""",
{
'batch_id': crawler_batch_id
}
)
for (job_id, ssdeep, lzjd) in rows_gen:
bag.insert(f'job_id:{job_id}', lzjd, ssdeep)
logging.info('Adding label hints')
rows = client.execute('select hint_id, ssdeep, lzjd from clustering_label_hints')
for (hint_id, ssdeep, lzjd) in rows:
bag.insert(f'label_hint_id:{hint_id}', lzjd, ssdeep)
return bag
def compute_similarities(client, clustering_batch_id, started_at, crawler_batch_id):
bag = load_content(client, crawler_batch_id)
logging.info('Computing similarities')
bag.compute_similarities(CLUSTERING_NB_THREAD, CLUSTERING_MAX_MEMORY, CH_HTTP_URL, clustering_batch_id, started_at, CLUSTERING_THRESHOLD)
logging.info('Done')
return bag.identifiers
def compute_clusters(clustering_batch_id, item_counts):
logging.info('Reading data from db')
db_scan = DbScan(item_counts, min_size=CLUSTER_MIN_SIZE)
db_scan.feed_from_http(
build_query_url(
CH_HTTP_URL,
f'SELECT first_id, second_id FROM clustering_similiarities WHERE batch_id = {clustering_batch_id} order by first_id, second_id FORMAT RowBinary'
)
)
logging.info('Computing clusters')
db_scan.compute()
logging.info('Done')
# { cluster_id => [ ...content_ids ] }
return db_scan.clusters()
def insert_cluster_results(client, clustering_batch_id, started_at, crawler_batch_id, identifiers, raw_clusters):
clusters = {}
logging.info('Fetching content info from db')
DomainInfo = namedtuple('DomainInfo', ['domain', 'metadata', 'tld'])
job_to_domain_map = {
job_id: DomainInfo(domain=domain, metadata=metadata, tld=tld)
for job_id, domain, tld, metadata in
client.execute_iter(
'SELECT toString(job_id), domain, tld, metadata FROM clustering_hashes WHERE batch_id = %(batch_id)s',
{'batch_id': crawler_batch_id}
)
}
hint_to_label_map = dict(client.execute('select toString(hint_id), toString(label_id) from clustering_label_hints'))
logging.info('Converting content ids to names')
for (raw_cluster_id, content_ids) in raw_clusters.items():
cluster = {'labels': [], 'entries': []}
for content_id in content_ids:
content_name = identifiers[content_id]
(content_type, content_uuid) = content_name.split(':')
if content_type == 'job_id':
cluster['entries'].append(content_uuid)
elif content_type == 'label_hint_id':
cluster['labels'].append(content_uuid)
clusters[raw_cluster_id] = cluster
logging.info('Inserting results')
for (raw_cluster_id, cluster) in clusters.items():
cluster_proposals = set(hint_to_label_map[hint] for hint in cluster['labels'])
cluster_label = None
if len(cluster_proposals) == 1:
cluster_label = cluster_proposals.pop()
# TODO: what if proposals > 1??
if cluster_label is None:
cluster_label = str(uuid.uuid4())
client.execute(
f'INSERT INTO clustering_results (batch_id, tld, metadata, job_id, clustering_batch_id, domain, cluster_label, clustering_started_at) VALUES',
(
{
'batch_id': crawler_batch_id,
'tld': job_to_domain_map[job_id].tld,
'metadata': job_to_domain_map[job_id].metadata,
'clustering_batch_id': clustering_batch_id,
'job_id': job_id,
'domain': job_to_domain_map[job_id].domain,
'cluster_label': cluster_label,
'clustering_started_at': started_at
}
for job_id in cluster['entries']
)
)
logging.info('Done')
def clean_up(client, clustering_batch_id):
logging.info('Cleaning up temporary data for clustering run %d', clustering_batch_id
)
client.execute(
'alter table clustering_similiarities_data on cluster %(ch_cluster)s delete where batch_id = %(clustering_batch_id)d',
{ 'clustering_batch_id': clustering_batch_id, 'ch_cluster': CH_CLUSTER }
)
@click.group()
def cli():
"""
Compute clusters of HTML pages hashes.
Only hashes of HTML pages from web sites that are not redirected
to another domain, and responded with a successful response are taken.
"""
pass
@cli.command()
def clean():
client = Client.from_url(CH_DB_URL)
logging.info('Cleaning up all temporary data')
client.execute('truncate table clustering_similiarities_data on cluster %(ch_cluster)s', { 'ch_cluster': CH_CLUSTER })
@cli.command()
@click.option(
'--crawler-batch-id',
default=None,
help='Id of the batch used for the clustering (default to the last `full` batch)',
)
@click.option(
'--no-clean-up',
is_flag=True,
default=False,
help='Delete transient data after the clustering is done',
)
def run(crawler_batch_id, no_clean_up):
client = Client.from_url(CH_DB_URL)
started_at = int(datetime.datetime.utcnow().timestamp())
if crawler_batch_id is None:
result = client.execute("""
with batch_candidates as (
select
bm.batch_id batch_id,
scheduled_job_count,
from
batches_metadata bm
global join batches b on b.batch_id = bm.batch_id
global left join clustering_batches_metadata cbm on cbm.batch_id = bm.batch_id
where
bm.job_name = 'full' and
cbm.ended_at is null and
bm.batch_id global not in (
select distinct batch_id from clustering_results
)
order by scheduled_at asc
),
job_count as (
select
batch_id,
count(distinct(job_id)) total_job_count
from
crawler.web_info
where
batch_id global in (select batch_id from batch_candidates)
group by batch_id
)
select
batch_id, scheduled_job_count, total_job_count, scheduled_job_count / total_job_count progression
from batch_candidates join job_count using(batch_id)
where progression > .99
""")
if len(result) == 0:
logging.info('Not clustering job to run, exiting')
return
crawler_batch_id, scheduled_count, job_count, progression = result[0]
logging.info('Selecting crawler batch %s (Scheduled Jobs : %d ; Finished Jobs : %d ; Progression : %.2f)', crawler_batch_id, scheduled_count, job_count, progression)
((last_clustering_batch_id, ), ) = client.execute("""
SELECT max(m)
FROM (
SELECT max(batch_id) as m FROM clustering_similiarities
UNION ALL
SELECT max(clustering_batch_id) as m FROM clustering_results
)
""")
clustering_batch_id = last_clustering_batch_id + 1
logging.info('Starting clustering run %d for batch %s', clustering_batch_id, crawler_batch_id)
identifiers = compute_similarities(client, clustering_batch_id, started_at, crawler_batch_id)
clusters = compute_clusters(clustering_batch_id, len(identifiers))
insert_cluster_results(client, clustering_batch_id, started_at, crawler_batch_id, identifiers, clusters)
client.execute(
'insert into clustering_batches_metadata (batch_id, clustering_batch_id, started_at, ended_at) values',
((crawler_batch_id, clustering_batch_id, started_at, datetime.datetime.utcnow()),)
)
if not no_clean_up:
clean_up(client, clustering_batch_id)
if __name__ == '__main__':
cli()