Skip to content

Commit

Permalink
refactor data that is re-used across fields for a particular page dur…
Browse files Browse the repository at this point in the history
…ing indexing into an 'FnCache'
  • Loading branch information
mikkeldenker committed Mar 19, 2024
1 parent 72ac622 commit 5bebe31
Show file tree
Hide file tree
Showing 11 changed files with 388 additions and 184 deletions.
13 changes: 13 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ crossbeam-channel = "0.5.6"
csv = "1.1.6"
dashmap = { version = "5.4.0", features = ["rayon"] }
encoding_rs = "0.8.31"
enum_dispatch = "0.3.12"
eventsource-stream = "0.2.3"
fend-core = "1.2.2"
flate2 = "1.0.28"
Expand Down
3 changes: 2 additions & 1 deletion assets/licenses.html
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ <h1>Third Party Licenses</h1>

<h2>Overview of licenses:</h2>
<ul class="licenses-overview">
<li><a href="#Apache-2.0">Apache License 2.0</a> (394)</li>
<li><a href="#Apache-2.0">Apache License 2.0</a> (395)</li>
<li><a href="#MIT">MIT License</a> (180)</li>
<li><a href="#MPL-2.0">Mozilla Public License 2.0</a> (9)</li>
<li><a href="#BSD-3-Clause">BSD 3-Clause &quot;New&quot; or &quot;Revised&quot; License</a> (8)</li>
Expand Down Expand Up @@ -9889,6 +9889,7 @@ <h4>Used by:</h4>
<li><a href=" https://github.com/zrzka/anes-rs ">anes 0.1.6</a></li>
<li><a href=" https://github.com/huggingface/candle ">candle-nn 0.3.3</a></li>
<li><a href=" https://github.com/huggingface/candle ">candle-transformers 0.3.3</a></li>
<li><a href=" https://gitlab.com/antonok/enum_dispatch ">enum_dispatch 0.3.12</a></li>
<li><a href=" https://github.com/jpopesculian/eventsource-stream ">eventsource-stream 0.2.3</a></li>
<li><a href=" https://github.com/cbreeden/fxhash ">fxhash 0.2.1</a></li>
<li><a href=" https://github.com/starkat99/half-rs ">half 2.4.0</a></li>
Expand Down
1 change: 1 addition & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ crossbeam-channel = { workspace = true }
csv = { workspace = true }
dashmap = { workspace = true }
encoding_rs = { workspace = true }
enum_dispatch = { workspace = true }
eventsource-stream = { workspace = true }
fend-core = { workspace = true }
flate2 = { workspace = true }
Expand Down
8 changes: 4 additions & 4 deletions crates/core/src/mapreduce/dht/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,18 +73,18 @@ macro_rules! raft_sonic_request_response {
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug)]
pub enum Response {
$(
$req(<$req as crate::distributed::sonic::service::Message<$service>>::Response),
$req(<$req as $crate::distributed::sonic::service::Message<$service>>::Response),
)*
Empty,
}

$(
impl TryFrom<Response> for <$req as crate::distributed::sonic::service::Message<$service>>::Response {
type Error = crate::distributed::sonic::Error;
impl TryFrom<Response> for <$req as $crate::distributed::sonic::service::Message<$service>>::Response {
type Error = $crate::distributed::sonic::Error;
fn try_from(res: Response) -> Result<Self, Self::Error> {
match res {
Response::$req(res) => Ok(res),
_ => Err(crate::distributed::sonic::Error::Application(anyhow::anyhow!("Invalid response for request from Raft"))),
_ => Err($crate::distributed::sonic::Error::Application(anyhow::anyhow!("Invalid response for request from Raft"))),
}
}
}
Expand Down
20 changes: 10 additions & 10 deletions crates/core/src/mapreduce/dht/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,34 +39,34 @@ impl RaftNetworkFactory<TypeConfig> for Network {
}
}

pub type AppendEntriesRequest = openraft::raft::AppendEntriesRequest<TypeConfig>;
pub type AppendEntries = openraft::raft::AppendEntriesRequest<TypeConfig>;
pub type AppendEntriesResponse = openraft::raft::AppendEntriesResponse<NodeId>;

pub type InstallSnapshotRequest = openraft::raft::InstallSnapshotRequest<TypeConfig>;
pub type InstallSnapshot = openraft::raft::InstallSnapshotRequest<TypeConfig>;
pub type InstallSnapshotResponse = openraft::raft::InstallSnapshotResponse<NodeId>;

pub type VoteRequest = openraft::raft::VoteRequest<NodeId>;
pub type Vote = openraft::raft::VoteRequest<NodeId>;
pub type VoteResponse = openraft::raft::VoteResponse<NodeId>;

#[derive(serde::Serialize, serde::Deserialize, Debug, Clone)]
pub struct AddLearnerRequest {
pub struct AddLearner {
pub id: NodeId,
pub addr: SocketAddr,
}

#[derive(serde::Serialize, serde::Deserialize, Debug, Clone)]
pub struct AddNodesRequest {
pub struct AddNodes {
members: BTreeMap<NodeId, BasicNode>,
}

sonic_service!(
Server,
[
AppendEntriesRequest,
InstallSnapshotRequest,
VoteRequest,
AddLearnerRequest,
AddNodesRequest,
AppendEntries,
InstallSnapshot,
Vote,
AddLearner,
AddNodes,
Get,
Set,
]
Expand Down
28 changes: 12 additions & 16 deletions crates/core/src/mapreduce/dht/network/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@ use crate::{
};

use super::{
AddLearnerRequest, AddNodesRequest, AppendEntriesRequest, AppendEntriesResponse,
InstallSnapshotRequest, InstallSnapshotResponse, Server, VoteRequest, VoteResponse,
AddLearner, AddNodes, AppendEntries, AppendEntriesResponse, InstallSnapshot,
InstallSnapshotResponse, Server, Vote, VoteResponse,
};

impl sonic::service::Message<Server> for AppendEntriesRequest {
impl sonic::service::Message<Server> for AppendEntries {
type Response = Result<AppendEntriesResponse, RaftError<NodeId>>;

async fn handle(self, server: &Server) -> Self::Response {
Expand All @@ -50,7 +50,7 @@ impl sonic::service::Message<Server> for AppendEntriesRequest {
}
}

impl sonic::service::Message<Server> for InstallSnapshotRequest {
impl sonic::service::Message<Server> for InstallSnapshot {
type Response = Result<InstallSnapshotResponse, RaftError<NodeId, InstallSnapshotError>>;

async fn handle(self, server: &Server) -> Self::Response {
Expand All @@ -59,7 +59,7 @@ impl sonic::service::Message<Server> for InstallSnapshotRequest {
}
}

impl sonic::service::Message<Server> for VoteRequest {
impl sonic::service::Message<Server> for Vote {
type Response = Result<VoteResponse, RaftError<NodeId>>;

async fn handle(self, server: &Server) -> Self::Response {
Expand All @@ -68,7 +68,7 @@ impl sonic::service::Message<Server> for VoteRequest {
}
}

impl sonic::service::Message<Server> for AddLearnerRequest {
impl sonic::service::Message<Server> for AddLearner {
type Response = Result<(), RaftError<NodeId, ClientWriteError<NodeId, BasicNode>>>;

async fn handle(self, server: &Server) -> Self::Response {
Expand All @@ -93,7 +93,7 @@ impl sonic::service::Message<Server> for AddLearnerRequest {
}
}

impl sonic::service::Message<Server> for AddNodesRequest {
impl sonic::service::Message<Server> for AddNodes {
type Response = Result<(), RaftError<NodeId, ClientWriteError<NodeId, BasicNode>>>;

async fn handle(self, server: &Server) -> Self::Response {
Expand Down Expand Up @@ -160,7 +160,7 @@ impl RemoteClient {
}

async fn add_learner(&self, id: NodeId, addr: SocketAddr) -> Result<()> {
let rpc = AddLearnerRequest { id, addr };
let rpc = AddLearner { id, addr };
let retry = ExponentialBackoff::from_millis(500)
.with_limit(Duration::from_secs(60))
.take(5);
Expand Down Expand Up @@ -220,7 +220,7 @@ impl RemoteClient {
}

async fn add_nodes(&self, members: BTreeMap<NodeId, BasicNode>) -> Result<()> {
let rpc = AddNodesRequest { members };
let rpc = AddNodes { members };
let retry = ExponentialBackoff::from_millis(500).with_limit(Duration::from_secs(10));

for backoff in retry {
Expand Down Expand Up @@ -293,7 +293,7 @@ impl RemoteClient {
impl RaftNetwork<TypeConfig> for RemoteClient {
async fn append_entries(
&mut self,
rpc: AppendEntriesRequest,
rpc: AppendEntries,
option: RPCOption,
) -> Result<AppendEntriesResponse, RPCError> {
self.send_raft_rpc(rpc, option).await?.map_err(|e| {
Expand All @@ -308,7 +308,7 @@ impl RaftNetwork<TypeConfig> for RemoteClient {

async fn install_snapshot(
&mut self,
rpc: InstallSnapshotRequest,
rpc: InstallSnapshot,
option: RPCOption,
) -> Result<InstallSnapshotResponse, RPCError<InstallSnapshotError>> {
self.send_raft_rpc(rpc, option).await?.map_err(|e| {
Expand All @@ -321,11 +321,7 @@ impl RaftNetwork<TypeConfig> for RemoteClient {
})
}

async fn vote(
&mut self,
rpc: VoteRequest,
option: RPCOption,
) -> Result<VoteResponse, RPCError> {
async fn vote(&mut self, rpc: Vote, option: RPCOption) -> Result<VoteResponse, RPCError> {
self.send_raft_rpc(rpc, option).await?.map_err(|e| {
openraft::error::RemoteError {
target: self.target,
Expand Down
5 changes: 3 additions & 2 deletions crates/core/src/query/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,9 @@ fn single_bang(input: &str, pref: char) -> nom::IResult<&str, Term> {

fn bang(input: &str) -> nom::IResult<&str, Term> {
for pref in BANG_PREFIXES.iter() {
let (input, output) = single_bang(input, *pref)?;
return Ok((input, output));
if let Ok((input, output)) = single_bang(input, *pref) {
return Ok((input, output));
}
}

Err(nom::Err::Error(nom::error::Error::new(
Expand Down
136 changes: 136 additions & 0 deletions crates/core/src/webpage/html/fn_cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
// Stract is an open source web search engine.
// Copyright (C) 2024 Stract ApS
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

use crate::{webpage::schema_org, Result};
use tantivy::tokenizer::PreTokenizedString;

use super::{find_recipe_first_ingredient_tag_id, Html};

#[macro_export]
macro_rules! cache {
($($fn:ident -> $res:ty),*$(,)?) => {
/// Dynamically compute the different webpage functions
/// and cache the results for subsequent calls.
///
/// Used during indexing as some of the fields use
/// the same data from the webpage and we don't want to
/// recompute the same data multiple times.
pub struct FnCache<'a> {
html: &'a Html,
first_ingredient_tag_id: Option<String>,
schema_json: Option<String>,
pretokenized_schema_json: Option<PreTokenizedString>,
$($fn: Option<$res>,)*
}

impl<'a> FnCache<'a> {
/// Create a new instance of the IndexingCacher
pub fn new(html: &'a Html) -> Self {
Self {
html,
first_ingredient_tag_id: None,
schema_json: None,
pretokenized_schema_json: None,
$($fn: None,)*
}
}

$(
/// Compute $fn from webpage and cache the result
pub fn $fn(&mut self) -> &$res {
if self.$fn.is_none() {
self.$fn = Some(self.html.$fn());
}

self.$fn.as_ref().unwrap()
}
)*
}
};
}

cache! {
pretokenize_title -> Result<PreTokenizedString>,
pretokenize_all_text -> Result<PreTokenizedString>,
pretokenize_clean_text -> PreTokenizedString,
pretokenize_url -> PreTokenizedString,
pretokenize_url_for_site_operator -> PreTokenizedString,
pretokenize_domain -> PreTokenizedString,
pretokenize_site -> PreTokenizedString,
pretokenize_description -> PreTokenizedString,
pretokenize_microformats -> PreTokenizedString,
domain_name -> String,
schema_org -> Vec<schema_org::Item>,
site_hash -> [u64; 2],
url_without_query_hash -> [u64; 2],
url_hash -> [u64; 2],
url_without_tld_hash -> [u64; 2],
domain_hash -> [u64; 2],
title_hash -> [u64; 2],
}

/// Some manual implementations so we can use previously cached data
/// to compute the next field.
impl<'a> FnCache<'a> {
pub fn first_ingredient_tag_id(&mut self) -> Option<&String> {
if self.first_ingredient_tag_id.is_none() {
let root = self.html.root.clone(); // Node is just a NodeRef, so it's cheap to clone

self.first_ingredient_tag_id =
find_recipe_first_ingredient_tag_id(self.schema_org().as_slice(), &root);
}

self.first_ingredient_tag_id.as_ref()
}

pub fn schema_json(&mut self) -> &String {
if self.schema_json.is_none() {
self.schema_json = Some(serde_json::to_string(self.schema_org()).unwrap());
}

self.schema_json.as_ref().unwrap()
}

pub fn pretokenized_schema_json(&mut self) -> &PreTokenizedString {
if self.pretokenized_schema_json.is_none() {
self.pretokenized_schema_json =
match schema_org::flattened_json(self.schema_org().clone()) {
Ok(mut f) => {
let mut tokens = Vec::new();

{
let mut stream = f.token_stream();

while let Some(token) = stream.next() {
tokens.push(token.clone());
}
}

Some(PreTokenizedString {
text: f.text().to_string(),
tokens,
})
}
Err(_) => Some(PreTokenizedString {
text: String::new(),
tokens: Vec::new(),
}),
};
}

self.pretokenized_schema_json.as_ref().unwrap()
}
}
Loading

0 comments on commit 5bebe31

Please sign in to comment.