Skip to content

Commit

Permalink
Improve code docs (#246)
Browse files Browse the repository at this point in the history
* document all entrypoints

* document ampc framework

* document ranking pipeline

* document the different searchers

* document generic search query flow

* document main crawler elements
  • Loading branch information
mikkeldenker authored Dec 5, 2024
1 parent daff4d0 commit 7633b61
Show file tree
Hide file tree
Showing 52 changed files with 315 additions and 190 deletions.
2 changes: 2 additions & 0 deletions crates/core/src/ampc/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ use super::{DhtConn, Finisher, Job, JobScheduled, RemoteWorker, Setup, Worker, W
use crate::{distributed::retry_strategy::ExponentialBackoff, Result};
use anyhow::anyhow;

/// A coordinator is responsible for scheduling jobs on workers and coordinating
/// between rounds of computation.
pub struct Coordinator<J>
where
J: Job,
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/ampc/dht/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
//! with multiple shards. Each shard cluster
//! is a Raft cluster, and each key is then routed to the correct
//! cluster based on hash(key) % number_of_shards. The keys
//! are currently *not* rebalanced if the number of shards change, so
//! are currently *not* re-balanced if the number of shards change, so
//! if an entire shard becomes unavailable or a new shard is added, all
//! keys in the entire DHT is essentially lost as the
//! keys might hash incorrectly.
Expand Down
1 change: 1 addition & 0 deletions crates/core/src/ampc/dht/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>

use std::collections::BTreeMap;
use std::fmt::Debug;
use std::io::Cursor;
Expand Down
2 changes: 2 additions & 0 deletions crates/core/src/ampc/finisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

use super::prelude::Job;

/// A finisher is responsible for determining if the computation is finished
/// or if another round of computation is needed.
pub trait Finisher {
type Job: Job;

Expand Down
1 change: 1 addition & 0 deletions crates/core/src/ampc/mapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

use super::{prelude::Job, DhtConn};

/// A mapper is the specific computation to be run on the graph.
pub trait Mapper: bincode::Encode + bincode::Decode + Send + Sync + Clone {
type Job: Job<Mapper = Self>;

Expand Down
27 changes: 27 additions & 0 deletions crates/core/src/ampc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,33 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

//! # Framework for Adaptive Massively Parallel Computation (AMPC).
//!
//! AMPC is a system for implementing large-scale distributed graph algorithms efficiently.
//! It provides a framework for parallel computation across clusters of machines.
//!
//! While similar in concept to MapReduce, AMPC uses a distributed hash table (DHT) as its
//! underlying data structure rather than the traditional map and reduce phases. This key
//! architectural difference enables more flexible and efficient computation patterns.
//!
//! The main advantage over MapReduce is that workers can dynamically access any keys in
//! the DHT during computation. This is in contrast to MapReduce where the keyspace must
//! be statically partitioned between reducers before computation begins. The dynamic
//! access pattern allows for more natural expression of graph algorithms in a distributed
//! setting.
//!
//! This is roughly inspired by
//! [Massively Parallel Graph Computation: From Theory to Practice](https://research.google/blog/massively-parallel-graph-computation-from-theory-to-practice/)
//!
//! ## Key concepts
//!
//! * **DHT**: A distributed hash table is used to store the result of the computation for
//! each round.
//! * **Worker**: A worker owns a subset of the overall graph and is responsible for
//! executing mappers on its portion of the graph and sending results to the DHT.
//! * **Mapper**: A mapper is the specific computation to be run on the graph.
//! * **Coordinator**: The coordinator is responsible for scheduling the jobs on the workers.
use self::{job::Job, worker::WorkerRef};
use crate::distributed::sonic;

Expand Down
12 changes: 12 additions & 0 deletions crates/core/src/ampc/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,24 @@

use super::DhtConn;

/// A setup is responsible for initializing the DHT before each round of computation.
pub trait Setup {
type DhtTables;

/// Setup initial state of the DHT.
fn init_dht(&self) -> DhtConn<Self::DhtTables>;

/// Setup state for a new round.
///
/// This is called once for each round of computation.
/// The first round will run `setup_first_round` first
/// but will still call `setup_round` after that.
#[allow(unused_variables)] // reason = "dht might be used by implementors"
fn setup_round(&self, dht: &Self::DhtTables) {}

/// Setup state for the first round.
///
/// This is called once before the first round of computation.
fn setup_first_round(&self, dht: &Self::DhtTables) {
self.setup_round(dht);
}
Expand Down
2 changes: 2 additions & 0 deletions crates/core/src/ampc/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ use crate::Result;
use anyhow::anyhow;
use tokio::net::ToSocketAddrs;

/// A worker is responsible for executing a mapper on its portion of the graph and
/// sending results to the DHT.
pub trait Worker: Send + Sync {
type Remote: RemoteWorker<Job = Self::Job>;

Expand Down
13 changes: 11 additions & 2 deletions crates/core/src/crawler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,15 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

//! # Crawler
//!
//! The crawler is responsible for fetching webpages and storing them in WARC files
//! for later processing.
//!
//! Before starting a crawl, a plan needs to be created. This plan is then used by
//! the crawler coordinator to assign sites to crawl to different workers.
//! A site is only assigned to one worker at a time for politeness.
use std::{collections::VecDeque, future::Future, net::SocketAddr, sync::Arc};

type HashMap<K, V> = std::collections::HashMap<K, V, ahash::RandomState>;
Expand All @@ -35,7 +44,7 @@ pub use router::Router;
mod file_queue;
pub mod planner;
pub mod robot_client;
mod wander_prirotiser;
mod wander_prioritiser;
mod warc_writer;
mod worker;

Expand Down Expand Up @@ -304,7 +313,7 @@ impl Crawler {
}
}

pub trait DatumStream: Send + Sync {
pub trait DatumSink: Send + Sync {
fn write(&self, crawl_datum: CrawlDatum) -> impl Future<Output = Result<()>> + Send;
fn finish(&self) -> impl Future<Output = Result<()>> + Send;
}
Expand Down
3 changes: 3 additions & 0 deletions crates/core/src/crawler/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

use anyhow::{anyhow, Result};
use futures::stream::FuturesOrdered;
use futures::StreamExt;
Expand Down Expand Up @@ -71,6 +72,7 @@ impl From<StoredUrl> for Url {
}
}

/// Store urls in groups on disk based on their harmonic rank.
struct UrlGrouper {
groups: Vec<speedy_kv::Db<StoredUrl, ()>>,
folder: std::path::PathBuf,
Expand Down Expand Up @@ -169,6 +171,7 @@ struct Budget {
remaining_schedulable: u64,
}

/// Create a crawl plan based on the harmonic rank of the hosts.
pub struct CrawlPlanner {
host_centrality: Arc<speedy_kv::Db<NodeID, f64>>,
host_centrality_rank: Arc<speedy_kv::Db<NodeID, u64>>,
Expand Down
1 change: 1 addition & 0 deletions crates/core/src/crawler/robot_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ pub(super) fn reqwest_client(config: &CrawlerConfig) -> Result<reqwest::Client>
.map_err(|e| Error::from(anyhow!(e)))
}

/// Reqwest client that respects robots.txt for each request.
#[derive(Clone)]
pub struct RobotClient {
robots_txt_manager: RobotsTxtManager,
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/crawler/warc_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::{
warc,
};

use super::{CrawlDatum, DatumStream, Error, Result};
use super::{CrawlDatum, DatumSink, Error, Result};
use anyhow::anyhow;

/// The WarcWriter is responsible for storing the crawl datums
Expand All @@ -30,7 +30,7 @@ pub struct WarcWriter {
tx: tokio::sync::mpsc::Sender<WarcWriterMessage>,
}

impl DatumStream for WarcWriter {
impl DatumSink for WarcWriter {
async fn write(&self, crawl_datum: CrawlDatum) -> Result<()> {
self.tx
.send(WarcWriterMessage::Crawl(crawl_datum))
Expand Down
9 changes: 5 additions & 4 deletions crates/core/src/crawler/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ use crate::{
};

use super::{
encoded_body, robot_client::RobotClient, wander_prirotiser::WanderPrioritiser, CrawlDatum,
DatumStream, Domain, Error, Result, RetrieableUrl, Site, WarcWriter, WeightedUrl, WorkerJob,
encoded_body, robot_client::RobotClient, wander_prioritiser::WanderPrioritiser, CrawlDatum,
DatumSink, Domain, Error, Result, RetrieableUrl, Site, WarcWriter, WeightedUrl, WorkerJob,
MAX_CONTENT_LENGTH, MAX_OUTGOING_URLS_PER_PAGE,
};

Expand Down Expand Up @@ -126,7 +126,8 @@ impl WorkerThread {
}
}

pub struct JobExecutor<S: DatumStream> {
/// JobExecutor receives a job from the coordinator and crawls the urls in the job.
pub struct JobExecutor<S: DatumSink> {
writer: Arc<S>,
client: RobotClient,
has_gotten_429_response: bool,
Expand All @@ -144,7 +145,7 @@ pub struct JobExecutor<S: DatumStream> {
job: WorkerJob,
}

impl<S: DatumStream> JobExecutor<S> {
impl<S: DatumSink> JobExecutor<S> {
pub fn new(
job: WorkerJob,
config: Arc<CrawlerConfig>,
Expand Down
11 changes: 6 additions & 5 deletions crates/core/src/entrypoint/configure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
use tokio::fs::File;
use tokio::io;
use tokio_stream::StreamExt;
use tracing::{debug, info};
use tracing::info;

use crate::config::{
defaults, IndexerConfig, IndexerDualEncoderConfig, IndexerGraphConfig, LocalConfig,
Expand Down Expand Up @@ -73,7 +73,7 @@ fn download_files() {
}

fn build_spellchecker() -> Result<()> {
debug!("Building spellchecker");
info!("Building spellchecker");
let spellchecker_path = Path::new(DATA_PATH).join("web_spell");

if !spellchecker_path.exists() {
Expand All @@ -97,7 +97,7 @@ fn build_spellchecker() -> Result<()> {
}

fn create_webgraph() -> Result<()> {
debug!("Creating webgraph");
info!("Creating webgraph");
let out_path = Path::new(DATA_PATH).join("webgraph");

if out_path.exists() {
Expand Down Expand Up @@ -128,7 +128,7 @@ fn create_webgraph() -> Result<()> {
}

fn calculate_centrality() {
debug!("Calculating centrality");
info!("Calculating centrality");
let webgraph_path = Path::new(DATA_PATH).join("webgraph");
let out_path = Path::new(DATA_PATH).join("centrality");

Expand All @@ -144,7 +144,7 @@ fn calculate_centrality() {
}

fn create_inverted_index() -> Result<()> {
debug!("Creating inverted index");
info!("Creating inverted index");
let out_path = Path::new(DATA_PATH).join("index");

if out_path.exists() {
Expand Down Expand Up @@ -209,6 +209,7 @@ fn create_inverted_index() -> Result<()> {
}

fn create_entity_index() -> Result<()> {
info!("Creating entity index");
let out_path = Path::new(DATA_PATH).join("entity");
if out_path.exists() {
std::fs::remove_dir_all(&out_path)?;
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/entrypoint/search_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ impl_search!([
]);

pub struct SearchService {
local_searcher: LocalSearcher<Arc<RwLock<Index>>>,
local_searcher: LocalSearcher,
// dropping the handle leaves the cluster
#[allow(unused)]
cluster_handle: Arc<Cluster>,
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/generic_query/get_homepage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ impl GenericQuery for GetHomepageQuery {
FirstDocCollector::with_shard_id(ctx.shard_id)
}

fn remote_collector(&self) -> Self::Collector {
fn coordinator_collector(&self) -> Self::Collector {
FirstDocCollector::without_shard_id()
}

Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/generic_query/get_site_urls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ impl GenericQuery for GetSiteUrlsQuery {
.disable_offset()
}

fn remote_collector(&self) -> Self::Collector {
fn coordinator_collector(&self) -> Self::Collector {
Self::Collector::new()
.with_limit(self.limit as usize)
.with_offset(self.offset.unwrap_or(0) as usize)
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/generic_query/get_webpage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ impl GenericQuery for GetWebpageQuery {
FirstDocCollector::with_shard_id(ctx.shard_id)
}

fn remote_collector(&self) -> Self::Collector {
fn coordinator_collector(&self) -> Self::Collector {
FirstDocCollector::without_shard_id()
}

Expand Down
24 changes: 23 additions & 1 deletion crates/core/src/generic_query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,26 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

//! # Main flow
//! ```md
//! `coordinator` <------> `searcher`
//! -----------------------------------
//! send query to searcher
//! search index
//! collect fruits
//! send fruits to coordinator
//! merge fruits
//! filter fruits
//! for each shard
//! send fruits to searchers
//! construct intermediate output
//! from fruits
//! send intermediate output to coordinator
//! merge intermediate outputs
//! return final output
//! ---------------------------------------------------
//! ```
use crate::{inverted_index::ShardId, search_ctx, Result};

pub mod top_key_phrases;
Expand All @@ -34,6 +54,8 @@ pub use get_site_urls::GetSiteUrlsQuery;
pub mod collector;
pub use collector::Collector;

/// A generic query that can be executed on a searcher
/// against an index.
pub trait GenericQuery: Send + Sync + bincode::Encode + bincode::Decode + Clone {
type Collector: Collector;
type TantivyQuery: tantivy::query::Query;
Expand All @@ -42,7 +64,7 @@ pub trait GenericQuery: Send + Sync + bincode::Encode + bincode::Decode + Clone

fn tantivy_query(&self, ctx: &search_ctx::Ctx) -> Self::TantivyQuery;
fn collector(&self, ctx: &search_ctx::Ctx) -> Self::Collector;
fn remote_collector(&self) -> Self::Collector;
fn coordinator_collector(&self) -> Self::Collector;

fn filter_fruit_shards(
&self,
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/generic_query/size.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ impl GenericQuery for SizeQuery {
SizeCollector::new().with_shard_id(ctx.shard_id)
}

fn remote_collector(&self) -> Self::Collector {
fn coordinator_collector(&self) -> Self::Collector {
SizeCollector::new()
}

Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/generic_query/top_key_phrases.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ impl GenericQuery for TopKeyPhrasesQuery {
TopKeyPhrasesCollector::new(self.top_n).with_shard_id(ctx.shard_id)
}

fn remote_collector(&self) -> Self::Collector {
fn coordinator_collector(&self) -> Self::Collector {
TopKeyPhrasesCollector::new(self.top_n)
}

Expand Down
8 changes: 8 additions & 0 deletions crates/core/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,14 @@ impl Index {
})
}

pub fn inverted_index(&self) -> &InvertedIndex {
&self.inverted_index
}

pub fn region_count(&self) -> &Mutex<RegionCount> {
&self.region_count
}

pub fn path(&self) -> PathBuf {
PathBuf::from(&self.path)
}
Expand Down
Loading

0 comments on commit 7633b61

Please sign in to comment.