diff --git a/crates/core/src/config/defaults.rs b/crates/core/src/config/defaults.rs index 58a305a4..3815ab11 100644 --- a/crates/core/src/config/defaults.rs +++ b/crates/core/src/config/defaults.rs @@ -169,3 +169,11 @@ impl Widgets { true } } + +pub struct Indexing; + +impl Indexing { + pub fn batch_size() -> usize { + 512 + } +} diff --git a/crates/core/src/config/mod.rs b/crates/core/src/config/mod.rs index ffd4a7a1..4c04e830 100644 --- a/crates/core/src/config/mod.rs +++ b/crates/core/src/config/mod.rs @@ -30,7 +30,6 @@ pub struct IndexingLocalConfig { pub limit_warc_files: Option, pub skip_warc_files: Option, pub warc_source: WarcSource, - pub batch_size: Option, pub page_webgraph_path: Option, pub host_centrality_threshold: Option, pub topics_path: Option, @@ -38,6 +37,9 @@ pub struct IndexingLocalConfig { pub page_centrality_store_path: Option, pub safety_classifier_path: Option, pub minimum_clean_words: Option, + + #[serde(default = "defaults::Indexing::batch_size")] + pub batch_size: usize, } #[derive(Debug, Deserialize, Clone)] diff --git a/crates/core/src/entrypoint/configure.rs b/crates/core/src/entrypoint/configure.rs index 1dac8629..d0a189f2 100644 --- a/crates/core/src/entrypoint/configure.rs +++ b/crates/core/src/entrypoint/configure.rs @@ -19,7 +19,7 @@ use tokio::io; use tokio_stream::StreamExt; use tracing::{debug, info}; -use crate::config::{LocalConfig, WebSpellConfig}; +use crate::config::{defaults, LocalConfig, WebSpellConfig}; use crate::entrypoint::indexer::JobSettings; use crate::entrypoint::{dmoz_parser, indexer}; use crate::Result; @@ -169,11 +169,12 @@ fn create_inverted_index() -> Result<()> { folder: ".".to_string(), names: vec![warc_path.to_str().unwrap().to_string()], }), - warc_paths: vec![warc_path.to_str().unwrap().to_string()], + warc_path: warc_path.to_str().unwrap().to_string(), base_path: out_path_tmp.to_str().unwrap().to_string(), settings: JobSettings { host_centrality_threshold: None, minimum_clean_words: None, + batch_size: defaults::Indexing::batch_size(), }, }; @@ -195,7 +196,7 @@ fn create_inverted_index() -> Result<()> { None, ); - let index = indexer::process_job(&job, &worker); + let index = job.process(&worker); std::fs::rename(index.path, out_path)?; std::fs::remove_dir_all(&out_path_tmp)?; diff --git a/crates/core/src/entrypoint/indexer.rs b/crates/core/src/entrypoint/indexer.rs deleted file mode 100644 index d63b5f9a..00000000 --- a/crates/core/src/entrypoint/indexer.rs +++ /dev/null @@ -1,460 +0,0 @@ -// Stract is an open source web search engine. -// Copyright (C) 2023 Stract ApS -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as -// published by the Free Software Foundation, either version 3 of the -// License, or (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . -use anyhow::anyhow; -use chrono::Utc; -use rayon::prelude::*; -use std::path::Path; -use std::thread; - -use itertools::Itertools; -use serde::{Deserialize, Serialize}; -use tokio::pin; -use tracing::{debug, info, trace, warn}; - -use crate::config::{self, WarcSource}; -use crate::entrypoint::download_all_warc_files; -use crate::index::Index; -use crate::kv::rocksdb_store::RocksDbStore; -use crate::kv::Kv; -use crate::mapreduce::{Map, Reduce, Worker}; -use crate::ranking::SignalAggregator; -use crate::warc::PayloadType; -use crate::webgraph::{Node, NodeID, Webgraph, WebgraphBuilder}; -use crate::webpage::{safety_classifier, Html, Webpage}; -use crate::{human_website_annotations, Result}; - -#[derive(Debug, Serialize, Deserialize)] -pub struct Job { - pub source_config: config::WarcSource, - pub warc_paths: Vec, - pub base_path: String, - pub settings: JobSettings, -} - -#[derive(Debug, Clone, Copy, Serialize, Deserialize)] -pub struct JobSettings { - pub host_centrality_threshold: Option, - pub minimum_clean_words: Option, -} - -pub struct IndexingWorker { - host_centrality_store: RocksDbStore, - host_centrality_rank_store: RocksDbStore, - page_centrality_store: Option>, - page_centrality_rank_store: Option>, - page_webgraph: Option, - topics: Option, - safety_classifier: Option, - job_settings: Option, -} - -impl IndexingWorker { - pub fn new( - host_centrality_store_path: String, - page_centrality_store_path: Option, - page_webgraph_path: Option, - topics_path: Option, - safety_classifier_path: Option, - ) -> Self { - Self { - host_centrality_store: RocksDbStore::open( - Path::new(&host_centrality_store_path).join("harmonic"), - ), - host_centrality_rank_store: RocksDbStore::open( - Path::new(&host_centrality_store_path).join("harmonic_rank"), - ), - page_centrality_store: page_centrality_store_path - .as_ref() - .map(|p| RocksDbStore::open(Path::new(&p).join("approx_harmonic"))), - page_centrality_rank_store: page_centrality_store_path - .as_ref() - .map(|p| RocksDbStore::open(Path::new(&p).join("approx_harmonic_rank"))), - page_webgraph: page_webgraph_path - .map(|path| WebgraphBuilder::new(path).single_threaded().open()), - topics: topics_path.map(|path| human_website_annotations::Mapper::open(path).unwrap()), - safety_classifier: safety_classifier_path - .map(|path| safety_classifier::Model::open(path).unwrap()), - job_settings: None, - } - } - - pub fn set_job_settings(&mut self, job_settings: JobSettings) { - self.job_settings = Some(job_settings); - } - - pub fn prepare_webpage(&self, body: &str, url: &str, fetch_time_ms: u64) -> Result { - let mut html = match Html::parse_without_text(body, url) { - Ok(html) => html, - Err(err) => { - debug!("error parsing html: {:?}", err); - return Err(anyhow!("error parsing html: {:?}", err)); - } - }; - - if html.is_no_index() { - return Err(anyhow!("noindex")); - } - - let title = html.title().unwrap_or_default(); - if title.is_empty() || title.chars().all(|c| c.is_whitespace()) { - return Err(anyhow!("empty title")); - } - - let node = Node::from(html.url()); - let host_node_id = node.clone().into_host().id(); - - let mut host_centrality = self - .host_centrality_store - .get(&host_node_id) - .unwrap_or_default(); - - let mut host_centrality_rank = self - .host_centrality_rank_store - .get(&host_node_id) - .unwrap_or(u64::MAX as f64); - - if let Some(host_centrality_threshold) = - self.job_settings.and_then(|s| s.host_centrality_threshold) - { - if host_centrality < host_centrality_threshold { - debug!("skipping due to low host_centrality value"); - return Err(anyhow!("low host_centrality value")); - } - } - - html.parse_text(); - - if html.empty_all_text() { - return Err(anyhow!("empty all text")); - } - - if let Some(minimum_clean_words) = self.job_settings.and_then(|s| s.minimum_clean_words) { - match html.clean_text() { - Some(clean_text) => { - if clean_text.split_whitespace().count() < minimum_clean_words { - return Err(anyhow!("too few clean words")); - } - } - None => return Err(anyhow!("no clean text")), - } - } - - let backlink_labels: Vec = self - .page_webgraph - .as_ref() - .map(|webgraph| { - webgraph - .raw_ingoing_edges_with_labels(&Node::from(html.url()).id()) - .into_iter() - .map(|edge| edge.label) - .filter(|label| !label.is_empty()) - .filter(|label| { - let label = label.to_lowercase(); - let stopwords = [ - "click", - "click here", - "here", - "link", - "website", - "webpage", - "page", - "site", - "url", - "web", - "visit", - "more", - "info", - "information", - "read", - "read more", - ]; - - !stopwords.contains(&label.as_str()) - }) - .collect() - }) - .unwrap_or_default(); - - let mut page_centrality = 0.0; - - if let Some(store) = self.page_centrality_store.as_ref() { - let node_id = node.id(); - - page_centrality = store.get(&node_id).unwrap_or_default(); - } - - let mut page_centrality_rank = u64::MAX as f64; - - if let Some(store) = self.page_centrality_rank_store.as_ref() { - let node_id = node.id(); - - page_centrality_rank = store.get(&node_id).unwrap_or(u64::MAX as f64); - } - - if !page_centrality.is_finite() { - page_centrality = 0.0; - } - - if !host_centrality.is_finite() { - host_centrality = 0.0; - } - - if !page_centrality_rank.is_finite() { - page_centrality_rank = u64::MAX as f64; - } - - if !host_centrality_rank.is_finite() { - host_centrality_rank = u64::MAX as f64; - } - - let mut dmoz_description = None; - - if let Some(mapper) = self.topics.as_ref() { - if let Some(info) = mapper.get(&html.url().host_str().unwrap_or_default().to_string()) { - dmoz_description = Some(info.description.clone()) - } - } - - let mut webpage = Webpage { - html, - backlink_labels, - page_centrality, - page_centrality_rank, - host_centrality, - host_centrality_rank, - fetch_time_ms, - pre_computed_score: 0.0, - node_id: Some(host_node_id), - dmoz_description, - safety_classification: None, - inserted_at: Utc::now(), - }; - - if let Some(model) = self.safety_classifier.as_ref() { - webpage.safety_classification = Some(model.predict(&webpage).label); - } - - let mut signal_aggregator = SignalAggregator::new(None); - signal_aggregator.set_current_timestamp(Utc::now().timestamp().max(0) as usize); - - webpage.pre_computed_score = signal_aggregator.precompute_score(&webpage); - - Ok(webpage) - } -} - -pub fn process_job(job: &Job, worker: &IndexingWorker) -> Index { - let name = job.warc_paths.first().unwrap().split('/').last().unwrap(); - - let mut has_host_centrality = false; - let mut has_page_centrality = false; - let mut has_backlinks = false; - - info!("processing {}", name); - - let mut index = Index::open(Path::new(&job.base_path).join(name)).unwrap(); - index.prepare_writer().unwrap(); - - let warc_files = download_all_warc_files(&job.warc_paths, &job.source_config); - pin!(warc_files); - - for file in warc_files.by_ref() { - for record in - file.records() - .flatten() - .filter(|record| match &record.response.payload_type { - Some(payload_type) => matches!(payload_type, PayloadType::Html), - None => true, - }) - { - if let Ok(webpage) = worker.prepare_webpage( - &record.response.body, - &record.request.url, - record.metadata.fetch_time_ms, - ) { - if webpage.host_centrality > 0.0 { - has_host_centrality = true; - } - - if webpage.page_centrality > 0.0 { - has_page_centrality = true; - } - - if !webpage.backlink_labels.is_empty() { - has_backlinks = true; - } - trace!("inserting webpage: {:?}", webpage.html.url()); - trace!("title = {:?}", webpage.html.title()); - trace!("text = {:?}", webpage.html.clean_text()); - - if let Err(err) = index.insert(webpage) { - warn!("{:?}", err); - panic!(); - } - } - } - - index.commit().unwrap(); - } - - if !has_host_centrality { - warn!("no host centrality values found in {}", name); - } - - if !has_page_centrality && worker.page_centrality_store.is_some() { - warn!("no page centrality values found in {}", name); - } - - if !has_backlinks && worker.page_webgraph.is_some() { - warn!("no backlinks found in {}", name); - } - - index.inverted_index.merge_into_max_segments(1).unwrap(); - - info!("{} done", name); - - index -} - -#[derive(Debug, Serialize, Deserialize)] -pub struct IndexPointer(String); - -impl From for IndexPointer { - fn from(path: String) -> Self { - IndexPointer(path) - } -} - -impl Worker for IndexingWorker {} - -impl Map for Job { - fn map(&self, worker: &IndexingWorker) -> IndexPointer { - let index = process_job(self, worker); - IndexPointer(index.path) - } -} - -impl Reduce for Index { - fn reduce(self, element: Index) -> Self { - let other = element; - let other_path = other.path.clone(); - - let res = self.merge(other); - - std::fs::remove_dir_all(other_path).unwrap(); - - res - } -} - -pub struct Indexer {} -impl Indexer { - pub fn run(config: &config::IndexingLocalConfig) -> Result<()> { - let warc_paths = config.warc_source.paths()?; - - let job_config: WarcSource = config.warc_source.clone(); - - let worker = IndexingWorker::new( - config.host_centrality_store_path.clone(), - config.page_centrality_store_path.clone(), - config.page_webgraph_path.clone(), - config.topics_path.clone(), - config.safety_classifier_path.clone(), - ); - - let indexes = warc_paths - .into_iter() - .skip(config.skip_warc_files.unwrap_or(0)) - .take(config.limit_warc_files.unwrap_or(usize::MAX)) - .chunks(config.batch_size.unwrap_or(1)) - .into_iter() - .map(|paths| paths.collect_vec()) - .collect_vec() - .into_par_iter() - .map(|warc_paths| Job { - source_config: job_config.clone(), - warc_paths, - base_path: config.output_path.clone(), - settings: JobSettings { - host_centrality_threshold: config.host_centrality_threshold, - minimum_clean_words: config.minimum_clean_words, - }, - }) - .map(|job| { - let pointer: IndexPointer = job.map(&worker); - pointer - }) - .collect(); - - Self::merge(indexes)?; - Ok(()) - } - - pub fn merge(indexes: Vec) -> Result<()> { - let num_indexes = indexes.len(); - let mut it = indexes.into_iter(); - let num_cores = num_cpus::get(); - - let mut threads = Vec::new(); - - for _ in 0..(num_cores + 1) { - let indexes = it - .by_ref() - .take(((num_indexes as f64) / (num_cores as f64)).ceil() as usize) - .collect_vec(); - - if indexes.is_empty() { - break; - } - - threads.push(thread::spawn(move || { - let mut it = indexes.into_iter(); - let mut index = Index::open(it.next().unwrap().0).unwrap(); - - for other in it { - let other_path = other.0; - let other = Index::open(&other_path).unwrap(); - - index = index.merge(other); - - std::fs::remove_dir_all(other_path).unwrap(); - } - - index.inverted_index.merge_into_max_segments(1).unwrap(); - - index - })); - } - - let mut indexes = Vec::new(); - for thread in threads { - indexes.push(thread.join().unwrap()); - } - - let mut it = indexes.into_iter(); - let mut index = it.next().unwrap(); - - for other in it { - let other_path = other.path.clone(); - index = index.merge(other); - std::fs::remove_dir_all(other_path).unwrap(); - } - - index.inverted_index.merge_into_max_segments(1).unwrap(); - - Ok(()) - } -} diff --git a/crates/core/src/entrypoint/indexer/indexable_webpage.rs b/crates/core/src/entrypoint/indexer/indexable_webpage.rs new file mode 100644 index 00000000..290d8ff9 --- /dev/null +++ b/crates/core/src/entrypoint/indexer/indexable_webpage.rs @@ -0,0 +1,44 @@ +// Stract is an open source web search engine. +// Copyright (C) 2024 Stract ApS +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use crate::crawler::CrawlDatum; +use crate::warc::WarcRecord; + +pub struct IndexableWebpage { + pub url: String, + pub body: String, + pub fetch_time_ms: u64, +} + +impl From for IndexableWebpage { + fn from(datum: CrawlDatum) -> Self { + Self { + url: datum.url.to_string(), + body: datum.body, + fetch_time_ms: datum.fetch_time_ms, + } + } +} + +impl From for IndexableWebpage { + fn from(record: WarcRecord) -> Self { + Self { + url: record.request.url, + body: record.response.body, + fetch_time_ms: record.metadata.fetch_time_ms, + } + } +} diff --git a/crates/core/src/entrypoint/indexer/job.rs b/crates/core/src/entrypoint/indexer/job.rs new file mode 100644 index 00000000..b496e13f --- /dev/null +++ b/crates/core/src/entrypoint/indexer/job.rs @@ -0,0 +1,128 @@ +// Stract is an open source web search engine. +// Copyright (C) 2024 Stract ApS +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use std::path::Path; + +use itertools::Itertools; +use serde::{Deserialize, Serialize}; +use tokio::pin; +use tracing::{info, trace, warn}; + +use crate::config; +use crate::entrypoint::download_all_warc_files; +use crate::index::Index; +use crate::warc::PayloadType; + +use super::{IndexableWebpage, IndexingWorker}; + +#[derive(Debug, Serialize, Deserialize)] +pub struct Job { + pub source_config: config::WarcSource, + pub warc_path: String, + pub base_path: String, + pub settings: JobSettings, +} + +#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +pub struct JobSettings { + pub host_centrality_threshold: Option, + pub minimum_clean_words: Option, + pub batch_size: usize, +} + +impl Job { + pub fn process(&self, worker: &IndexingWorker) -> Index { + let name = self.warc_path.split('/').last().unwrap(); + + let mut has_host_centrality = false; + let mut has_page_centrality = false; + let mut has_backlinks = false; + + info!("processing {}", name); + + let mut index = Index::open(Path::new(&self.base_path).join(name)).unwrap(); + index.prepare_writer().unwrap(); + + let paths = vec![self.warc_path.clone()]; + let warc_files = download_all_warc_files(&paths, &self.source_config); + pin!(warc_files); + + for file in warc_files.by_ref() { + let mut batch = Vec::with_capacity(self.settings.batch_size); + let mut batch_webpages = Vec::with_capacity(self.settings.batch_size); + + for chunk in &file + .records() + .flatten() + .filter(|record| match &record.response.payload_type { + Some(payload_type) => matches!(payload_type, PayloadType::Html), + None => true, + }) + .chunks(self.settings.batch_size) + { + batch.clear(); + + for record in chunk { + batch.push(IndexableWebpage::from(record)); + } + + worker.prepare_webpage(&batch, &mut batch_webpages); + + for webpage in &batch_webpages { + if webpage.host_centrality > 0.0 { + has_host_centrality = true; + } + + if webpage.page_centrality > 0.0 { + has_page_centrality = true; + } + + if !webpage.backlink_labels.is_empty() { + has_backlinks = true; + } + trace!("inserting webpage: {:?}", webpage.html.url()); + trace!("title = {:?}", webpage.html.title()); + trace!("text = {:?}", webpage.html.clean_text()); + + if let Err(err) = index.insert(webpage) { + warn!("{:?}", err); + panic!(); + } + } + } + + index.commit().unwrap(); + } + + if !has_host_centrality { + warn!("no host centrality values found in {}", name); + } + + if !has_page_centrality && worker.page_centrality_store().is_some() { + warn!("no page centrality values found in {}", name); + } + + if !has_backlinks && worker.page_webgraph().is_some() { + warn!("no backlinks found in {}", name); + } + + index.inverted_index.merge_into_max_segments(1).unwrap(); + + info!("{} done", name); + + index + } +} diff --git a/crates/core/src/entrypoint/indexer/mod.rs b/crates/core/src/entrypoint/indexer/mod.rs new file mode 100644 index 00000000..f2af5325 --- /dev/null +++ b/crates/core/src/entrypoint/indexer/mod.rs @@ -0,0 +1,157 @@ +// Stract is an open source web search engine. +// Copyright (C) 2024 Stract ApS +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +pub mod indexable_webpage; +pub mod job; +pub mod worker; + +use rayon::prelude::*; +use std::thread; + +use itertools::Itertools; +use serde::{Deserialize, Serialize}; + +pub use crate::entrypoint::indexer::indexable_webpage::IndexableWebpage; +pub use crate::entrypoint::indexer::job::{Job, JobSettings}; +pub use crate::entrypoint::indexer::worker::IndexingWorker; + +use crate::config::{self, WarcSource}; +use crate::index::Index; +use crate::mapreduce::{Map, Reduce, Worker}; +use crate::Result; + +#[derive(Debug, Serialize, Deserialize)] +pub struct IndexPointer(String); + +impl From for IndexPointer { + fn from(path: String) -> Self { + IndexPointer(path) + } +} + +impl Worker for IndexingWorker {} + +impl Map for Job { + fn map(&self, worker: &IndexingWorker) -> IndexPointer { + let index = self.process(worker); + IndexPointer(index.path) + } +} + +impl Reduce for Index { + fn reduce(self, element: Index) -> Self { + let other = element; + let other_path = other.path.clone(); + + let res = self.merge(other); + + std::fs::remove_dir_all(other_path).unwrap(); + + res + } +} + +pub fn run(config: &config::IndexingLocalConfig) -> Result<()> { + let warc_paths = config.warc_source.paths()?; + + let job_config: WarcSource = config.warc_source.clone(); + + let worker = IndexingWorker::new( + config.host_centrality_store_path.clone(), + config.page_centrality_store_path.clone(), + config.page_webgraph_path.clone(), + config.topics_path.clone(), + config.safety_classifier_path.clone(), + ); + + let indexes = warc_paths + .into_par_iter() + .skip(config.skip_warc_files.unwrap_or(0)) + .take(config.limit_warc_files.unwrap_or(usize::MAX)) + .map(|warc_path| Job { + source_config: job_config.clone(), + warc_path, + base_path: config.output_path.clone(), + settings: JobSettings { + host_centrality_threshold: config.host_centrality_threshold, + minimum_clean_words: config.minimum_clean_words, + batch_size: config.batch_size, + }, + }) + .map(|job| { + let pointer: IndexPointer = job.map(&worker); + pointer + }) + .collect(); + + merge(indexes)?; + Ok(()) +} + +pub fn merge(indexes: Vec) -> Result<()> { + let num_indexes = indexes.len(); + let mut it = indexes.into_iter(); + let num_cores = num_cpus::get(); + + let mut threads = Vec::new(); + + for _ in 0..(num_cores + 1) { + let indexes = it + .by_ref() + .take(((num_indexes as f64) / (num_cores as f64)).ceil() as usize) + .collect_vec(); + + if indexes.is_empty() { + break; + } + + threads.push(thread::spawn(move || { + let mut it = indexes.into_iter(); + let mut index = Index::open(it.next().unwrap().0).unwrap(); + + for other in it { + let other_path = other.0; + let other = Index::open(&other_path).unwrap(); + + index = index.merge(other); + + std::fs::remove_dir_all(other_path).unwrap(); + } + + index.inverted_index.merge_into_max_segments(1).unwrap(); + + index + })); + } + + let mut indexes = Vec::new(); + for thread in threads { + indexes.push(thread.join().unwrap()); + } + + let mut it = indexes.into_iter(); + let mut index = it.next().unwrap(); + + for other in it { + let other_path = other.path.clone(); + index = index.merge(other); + std::fs::remove_dir_all(other_path).unwrap(); + } + + index.inverted_index.merge_into_max_segments(1).unwrap(); + + Ok(()) +} diff --git a/crates/core/src/entrypoint/indexer/worker.rs b/crates/core/src/entrypoint/indexer/worker.rs new file mode 100644 index 00000000..450fdd73 --- /dev/null +++ b/crates/core/src/entrypoint/indexer/worker.rs @@ -0,0 +1,275 @@ +// Stract is an open source web search engine. +// Copyright (C) 2024 Stract ApS +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use chrono::Utc; +use std::path::Path; + +use tracing::debug; + +pub use super::indexable_webpage::IndexableWebpage; +pub use super::job::{Job, JobSettings}; + +use crate::human_website_annotations; +use crate::index::Index; +use crate::kv::rocksdb_store::RocksDbStore; +use crate::kv::Kv; +use crate::rake::RakeModel; +use crate::ranking::SignalAggregator; +use crate::webgraph::{Node, NodeID, Webgraph, WebgraphBuilder}; +use crate::webpage::{safety_classifier, Html, Webpage}; + +pub struct IndexingWorker { + host_centrality_store: RocksDbStore, + host_centrality_rank_store: RocksDbStore, + page_centrality_store: Option>, + page_centrality_rank_store: Option>, + page_webgraph: Option, + topics: Option, + safety_classifier: Option, + job_settings: Option, + rake: RakeModel, +} + +impl IndexingWorker { + pub fn new( + host_centrality_store_path: String, + page_centrality_store_path: Option, + page_webgraph_path: Option, + topics_path: Option, + safety_classifier_path: Option, + ) -> Self { + Self { + host_centrality_store: RocksDbStore::open( + Path::new(&host_centrality_store_path).join("harmonic"), + ), + host_centrality_rank_store: RocksDbStore::open( + Path::new(&host_centrality_store_path).join("harmonic_rank"), + ), + page_centrality_store: page_centrality_store_path + .as_ref() + .map(|p| RocksDbStore::open(Path::new(&p).join("approx_harmonic"))), + page_centrality_rank_store: page_centrality_store_path + .as_ref() + .map(|p| RocksDbStore::open(Path::new(&p).join("approx_harmonic_rank"))), + page_webgraph: page_webgraph_path + .map(|path| WebgraphBuilder::new(path).single_threaded().open()), + topics: topics_path.map(|path| human_website_annotations::Mapper::open(path).unwrap()), + safety_classifier: safety_classifier_path + .map(|path| safety_classifier::Model::open(path).unwrap()), + job_settings: None, + rake: RakeModel::default(), + } + } + + pub(super) fn page_centrality_store(&self) -> Option<&RocksDbStore> { + self.page_centrality_store.as_ref() + } + + pub(super) fn page_webgraph(&self) -> Option<&Webgraph> { + self.page_webgraph.as_ref() + } + + pub fn process(&self, job: &Job) -> Index { + job.process(self) + } + + pub fn set_job_settings(&mut self, job_settings: JobSettings) { + self.job_settings = Some(job_settings); + } + + pub fn prepare_webpage(&self, batch: &[IndexableWebpage], out: &mut Vec) { + out.clear(); + let mut signal_aggregator = SignalAggregator::new(None); + + for page in batch { + let IndexableWebpage { + url, + body, + fetch_time_ms, + } = page; + + let mut html = match Html::parse_without_text(body, url) { + Ok(html) => html, + Err(err) => { + debug!("error parsing html: {:?}", err); + continue; + } + }; + + if html.is_no_index() { + debug!("noindex"); + continue; + } + + let title = html.title().unwrap_or_default(); + if title.is_empty() || title.chars().all(|c| c.is_whitespace()) { + debug!("empty title"); + continue; + } + + let node = Node::from(html.url()); + let host_node_id = node.clone().into_host().id(); + + let mut host_centrality = self + .host_centrality_store + .get(&host_node_id) + .unwrap_or_default(); + + let mut host_centrality_rank = self + .host_centrality_rank_store + .get(&host_node_id) + .unwrap_or(u64::MAX as f64); + + if let Some(host_centrality_threshold) = + self.job_settings.and_then(|s| s.host_centrality_threshold) + { + if host_centrality < host_centrality_threshold { + debug!("skipping due to low host_centrality value"); + continue; + } + } + + html.parse_text(); + + if html.empty_all_text() { + debug!("empty all text"); + continue; + } + + if let Some(minimum_clean_words) = self.job_settings.and_then(|s| s.minimum_clean_words) + { + match html.clean_text() { + Some(clean_text) => { + if clean_text.split_whitespace().count() < minimum_clean_words { + debug!("too few clean words"); + continue; + } + } + None => { + debug!("no clean text"); + continue; + } + } + } + + let backlink_labels: Vec = self + .page_webgraph + .as_ref() + .map(|webgraph| { + webgraph + .raw_ingoing_edges_with_labels(&Node::from(html.url()).id()) + .into_iter() + .map(|edge| edge.label) + .filter(|label| !label.is_empty()) + .filter(|label| { + let label = label.to_lowercase(); + let stopwords = [ + "click", + "click here", + "here", + "link", + "website", + "webpage", + "page", + "site", + "url", + "web", + "visit", + "more", + "info", + "information", + "read", + "read more", + ]; + + !stopwords.contains(&label.as_str()) + }) + .collect() + }) + .unwrap_or_default(); + + let mut page_centrality = 0.0; + + if let Some(store) = self.page_centrality_store.as_ref() { + let node_id = node.id(); + + page_centrality = store.get(&node_id).unwrap_or_default(); + } + + let mut page_centrality_rank = u64::MAX as f64; + + if let Some(store) = self.page_centrality_rank_store.as_ref() { + let node_id = node.id(); + + page_centrality_rank = store.get(&node_id).unwrap_or(u64::MAX as f64); + } + + if !page_centrality.is_finite() { + page_centrality = 0.0; + } + + if !host_centrality.is_finite() { + host_centrality = 0.0; + } + + if !page_centrality_rank.is_finite() { + page_centrality_rank = u64::MAX as f64; + } + + if !host_centrality_rank.is_finite() { + host_centrality_rank = u64::MAX as f64; + } + + let mut dmoz_description = None; + + if let Some(mapper) = self.topics.as_ref() { + if let Some(info) = + mapper.get(&html.url().host_str().unwrap_or_default().to_string()) + { + dmoz_description = Some(info.description.clone()) + } + } + + let keywords = html.keywords(&self.rake); + + let mut webpage = Webpage { + html, + backlink_labels, + page_centrality, + page_centrality_rank, + host_centrality, + host_centrality_rank, + fetch_time_ms: *fetch_time_ms, + pre_computed_score: 0.0, + node_id: Some(host_node_id), + dmoz_description, + safety_classification: None, + inserted_at: Utc::now(), + keywords, + }; + + if let Some(model) = self.safety_classifier.as_ref() { + webpage.safety_classification = Some(model.predict(&webpage).label); + } + + signal_aggregator.set_current_timestamp(Utc::now().timestamp().max(0) as usize); + + webpage.pre_computed_score = signal_aggregator.precompute_score(&webpage); + + out.push(webpage); + } + } +} diff --git a/crates/core/src/entrypoint/mod.rs b/crates/core/src/entrypoint/mod.rs index 3622c20b..8f63e4b6 100644 --- a/crates/core/src/entrypoint/mod.rs +++ b/crates/core/src/entrypoint/mod.rs @@ -34,7 +34,6 @@ pub mod webgraph_server; pub use centrality::Centrality; pub use entity::EntityIndexer; -pub use indexer::Indexer; use tracing::{debug, log::error}; pub use webgraph::Webgraph; pub mod live_index; diff --git a/crates/core/src/index.rs b/crates/core/src/index.rs index 73cf0906..7152a6c6 100644 --- a/crates/core/src/index.rs +++ b/crates/core/src/index.rs @@ -76,8 +76,8 @@ impl Index { Ok(s) } - pub fn insert(&self, webpage: Webpage) -> Result<()> { - if let Ok(region) = Region::guess_from(&webpage) { + pub fn insert(&self, webpage: &Webpage) -> Result<()> { + if let Ok(region) = Region::guess_from(webpage) { let mut reg = self.region_count.lock().unwrap_or_else(|e| e.into_inner()); reg.increment(®ion); } @@ -165,7 +165,7 @@ mod tests { index .insert( - Webpage::new( + &Webpage::new( &format!( r#" @@ -186,7 +186,7 @@ mod tests { .expect("failed to insert webpage"); index .insert( - Webpage::new( + &Webpage::new( &format!( r#" @@ -207,7 +207,7 @@ mod tests { .expect("failed to insert webpage"); index .insert( - Webpage::new( + &Webpage::new( &format!( r#" diff --git a/crates/core/src/inverted_index.rs b/crates/core/src/inverted_index.rs index 3a13552f..80938af9 100644 --- a/crates/core/src/inverted_index.rs +++ b/crates/core/src/inverted_index.rs @@ -41,7 +41,6 @@ use crate::config::SnippetConfig; use crate::fastfield_reader::FastFieldReader; use crate::query::shortcircuit::ShortCircuitQuery; use crate::query::Query; -use crate::rake::RakeModel; use crate::ranking::initial::Score; use crate::ranking::pipeline::RankingWebsite; use crate::ranking::SignalAggregator; @@ -165,7 +164,6 @@ pub struct InvertedIndex { schema: Arc, snippet_config: SnippetConfig, fastfield_reader: FastFieldReader, - rake: RakeModel, } impl InvertedIndex { @@ -236,7 +234,6 @@ impl InvertedIndex { tantivy_index, snippet_config: SnippetConfig::default(), fastfield_reader, - rake: RakeModel::default(), }) } @@ -287,11 +284,11 @@ impl InvertedIndex { Ok(s) } - pub fn insert(&self, webpage: Webpage) -> Result<()> { + pub fn insert(&self, webpage: &Webpage) -> Result<()> { self.writer .as_ref() .expect("writer has not been prepared") - .add_document(webpage.into_tantivy(&self.schema, &self.rake)?)?; + .add_document(webpage.as_tantivy(&self.schema)?)?; Ok(()) } @@ -880,7 +877,7 @@ mod tests { index .insert( - Webpage::new( + &Webpage::new( &format!( r#" @@ -919,7 +916,7 @@ mod tests { index .insert( - Webpage::new( + &Webpage::new( &format!( r#" @@ -967,7 +964,7 @@ mod tests { index .insert( - Webpage::new( + &Webpage::new( &format!( r#" @@ -1015,7 +1012,7 @@ mod tests { index .insert( - Webpage::new( + &Webpage::new( &format!( r#" @@ -1063,7 +1060,7 @@ mod tests { index .insert( - Webpage::new( + &Webpage::new( &format!( r#" @@ -1081,7 +1078,7 @@ mod tests { ) .expect("failed to insert webpage"); index - .insert(Webpage { + .insert(&Webpage { html: Html::parse( &format!( r#" @@ -1143,7 +1140,7 @@ mod tests { index .insert( - Webpage::new( + &Webpage::new( &format!( r#" @@ -1192,7 +1189,7 @@ mod tests { index .insert( - Webpage::new( + &Webpage::new( &format!( r#" @@ -1240,7 +1237,7 @@ mod tests { index1 .insert( - Webpage::new( + &Webpage::new( &format!( r#" @@ -1264,7 +1261,7 @@ mod tests { index2 .insert( - Webpage::new( + &Webpage::new( &format!( r#" @@ -1337,7 +1334,7 @@ mod tests { index .insert( - Webpage::new( + &Webpage::new( &format!( r#" @@ -1375,7 +1372,7 @@ mod tests { index .insert( - Webpage::new( + &Webpage::new( &format!( r#" @@ -1422,7 +1419,7 @@ mod tests { let mut index = InvertedIndex::temporary().expect("Unable to open index"); index - .insert(Webpage::new( + .insert(&Webpage::new( &format!( r#" @@ -1501,7 +1498,7 @@ mod tests { let mut index = InvertedIndex::temporary().expect("Unable to open index"); index - .insert(Webpage::new( + .insert(&Webpage::new( &format!( r#" @@ -1535,7 +1532,7 @@ mod tests { let mut index = InvertedIndex::temporary().expect("Unable to open index"); index - .insert(Webpage::new( + .insert(&Webpage::new( &format!( r#" @@ -1617,8 +1614,8 @@ mod tests { ..Default::default() }; - index.insert(a).unwrap(); - index.insert(b).unwrap(); + index.insert(&a).unwrap(); + index.insert(&b).unwrap(); index.commit().expect("failed to commit index"); diff --git a/crates/core/src/kv/rocksdb_store.rs b/crates/core/src/kv/rocksdb_store.rs index 4c306199..e0d7cc07 100644 --- a/crates/core/src/kv/rocksdb_store.rs +++ b/crates/core/src/kv/rocksdb_store.rs @@ -51,7 +51,7 @@ where options.set_level_compaction_dynamic_level_bytes(true); options.set_bytes_per_sync(1048576); let mut block_options = BlockBasedOptions::default(); - block_options.set_block_size(1024 * 1024 * 1024); // 1 GB + block_options.set_block_size(32 * 1024); // 32 kb block_options.set_format_version(5); block_options.set_cache_index_and_filter_blocks(true); block_options.set_pin_l0_filter_and_index_blocks_in_cache(true); diff --git a/crates/core/src/live_index.rs b/crates/core/src/live_index.rs index e659dc8e..482aec54 100644 --- a/crates/core/src/live_index.rs +++ b/crates/core/src/live_index.rs @@ -17,7 +17,7 @@ use anyhow::Result; use std::{ path::Path, - sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard}, + sync::{Arc, Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard}, time::{Duration, SystemTime}, }; @@ -29,7 +29,7 @@ use crate::{ crawler::{ reqwest_client, CrawlDatum, DatumStream, JobExecutor, RetrieableUrl, WeightedUrl, WorkerJob, }, - entrypoint::indexer::IndexingWorker, + entrypoint::indexer::{self, IndexingWorker}, feed::{ self, scheduler::{Domain, DomainFeeds, Split}, @@ -41,6 +41,7 @@ const PRUNE_INTERVAL: Duration = Duration::from_secs(60 * 60); // 1 hour const FEED_CHECK_INTERVAL: Duration = Duration::from_secs(60 * 10); // 10 minutes const AUTO_COMMIT_INTERVAL: Duration = Duration::from_secs(60 * 5); // 5 minutes const EVENT_LOOP_INTERVAL: Duration = Duration::from_secs(5); +const BATCH_SIZE: usize = 512; #[derive(Debug, Clone)] struct Feeds { @@ -108,6 +109,7 @@ impl DownloadedDb { struct Indexer { search_index: Arc>, worker: IndexingWorker, + write_batch: Arc>>, } enum CrawlResults { @@ -221,6 +223,40 @@ impl Crawler { } } +impl Indexer { + fn maybe_write_batch_to_index(&self) { + let batch = self.write_batch.lock().unwrap_or_else(|e| e.into_inner()); + + if batch.len() < BATCH_SIZE { + return; + } + + let mut out = Vec::new(); + self.worker.prepare_webpage(&batch, &mut out); + + let search_index = self.search_index.write().unwrap_or_else(|e| e.into_inner()); + for webpage in &out { + search_index.insert(webpage).ok(); + } + } + + fn write_batch_to_index(&self) { + let batch = self.write_batch.lock().unwrap_or_else(|e| e.into_inner()); + + if batch.is_empty() { + return; + } + + let mut out = Vec::new(); + self.worker.prepare_webpage(&batch, &mut out); + + let search_index = self.search_index.write().unwrap_or_else(|e| e.into_inner()); + for webpage in &out { + search_index.insert(webpage).ok(); + } + } +} + pub struct Index { search_index: Arc>, } @@ -267,21 +303,19 @@ impl Index { impl DatumStream for Indexer { async fn write(&self, crawl_datum: CrawlDatum) -> Result<()> { - let webpage = self.worker.prepare_webpage( - &crawl_datum.body, - crawl_datum.url.as_str(), - crawl_datum.fetch_time_ms, - )?; - - self.search_index - .read() + self.write_batch + .lock() .unwrap_or_else(|e| e.into_inner()) - .insert(webpage)?; + .push(crawl_datum.into()); + + self.maybe_write_batch_to_index(); Ok(()) } async fn finish(&self) -> Result<()> { + self.write_batch_to_index(); + self.search_index .write() .unwrap_or_else(|e| e.into_inner()) @@ -335,6 +369,7 @@ impl IndexManager { None, config.safety_classifier_path.clone(), ), + write_batch: Arc::new(Mutex::new(Vec::new())), }); let crawler_config = CrawlerConfig::from(&config); diff --git a/crates/core/src/main.rs b/crates/core/src/main.rs index 30d739db..3465611c 100644 --- a/crates/core/src/main.rs +++ b/crates/core/src/main.rs @@ -243,7 +243,7 @@ fn main() -> Result<()> { Commands::Indexer { options } => match options { IndexingOptions::Search { config_path } => { let config = load_toml_config(config_path); - entrypoint::Indexer::run(&config)?; + entrypoint::indexer::run(&config)?; } IndexingOptions::Entity { wikipedia_dump_path, @@ -258,7 +258,7 @@ fn main() -> Result<()> { .into_iter() .map(entrypoint::indexer::IndexPointer::from) .collect::>(); - entrypoint::indexer::Indexer::merge(pointers)?; + entrypoint::indexer::merge(pointers)?; } }, Commands::Centrality { mode } => { diff --git a/crates/core/src/query/mod.rs b/crates/core/src/query/mod.rs index 589605bf..49251c82 100644 --- a/crates/core/src/query/mod.rs +++ b/crates/core/src/query/mod.rs @@ -396,7 +396,7 @@ mod tests { index .insert( - Webpage::new( + &Webpage::new( r#" @@ -414,7 +414,7 @@ mod tests { .expect("failed to insert webpage"); index .insert( - Webpage::new( + &Webpage::new( r#" @@ -444,7 +444,7 @@ mod tests { index .insert( - Webpage::new( + &Webpage::new( r#" @@ -462,7 +462,7 @@ mod tests { .expect("failed to insert webpage"); index .insert( - Webpage::new( + &Webpage::new( r#" @@ -512,7 +512,7 @@ mod tests { index .insert( - Webpage::new( + &Webpage::new( r#" @@ -530,7 +530,7 @@ mod tests { .expect("failed to insert webpage"); index .insert( - Webpage::new( + &Webpage::new( r#" @@ -564,7 +564,7 @@ mod tests { index .insert( - Webpage::new( + &Webpage::new( r#" @@ -582,7 +582,7 @@ mod tests { .expect("failed to insert webpage"); index .insert( - Webpage::new( + &Webpage::new( r#" @@ -652,7 +652,7 @@ mod tests { index .insert( - Webpage::new( + &Webpage::new( &format!( r#" @@ -673,7 +673,7 @@ mod tests { .expect("failed to insert webpage"); index .insert( - Webpage::new( + &Webpage::new( &format!( r#" @@ -725,7 +725,7 @@ mod tests { index .insert( - Webpage::new( + &Webpage::new( &format!( r#" @@ -746,7 +746,7 @@ mod tests { .expect("failed to insert webpage"); index .insert( - Webpage::new( + &Webpage::new( &format!( r#" @@ -790,7 +790,7 @@ mod tests { index .insert( - Webpage::new( + &Webpage::new( &format!( r#" @@ -812,7 +812,7 @@ mod tests { index .insert( - Webpage::new( + &Webpage::new( &format!( r#" @@ -876,7 +876,7 @@ mod tests { webpage.safety_classification = Some(safety_classifier::Label::SFW); webpage.html.set_clean_text("sfw".to_string()); - index.insert(webpage).expect("failed to insert webpage"); + index.insert(&webpage).expect("failed to insert webpage"); let mut webpage = Webpage::new( &format!( @@ -899,7 +899,7 @@ mod tests { webpage.safety_classification = Some(safety_classifier::Label::NSFW); webpage.html.set_clean_text("nsfw".to_string()); - index.insert(webpage).expect("failed to insert webpage"); + index.insert(&webpage).expect("failed to insert webpage"); index.commit().expect("failed to commit index"); let searcher = LocalSearcher::from(index); @@ -931,7 +931,7 @@ mod tests { index .insert( - Webpage::new( + &Webpage::new( &format!( r#" @@ -952,7 +952,7 @@ mod tests { .expect("failed to insert webpage"); index .insert( - Webpage::new( + &Webpage::new( &format!( r#" @@ -973,7 +973,7 @@ mod tests { .expect("failed to insert webpage"); index .insert( - Webpage::new( + &Webpage::new( &format!( r#" @@ -1027,6 +1027,7 @@ mod tests { query: "test site:www.first.com".to_string(), ..Default::default() }; + let result = searcher.search(&query).expect("Search failed"); assert_eq!(result.webpages.len(), 1); } diff --git a/crates/core/src/query/optic.rs b/crates/core/src/query/optic.rs index e51e6907..040cf05f 100644 --- a/crates/core/src/query/optic.rs +++ b/crates/core/src/query/optic.rs @@ -283,7 +283,7 @@ mod tests { let mut index = Index::temporary().expect("Unable to open index"); index - .insert(Webpage { + .insert(&Webpage { html: Html::parse( &format!( r#" @@ -307,7 +307,7 @@ mod tests { }) .expect("failed to insert webpage"); index - .insert(Webpage { + .insert(&Webpage { html: Html::parse( &format!( r#" @@ -408,7 +408,7 @@ mod tests { let mut index = Index::temporary().expect("Unable to open index"); index - .insert(Webpage { + .insert(&Webpage { html: Html::parse( &format!( r#" @@ -431,7 +431,7 @@ mod tests { }) .expect("failed to insert webpage"); index - .insert(Webpage { + .insert(&Webpage { html: Html::parse( &format!( r#" @@ -505,7 +505,7 @@ mod tests { let mut index = Index::temporary().expect("Unable to open index"); index - .insert(Webpage { + .insert(&Webpage { html: Html::parse( &format!( r#" @@ -529,7 +529,7 @@ mod tests { }) .expect("failed to insert webpage"); index - .insert(Webpage { + .insert(&Webpage { html: Html::parse( &format!( r#" @@ -553,7 +553,7 @@ mod tests { }) .expect("failed to insert webpage"); index - .insert(Webpage { + .insert(&Webpage { html: Html::parse( &format!( r#" @@ -653,7 +653,7 @@ mod tests { let graph = writer.finalize(); index - .insert(Webpage { + .insert(&Webpage { html: Html::parse( &format!( r#" @@ -678,7 +678,7 @@ mod tests { }) .expect("failed to insert webpage"); index - .insert(Webpage { + .insert(&Webpage { html: Html::parse( &format!( r#" @@ -703,7 +703,7 @@ mod tests { }) .expect("failed to insert webpage"); index - .insert(Webpage { + .insert(&Webpage { html: Html::parse( &format!( r#" @@ -764,7 +764,7 @@ mod tests { let mut index = Index::temporary().expect("Unable to open index"); index - .insert(Webpage { + .insert(&Webpage { html: Html::parse( &format!( r#" @@ -800,7 +800,7 @@ mod tests { }) .expect("failed to insert webpage"); index - .insert(Webpage { + .insert(&Webpage { html: Html::parse( &format!( r##" @@ -943,7 +943,7 @@ mod tests { let mut index = Index::temporary().expect("Unable to open index"); index - .insert(Webpage { + .insert(&Webpage { html: Html::parse( &format!( r#" @@ -1009,7 +1009,7 @@ mod tests { let mut index = Index::temporary().expect("Unable to open index"); index - .insert(Webpage { + .insert(&Webpage { html: Html::parse( &format!( r#" @@ -1033,7 +1033,7 @@ mod tests { }) .expect("failed to insert webpage"); index - .insert(Webpage { + .insert(&Webpage { html: Html::parse( &format!( r#" @@ -1095,7 +1095,7 @@ mod tests { let mut index = Index::temporary().expect("Unable to open index"); index - .insert(Webpage { + .insert(&Webpage { html: Html::parse( &format!( r#" @@ -1251,7 +1251,7 @@ mod tests { let mut index = Index::temporary().expect("Unable to open index"); index - .insert(Webpage { + .insert(&Webpage { html: Html::parse( &format!( r#" @@ -1319,7 +1319,7 @@ mod tests { let mut index = Index::temporary().expect("Unable to open index"); index - .insert(Webpage { + .insert(&Webpage { html: Html::parse( &format!( r#" @@ -1374,7 +1374,7 @@ mod tests { let mut index = Index::temporary().expect("Unable to open index"); index - .insert(Webpage { + .insert(&Webpage { html: Html::parse( &format!( r#" @@ -1398,7 +1398,7 @@ mod tests { }) .expect("failed to insert webpage"); index - .insert(Webpage { + .insert(&Webpage { html: Html::parse( &format!( r#" @@ -1516,7 +1516,7 @@ mod tests { page.html.set_clean_text("".to_string()); - index.insert(page).expect("failed to insert webpage"); + index.insert(&page).expect("failed to insert webpage"); index.commit().expect("failed to commit index"); let searcher = LocalSearcher::from(index); @@ -1589,7 +1589,7 @@ mod tests { page.html.set_clean_text("".to_string()); - index.insert(page).expect("failed to insert webpage"); + index.insert(&page).expect("failed to insert webpage"); let mut page = Webpage { html: Html::parse( @@ -1612,7 +1612,7 @@ mod tests { page.html.set_clean_text("".to_string()); - index.insert(page).expect("failed to insert webpage"); + index.insert(&page).expect("failed to insert webpage"); index.commit().expect("failed to commit index"); let searcher = LocalSearcher::from(index); @@ -1668,7 +1668,7 @@ mod tests { page.html.set_clean_text("".to_string()); - index.insert(page).expect("failed to insert webpage"); + index.insert(&page).expect("failed to insert webpage"); let mut page = Webpage { html: Html::parse( @@ -1691,7 +1691,7 @@ mod tests { page.html.set_clean_text("".to_string()); - index.insert(page).expect("failed to insert webpage"); + index.insert(&page).expect("failed to insert webpage"); index.commit().expect("failed to commit index"); let searcher = LocalSearcher::from(index); @@ -1760,7 +1760,7 @@ mod tests { page.html.set_clean_text("".to_string()); - index.insert(page).expect("failed to insert webpage"); + index.insert(&page).expect("failed to insert webpage"); let mut page = Webpage { html: Html::parse( @@ -1782,7 +1782,7 @@ mod tests { page.html.set_clean_text("".to_string()); - index.insert(page).expect("failed to insert webpage"); + index.insert(&page).expect("failed to insert webpage"); let mut page = Webpage { html: Html::parse( @@ -1804,7 +1804,7 @@ mod tests { page.html.set_clean_text("".to_string()); - index.insert(page).expect("failed to insert webpage"); + index.insert(&page).expect("failed to insert webpage"); index.commit().expect("failed to commit index"); @@ -1849,7 +1849,7 @@ mod tests { page.html.set_clean_text("".to_string()); - index.insert(page).expect("failed to insert webpage"); + index.insert(&page).expect("failed to insert webpage"); let mut page = Webpage { html: Html::parse( @@ -1871,7 +1871,7 @@ mod tests { page.html.set_clean_text("".to_string()); - index.insert(page).expect("failed to insert webpage"); + index.insert(&page).expect("failed to insert webpage"); let mut page = Webpage { html: Html::parse( @@ -1893,7 +1893,7 @@ mod tests { page.html.set_clean_text("".to_string()); - index.insert(page).expect("failed to insert webpage"); + index.insert(&page).expect("failed to insert webpage"); index.commit().expect("failed to commit index"); diff --git a/crates/core/src/ranking/inbound_similarity.rs b/crates/core/src/ranking/inbound_similarity.rs index 3d0646d0..1f54f75d 100644 --- a/crates/core/src/ranking/inbound_similarity.rs +++ b/crates/core/src/ranking/inbound_similarity.rs @@ -400,7 +400,7 @@ mod tests { let mut index = Index::temporary().expect("Unable to open index"); index - .insert(Webpage { + .insert(&Webpage { html: Html::parse( &format!( r#" @@ -424,7 +424,7 @@ mod tests { }) .expect("failed to insert webpage"); index - .insert(Webpage { + .insert(&Webpage { html: Html::parse( &format!( r#" diff --git a/crates/core/src/ranking/mod.rs b/crates/core/src/ranking/mod.rs index 6082f789..5facffec 100644 --- a/crates/core/src/ranking/mod.rs +++ b/crates/core/src/ranking/mod.rs @@ -145,7 +145,7 @@ mod tests { let mut index = Index::temporary().expect("Unable to open index"); index - .insert(Webpage { + .insert(&Webpage { html: Html::parse( &format!( r#" @@ -169,7 +169,7 @@ mod tests { }) .expect("failed to insert webpage"); index - .insert(Webpage { + .insert(&Webpage { html: Html::parse( &format!( r#" @@ -211,7 +211,7 @@ mod tests { let mut index = Index::temporary().expect("Unable to open index"); index - .insert(Webpage { + .insert(&Webpage { html: Html::parse( &format!( r#" @@ -235,7 +235,7 @@ mod tests { }) .expect("failed to insert webpage"); index - .insert(Webpage { + .insert(&Webpage { html: Html::parse( &format!( r#" @@ -278,7 +278,7 @@ mod tests { let mut index = Index::temporary().expect("Unable to open index"); index - .insert(Webpage { + .insert(&Webpage { html: Html::parse( &format!( r#" @@ -303,7 +303,7 @@ mod tests { }) .expect("failed to insert webpage"); index - .insert(Webpage { + .insert(&Webpage { html: Html::parse( &format!( r#" @@ -353,7 +353,7 @@ mod tests { let mut index = Index::temporary().expect("Unable to open index"); index - .insert(Webpage { + .insert(&Webpage { html: Html::parse( r#" @@ -373,7 +373,7 @@ mod tests { }) .expect("failed to insert webpage"); index - .insert(Webpage { + .insert(&Webpage { html: Html::parse(r#" @@ -423,7 +423,7 @@ mod tests { let mut index = Index::temporary().expect("Unable to open index"); index - .insert(Webpage { + .insert(&Webpage { html: Html::parse( r#" @@ -444,7 +444,7 @@ mod tests { }) .expect("failed to insert webpage"); index - .insert(Webpage { + .insert(&Webpage { html: Html::parse( r#" @@ -483,7 +483,7 @@ mod tests { let mut index = Index::temporary().expect("Unable to open index"); index - .insert(Webpage { + .insert(&Webpage { html: Html::parse( r#" @@ -505,7 +505,7 @@ mod tests { .expect("failed to insert webpage"); index - .insert(Webpage { + .insert(&Webpage { html: Html::parse( r#" @@ -527,7 +527,7 @@ mod tests { .expect("failed to insert webpage"); index - .insert(Webpage { + .insert(&Webpage { html: Html::parse( r#" @@ -596,7 +596,7 @@ mod tests { let mut index = Index::temporary().expect("Unable to open index"); index - .insert(Webpage { + .insert(&Webpage { html: Html::parse( &format!( r#" @@ -619,7 +619,7 @@ mod tests { }) .expect("failed to insert webpage"); index - .insert(Webpage { + .insert(&Webpage { html: Html::parse( &format!( r#" @@ -669,7 +669,7 @@ mod tests { let mut index = Index::temporary().expect("Unable to open index"); index - .insert(Webpage { + .insert(&Webpage { html: Html::parse( &format!( r#" @@ -693,7 +693,7 @@ mod tests { }) .expect("failed to insert webpage"); index - .insert(Webpage { + .insert(&Webpage { html: Html::parse( &format!( r#" @@ -717,7 +717,7 @@ mod tests { }) .expect("failed to insert webpage"); index - .insert(Webpage { + .insert(&Webpage { html: Html::parse( &format!( r#" diff --git a/crates/core/src/ranking/optics.rs b/crates/core/src/ranking/optics.rs index f1169576..537828e8 100644 --- a/crates/core/src/ranking/optics.rs +++ b/crates/core/src/ranking/optics.rs @@ -85,7 +85,7 @@ mod tests { let graph = wrt.finalize(); index - .insert(Webpage { + .insert(&Webpage { html: Html::parse( &format!( r#" @@ -110,7 +110,7 @@ mod tests { }) .expect("failed to insert webpage"); index - .insert(Webpage { + .insert(&Webpage { html: Html::parse( &format!( r#" @@ -135,7 +135,7 @@ mod tests { }) .expect("failed to insert webpage"); index - .insert(Webpage { + .insert(&Webpage { html: Html::parse( &format!( r#" diff --git a/crates/core/src/searcher/local.rs b/crates/core/src/searcher/local.rs index 90f98e3a..f1cf1b5a 100644 --- a/crates/core/src/searcher/local.rs +++ b/crates/core/src/searcher/local.rs @@ -430,7 +430,7 @@ mod tests { for i in 0..NUM_WEBSITES { index - .insert(Webpage { + .insert(&Webpage { html: Html::parse( r#" diff --git a/crates/core/src/snippet.rs b/crates/core/src/snippet.rs index 1da0ef53..a6c3e38d 100644 --- a/crates/core/src/snippet.rs +++ b/crates/core/src/snippet.rs @@ -388,7 +388,7 @@ Survey in 2016, 2017, and 2018."#; index .insert( - Webpage::new( + &Webpage::new( &format!( r#" @@ -427,7 +427,7 @@ Survey in 2016, 2017, and 2018."#; index .insert( - Webpage::new( + &Webpage::new( &format!( r#" @@ -466,7 +466,7 @@ Survey in 2016, 2017, and 2018."#; index .insert( - Webpage::new( + &Webpage::new( &format!( r#" diff --git a/crates/core/src/warc.rs b/crates/core/src/warc.rs index cffeff1c..f8874bcb 100644 --- a/crates/core/src/warc.rs +++ b/crates/core/src/warc.rs @@ -51,8 +51,6 @@ fn decode_string(raw: &[u8]) -> String { let (enc, conf) = detector.guess_assess(None, true); if conf { - dbg!(enc.name()); - let (cow, _, had_errors) = enc.decode(raw); if !had_errors { return cow.to_string(); diff --git a/crates/core/src/webgraph/mod.rs b/crates/core/src/webgraph/mod.rs index 6fa51926..7cc19220 100644 --- a/crates/core/src/webgraph/mod.rs +++ b/crates/core/src/webgraph/mod.rs @@ -485,22 +485,24 @@ impl Id2NodeDb { opts.set_target_file_size_base(512 * 1024 * 1024); // 512 MB opts.set_target_file_size_multiplier(10); - opts.set_compression_type(rocksdb::DBCompressionType::Lz4); + opts.set_compression_type(rocksdb::DBCompressionType::None); let mut block_opts = rocksdb::BlockBasedOptions::default(); - let cache = rocksdb::Cache::new_lru_cache(8 * 1024 * 1024 * 1024); // 8 gb - opts.set_block_based_table_factory(&block_opts); + let cache = rocksdb::Cache::new_lru_cache(1024 * 1024 * 1024); // 1 gb + block_opts.set_ribbon_filter(10.0); // some recommended settings (https://github.com/facebook/rocksdb/wiki/Setup-Options-and-Basic-Tuning) opts.set_level_compaction_dynamic_level_bytes(true); opts.set_bytes_per_sync(1048576); - block_opts.set_block_size(16 * 1024); + block_opts.set_block_size(32 * 1024); // 32 kb block_opts.set_format_version(5); block_opts.set_cache_index_and_filter_blocks(true); block_opts.set_pin_l0_filter_and_index_blocks_in_cache(true); block_opts.set_block_cache(&cache); + opts.set_block_based_table_factory(&block_opts); + let db = rocksdb::DB::open(&opts, path).unwrap(); Self { db, _cache: cache } diff --git a/crates/core/src/webgraph/store.rs b/crates/core/src/webgraph/store.rs index 55333416..7db3ad5d 100644 --- a/crates/core/src/webgraph/store.rs +++ b/crates/core/src/webgraph/store.rs @@ -314,7 +314,7 @@ impl EdgeStore { options.set_level_compaction_dynamic_level_bytes(true); options.set_bytes_per_sync(1048576); let mut block_options = BlockBasedOptions::default(); - block_options.set_block_size(16 * 1024); + block_options.set_block_size(32 * 1024); block_options.set_format_version(5); block_options.set_cache_index_and_filter_blocks(true); block_options.set_pin_l0_filter_and_index_blocks_in_cache(true); diff --git a/crates/core/src/webpage/html/into_tantivy.rs b/crates/core/src/webpage/html/into_tantivy.rs index 443d5ff7..5b616b30 100644 --- a/crates/core/src/webpage/html/into_tantivy.rs +++ b/crates/core/src/webpage/html/into_tantivy.rs @@ -129,11 +129,7 @@ impl Html { .unwrap_or_default() } - pub fn into_tantivy( - self, - schema: &tantivy::schema::Schema, - rake: &RakeModel, - ) -> Result { + pub fn as_tantivy(&self, schema: &tantivy::schema::Schema) -> Result { let mut doc = TantivyDocument::new(); let title = self.pretokenize_title()?; @@ -415,10 +411,6 @@ impl Html { doc.add_text(tantivy_field, ""); } } - Field::Text(TextField::Keywords) => { - let rake_keywords = self.keywords(rake); - doc.add_text(tantivy_field, rake_keywords.join("\n")); - } Field::Text(TextField::AllBody) => { doc.add_pre_tokenized_text(tantivy_field, all_text.clone()) } @@ -562,6 +554,7 @@ impl Html { | Field::Fast(FastField::PreComputedScore) | Field::Fast(FastField::Region) | Field::Fast(FastField::HostNodeID) + | Field::Text(TextField::Keywords) | Field::Text(TextField::DmozDescription) => {} } } diff --git a/crates/core/src/webpage/html/mod.rs b/crates/core/src/webpage/html/mod.rs index 68f89979..59175ad4 100644 --- a/crates/core/src/webpage/html/mod.rs +++ b/crates/core/src/webpage/html/mod.rs @@ -493,7 +493,6 @@ mod tests { use std::collections::HashMap; use crate::{ - rake::RakeModel, schema::create_schema, webpage::{url_ext::UrlExt, Link}, }; @@ -741,9 +740,7 @@ mod tests { assert!(!webpage.all_text().unwrap().is_empty()); let schema = create_schema(); - webpage - .into_tantivy(&schema, &RakeModel::default()) - .unwrap(); + webpage.as_tantivy(&schema).unwrap(); } #[test] diff --git a/crates/core/src/webpage/mod.rs b/crates/core/src/webpage/mod.rs index 4ebd2c46..9814c2eb 100644 --- a/crates/core/src/webpage/mod.rs +++ b/crates/core/src/webpage/mod.rs @@ -15,7 +15,6 @@ // along with this program. If not, see . use crate::{ - rake::RakeModel, schema::{FastField, TextField}, webgraph::NodeID, Result, @@ -53,6 +52,7 @@ pub struct Webpage { pub dmoz_description: Option, pub safety_classification: Option, pub inserted_at: DateTime, + pub keywords: Vec, } #[cfg(test)] @@ -71,6 +71,7 @@ impl Default for Webpage { dmoz_description: Default::default(), safety_classification: Default::default(), inserted_at: Utc::now(), + keywords: Default::default(), } } } @@ -102,16 +103,12 @@ impl Webpage { }) } - pub fn into_tantivy( - self, - schema: &tantivy::schema::Schema, - rake: &RakeModel, - ) -> Result { - let region = Region::guess_from(&self); + pub fn as_tantivy(&self, schema: &tantivy::schema::Schema) -> Result { + let region = Region::guess_from(self); let dmoz_description = self.dmoz_description(); - let mut doc = self.html.into_tantivy(schema, rake)?; + let mut doc = self.html.as_tantivy(schema)?; if let Ok(region) = region { doc.add_u64( @@ -130,7 +127,7 @@ impl Webpage { } let backlink_text: String = - itertools::intersperse(self.backlink_labels, "\n".to_string()).collect(); + itertools::intersperse(self.backlink_labels.clone(), "\n".to_string()).collect(); doc.add_text( schema @@ -139,6 +136,13 @@ impl Webpage { backlink_text, ); + doc.add_text( + schema + .get_field(Field::Text(TextField::Keywords).name()) + .expect("Failed to get keywords field"), + self.keywords.join("\n"), + ); + doc.add_date( schema .get_field(Field::Text(TextField::InsertionTimestamp).name())