From 1e931c01b0e32120d4f79e663d37b84f393a3859 Mon Sep 17 00:00:00 2001 From: Mikkel Denker Date: Mon, 25 Mar 2024 11:46:22 +0100 Subject: [PATCH] split webgraph and inverted_index into smaller files --- crates/core/src/inverted_index/indexing.rs | 260 +++++++ .../mod.rs} | 573 ++-------------- crates/core/src/inverted_index/search.rs | 283 ++++++++ crates/core/src/snippet.rs | 11 +- crates/core/src/webgraph/builder.rs | 51 ++ crates/core/src/webgraph/compression.rs | 38 ++ crates/core/src/webgraph/edge.rs | 90 +++ crates/core/src/webgraph/id_node_db.rs | 147 ++++ crates/core/src/webgraph/mod.rs | 642 +----------------- crates/core/src/webgraph/node.rs | 171 +++++ crates/core/src/webgraph/segment.rs | 14 +- crates/core/src/webgraph/shortest_path.rs | 122 ++++ crates/core/src/webgraph/store.rs | 30 +- crates/core/src/webgraph/writer.rs | 128 ++++ 14 files changed, 1386 insertions(+), 1174 deletions(-) create mode 100644 crates/core/src/inverted_index/indexing.rs rename crates/core/src/{inverted_index.rs => inverted_index/mod.rs} (70%) create mode 100644 crates/core/src/inverted_index/search.rs create mode 100644 crates/core/src/webgraph/builder.rs create mode 100644 crates/core/src/webgraph/compression.rs create mode 100644 crates/core/src/webgraph/edge.rs create mode 100644 crates/core/src/webgraph/id_node_db.rs create mode 100644 crates/core/src/webgraph/node.rs create mode 100644 crates/core/src/webgraph/shortest_path.rs create mode 100644 crates/core/src/webgraph/writer.rs diff --git a/crates/core/src/inverted_index/indexing.rs b/crates/core/src/inverted_index/indexing.rs new file mode 100644 index 00000000..d11499dc --- /dev/null +++ b/crates/core/src/inverted_index/indexing.rs @@ -0,0 +1,260 @@ +// Stract is an open source web search engine. +// Copyright (C) 2024 Stract ApS +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see + +use tantivy::merge_policy::NoMergePolicy; + +use tantivy::{IndexWriter, SegmentMeta}; + +use crate::fastfield_reader::FastFieldReader; + +use crate::schema::text_field; +use crate::schema::text_field::TextField; + +use crate::webpage::Webpage; +use crate::Result; +use std::collections::HashSet; +use std::fs; +use std::path::Path; + +use super::InvertedIndex; + +struct SegmentMergeCandidate { + num_docs: u32, + segments: Vec, +} + +pub fn merge_tantivy_segments>( + writer: &mut IndexWriter, + mut segments: Vec, + base_path: P, + max_num_segments: u64, +) -> Result<()> { + assert!(max_num_segments > 0); + + if segments.len() <= max_num_segments as usize { + return Ok(()); + } + + let num_segments = (max_num_segments + 1) / 2; // ceil(num_segments/2) + + let mut merge_segments = Vec::new(); + + for _ in 0..num_segments { + merge_segments.push(SegmentMergeCandidate { + num_docs: 0, + segments: Vec::new(), + }); + } + + segments.sort_by_key(|b| std::cmp::Reverse(b.num_docs())); + + for segment in segments { + let best_candidate = merge_segments + .iter_mut() + .min_by(|a, b| a.num_docs.cmp(&b.num_docs)) + .unwrap(); + + best_candidate.num_docs += segment.num_docs(); + best_candidate.segments.push(segment); + } + + for merge in merge_segments + .into_iter() + .filter(|merge| !merge.segments.is_empty()) + { + let segment_ids: Vec<_> = merge.segments.iter().map(|segment| segment.id()).collect(); + writer.merge(&segment_ids[..]).wait()?; + + for segment in merge.segments { + for file in segment.list_files() { + std::fs::remove_file(base_path.as_ref().join(file)).ok(); + } + } + } + + Ok(()) +} + +impl InvertedIndex { + pub fn prepare_writer(&mut self) -> Result<()> { + if self.writer.is_some() { + return Ok(()); + } + + let writer = self + .tantivy_index + .writer_with_num_threads(1, 1_000_000_000)?; + + let merge_policy = NoMergePolicy; + writer.set_merge_policy(Box::new(merge_policy)); + + self.writer = Some(writer); + + Ok(()) + } + + pub fn set_auto_merge_policy(&mut self) { + let merge_policy = tantivy::merge_policy::LogMergePolicy::default(); + self.writer + .as_mut() + .expect("writer has not been prepared") + .set_merge_policy(Box::new(merge_policy)); + } + + pub fn insert(&self, webpage: &Webpage) -> Result<()> { + self.writer + .as_ref() + .expect("writer has not been prepared") + .add_document(webpage.as_tantivy(&self.schema)?)?; + Ok(()) + } + + pub fn commit(&mut self) -> Result<()> { + self.prepare_writer()?; + self.writer + .as_mut() + .expect("writer has not been prepared") + .commit()?; + self.reader.reload()?; + self.fastfield_reader = FastFieldReader::new(&self.reader.searcher()); + + Ok(()) + } + + fn delete(&self, query: Box) -> Result<()> { + self.writer + .as_ref() + .expect("writer has not been prepared") + .delete_query(query)?; + + Ok(()) + } + + pub fn delete_all_before(&self, timestamp: tantivy::DateTime) -> Result<()> { + let query = tantivy::query::RangeQuery::new_date_bounds( + text_field::InsertionTimestamp.name().to_string(), + std::ops::Bound::Unbounded, + std::ops::Bound::Excluded(timestamp), + ); + + self.delete(Box::new(query)) + } + + #[allow(clippy::missing_panics_doc)] // cannot panic as writer is prepared + pub fn merge_into_max_segments(&mut self, max_num_segments: u64) -> Result<()> { + self.prepare_writer()?; + let base_path = Path::new(&self.path); + let segments: Vec<_> = self + .tantivy_index + .load_metas()? + .segments + .into_iter() + .collect(); + + merge_tantivy_segments( + self.writer.as_mut().expect("writer has not been prepared"), + segments, + base_path, + max_num_segments, + )?; + + Ok(()) + } + + #[must_use] + pub fn merge(mut self, mut other: InvertedIndex) -> Self { + self.prepare_writer().expect("failed to prepare writer"); + other.prepare_writer().expect("failed to prepare writer"); + + let path = self.path.clone(); + + { + other.commit().expect("failed to commit index"); + self.commit().expect("failed to commit index"); + + let other_meta = other + .tantivy_index + .load_metas() + .expect("failed to load tantivy metadata for index"); + + let mut meta = self + .tantivy_index + .load_metas() + .expect("failed to load tantivy metadata for index"); + + let other_path = other.path.clone(); + let other_path = Path::new(other_path.as_str()); + other + .writer + .take() + .expect("writer has not been prepared") + .wait_merging_threads() + .unwrap(); + + let path = self.path.clone(); + let self_path = Path::new(path.as_str()); + self.writer + .take() + .expect("writer has not been prepared") + .wait_merging_threads() + .unwrap(); + + let ids: HashSet<_> = meta.segments.iter().map(|segment| segment.id()).collect(); + + for segment in other_meta.segments { + if ids.contains(&segment.id()) { + continue; + } + + // TODO: handle case where current index has segment with same name + for file in segment.list_files() { + let p = other_path.join(&file); + if p.exists() { + fs::rename(p, self_path.join(&file)).unwrap(); + } + } + meta.segments.push(segment); + } + + meta.segments + .sort_by_key(|a| std::cmp::Reverse(a.max_doc())); + + fs::remove_dir_all(other_path).ok(); + + let self_path = Path::new(&path); + + std::fs::write( + self_path.join("meta.json"), + serde_json::to_string_pretty(&meta).unwrap(), + ) + .unwrap(); + } + + let mut res = Self::open(path).expect("failed to open index"); + + res.prepare_writer().expect("failed to prepare writer"); + + res + } + + pub fn stop(mut self) { + self.writer + .take() + .expect("writer has not been prepared") + .wait_merging_threads() + .unwrap() + } +} diff --git a/crates/core/src/inverted_index.rs b/crates/core/src/inverted_index/mod.rs similarity index 70% rename from crates/core/src/inverted_index.rs rename to crates/core/src/inverted_index/mod.rs index 5f0b591a..94de2bb2 100644 --- a/crates/core/src/inverted_index.rs +++ b/crates/core/src/inverted_index/mod.rs @@ -1,5 +1,5 @@ // Stract is an open source web search engine. -// Copyright (C) 2023 Stract ApS +// Copyright (C) 2024 Stract ApS // // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU Affero General Public License as @@ -25,41 +25,38 @@ //! tantivy is actually a FST (finite state transducer) and not a hash map. //! This allows us to perform more advanced queries than just term lookups, //! but the principle is the same. + +mod indexing; +mod search; + +pub use indexing::merge_tantivy_segments; + use chrono::NaiveDateTime; -use itertools::Itertools; use serde::{Deserialize, Serialize}; -use tantivy::collector::Count; + use tantivy::directory::MmapDirectory; -use tantivy::merge_policy::NoMergePolicy; + use tantivy::schema::{Schema, Value}; use tantivy::tokenizer::TokenizerManager; -use tantivy::{IndexReader, IndexWriter, SegmentMeta, TantivyDocument}; -use url::Url; +use tantivy::{IndexReader, IndexWriter, TantivyDocument}; -use crate::collector::{Hashes, MainCollector}; +use crate::collector::Hashes; use crate::config::SnippetConfig; use crate::fastfield_reader::FastFieldReader; -use crate::highlighted::HighlightedFragment; -use crate::query::shortcircuit::ShortCircuitQuery; -use crate::query::Query; + use crate::ranking::initial::Score; -use crate::ranking::pipeline::RecallRankingWebpage; -use crate::ranking::SignalComputer; + use crate::schema::text_field::TextField; use crate::schema::{fast_field, text_field, FastFieldEnum, Field, TextFieldEnum}; -use crate::search_ctx::Ctx; -use crate::snippet; use crate::snippet::TextSnippet; use crate::tokenizer::{ BigramTokenizer, Identity, JsonField, SiteOperatorUrlTokenizer, TrigramTokenizer, }; -use crate::webgraph::NodeID; use crate::webpage::region::Region; -use crate::webpage::url_ext::UrlExt; -use crate::webpage::{schema_org, Webpage}; + +use crate::webpage::schema_org; use crate::Result; use crate::{schema::create_schema, tokenizer::Tokenizer}; -use std::collections::HashSet; use std::fs; use std::path::Path; use std::sync::Arc; @@ -101,61 +98,27 @@ impl From for tantivy::DocAddress { } } -pub fn merge_tantivy_segments>( - writer: &mut IndexWriter, - mut segments: Vec, - base_path: P, - max_num_segments: u64, -) -> Result<()> { - assert!(max_num_segments > 0); - - if segments.len() <= max_num_segments as usize { - return Ok(()); - } - - let num_segments = (max_num_segments + 1) / 2; // ceil(num_segments/2) +fn register_tokenizers(manager: &TokenizerManager) { + let tokenizer = Tokenizer::default(); + manager.register(tokenizer.as_str(), tokenizer); - let mut merge_segments = Vec::new(); + let tokenizer = Tokenizer::new_stemmed(); + manager.register(tokenizer.as_str(), tokenizer); - for _ in 0..num_segments { - merge_segments.push(SegmentMergeCandidate { - num_docs: 0, - segments: Vec::new(), - }); - } + let tokenizer = Tokenizer::Identity(Identity::default()); + manager.register(tokenizer.as_str(), tokenizer); - segments.sort_by_key(|b| std::cmp::Reverse(b.num_docs())); - - for segment in segments { - let best_candidate = merge_segments - .iter_mut() - .min_by(|a, b| a.num_docs.cmp(&b.num_docs)) - .unwrap(); + let tokenizer = Tokenizer::Bigram(BigramTokenizer::default()); + manager.register(tokenizer.as_str(), tokenizer); - best_candidate.num_docs += segment.num_docs(); - best_candidate.segments.push(segment); - } - - for merge in merge_segments - .into_iter() - .filter(|merge| !merge.segments.is_empty()) - { - let segment_ids: Vec<_> = merge.segments.iter().map(|segment| segment.id()).collect(); - writer.merge(&segment_ids[..]).wait()?; - - for segment in merge.segments { - for file in segment.list_files() { - std::fs::remove_file(base_path.as_ref().join(file)).ok(); - } - } - } + let tokenizer = Tokenizer::Trigram(TrigramTokenizer::default()); + manager.register(tokenizer.as_str(), tokenizer); - Ok(()) -} + let tokenizer = Tokenizer::SiteOperator(SiteOperatorUrlTokenizer); + manager.register(tokenizer.as_str(), tokenizer); -struct SegmentMergeCandidate { - num_docs: u32, - segments: Vec, + let tokenizer = Tokenizer::Json(JsonField); + manager.register(tokenizer.as_str(), tokenizer); } pub struct InvertedIndex { @@ -191,40 +154,7 @@ impl InvertedIndex { tantivy::Index::create(mmap_directory, schema.clone(), index_settings)? }; - let tokenizer = Tokenizer::default(); - tantivy_index - .tokenizers() - .register(tokenizer.as_str(), tokenizer); - - let tokenizer = Tokenizer::new_stemmed(); - tantivy_index - .tokenizers() - .register(tokenizer.as_str(), tokenizer); - - let tokenizer = Tokenizer::Identity(Identity::default()); - tantivy_index - .tokenizers() - .register(tokenizer.as_str(), tokenizer); - - let tokenizer = Tokenizer::Bigram(BigramTokenizer::default()); - tantivy_index - .tokenizers() - .register(tokenizer.as_str(), tokenizer); - - let tokenizer = Tokenizer::Trigram(TrigramTokenizer::default()); - tantivy_index - .tokenizers() - .register(tokenizer.as_str(), tokenizer); - - let tokenizer = Tokenizer::SiteOperator(SiteOperatorUrlTokenizer); - tantivy_index - .tokenizers() - .register(tokenizer.as_str(), tokenizer); - - let tokenizer = Tokenizer::Json(JsonField); - tantivy_index - .tokenizers() - .register(tokenizer.as_str(), tokenizer); + register_tokenizers(tantivy_index.tokenizers()); let reader: IndexReader = tantivy_index.reader_builder().try_into()?; @@ -245,390 +175,14 @@ impl InvertedIndex { self.fastfield_reader.clone() } - pub fn prepare_writer(&mut self) -> Result<()> { - if self.writer.is_some() { - return Ok(()); - } - - let writer = self - .tantivy_index - .writer_with_num_threads(1, 1_000_000_000)?; - - let merge_policy = NoMergePolicy; - writer.set_merge_policy(Box::new(merge_policy)); - - self.writer = Some(writer); - - Ok(()) - } - pub fn set_snippet_config(&mut self, config: SnippetConfig) { self.snippet_config = config; } - pub fn set_auto_merge_policy(&mut self) { - let merge_policy = tantivy::merge_policy::LogMergePolicy::default(); - self.writer - .as_mut() - .expect("writer has not been prepared") - .set_merge_policy(Box::new(merge_policy)); - } - pub fn tokenizers(&self) -> &TokenizerManager { self.tantivy_index.tokenizers() } - #[cfg(test)] - pub fn temporary() -> Result { - let path = crate::gen_temp_path(); - let mut s = Self::open(path)?; - - s.prepare_writer()?; - - Ok(s) - } - - pub fn insert(&self, webpage: &Webpage) -> Result<()> { - self.writer - .as_ref() - .expect("writer has not been prepared") - .add_document(webpage.as_tantivy(&self.schema)?)?; - Ok(()) - } - - pub fn commit(&mut self) -> Result<()> { - self.prepare_writer()?; - self.writer - .as_mut() - .expect("writer has not been prepared") - .commit()?; - self.reader.reload()?; - self.fastfield_reader = FastFieldReader::new(&self.reader.searcher()); - - Ok(()) - } - - fn delete(&self, query: Box) -> Result<()> { - self.writer - .as_ref() - .expect("writer has not been prepared") - .delete_query(query)?; - - Ok(()) - } - - pub fn delete_all_before(&self, timestamp: tantivy::DateTime) -> Result<()> { - let query = tantivy::query::RangeQuery::new_date_bounds( - text_field::InsertionTimestamp.name().to_string(), - std::ops::Bound::Unbounded, - std::ops::Bound::Excluded(timestamp), - ); - - self.delete(Box::new(query)) - } - - pub fn search_initial( - &self, - query: &Query, - ctx: &Ctx, - collector: MainCollector, - ) -> Result { - if !query.count_results() { - let mut query: Box = Box::new(query.clone()); - - if let Some(limit) = collector.top_docs().max_docs() { - let docs_per_segment = limit.total_docs / limit.segments; - query = Box::new(ShortCircuitQuery::new(query, docs_per_segment as u64)); - } - - let pointers = ctx.tv_searcher.search(&query, &collector)?; - - return Ok(InitialSearchResult { - num_websites: None, - top_websites: pointers, - }); - } - - let collector = (Count, collector); - let (count, pointers) = ctx.tv_searcher.search(query, &collector)?; - - Ok(InitialSearchResult { - num_websites: Some(count), - top_websites: pointers, - }) - } - - pub fn local_search_ctx(&self) -> Ctx { - let tv_searcher = self.tv_searcher(); - Ctx { - fastfield_reader: self.fastfield_reader.clone(), - tv_searcher, - } - } - - pub fn tv_searcher(&self) -> tantivy::Searcher { - self.reader.searcher() - } - - pub fn retrieve_ranking_websites( - &self, - ctx: &Ctx, - pointers: Vec, - mut computer: SignalComputer, - fastfield_reader: &FastFieldReader, - ) -> Result> { - let mut top_websites = Vec::new(); - - let mut pointers: Vec<_> = pointers.into_iter().enumerate().collect(); - pointers.sort_by(|a, b| { - a.1.address - .segment - .cmp(&b.1.address.segment) - .then_with(|| a.1.address.doc_id.cmp(&b.1.address.doc_id)) - }); - - let mut prev_segment = None; - for (orig_index, pointer) in pointers { - let update_segment = match prev_segment { - Some(prev_segment) if prev_segment != pointer.address.segment => true, - None => true, - _ => false, - }; - - let segment_reader = ctx.tv_searcher.segment_reader(pointer.address.segment); - if update_segment { - computer.register_segment(&ctx.tv_searcher, segment_reader, fastfield_reader)?; - } - - prev_segment = Some(pointer.address.segment); - - top_websites.push(( - orig_index, - RecallRankingWebpage::new( - pointer, - fastfield_reader.borrow_segment(&segment_reader.segment_id()), - &mut computer, - ), - )); - } - - top_websites.sort_by(|a, b| a.0.cmp(&b.0)); - - Ok(top_websites - .into_iter() - .map(|(_, website)| website) - .collect()) - } - - pub fn website_host_node(&self, website: &WebpagePointer) -> Result> { - let searcher = self.reader.searcher(); - let doc: TantivyDocument = searcher.doc(website.address.into())?; - - let field = self - .schema() - .get_field(Field::Fast(FastFieldEnum::from(fast_field::HostNodeID)).name()) - .unwrap(); - - let id = doc.get_first(field).unwrap().as_u64().unwrap(); - - if id == u64::MAX { - Ok(None) - } else { - Ok(Some(id.into())) - } - } - - pub fn retrieve_websites( - &self, - websites: &[WebpagePointer], - query: &Query, - ) -> Result> { - let tv_searcher = self.reader.searcher(); - let mut webpages: Vec = websites - .iter() - .map(|website| self.retrieve_doc(website.address, &tv_searcher)) - .filter_map(|res| res.ok()) - .collect(); - - for (url, page) in webpages.iter_mut().filter_map(|page| { - let url = Url::parse(&page.url).ok()?; - Some((url, page)) - }) { - if query.simple_terms().is_empty() { - let snippet = if let Some(description) = page.description.as_deref() { - let snip = description - .split_whitespace() - .take(self.snippet_config.empty_query_snippet_words) - .join(" "); - - if snip.split_whitespace().count() < self.snippet_config.min_description_words { - page.body - .split_whitespace() - .take(self.snippet_config.empty_query_snippet_words) - .join(" ") - } else { - snip - } - } else { - page.body - .split_whitespace() - .take(self.snippet_config.empty_query_snippet_words) - .join(" ") - }; - - page.snippet = TextSnippet { - fragments: vec![HighlightedFragment::new_unhighlighted(snippet)], - }; - } else { - let min_body_len = if url.is_homepage() { - self.snippet_config.min_body_length_homepage - } else { - self.snippet_config.min_body_length - }; - - if page.body.split_whitespace().count() < min_body_len - && page - .description - .as_deref() - .unwrap_or_default() - .split_whitespace() - .count() - >= self.snippet_config.min_description_words - { - page.snippet = snippet::generate( - query, - page.description.as_deref().unwrap_or_default(), - &page.region, - self.snippet_config.clone(), - ); - } else { - page.snippet = snippet::generate( - query, - &page.body, - &page.region, - self.snippet_config.clone(), - ); - } - } - } - - Ok(webpages) - } - - #[allow(clippy::missing_panics_doc)] // cannot panic as writer is prepared - pub fn merge_into_max_segments(&mut self, max_num_segments: u64) -> Result<()> { - self.prepare_writer()?; - let base_path = Path::new(&self.path); - let segments: Vec<_> = self - .tantivy_index - .load_metas()? - .segments - .into_iter() - .collect(); - - merge_tantivy_segments( - self.writer.as_mut().expect("writer has not been prepared"), - segments, - base_path, - max_num_segments, - )?; - - Ok(()) - } - - fn retrieve_doc( - &self, - doc_address: DocAddress, - searcher: &tantivy::Searcher, - ) -> Result { - let doc: TantivyDocument = searcher.doc(doc_address.into())?; - Ok(RetrievedWebpage::from(doc)) - } - - #[must_use] - pub fn merge(mut self, mut other: InvertedIndex) -> Self { - self.prepare_writer().expect("failed to prepare writer"); - other.prepare_writer().expect("failed to prepare writer"); - - let path = self.path.clone(); - - { - other.commit().expect("failed to commit index"); - self.commit().expect("failed to commit index"); - - let other_meta = other - .tantivy_index - .load_metas() - .expect("failed to load tantivy metadata for index"); - - let mut meta = self - .tantivy_index - .load_metas() - .expect("failed to load tantivy metadata for index"); - - let other_path = other.path.clone(); - let other_path = Path::new(other_path.as_str()); - other - .writer - .take() - .expect("writer has not been prepared") - .wait_merging_threads() - .unwrap(); - - let path = self.path.clone(); - let self_path = Path::new(path.as_str()); - self.writer - .take() - .expect("writer has not been prepared") - .wait_merging_threads() - .unwrap(); - - let ids: HashSet<_> = meta.segments.iter().map(|segment| segment.id()).collect(); - - for segment in other_meta.segments { - if ids.contains(&segment.id()) { - continue; - } - - // TODO: handle case where current index has segment with same name - for file in segment.list_files() { - let p = other_path.join(&file); - if p.exists() { - fs::rename(p, self_path.join(&file)).unwrap(); - } - } - meta.segments.push(segment); - } - - meta.segments - .sort_by_key(|a| std::cmp::Reverse(a.max_doc())); - - fs::remove_dir_all(other_path).ok(); - - let self_path = Path::new(&path); - - std::fs::write( - self_path.join("meta.json"), - serde_json::to_string_pretty(&meta).unwrap(), - ) - .unwrap(); - } - - let mut res = Self::open(path).expect("failed to open index"); - - res.prepare_writer().expect("failed to prepare writer"); - - res - } - - pub fn stop(mut self) { - self.writer - .take() - .expect("writer has not been prepared") - .wait_merging_threads() - .unwrap() - } - pub fn schema(&self) -> Arc { Arc::clone(&self.schema) } @@ -637,47 +191,14 @@ impl InvertedIndex { self.tantivy_index.searchable_segments().unwrap().len() } - pub(crate) fn get_webpage(&self, url: &str) -> Option { - let url = Url::parse(url).ok()?; - let tv_searcher = self.reader.searcher(); - let field = tv_searcher - .schema() - .get_field(Field::Text(TextFieldEnum::from(text_field::UrlNoTokenizer)).name()) - .unwrap(); - - let term = tantivy::Term::from_field_text(field, url.as_str()); - - let query = tantivy::query::TermQuery::new(term, tantivy::schema::IndexRecordOption::Basic); - - let mut res = tv_searcher - .search(&query, &tantivy::collector::TopDocs::with_limit(1)) - .unwrap(); - - res.pop() - .map(|(_, doc)| self.retrieve_doc(doc.into(), &tv_searcher).unwrap()) - } - - pub(crate) fn get_homepage(&self, url: &Url) -> Option { - let tv_searcher = self.reader.searcher(); - let field = tv_searcher - .schema() - .get_field( - Field::Text(TextFieldEnum::from(text_field::SiteIfHomepageNoTokenizer)).name(), - ) - .unwrap(); - - let host = url.normalized_host().unwrap_or_default(); - - let term = tantivy::Term::from_field_text(field, host); - - let query = tantivy::query::TermQuery::new(term, tantivy::schema::IndexRecordOption::Basic); + #[cfg(test)] + pub fn temporary() -> Result { + let path = crate::gen_temp_path(); + let mut s = Self::open(path)?; - let mut res = tv_searcher - .search(&query, &tantivy::collector::TopDocs::with_limit(1)) - .unwrap(); + s.prepare_writer()?; - res.pop() - .map(|(_, doc)| self.retrieve_doc(doc.into(), &tv_searcher).unwrap()) + Ok(s) } } @@ -795,8 +316,17 @@ impl From for RetrievedWebpage { mod tests { use candle_core::Tensor; use maplit::hashmap; - - use crate::{config::CollectorConfig, ranking::Ranker, searcher::SearchQuery, webpage::Html}; + use url::Url; + + use crate::{ + collector::MainCollector, + config::CollectorConfig, + query::Query, + ranking::{Ranker, SignalComputer}, + search_ctx::Ctx, + searcher::SearchQuery, + webpage::{Html, Webpage}, + }; use super::*; @@ -1706,12 +1236,7 @@ mod tests { let fastfield_reader = index.fastfield_reader(); let ranking_websites = index - .retrieve_ranking_websites( - &ctx, - res.top_websites, - ranker.computer(), - &fastfield_reader, - ) + .retrieve_ranking_websites(&ctx, res.top_websites, ranker.computer(), &fastfield_reader) .unwrap(); assert_eq!(ranking_websites.len(), 2); diff --git a/crates/core/src/inverted_index/search.rs b/crates/core/src/inverted_index/search.rs new file mode 100644 index 00000000..d8c7fee7 --- /dev/null +++ b/crates/core/src/inverted_index/search.rs @@ -0,0 +1,283 @@ +// Stract is an open source web search engine. +// Copyright (C) 2024 Stract ApS +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see + +use super::{DocAddress, InitialSearchResult, InvertedIndex, RetrievedWebpage, WebpagePointer}; +use itertools::Itertools; +use tantivy::collector::Count; + +use tantivy::schema::Value; + +use tantivy::TantivyDocument; +use url::Url; + +use crate::collector::MainCollector; + +use crate::fastfield_reader::FastFieldReader; +use crate::highlighted::HighlightedFragment; +use crate::query::shortcircuit::ShortCircuitQuery; +use crate::query::Query; +use crate::ranking::pipeline::RecallRankingWebpage; +use crate::ranking::SignalComputer; +use crate::schema::{fast_field, text_field, FastFieldEnum, Field, TextFieldEnum}; +use crate::search_ctx::Ctx; +use crate::snippet; +use crate::snippet::TextSnippet; +use crate::webgraph::NodeID; + +use crate::webpage::url_ext::UrlExt; +use crate::Result; + +impl InvertedIndex { + pub fn search_initial( + &self, + query: &Query, + ctx: &Ctx, + collector: MainCollector, + ) -> Result { + if !query.count_results() { + let mut query: Box = Box::new(query.clone()); + + if let Some(limit) = collector.top_docs().max_docs() { + let docs_per_segment = limit.total_docs / limit.segments; + query = Box::new(ShortCircuitQuery::new(query, docs_per_segment as u64)); + } + + let pointers = ctx.tv_searcher.search(&query, &collector)?; + + return Ok(InitialSearchResult { + num_websites: None, + top_websites: pointers, + }); + } + + let collector = (Count, collector); + let (count, pointers) = ctx.tv_searcher.search(query, &collector)?; + + Ok(InitialSearchResult { + num_websites: Some(count), + top_websites: pointers, + }) + } + + pub fn local_search_ctx(&self) -> Ctx { + let tv_searcher = self.tv_searcher(); + Ctx { + fastfield_reader: self.fastfield_reader.clone(), + tv_searcher, + } + } + + pub fn tv_searcher(&self) -> tantivy::Searcher { + self.reader.searcher() + } + + pub fn retrieve_ranking_websites( + &self, + ctx: &Ctx, + pointers: Vec, + mut computer: SignalComputer, + fastfield_reader: &FastFieldReader, + ) -> Result> { + let mut top_websites = Vec::new(); + + let mut pointers: Vec<_> = pointers.into_iter().enumerate().collect(); + pointers.sort_by(|a, b| { + a.1.address + .segment + .cmp(&b.1.address.segment) + .then_with(|| a.1.address.doc_id.cmp(&b.1.address.doc_id)) + }); + + let mut prev_segment = None; + for (orig_index, pointer) in pointers { + let update_segment = match prev_segment { + Some(prev_segment) if prev_segment != pointer.address.segment => true, + None => true, + _ => false, + }; + + let segment_reader = ctx.tv_searcher.segment_reader(pointer.address.segment); + if update_segment { + computer.register_segment(&ctx.tv_searcher, segment_reader, fastfield_reader)?; + } + + prev_segment = Some(pointer.address.segment); + + top_websites.push(( + orig_index, + RecallRankingWebpage::new( + pointer, + fastfield_reader.borrow_segment(&segment_reader.segment_id()), + &mut computer, + ), + )); + } + + top_websites.sort_by(|a, b| a.0.cmp(&b.0)); + + Ok(top_websites + .into_iter() + .map(|(_, website)| website) + .collect()) + } + + pub fn website_host_node(&self, website: &WebpagePointer) -> Result> { + let searcher = self.reader.searcher(); + let doc: TantivyDocument = searcher.doc(website.address.into())?; + + let field = self + .schema() + .get_field(Field::Fast(FastFieldEnum::from(fast_field::HostNodeID)).name()) + .unwrap(); + + let id = doc.get_first(field).unwrap().as_u64().unwrap(); + + if id == u64::MAX { + Ok(None) + } else { + Ok(Some(id.into())) + } + } + + pub fn retrieve_websites( + &self, + websites: &[WebpagePointer], + query: &Query, + ) -> Result> { + let tv_searcher = self.reader.searcher(); + let mut webpages: Vec = websites + .iter() + .map(|website| self.retrieve_doc(website.address, &tv_searcher)) + .filter_map(|res| res.ok()) + .collect(); + + for (url, page) in webpages.iter_mut().filter_map(|page| { + let url = Url::parse(&page.url).ok()?; + Some((url, page)) + }) { + if query.simple_terms().is_empty() { + let snippet = if let Some(description) = page.description.as_deref() { + let snip = description + .split_whitespace() + .take(self.snippet_config.empty_query_snippet_words) + .join(" "); + + if snip.split_whitespace().count() < self.snippet_config.min_description_words { + page.body + .split_whitespace() + .take(self.snippet_config.empty_query_snippet_words) + .join(" ") + } else { + snip + } + } else { + page.body + .split_whitespace() + .take(self.snippet_config.empty_query_snippet_words) + .join(" ") + }; + + page.snippet = TextSnippet { + fragments: vec![HighlightedFragment::new_unhighlighted(snippet)], + }; + } else { + let min_body_len = if url.is_homepage() { + self.snippet_config.min_body_length_homepage + } else { + self.snippet_config.min_body_length + }; + + if page.body.split_whitespace().count() < min_body_len + && page + .description + .as_deref() + .unwrap_or_default() + .split_whitespace() + .count() + >= self.snippet_config.min_description_words + { + page.snippet = snippet::generate( + query, + page.description.as_deref().unwrap_or_default(), + &page.region, + self.snippet_config.clone(), + ); + } else { + page.snippet = snippet::generate( + query, + &page.body, + &page.region, + self.snippet_config.clone(), + ); + } + } + } + + Ok(webpages) + } + + fn retrieve_doc( + &self, + doc_address: DocAddress, + searcher: &tantivy::Searcher, + ) -> Result { + let doc: TantivyDocument = searcher.doc(doc_address.into())?; + Ok(RetrievedWebpage::from(doc)) + } + + pub(crate) fn get_webpage(&self, url: &str) -> Option { + let url = Url::parse(url).ok()?; + let tv_searcher = self.reader.searcher(); + let field = tv_searcher + .schema() + .get_field(Field::Text(TextFieldEnum::from(text_field::UrlNoTokenizer)).name()) + .unwrap(); + + let term = tantivy::Term::from_field_text(field, url.as_str()); + + let query = tantivy::query::TermQuery::new(term, tantivy::schema::IndexRecordOption::Basic); + + let mut res = tv_searcher + .search(&query, &tantivy::collector::TopDocs::with_limit(1)) + .unwrap(); + + res.pop() + .map(|(_, doc)| self.retrieve_doc(doc.into(), &tv_searcher).unwrap()) + } + + pub(crate) fn get_homepage(&self, url: &Url) -> Option { + let tv_searcher = self.reader.searcher(); + let field = tv_searcher + .schema() + .get_field( + Field::Text(TextFieldEnum::from(text_field::SiteIfHomepageNoTokenizer)).name(), + ) + .unwrap(); + + let host = url.normalized_host().unwrap_or_default(); + + let term = tantivy::Term::from_field_text(field, host); + + let query = tantivy::query::TermQuery::new(term, tantivy::schema::IndexRecordOption::Basic); + + let mut res = tv_searcher + .search(&query, &tantivy::collector::TopDocs::with_limit(1)) + .unwrap(); + + res.pop() + .map(|(_, doc)| self.retrieve_doc(doc.into(), &tv_searcher).unwrap()) + } +} diff --git a/crates/core/src/snippet.rs b/crates/core/src/snippet.rs index 78d0ba11..44f311c7 100644 --- a/crates/core/src/snippet.rs +++ b/crates/core/src/snippet.rs @@ -181,11 +181,8 @@ fn calculate_idf(terms: &HashSet, passages: &[PassageCandidate]) -> Hash .collect() } -fn score_passages_with_bm25( - passages: &mut [PassageCandidate], - terms: &HashSet, - idf: &HashMap, -) { +fn score_passages_with_bm25(passages: &mut [PassageCandidate], terms: &HashSet) { + let idf = calculate_idf(terms, passages); let mut total_d_size = 0; for passage in passages.iter() { @@ -238,9 +235,7 @@ fn snippet_string_builder( return snippet; } - let idf = calculate_idf(&terms, &passages); - - score_passages_with_bm25(&mut passages, &terms, &idf); + score_passages_with_bm25(&mut passages, &terms); let best_idx = passages .iter() diff --git a/crates/core/src/webgraph/builder.rs b/crates/core/src/webgraph/builder.rs new file mode 100644 index 00000000..c85239d6 --- /dev/null +++ b/crates/core/src/webgraph/builder.rs @@ -0,0 +1,51 @@ +// Stract is an open source web search engine. +// Copyright (C) 2024 Stract ApS +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see + +use std::path::Path; + +use crate::executor::Executor; + +use super::{Compression, Webgraph}; + +pub struct WebgraphBuilder { + path: Box, + executor: Executor, + compression: Compression, +} + +impl WebgraphBuilder { + pub fn new>(path: P) -> Self { + Self { + path: path.as_ref().into(), + executor: Executor::multi_thread("webgraph").unwrap(), + compression: Compression::default(), + } + } + + pub fn single_threaded(mut self) -> Self { + self.executor = Executor::single_thread(); + self + } + + pub fn compression(mut self, compression: Compression) -> Self { + self.compression = compression; + self + } + + pub fn open(self) -> Webgraph { + Webgraph::open(self.path, self.executor, self.compression) + } +} diff --git a/crates/core/src/webgraph/compression.rs b/crates/core/src/webgraph/compression.rs new file mode 100644 index 00000000..da89feb8 --- /dev/null +++ b/crates/core/src/webgraph/compression.rs @@ -0,0 +1,38 @@ +// Stract is an open source web search engine. +// Copyright (C) 2024 Stract ApS +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see + +#[derive(Default, Debug, Clone, Copy)] +pub enum Compression { + None, + #[default] + Lz4, +} + +impl Compression { + pub fn compress(&self, bytes: &[u8]) -> Vec { + match self { + Compression::None => bytes.to_vec(), + Compression::Lz4 => lz4_flex::compress_prepend_size(bytes), + } + } + + pub fn decompress(&self, bytes: &[u8]) -> Vec { + match self { + Compression::None => bytes.to_vec(), + Compression::Lz4 => lz4_flex::decompress_size_prepended(bytes).unwrap(), + } + } +} diff --git a/crates/core/src/webgraph/edge.rs b/crates/core/src/webgraph/edge.rs new file mode 100644 index 00000000..0d7e7ba9 --- /dev/null +++ b/crates/core/src/webgraph/edge.rs @@ -0,0 +1,90 @@ +// Stract is an open source web search engine. +// Copyright (C) 2024 Stract ApS +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see anyhow::Result>; + fn from_bytes(bytes: &[u8]) -> anyhow::Result; +} + +impl EdgeLabel for String { + fn to_bytes(&self) -> anyhow::Result> { + Ok(self.as_bytes().to_vec()) + } + + fn from_bytes(bytes: &[u8]) -> anyhow::Result { + Ok(String::from_utf8(bytes.to_vec())?) + } +} + +impl EdgeLabel for () { + fn to_bytes(&self) -> anyhow::Result> { + Ok(Vec::new()) + } + + fn from_bytes(_bytes: &[u8]) -> anyhow::Result { + Ok(()) + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct Edge +where + L: EdgeLabel, +{ + pub from: NodeID, + pub to: NodeID, + pub label: L, +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct InnerEdge +where + L: EdgeLabel, +{ + pub from: FullNodeID, + pub to: FullNodeID, + pub label: L, +} + +impl From> for Edge +where + L: EdgeLabel, +{ + fn from(edge: InnerEdge) -> Self { + Edge { + from: edge.from.id, + to: edge.to.id, + label: edge.label, + } + } +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq, Hash, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct FullEdge { + pub from: Node, + pub to: Node, + pub label: String, +} diff --git a/crates/core/src/webgraph/id_node_db.rs b/crates/core/src/webgraph/id_node_db.rs new file mode 100644 index 00000000..89099ece --- /dev/null +++ b/crates/core/src/webgraph/id_node_db.rs @@ -0,0 +1,147 @@ +// Stract is an open source web search engine. +// Copyright (C) 2024 Stract ApS +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see >(path: P) -> Self { + let mut opts = rocksdb::Options::default(); + opts.create_if_missing(true); + opts.optimize_for_point_lookup(512); + + opts.set_allow_mmap_reads(true); + opts.set_allow_mmap_writes(true); + opts.set_write_buffer_size(128 * 1024 * 1024); // 128 MB + opts.set_target_file_size_base(512 * 1024 * 1024); // 512 MB + opts.set_target_file_size_multiplier(10); + + opts.set_compression_type(rocksdb::DBCompressionType::None); + + let mut block_opts = rocksdb::BlockBasedOptions::default(); + let cache = rocksdb::Cache::new_lru_cache(1024 * 1024 * 1024); // 1 gb + block_opts.set_ribbon_filter(10.0); + + // some recommended settings (https://github.com/facebook/rocksdb/wiki/Setup-Options-and-Basic-Tuning) + opts.set_level_compaction_dynamic_level_bytes(true); + opts.set_bytes_per_sync(1048576); + + block_opts.set_block_size(32 * 1024); // 32 kb + block_opts.set_format_version(5); + block_opts.set_cache_index_and_filter_blocks(true); + block_opts.set_pin_l0_filter_and_index_blocks_in_cache(true); + block_opts.set_block_cache(&cache); + + opts.set_block_based_table_factory(&block_opts); + + let db = rocksdb::DB::open(&opts, path).unwrap(); + + Self { db, _cache: cache } + } + + pub fn put(&mut self, id: &NodeID, node: &Node) { + let mut opts = rocksdb::WriteOptions::default(); + opts.disable_wal(true); + + self.db + .put_opt( + id.as_u64().to_le_bytes(), + bincode::serialize(node).unwrap(), + &opts, + ) + .unwrap(); + } + + pub fn get(&self, id: &NodeID) -> Option { + let mut opts = rocksdb::ReadOptions::default(); + opts.set_verify_checksums(false); + + self.db + .get_opt(id.as_u64().to_le_bytes(), &opts) + .unwrap() + .map(|bytes| bincode::deserialize(&bytes).unwrap()) + } + + pub fn keys(&self) -> impl Iterator + '_ { + let mut opts = rocksdb::ReadOptions::default(); + opts.set_verify_checksums(false); + opts.set_async_io(true); + + self.db + .iterator_opt(rocksdb::IteratorMode::Start, opts) + .filter_map(|r| { + let (key, _) = r.ok()?; + Some(NodeID::from(u64::from_le_bytes((*key).try_into().unwrap()))) + }) + } + + pub fn estimate_num_keys(&self) -> usize { + self.db + .property_int_value("rocksdb.estimate-num-keys") + .ok() + .flatten() + .unwrap_or_default() as usize + } + + pub fn iter(&self) -> impl Iterator + '_ { + let mut opts = rocksdb::ReadOptions::default(); + opts.set_verify_checksums(false); + + self.db + .iterator_opt(rocksdb::IteratorMode::Start, opts) + .filter_map(|r| { + let (key, value) = r.ok()?; + + Some(( + NodeID::from(u64::from_le_bytes((*key).try_into().unwrap())), + bincode::deserialize(&value).unwrap(), + )) + }) + } + + pub fn batch_put(&mut self, iter: impl Iterator) { + let mut batch = rocksdb::WriteBatch::default(); + let mut count = 0; + + for (id, node) in iter { + batch.put( + id.as_u64().to_le_bytes(), + bincode::serialize(&node).unwrap(), + ); + count += 1; + + if count > 10_000 { + self.db.write(batch).unwrap(); + batch = rocksdb::WriteBatch::default(); + count = 0; + } + } + + if count > 0 { + self.db.write(batch).unwrap(); + } + } + + pub fn flush(&self) { + self.db.flush().unwrap(); + } +} diff --git a/crates/core/src/webgraph/mod.rs b/crates/core/src/webgraph/mod.rs index 25526f93..3b834956 100644 --- a/crates/core/src/webgraph/mod.rs +++ b/crates/core/src/webgraph/mod.rs @@ -1,5 +1,5 @@ // Stract is an open source web search engine. -// Copyright (C) 2023 Stract ApS +// Copyright (C) 2024 Stract ApS // // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU Affero General Public License as @@ -13,403 +13,37 @@ // // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -mod segment; - -use std::collections::{BTreeMap, BinaryHeap}; +use std::fs; use std::fs::File; use std::io::{BufReader, BufWriter, Read, Write}; use std::path::Path; use std::sync::Arc; -use std::{cmp, fs}; use itertools::Itertools; use rand::seq::SliceRandom; use rayon::prelude::*; -use url::Url; -use utoipa::ToSchema; +use self::id_node_db::Id2NodeDb; +use self::segment::Segment; use crate::executor::Executor; -use crate::intmap; -use crate::webpage::url_ext::UrlExt; +pub use builder::WebgraphBuilder; +pub use compression::Compression; +pub use edge::*; +pub use node::*; +pub use shortest_path::ShortestPaths; +pub use writer::WebgraphWriter; + +mod builder; pub mod centrality; +mod compression; +mod edge; +mod id_node_db; +mod node; +mod segment; +mod shortest_path; mod store; -use self::segment::{Segment, SegmentWriter}; - -pub const MAX_LABEL_LENGTH: usize = 1024; - -#[derive( - Debug, Clone, Copy, serde::Serialize, serde::Deserialize, PartialEq, Eq, PartialOrd, Ord, Hash, -)] -pub struct NodeID(u64); - -impl NodeID { - pub fn as_u64(self) -> u64 { - self.0 - } -} - -impl From for NodeID { - fn from(val: u128) -> Self { - NodeID(val as u64) - } -} - -impl From for NodeID { - fn from(val: u64) -> Self { - NodeID(val) - } -} - -#[derive(Debug, Clone, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)] -pub struct FullNodeID { - pub prefix: NodeID, - pub id: NodeID, -} - -impl From for FullNodeID { - fn from(value: Node) -> Self { - let id = value.id(); - let prefix = value.into_host().id(); - - FullNodeID { prefix, id } - } -} - -impl intmap::Key for NodeID { - const BIG_PRIME: Self = NodeID(11400714819323198549); - - fn wrapping_mul(self, rhs: Self) -> Self { - NodeID(self.0.wrapping_mul(rhs.0)) - } - - fn modulus_usize(self, rhs: usize) -> usize { - (self.0 % (rhs as u64)) as usize - } -} - -pub trait EdgeLabel -where - Self: Send + Sync + Sized, -{ - fn to_bytes(&self) -> anyhow::Result>; - fn from_bytes(bytes: &[u8]) -> anyhow::Result; -} - -impl EdgeLabel for String { - fn to_bytes(&self) -> anyhow::Result> { - Ok(self.as_bytes().to_vec()) - } - - fn from_bytes(bytes: &[u8]) -> anyhow::Result { - Ok(String::from_utf8(bytes.to_vec())?) - } -} - -impl EdgeLabel for () { - fn to_bytes(&self) -> anyhow::Result> { - Ok(Vec::new()) - } - - fn from_bytes(_bytes: &[u8]) -> anyhow::Result { - Ok(()) - } -} - -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct Edge -where - L: EdgeLabel, -{ - pub from: NodeID, - pub to: NodeID, - pub label: L, -} - -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct InnerEdge -where - L: EdgeLabel, -{ - pub from: FullNodeID, - pub to: FullNodeID, - pub label: L, -} - -impl From> for Edge -where - L: EdgeLabel, -{ - fn from(edge: InnerEdge) -> Self { - Edge { - from: edge.from.id, - to: edge.to.id, - label: edge.label, - } - } -} - -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq, Hash, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct FullEdge { - pub from: Node, - pub to: Node, - pub label: String, -} - -#[derive( - Debug, - Clone, - PartialEq, - Eq, - PartialOrd, - Ord, - Hash, - serde::Serialize, - serde::Deserialize, - ToSchema, -)] -#[serde(rename_all = "camelCase")] -pub struct Node { - name: String, -} - -impl Node { - pub fn into_host(self) -> Node { - let url = if self.name.contains("://") { - Url::parse(&self.name) - } else { - Url::parse(&("http://".to_string() + self.name.as_str())) - }; - - match url { - Ok(url) => { - let host = url.normalized_host().unwrap_or_default().to_string(); - Node { name: host } - } - Err(_) => Node { - name: String::new(), - }, - } - } - - pub fn as_str(&self) -> &str { - self.name.as_str() - } - - pub fn id(&self) -> NodeID { - let digest = md5::compute(self.name.as_bytes()); - u128::from_le_bytes(*digest).into() - } -} - -impl From for Node { - fn from(name: String) -> Self { - let url = if name.contains("://") { - Url::parse(&name).unwrap() - } else { - Url::parse(&("http://".to_string() + name.as_str())).unwrap() - }; - - Node::from(&url) - } -} - -impl From<&Url> for Node { - fn from(url: &Url) -> Self { - let normalized = normalize_url(url); - Node { name: normalized } - } -} - -impl From<&str> for Node { - fn from(name: &str) -> Self { - name.to_string().into() - } -} - -impl From for Node { - fn from(url: Url) -> Self { - Self::from(&url) - } -} - -pub fn normalize_url(url: &Url) -> String { - let mut url = url.clone(); - url.normalize(); - - let scheme = url.scheme(); - let mut normalized = url - .as_str() - .strip_prefix(scheme) - .unwrap_or_default() - .strip_prefix("://") - .unwrap_or_default() - .to_string(); - - if let Some(stripped) = normalized.strip_prefix("www.") { - normalized = stripped.to_string(); - } - - if let Some(prefix) = normalized.strip_suffix('/') { - normalized = prefix.to_string(); - } - - normalized -} - -#[derive(Default, Debug, Clone, Copy)] -pub enum Compression { - None, - #[default] - Lz4, -} - -impl Compression { - pub fn compress(&self, bytes: &[u8]) -> Vec { - match self { - Compression::None => bytes.to_vec(), - Compression::Lz4 => lz4_flex::compress_prepend_size(bytes), - } - } - - pub fn decompress(&self, bytes: &[u8]) -> Vec { - match self { - Compression::None => bytes.to_vec(), - Compression::Lz4 => lz4_flex::decompress_size_prepended(bytes).unwrap(), - } - } -} - -pub struct WebgraphBuilder { - path: Box, - executor: Executor, - compression: Compression, -} - -impl WebgraphBuilder { - pub fn new>(path: P) -> Self { - Self { - path: path.as_ref().into(), - executor: Executor::multi_thread("webgraph").unwrap(), - compression: Compression::default(), - } - } - - pub fn single_threaded(mut self) -> Self { - self.executor = Executor::single_thread(); - self - } - - pub fn compression(mut self, compression: Compression) -> Self { - self.compression = compression; - self - } - - pub fn open(self) -> Webgraph { - Webgraph::open(self.path, self.executor, self.compression) - } -} - -pub trait ShortestPaths { - fn distances(&self, source: Node) -> BTreeMap; - fn raw_distances(&self, source: NodeID) -> BTreeMap; - fn raw_distances_with_max(&self, source: NodeID, max_dist: u8) -> BTreeMap; - fn raw_reversed_distances(&self, source: NodeID) -> BTreeMap; - fn reversed_distances(&self, source: Node) -> BTreeMap; -} - -fn dijkstra_multi( - sources: &[NodeID], - node_edges: F1, - edge_node: F2, - max_dist: Option, -) -> BTreeMap -where - L: EdgeLabel, - F1: Fn(NodeID) -> Vec>, - F2: Fn(&Edge) -> NodeID, -{ - let mut distances: BTreeMap = BTreeMap::default(); - - let mut queue = BinaryHeap::new(); - - for source_id in sources.iter().copied() { - queue.push(cmp::Reverse((0, source_id))); - distances.insert(source_id, 0); - } - - while let Some(state) = queue.pop() { - let (cost, v) = state.0; - - let current_dist = distances.get(&v).unwrap_or(&u8::MAX); - - if cost > *current_dist { - continue; - } - - if let Some(max_dist) = max_dist { - if cost > max_dist { - return distances; - } - } - - for edge in node_edges(v) { - if cost + 1 < *distances.get(&edge_node(&edge)).unwrap_or(&u8::MAX) { - let d = cost + 1; - - let next = cmp::Reverse((d, edge_node(&edge))); - queue.push(next); - distances.insert(edge_node(&edge), d); - } - } - } - - distances -} - -impl ShortestPaths for Webgraph { - fn distances(&self, source: Node) -> BTreeMap { - self.raw_distances(source.id()) - .into_iter() - .filter_map(|(id, dist)| self.id2node(&id).map(|node| (node, dist))) - .collect() - } - - fn raw_distances_with_max(&self, source: NodeID, max_dist: u8) -> BTreeMap { - dijkstra_multi( - &[source], - |node| self.raw_outgoing_edges(&node), - |edge| edge.to, - Some(max_dist), - ) - } - - fn raw_distances(&self, source: NodeID) -> BTreeMap { - dijkstra_multi( - &[source], - |node| self.raw_outgoing_edges(&node), - |edge| edge.to, - None, - ) - } - - fn raw_reversed_distances(&self, source: NodeID) -> BTreeMap { - dijkstra_multi( - &[source], - |node| self.raw_ingoing_edges(&node), - |edge| edge.from, - None, - ) - } - - fn reversed_distances(&self, source: Node) -> BTreeMap { - self.raw_reversed_distances(source.id()) - .into_iter() - .filter_map(|(id, dist)| self.id2node(&id).map(|node| (node, dist))) - .collect() - } -} +mod writer; type SegmentID = String; @@ -450,238 +84,6 @@ impl Meta { } } -struct Id2NodeDb { - db: rocksdb::DB, - _cache: rocksdb::Cache, // needs to be kept alive for as long as the db is alive -} - -impl Id2NodeDb { - fn open>(path: P) -> Self { - let mut opts = rocksdb::Options::default(); - opts.create_if_missing(true); - opts.optimize_for_point_lookup(512); - - opts.set_allow_mmap_reads(true); - opts.set_allow_mmap_writes(true); - opts.set_write_buffer_size(128 * 1024 * 1024); // 128 MB - opts.set_target_file_size_base(512 * 1024 * 1024); // 512 MB - opts.set_target_file_size_multiplier(10); - - opts.set_compression_type(rocksdb::DBCompressionType::None); - - let mut block_opts = rocksdb::BlockBasedOptions::default(); - let cache = rocksdb::Cache::new_lru_cache(1024 * 1024 * 1024); // 1 gb - block_opts.set_ribbon_filter(10.0); - - // some recommended settings (https://github.com/facebook/rocksdb/wiki/Setup-Options-and-Basic-Tuning) - opts.set_level_compaction_dynamic_level_bytes(true); - opts.set_bytes_per_sync(1048576); - - block_opts.set_block_size(32 * 1024); // 32 kb - block_opts.set_format_version(5); - block_opts.set_cache_index_and_filter_blocks(true); - block_opts.set_pin_l0_filter_and_index_blocks_in_cache(true); - block_opts.set_block_cache(&cache); - - opts.set_block_based_table_factory(&block_opts); - - let db = rocksdb::DB::open(&opts, path).unwrap(); - - Self { db, _cache: cache } - } - - fn put(&mut self, id: &NodeID, node: &Node) { - let mut opts = rocksdb::WriteOptions::default(); - opts.disable_wal(true); - - self.db - .put_opt( - id.as_u64().to_le_bytes(), - bincode::serialize(node).unwrap(), - &opts, - ) - .unwrap(); - } - - fn get(&self, id: &NodeID) -> Option { - let mut opts = rocksdb::ReadOptions::default(); - opts.set_verify_checksums(false); - - self.db - .get_opt(id.as_u64().to_le_bytes(), &opts) - .unwrap() - .map(|bytes| bincode::deserialize(&bytes).unwrap()) - } - - fn keys(&self) -> impl Iterator + '_ { - let mut opts = rocksdb::ReadOptions::default(); - opts.set_verify_checksums(false); - opts.set_async_io(true); - - self.db - .iterator_opt(rocksdb::IteratorMode::Start, opts) - .filter_map(|r| { - let (key, _) = r.ok()?; - Some(NodeID(u64::from_le_bytes((*key).try_into().unwrap()))) - }) - } - - fn estimate_num_keys(&self) -> usize { - self.db - .property_int_value("rocksdb.estimate-num-keys") - .ok() - .flatten() - .unwrap_or_default() as usize - } - - fn iter(&self) -> impl Iterator + '_ { - let mut opts = rocksdb::ReadOptions::default(); - opts.set_verify_checksums(false); - - self.db - .iterator_opt(rocksdb::IteratorMode::Start, opts) - .filter_map(|r| { - let (key, value) = r.ok()?; - - Some(( - NodeID(u64::from_le_bytes((*key).try_into().unwrap())), - bincode::deserialize(&value).unwrap(), - )) - }) - } - - fn batch_put(&mut self, iter: impl Iterator) { - let mut batch = rocksdb::WriteBatch::default(); - let mut count = 0; - - for (id, node) in iter { - batch.put( - id.as_u64().to_le_bytes(), - bincode::serialize(&node).unwrap(), - ); - count += 1; - - if count > 10_000 { - self.db.write(batch).unwrap(); - batch = rocksdb::WriteBatch::default(); - count = 0; - } - } - - if count > 0 { - self.db.write(batch).unwrap(); - } - } - - fn flush(&self) { - self.db.flush().unwrap(); - } -} - -pub struct WebgraphWriter { - pub path: String, - segment: SegmentWriter, - insert_batch: Vec>, - id2node: Id2NodeDb, - executor: Executor, - meta: Meta, - compression: Compression, -} - -impl WebgraphWriter { - fn meta>(path: P) -> Meta { - let meta_path = path.as_ref().join("metadata.json"); - Meta::open(meta_path) - } - - fn save_metadata(&mut self) { - let path = Path::new(&self.path).join("metadata.json"); - self.meta.save(path); - } - - pub fn new>(path: P, executor: Executor, compression: Compression) -> Self { - fs::create_dir_all(&path).unwrap(); - let mut meta = Self::meta(&path); - meta.comitted_segments.clear(); - - fs::create_dir_all(path.as_ref().join("segments")).unwrap(); - - let id = uuid::Uuid::new_v4().to_string(); - let segment = SegmentWriter::open(path.as_ref().join("segments"), id.clone(), compression); - - meta.comitted_segments.push(id); - - Self { - path: path.as_ref().as_os_str().to_str().unwrap().to_string(), - segment, - id2node: Id2NodeDb::open(path.as_ref().join("id2node")), - insert_batch: Vec::with_capacity(store::MAX_BATCH_SIZE), - executor, - meta, - compression, - } - } - - pub fn id2node(&self, id: &NodeID) -> Option { - self.id2node.get(id) - } - - fn id_or_assign(&mut self, node: Node) -> FullNodeID { - let id = FullNodeID::from(node.clone()); - - self.id2node.put(&id.id, &node); - - id - } - - pub fn insert(&mut self, from: Node, to: Node, label: String) { - if from == to { - return; - } - - let (from_id, to_id) = ( - self.id_or_assign(from.clone()), - self.id_or_assign(to.clone()), - ); - - let edge = InnerEdge { - from: from_id, - to: to_id, - label: label.chars().take(MAX_LABEL_LENGTH).collect(), - }; - - self.insert_batch.push(edge); - - if self.insert_batch.len() >= store::MAX_BATCH_SIZE { - self.commit(); - } - } - - pub fn commit(&mut self) { - if !self.insert_batch.is_empty() { - self.segment.insert(&self.insert_batch); - self.segment.flush(); - self.insert_batch.clear(); - } - - self.save_metadata(); - self.id2node.flush(); - } - - pub fn finalize(mut self) -> Webgraph { - self.commit(); - - Webgraph { - path: self.path, - segments: vec![self.segment.finalize()], - executor: self.executor.into(), - id2node: self.id2node, - meta: self.meta, - compression: self.compression, - } - } -} - pub struct Webgraph { pub path: String, segments: Vec, @@ -1043,20 +445,20 @@ mod test { #[test] fn node_lowercase_name() { let n = Node::from("TEST".to_string()); - assert_eq!(&n.name, "test"); + assert_eq!(n.as_str(), "test"); } #[test] fn host_node_cleanup() { let n = Node::from("https://www.example.com?test").into_host(); - assert_eq!(&n.name, "example.com"); + assert_eq!(n.as_str(), "example.com"); } #[test] fn remove_protocol() { let n = Node::from("https://www.example.com/?test"); - assert_eq!(&n.name, "example.com/?test="); + assert_eq!(n.as_str(), "example.com/?test="); } #[test] diff --git a/crates/core/src/webgraph/node.rs b/crates/core/src/webgraph/node.rs new file mode 100644 index 00000000..fb3f91da --- /dev/null +++ b/crates/core/src/webgraph/node.rs @@ -0,0 +1,171 @@ +// Stract is an open source web search engine. +// Copyright (C) 2024 Stract ApS +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see + +use url::Url; +use utoipa::ToSchema; + +use crate::{intmap, webpage::url_ext::UrlExt}; + +#[derive( + Debug, Clone, Copy, serde::Serialize, serde::Deserialize, PartialEq, Eq, PartialOrd, Ord, Hash, +)] +pub struct NodeID(u64); + +impl NodeID { + pub fn as_u64(self) -> u64 { + self.0 + } +} + +impl From for NodeID { + fn from(val: u128) -> Self { + NodeID(val as u64) + } +} + +impl From for NodeID { + fn from(val: u64) -> Self { + NodeID(val) + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)] +pub struct FullNodeID { + pub prefix: NodeID, + pub id: NodeID, +} + +impl From for FullNodeID { + fn from(value: Node) -> Self { + let id = value.id(); + let prefix = value.into_host().id(); + + FullNodeID { prefix, id } + } +} + +impl intmap::Key for NodeID { + const BIG_PRIME: Self = NodeID(11400714819323198549); + + fn wrapping_mul(self, rhs: Self) -> Self { + NodeID(self.0.wrapping_mul(rhs.0)) + } + + fn modulus_usize(self, rhs: usize) -> usize { + (self.0 % (rhs as u64)) as usize + } +} + +#[derive( + Debug, + Clone, + PartialEq, + Eq, + PartialOrd, + Ord, + Hash, + serde::Serialize, + serde::Deserialize, + ToSchema, +)] +#[serde(rename_all = "camelCase")] +pub struct Node { + name: String, +} + +impl Node { + pub fn into_host(self) -> Node { + let url = if self.name.contains("://") { + Url::parse(&self.name) + } else { + Url::parse(&("http://".to_string() + self.name.as_str())) + }; + + match url { + Ok(url) => { + let host = url.normalized_host().unwrap_or_default().to_string(); + Node { name: host } + } + Err(_) => Node { + name: String::new(), + }, + } + } + + pub fn as_str(&self) -> &str { + self.name.as_str() + } + + pub fn id(&self) -> NodeID { + let digest = md5::compute(self.name.as_bytes()); + u128::from_le_bytes(*digest).into() + } +} + +impl From for Node { + fn from(name: String) -> Self { + let url = if name.contains("://") { + Url::parse(&name).unwrap() + } else { + Url::parse(&("http://".to_string() + name.as_str())).unwrap() + }; + + Node::from(&url) + } +} + +impl From<&Url> for Node { + fn from(url: &Url) -> Self { + let normalized = normalize_url(url); + Node { name: normalized } + } +} + +impl From<&str> for Node { + fn from(name: &str) -> Self { + name.to_string().into() + } +} + +impl From for Node { + fn from(url: Url) -> Self { + Self::from(&url) + } +} + +pub fn normalize_url(url: &Url) -> String { + let mut url = url.clone(); + url.normalize(); + + let scheme = url.scheme(); + let mut normalized = url + .as_str() + .strip_prefix(scheme) + .unwrap_or_default() + .strip_prefix("://") + .unwrap_or_default() + .to_string(); + + if let Some(stripped) = normalized.strip_prefix("www.") { + normalized = stripped.to_string(); + } + + if let Some(prefix) = normalized.strip_suffix('/') { + normalized = prefix.to_string(); + } + + normalized +} diff --git a/crates/core/src/webgraph/segment.rs b/crates/core/src/webgraph/segment.rs index 125aefc5..83c32269 100644 --- a/crates/core/src/webgraph/segment.rs +++ b/crates/core/src/webgraph/segment.rs @@ -1,5 +1,5 @@ // Stract is an open source web search engine. -// Copyright (C) 2023 Stract ApS +// Copyright (C) 2024 Stract ApS // // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU Affero General Public License as @@ -182,16 +182,16 @@ mod test { let mut edges = Vec::new(); let a = FullNodeID { - id: NodeID(0), - prefix: NodeID(0), + id: NodeID::from(0 as u64), + prefix: NodeID::from(0 as u64), }; let b = FullNodeID { - id: NodeID(1), - prefix: NodeID(0), + id: NodeID::from(1 as u64), + prefix: NodeID::from(0 as u64), }; let c = FullNodeID { - id: NodeID(2), - prefix: NodeID(0), + id: NodeID::from(2 as u64), + prefix: NodeID::from(0 as u64), }; edges.push(InnerEdge { diff --git a/crates/core/src/webgraph/shortest_path.rs b/crates/core/src/webgraph/shortest_path.rs new file mode 100644 index 00000000..a178d6c0 --- /dev/null +++ b/crates/core/src/webgraph/shortest_path.rs @@ -0,0 +1,122 @@ +// Stract is an open source web search engine. +// Copyright (C) 2024 Stract ApS +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see BTreeMap; + fn raw_distances(&self, source: NodeID) -> BTreeMap; + fn raw_distances_with_max(&self, source: NodeID, max_dist: u8) -> BTreeMap; + fn raw_reversed_distances(&self, source: NodeID) -> BTreeMap; + fn reversed_distances(&self, source: Node) -> BTreeMap; +} + +fn dijkstra_multi( + sources: &[NodeID], + node_edges: F1, + edge_node: F2, + max_dist: Option, +) -> BTreeMap +where + L: EdgeLabel, + F1: Fn(NodeID) -> Vec>, + F2: Fn(&Edge) -> NodeID, +{ + let mut distances: BTreeMap = BTreeMap::default(); + + let mut queue = BinaryHeap::new(); + + for source_id in sources.iter().copied() { + queue.push(cmp::Reverse((0, source_id))); + distances.insert(source_id, 0); + } + + while let Some(state) = queue.pop() { + let (cost, v) = state.0; + + let current_dist = distances.get(&v).unwrap_or(&u8::MAX); + + if cost > *current_dist { + continue; + } + + if let Some(max_dist) = max_dist { + if cost > max_dist { + return distances; + } + } + + for edge in node_edges(v) { + if cost + 1 < *distances.get(&edge_node(&edge)).unwrap_or(&u8::MAX) { + let d = cost + 1; + + let next = cmp::Reverse((d, edge_node(&edge))); + queue.push(next); + distances.insert(edge_node(&edge), d); + } + } + } + + distances +} + +impl ShortestPaths for Webgraph { + fn distances(&self, source: Node) -> BTreeMap { + self.raw_distances(source.id()) + .into_iter() + .filter_map(|(id, dist)| self.id2node(&id).map(|node| (node, dist))) + .collect() + } + + fn raw_distances_with_max(&self, source: NodeID, max_dist: u8) -> BTreeMap { + dijkstra_multi( + &[source], + |node| self.raw_outgoing_edges(&node), + |edge| edge.to, + Some(max_dist), + ) + } + + fn raw_distances(&self, source: NodeID) -> BTreeMap { + dijkstra_multi( + &[source], + |node| self.raw_outgoing_edges(&node), + |edge| edge.to, + None, + ) + } + + fn raw_reversed_distances(&self, source: NodeID) -> BTreeMap { + dijkstra_multi( + &[source], + |node| self.raw_ingoing_edges(&node), + |edge| edge.from, + None, + ) + } + + fn reversed_distances(&self, source: Node) -> BTreeMap { + self.raw_reversed_distances(source.id()) + .into_iter() + .filter_map(|(id, dist)| self.id2node(&id).map(|node| (node, dist))) + .collect() + } +} diff --git a/crates/core/src/webgraph/store.rs b/crates/core/src/webgraph/store.rs index d9600dc0..f9ad6ea8 100644 --- a/crates/core/src/webgraph/store.rs +++ b/crates/core/src/webgraph/store.rs @@ -146,11 +146,11 @@ impl EdgeStoreWriter { Some(InnerEdge { from: FullNodeID { prefix: val.from_prefix, - id: NodeID(from), + id: NodeID::from(from), }, to: FullNodeID { prefix: val.to_prefix, - id: NodeID(to), + id: NodeID::from(to), }, label: L::from_bytes(&val.label).unwrap(), }) @@ -619,7 +619,7 @@ impl EdgeStore { let (key, val) = res.unwrap(); let node = u64::from_le_bytes((*key).try_into().unwrap()); - let node = NodeID(node); + let node = NodeID::from(node); let node_range = bincode::deserialize::>(&val).unwrap(); let edge_nodes = &self.edge_nodes[node_range]; @@ -659,12 +659,12 @@ mod tests { let e = InnerEdge { from: FullNodeID { - id: NodeID(0), - prefix: NodeID(0), + id: NodeID::from(0 as u64), + prefix: NodeID::from(0 as u64), }, to: FullNodeID { - id: NodeID(1), - prefix: NodeID(0), + id: NodeID::from(1 as u64), + prefix: NodeID::from(0 as u64), }, label: "test".to_string(), }; @@ -673,12 +673,12 @@ mod tests { let store = kv.finalize(); - let edges: Vec<_> = store.get_with_label(&NodeID(0)); + let edges: Vec<_> = store.get_with_label(&NodeID::from(0 as u64)); assert_eq!(edges.len(), 1); assert_eq!(&edges[0], &Edge::from(e.clone())); - let edges: Vec<_> = store.get_with_label(&NodeID(1)); + let edges: Vec<_> = store.get_with_label(&NodeID::from(1 as u64)); assert_eq!(edges.len(), 0); } @@ -693,12 +693,12 @@ mod tests { let e = InnerEdge { from: FullNodeID { - id: NodeID(0), - prefix: NodeID(0), + id: NodeID::from(0 as u64), + prefix: NodeID::from(0 as u64), }, to: FullNodeID { - id: NodeID(1), - prefix: NodeID(0), + id: NodeID::from(1 as u64), + prefix: NodeID::from(0 as u64), }, label: "test".to_string(), }; @@ -707,10 +707,10 @@ mod tests { let store = kv.finalize(); - let edges: Vec<_> = store.get_with_label(&NodeID(0)); + let edges: Vec<_> = store.get_with_label(&NodeID::from(0 as u64)); assert_eq!(edges.len(), 0); - let edges: Vec<_> = store.get_with_label(&NodeID(1)); + let edges: Vec<_> = store.get_with_label(&NodeID::from(1 as u64)); assert_eq!(edges.len(), 1); assert_eq!(&edges[0], &Edge::from(e.clone())); } diff --git a/crates/core/src/webgraph/writer.rs b/crates/core/src/webgraph/writer.rs new file mode 100644 index 00000000..f80a87e8 --- /dev/null +++ b/crates/core/src/webgraph/writer.rs @@ -0,0 +1,128 @@ +// Stract is an open source web search engine. +// Copyright (C) 2024 Stract ApS +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see >, + id2node: Id2NodeDb, + executor: Executor, + meta: Meta, + compression: Compression, +} + +impl WebgraphWriter { + fn meta>(path: P) -> Meta { + let meta_path = path.as_ref().join("metadata.json"); + Meta::open(meta_path) + } + + fn save_metadata(&mut self) { + let path = Path::new(&self.path).join("metadata.json"); + self.meta.save(path); + } + + pub fn new>(path: P, executor: Executor, compression: Compression) -> Self { + fs::create_dir_all(&path).unwrap(); + let mut meta = Self::meta(&path); + meta.comitted_segments.clear(); + + fs::create_dir_all(path.as_ref().join("segments")).unwrap(); + + let id = uuid::Uuid::new_v4().to_string(); + let segment = SegmentWriter::open(path.as_ref().join("segments"), id.clone(), compression); + + meta.comitted_segments.push(id); + + Self { + path: path.as_ref().as_os_str().to_str().unwrap().to_string(), + segment, + id2node: Id2NodeDb::open(path.as_ref().join("id2node")), + insert_batch: Vec::with_capacity(store::MAX_BATCH_SIZE), + executor, + meta, + compression, + } + } + + pub fn id2node(&self, id: &NodeID) -> Option { + self.id2node.get(id) + } + + fn id_or_assign(&mut self, node: Node) -> FullNodeID { + let id = FullNodeID::from(node.clone()); + + self.id2node.put(&id.id, &node); + + id + } + + pub fn insert(&mut self, from: Node, to: Node, label: String) { + if from == to { + return; + } + + let (from_id, to_id) = ( + self.id_or_assign(from.clone()), + self.id_or_assign(to.clone()), + ); + + let edge = InnerEdge { + from: from_id, + to: to_id, + label: label.chars().take(MAX_LABEL_LENGTH).collect(), + }; + + self.insert_batch.push(edge); + + if self.insert_batch.len() >= store::MAX_BATCH_SIZE { + self.commit(); + } + } + + pub fn commit(&mut self) { + if !self.insert_batch.is_empty() { + self.segment.insert(&self.insert_batch); + self.segment.flush(); + self.insert_batch.clear(); + } + + self.save_metadata(); + self.id2node.flush(); + } + + pub fn finalize(mut self) -> Webgraph { + self.commit(); + + Webgraph { + path: self.path, + segments: vec![self.segment.finalize()], + executor: self.executor.into(), + id2node: self.id2node, + meta: self.meta, + compression: self.compression, + } + } +}