Skip to content

Commit

Permalink
handle missing fastfields in index gracefully
Browse files Browse the repository at this point in the history
  • Loading branch information
mikkeldenker committed Mar 11, 2024
1 parent 163a341 commit 40c827d
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 44 deletions.
5 changes: 3 additions & 2 deletions crates/core/src/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,8 @@ impl TopSegmentCollector {
let field_reader = self.fastfield_segment_reader.get_field_reader(doc);

let hash = [
field_reader.get(field1).as_u64().unwrap(),
field_reader.get(field2).as_u64().unwrap(),
field_reader.get(field1).unwrap().as_u64().unwrap(),
field_reader.get(field2).unwrap().as_u64().unwrap(),
];
combine_u64s(hash).into()
}
Expand All @@ -170,6 +170,7 @@ impl TopSegmentCollector {
.fastfield_segment_reader
.get_field_reader(doc)
.get(FastField::SimHash)
.unwrap()
.into();

self.bucket_collector.insert(SegmentDoc {
Expand Down
50 changes: 23 additions & 27 deletions crates/core/src/fastfield_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,14 @@ impl FastFieldReader {
for field in Field::all().filter_map(Field::as_fast) {
match field.data_type() {
DataType::U64 => {
let reader = fastfield_readers.u64(field.name()).unwrap();
u64s.insert(field, reader);
if let Ok(reader) = fastfield_readers.u64(field.name()) {
u64s.insert(field, reader);
}
}
DataType::Bytes => {
let reader = fastfield_readers.bytes(field.name()).unwrap().unwrap();
bytes.insert(field, reader);
if let Some(reader) = fastfield_readers.bytes(field.name()).ok().flatten() {
bytes.insert(field, reader);
}
}
};
}
Expand All @@ -83,7 +85,7 @@ struct AllReaders {

pub enum Value {
U64(u64),
Bytes(Option<Vec<u8>>),
Bytes(Vec<u8>),
}

impl Value {
Expand All @@ -96,7 +98,7 @@ impl Value {

pub fn as_bytes(&self) -> Option<&[u8]> {
match self {
Value::Bytes(Some(val)) => Some(val),
Value::Bytes(val) => Some(val),
_ => None,
}
}
Expand All @@ -110,12 +112,6 @@ impl From<u64> for Value {

impl From<Vec<u8>> for Value {
fn from(val: Vec<u8>) -> Self {
Value::Bytes(Some(val))
}
}

impl From<Option<Vec<u8>>> for Value {
fn from(val: Option<Vec<u8>>) -> Self {
Value::Bytes(val)
}
}
Expand All @@ -135,7 +131,7 @@ impl<'a> From<&'a Value> for Option<&'a [u8]> {
impl From<Value> for Option<Vec<u8>> {
fn from(val: Value) -> Self {
match val {
Value::Bytes(val) => val,
Value::Bytes(val) => Some(val),
_ => None,
}
}
Expand All @@ -147,32 +143,32 @@ pub struct FieldReader<'a> {
}

impl<'a> FieldReader<'a> {
pub fn get(&self, field: FastField) -> Value {
pub fn get(&self, field: FastField) -> Option<Value> {
match field.data_type() {
DataType::U64 => self
.readers
.u64s
.get(field)
.unwrap()
.values
.get_val(self.doc)
.into(),
DataType::U64 => Some(
self.readers
.u64s
.get(field)?
.values
.get_val(self.doc)
.into(),
),

DataType::Bytes => {
let reader = self.readers.bytes.get(field).unwrap();
let reader = self.readers.bytes.get(field)?;
let ord = reader.ords().values.get_val(self.doc);

if ord > reader.num_terms() as u64 {
return Value::Bytes(None);
if ord > reader.num_terms() as u64 || reader.num_terms() == 0 {
return None;
}

let mut bytes = Vec::new();
reader.ord_to_bytes(ord, &mut bytes).unwrap();

if bytes.is_empty() {
Value::Bytes(None)
None
} else {
bytes.into()
Some(bytes.into())
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/query/pattern_query/scorer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ impl EmptyFieldScorer {
.segment_reader
.get_field_reader(doc)
.get(self.num_tokens_fastfield)
.into();
.and_then(|v| v.as_u64());
s.unwrap_or_default()
}
}
Expand Down Expand Up @@ -279,7 +279,7 @@ impl NormalPatternScorer {
.segment_reader
.get_field_reader(self.doc())
.get(self.num_tokens_field)
.into();
.and_then(|v| v.as_u64());
let num_tokens_doc = num_tokens_doc.unwrap();

for (i, pattern_part) in self.pattern.iter().enumerate() {
Expand Down
21 changes: 21 additions & 0 deletions crates/core/src/ranking/pipeline/scorers/embedding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,10 @@ impl ScoredWebpagePointer {

impl<W: RankableWebpage, E: EmbeddingSignal<W>> Scorer<W> for EmbeddingScorer<W, E> {
fn score(&self, webpages: &mut [W]) {
if !webpages.iter().any(E::has_embedding) {
return;
}

if let Some((query_emb, coefficient)) = self.query_emb_and_coefficient(E::signal()) {
let hidden_size = query_emb.size();
for webpage in webpages.iter_mut() {
Expand All @@ -164,6 +168,7 @@ pub struct KeywordEmbeddings;

pub trait EmbeddingSignal<W>: Send + Sync {
fn signal() -> Signal;
fn has_embedding(webpage: &W) -> bool;
fn embedding(webpage: &W, hidden_size: usize) -> Option<Embedding>;
fn insert_signal(webpage: &mut W, score: f64, coefficient: f64);
}
Expand All @@ -173,6 +178,10 @@ impl EmbeddingSignal<ScoredWebpagePointer> for TitleEmbeddings {
Signal::TitleEmbeddingSimilarity
}

fn has_embedding(webpage: &ScoredWebpagePointer) -> bool {
webpage.as_ranking().title_embedding.is_some()
}

fn embedding(webpage: &ScoredWebpagePointer, hidden_size: usize) -> Option<Embedding> {
webpage.title_emb(hidden_size)
}
Expand All @@ -194,6 +203,10 @@ impl EmbeddingSignal<RecallRankingWebpage> for TitleEmbeddings {
Signal::TitleEmbeddingSimilarity
}

fn has_embedding(webpage: &RecallRankingWebpage) -> bool {
webpage.title_embedding.is_some()
}

fn embedding(webpage: &RecallRankingWebpage, hidden_size: usize) -> Option<Embedding> {
webpage.title_emb(hidden_size)
}
Expand All @@ -216,6 +229,10 @@ impl EmbeddingSignal<ScoredWebpagePointer> for KeywordEmbeddings {
Signal::KeywordEmbeddingSimilarity
}

fn has_embedding(webpage: &ScoredWebpagePointer) -> bool {
webpage.as_ranking().keyword_embedding.is_some()
}

fn embedding(webpage: &ScoredWebpagePointer, hidden_size: usize) -> Option<Embedding> {
webpage.keyword_emb(hidden_size)
}
Expand All @@ -237,6 +254,10 @@ impl EmbeddingSignal<RecallRankingWebpage> for KeywordEmbeddings {
Signal::KeywordEmbeddingSimilarity
}

fn has_embedding(webpage: &RecallRankingWebpage) -> bool {
webpage.keyword_embedding.is_some()
}

fn embedding(webpage: &RecallRankingWebpage, hidden_size: usize) -> Option<Embedding> {
webpage.keyword_emb(hidden_size)
}
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/ranking/pipeline/stages/recall.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,14 @@ impl RecallRankingWebpage {
.unwrap()
.get_field_reader(pointer.address.doc_id)
.get(FastField::TitleEmbeddings)
.into();
.and_then(|v| v.into());

let keyword_embedding: Option<Vec<u8>> = aggregator
.fastfield_readers()
.unwrap()
.get_field_reader(pointer.address.doc_id)
.get(FastField::KeywordEmbeddings)
.into();
.and_then(|v| v.into());

let mut res = RecallRankingWebpage {
signals: EnumMap::new(),
Expand Down
27 changes: 16 additions & 11 deletions crates/core/src/ranking/signal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,10 @@ fn idf_sum(field: &mut TextFieldData, doc: DocId) -> f64 {
impl Signal {
fn is_computable_before_search(&self) -> bool {
self.as_fastfield().is_some()
&& !matches!(
self,
Signal::TitleEmbeddingSimilarity | Signal::KeywordEmbeddingSimilarity
)
}

pub fn default_coefficient(&self) -> f64 {
Expand Down Expand Up @@ -356,7 +360,7 @@ impl Signal {

let node_id = fastfield_reader
.get(FastField::HostNodeID)
.as_u64()
.and_then(|n| n.as_u64())
.unwrap();

let host_id: Option<NodeID> = if node_id == u64::MAX {
Expand All @@ -369,35 +373,35 @@ impl Signal {
Signal::HostCentrality | Signal::PageCentrality => {
let val = fastfield_reader
.get(self.as_fastfield().unwrap())
.as_u64()
.and_then(|v| v.as_u64())
.unwrap();
Some(val as f64 / FLOAT_SCALING as f64)
}
Signal::HostCentralityRank | Signal::PageCentralityRank => {
let val = fastfield_reader
.get(self.as_fastfield().unwrap())
.as_u64()
.and_then(|v| v.as_u64())
.unwrap();
Some(score_rank(val as f64))
}
Signal::IsHomepage => {
let val = fastfield_reader
.get(self.as_fastfield().unwrap())
.as_u64()
.and_then(|v| v.as_u64())
.unwrap();
Some(val as f64)
}
Signal::LinkDensity => {
let val = fastfield_reader
.get(self.as_fastfield().unwrap())
.as_u64()
.and_then(|v| v.as_u64())
.unwrap();
Some(score_link_density(val as f64 / FLOAT_SCALING as f64))
}
Signal::FetchTimeMs => {
let fetch_time_ms = fastfield_reader
.get(self.as_fastfield().unwrap())
.as_u64()
.and_then(|v| v.as_u64())
.unwrap() as usize;

if fetch_time_ms >= signal_aggregator.fetch_time_ms_cache.len() {
Expand All @@ -409,36 +413,36 @@ impl Signal {
Signal::UpdateTimestamp => {
let val = fastfield_reader
.get(self.as_fastfield().unwrap())
.as_u64()
.and_then(|v| v.as_u64())
.unwrap() as usize;

Some(score_timestamp(val, signal_aggregator))
}
Signal::TrackerScore => {
let val = fastfield_reader
.get(self.as_fastfield().unwrap())
.as_u64()
.and_then(|v| v.as_u64())
.unwrap();
Some(score_trackers(val as f64))
}
Signal::UrlDigits => {
let val = fastfield_reader
.get(self.as_fastfield().unwrap())
.as_u64()
.and_then(|v| v.as_u64())
.unwrap();
Some(score_digits(val as f64))
}
Signal::UrlSlashes => {
let val = fastfield_reader
.get(self.as_fastfield().unwrap())
.as_u64()
.and_then(|v| v.as_u64())
.unwrap();
Some(score_slashes(val as f64))
}
Signal::Region => {
let val = fastfield_reader
.get(self.as_fastfield().unwrap())
.as_u64()
.and_then(|v| v.as_u64())
.unwrap();
let region = Region::from_id(val);
Some(score_region(region, signal_aggregator))
Expand Down Expand Up @@ -621,6 +625,7 @@ impl Signal {
Signal::UrlDigits => Some(FastField::NumPathAndQueryDigits),
Signal::LinkDensity => Some(FastField::LinkDensity),
Signal::TitleEmbeddingSimilarity => Some(FastField::TitleEmbeddings),
Signal::KeywordEmbeddingSimilarity => Some(FastField::KeywordEmbeddings),
_ => None,
}
}
Expand Down

0 comments on commit 40c827d

Please sign in to comment.