Skip to content

Commit

Permalink
document main crawler elements
Browse files Browse the repository at this point in the history
  • Loading branch information
mikkeldenker committed Dec 5, 2024
1 parent 93ddf47 commit 8f7d712
Show file tree
Hide file tree
Showing 7 changed files with 23 additions and 9 deletions.
13 changes: 11 additions & 2 deletions crates/core/src/crawler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,15 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

//! # Crawler
//!
//! The crawler is responsible for fetching webpages and storing them in WARC files
//! for later processing.
//!
//! Before starting a crawl, a plan needs to be created. This plan is then used by
//! the crawler coordinator to assign sites to crawl to different workers.
//! A site is only assigned to one worker at a time for politeness.
use std::{collections::VecDeque, future::Future, net::SocketAddr, sync::Arc};

type HashMap<K, V> = std::collections::HashMap<K, V, ahash::RandomState>;
Expand All @@ -35,7 +44,7 @@ pub use router::Router;
mod file_queue;
pub mod planner;
pub mod robot_client;
mod wander_prirotiser;
mod wander_prioritiser;
mod warc_writer;
mod worker;

Expand Down Expand Up @@ -304,7 +313,7 @@ impl Crawler {
}
}

pub trait DatumStream: Send + Sync {
pub trait DatumSink: Send + Sync {
fn write(&self, crawl_datum: CrawlDatum) -> impl Future<Output = Result<()>> + Send;
fn finish(&self) -> impl Future<Output = Result<()>> + Send;
}
Expand Down
3 changes: 3 additions & 0 deletions crates/core/src/crawler/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

use anyhow::{anyhow, Result};
use futures::stream::FuturesOrdered;
use futures::StreamExt;
Expand Down Expand Up @@ -71,6 +72,7 @@ impl From<StoredUrl> for Url {
}
}

/// Store urls in groups on disk based on their harmonic rank.
struct UrlGrouper {
groups: Vec<speedy_kv::Db<StoredUrl, ()>>,
folder: std::path::PathBuf,
Expand Down Expand Up @@ -169,6 +171,7 @@ struct Budget {
remaining_schedulable: u64,
}

/// Create a crawl plan based on the harmonic rank of the hosts.
pub struct CrawlPlanner {
host_centrality: Arc<speedy_kv::Db<NodeID, f64>>,
host_centrality_rank: Arc<speedy_kv::Db<NodeID, u64>>,
Expand Down
1 change: 1 addition & 0 deletions crates/core/src/crawler/robot_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ pub(super) fn reqwest_client(config: &CrawlerConfig) -> Result<reqwest::Client>
.map_err(|e| Error::from(anyhow!(e)))
}

/// Reqwest client that respects robots.txt for each request.
#[derive(Clone)]
pub struct RobotClient {
robots_txt_manager: RobotsTxtManager,
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/crawler/warc_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::{
warc,
};

use super::{CrawlDatum, DatumStream, Error, Result};
use super::{CrawlDatum, DatumSink, Error, Result};
use anyhow::anyhow;

/// The WarcWriter is responsible for storing the crawl datums
Expand All @@ -30,7 +30,7 @@ pub struct WarcWriter {
tx: tokio::sync::mpsc::Sender<WarcWriterMessage>,
}

impl DatumStream for WarcWriter {
impl DatumSink for WarcWriter {
async fn write(&self, crawl_datum: CrawlDatum) -> Result<()> {
self.tx
.send(WarcWriterMessage::Crawl(crawl_datum))
Expand Down
9 changes: 5 additions & 4 deletions crates/core/src/crawler/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ use crate::{
};

use super::{
encoded_body, robot_client::RobotClient, wander_prirotiser::WanderPrioritiser, CrawlDatum,
DatumStream, Domain, Error, Result, RetrieableUrl, Site, WarcWriter, WeightedUrl, WorkerJob,
encoded_body, robot_client::RobotClient, wander_prioritiser::WanderPrioritiser, CrawlDatum,
DatumSink, Domain, Error, Result, RetrieableUrl, Site, WarcWriter, WeightedUrl, WorkerJob,
MAX_CONTENT_LENGTH, MAX_OUTGOING_URLS_PER_PAGE,
};

Expand Down Expand Up @@ -126,7 +126,8 @@ impl WorkerThread {
}
}

pub struct JobExecutor<S: DatumStream> {
/// JobExecutor receives a job from the coordinator and crawls the urls in the job.
pub struct JobExecutor<S: DatumSink> {
writer: Arc<S>,
client: RobotClient,
has_gotten_429_response: bool,
Expand All @@ -144,7 +145,7 @@ pub struct JobExecutor<S: DatumStream> {
job: WorkerJob,
}

impl<S: DatumStream> JobExecutor<S> {
impl<S: DatumSink> JobExecutor<S> {
pub fn new(
job: WorkerJob,
config: Arc<CrawlerConfig>,
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/live_index/crawler/crawlable_site.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ impl CrawlableSite {
}
}

impl crawler::DatumStream for tokio::sync::Mutex<Vec<crawler::CrawlDatum>> {
impl crawler::DatumSink for tokio::sync::Mutex<Vec<crawler::CrawlDatum>> {
async fn write(&self, crawl_datum: crawler::CrawlDatum) -> Result<(), crawler::Error> {
self.lock().await.push(crawl_datum);
Ok(())
Expand Down

0 comments on commit 8f7d712

Please sign in to comment.