From b4d21e3d12d7280cac2b34974cc7c6e5611eb131 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=96zg=C3=BCr=20Akkurt?= <91746947+ozgrakkurt@users.noreply.github.com> Date: Wed, 22 Feb 2023 01:16:52 +0300 Subject: [PATCH] use xor filters instead of bloom filters (#145) --- Cargo.lock | 31 +++--- core/src/deserialize.rs | 2 +- worker/Cargo.toml | 4 +- worker/src/bloom.rs | 224 ---------------------------------------- worker/src/data_ctx.rs | 13 ++- worker/src/db.rs | 4 +- worker/src/db_writer.rs | 32 +++--- worker/src/lib.rs | 1 - 8 files changed, 41 insertions(+), 270 deletions(-) delete mode 100644 worker/src/bloom.rs diff --git a/Cargo.lock b/Cargo.lock index d2b41507..a24db4e6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -834,16 +834,6 @@ version = "3.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0d261e256854913907f67ed06efbc3338dfe6179796deefc1ff763fc1aee5535" -[[package]] -name = "bv" -version = "0.11.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8834bb1d8ee5dc048ee3124f2c7c1afcc6bc9aed03f11e9dfd8c69470a5db340" -dependencies = [ - "feature-probe", - "serde", -] - [[package]] name = "bytemuck" version = "1.13.0" @@ -1412,21 +1402,18 @@ version = "0.1.0" dependencies = [ "actix-web", "arrayvec", - "bv", "bytes", "clap", "crossbeam-channel", "derive_more", "env_logger", "eth-archive-core", - "fnv", "futures", "itertools", "log", "mimalloc", "polars", "prefix-hex", - "rand", "rayon", "rmp-serde", "rocksdb", @@ -1434,6 +1421,7 @@ dependencies = [ "serde_json", "thiserror", "tokio", + "xorf", ] [[package]] @@ -1469,12 +1457,6 @@ dependencies = [ "instant", ] -[[package]] -name = "feature-probe" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "835a3dc7d1ec9e75e2b5fb4ba75396837112d2060b03f7d43bc1897c7f7211da" - [[package]] name = "flate2" version = "1.0.25" @@ -3925,6 +3907,17 @@ version = "0.13.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4d25c75bf9ea12c4040a97f829154768bbbce366287e2dc044af160cd79a13fd" +[[package]] +name = "xorf" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57901b00e3f8e14f4d20b8955bf8087ecb545cfe2ed8741c2a2dbc89847a1a29" +dependencies = [ + "libm", + "rand", + "serde", +] + [[package]] name = "xxhash-rust" version = "0.8.6" diff --git a/core/src/deserialize.rs b/core/src/deserialize.rs index 4efd173a..8ea96fc0 100644 --- a/core/src/deserialize.rs +++ b/core/src/deserialize.rs @@ -8,7 +8,7 @@ use std::result::Result as StdResult; #[derive(Debug, Clone, derive_more::Deref, derive_more::From, PartialEq, Eq)] pub struct Bytes32(pub Box<[u8; 32]>); -#[derive(Debug, Clone, derive_more::Deref, derive_more::From, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, derive_more::Deref, derive_more::From, PartialEq, Eq)] pub struct Address(pub Box<[u8; 20]>); #[derive(Debug, Clone, derive_more::Deref, derive_more::From, PartialEq, Eq)] diff --git a/worker/Cargo.toml b/worker/Cargo.toml index ac9a200e..7e7e0f54 100644 --- a/worker/Cargo.toml +++ b/worker/Cargo.toml @@ -22,9 +22,7 @@ futures = "0.3" mimalloc = { version = "0.1.34", default-features = false } crossbeam-channel = "0.5" derive_more = "0.99" -fnv = "1" -bv = { version = "0.11", features = [ "serde" ] } -rand = "0.8" +xorf = { version = "0.8.1", features = ["serde"] } eth-archive-core = { path = "../core" } diff --git a/worker/src/bloom.rs b/worker/src/bloom.rs deleted file mode 100644 index c3ad4728..00000000 --- a/worker/src/bloom.rs +++ /dev/null @@ -1,224 +0,0 @@ -#![allow(dead_code)] - -//! Simple Bloom Filter. copy pasted from https://github.com/solana-labs/solana/blob/master/bloom/src/bloom.rs -use { - bv::BitVec, - fnv::FnvHasher, - rand::{self, Rng}, - serde::{Deserialize, Serialize}, - std::{ - cmp, fmt, - hash::Hasher, - marker::PhantomData, - sync::atomic::{AtomicU64, Ordering}, - }, -}; - -/// Generate a stable hash of `self` for each `hash_index` -/// Best effort can be made for uniqueness of each hash. -pub trait BloomHashIndex { - fn hash_at_index(&self, hash_index: u64) -> u64; -} - -#[derive(Serialize, Deserialize, Default, Clone, PartialEq, Eq)] -pub struct Bloom { - pub keys: Vec, - pub bits: BitVec, - num_bits_set: u64, - _phantom: PhantomData, -} - -impl fmt::Debug for Bloom { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!( - f, - "Bloom {{ keys.len: {} bits.len: {} num_set: {} bits: ", - self.keys.len(), - self.bits.len(), - self.num_bits_set - )?; - const MAX_PRINT_BITS: u64 = 10; - for i in 0..std::cmp::min(MAX_PRINT_BITS, self.bits.len()) { - if self.bits.get(i) { - write!(f, "1")?; - } else { - write!(f, "0")?; - } - } - if self.bits.len() > MAX_PRINT_BITS { - write!(f, "..")?; - } - write!(f, " }}") - } -} - -impl Bloom { - pub fn new(num_bits: usize, keys: Vec) -> Self { - let bits = BitVec::new_fill(false, num_bits as u64); - Bloom { - keys, - bits, - num_bits_set: 0, - _phantom: PhantomData::default(), - } - } - /// Create filter optimal for num size given the `FALSE_RATE`. - /// - /// The keys are randomized for picking data out of a collision resistant hash of size - /// `keysize` bytes. - /// - /// See . - pub fn random(num_items: usize, false_rate: f64, max_bits: usize) -> Self { - let m = Self::num_bits(num_items as f64, false_rate); - let num_bits = cmp::max(1, cmp::min(m as usize, max_bits)); - let num_keys = Self::num_keys(num_bits as f64, num_items as f64) as usize; - let keys: Vec = (0..num_keys).map(|_| rand::thread_rng().gen()).collect(); - Self::new(num_bits, keys) - } - fn num_bits(num_items: f64, false_rate: f64) -> f64 { - let n = num_items; - let p = false_rate; - ((n * p.ln()) / (1f64 / 2f64.powf(2f64.ln())).ln()).ceil() - } - fn num_keys(num_bits: f64, num_items: f64) -> f64 { - let n = num_items; - let m = num_bits; - // infinity as usize is zero in rust 1.43 but 2^64-1 in rust 1.45; ensure it's zero here - if n == 0.0 { - 0.0 - } else { - 1f64.max(((m / n) * 2f64.ln()).round()) - } - } - fn pos(&self, key: &T, k: u64) -> u64 { - key.hash_at_index(k).wrapping_rem(self.bits.len()) - } - pub fn clear(&mut self) { - self.bits = BitVec::new_fill(false, self.bits.len()); - self.num_bits_set = 0; - } - pub fn add(&mut self, key: &T) { - for k in &self.keys { - let pos = self.pos(key, *k); - if !self.bits.get(pos) { - self.num_bits_set = self.num_bits_set.saturating_add(1); - self.bits.set(pos, true); - } - } - } - pub fn contains(&self, key: &T) -> bool { - for k in &self.keys { - let pos = self.pos(key, *k); - if !self.bits.get(pos) { - return false; - } - } - true - } -} - -fn slice_hash(slice: &[u8], hash_index: u64) -> u64 { - let mut hasher = FnvHasher::with_key(hash_index); - hasher.write(slice); - hasher.finish() -} - -impl> BloomHashIndex for T { - fn hash_at_index(&self, hash_index: u64) -> u64 { - slice_hash(self.as_ref(), hash_index) - } -} - -pub struct AtomicBloom { - num_bits: u64, - keys: Vec, - bits: Vec, - _phantom: PhantomData, -} - -impl From> for AtomicBloom { - fn from(bloom: Bloom) -> Self { - AtomicBloom { - num_bits: bloom.bits.len(), - keys: bloom.keys, - bits: bloom - .bits - .into_boxed_slice() - .iter() - .map(|&x| AtomicU64::new(x)) - .collect(), - _phantom: PhantomData::default(), - } - } -} - -impl AtomicBloom { - fn pos(&self, key: &T, hash_index: u64) -> (usize, u64) { - let pos = key.hash_at_index(hash_index).wrapping_rem(self.num_bits); - // Divide by 64 to figure out which of the - // AtomicU64 bit chunks we need to modify. - let index = pos.wrapping_shr(6); - // (pos & 63) is equivalent to mod 64 so that we can find - // the index of the bit within the AtomicU64 to modify. - let mask = 1u64.wrapping_shl(u32::try_from(pos & 63).unwrap()); - (index as usize, mask) - } - - /// Adds an item to the bloom filter and returns true if the item - /// was not in the filter before. - pub fn add(&self, key: &T) -> bool { - let mut added = false; - for k in &self.keys { - let (index, mask) = self.pos(key, *k); - let prev_val = self.bits[index].fetch_or(mask, Ordering::Relaxed); - added = added || prev_val & mask == 0u64; - } - added - } - - pub fn contains(&self, key: &T) -> bool { - self.keys.iter().all(|k| { - let (index, mask) = self.pos(key, *k); - let bit = self.bits[index].load(Ordering::Relaxed) & mask; - bit != 0u64 - }) - } - - pub fn clear_for_tests(&mut self) { - self.bits.iter().for_each(|bit| { - bit.store(0u64, Ordering::Relaxed); - }); - } - - // Only for tests and simulations. - pub fn mock_clone(&self) -> Self { - Self { - keys: self.keys.clone(), - bits: self - .bits - .iter() - .map(|v| AtomicU64::new(v.load(Ordering::Relaxed))) - .collect(), - ..*self - } - } -} - -impl From> for Bloom { - fn from(atomic_bloom: AtomicBloom) -> Self { - let bits: Vec<_> = atomic_bloom - .bits - .into_iter() - .map(AtomicU64::into_inner) - .collect(); - let num_bits_set = bits.iter().map(|x| x.count_ones() as u64).sum(); - let mut bits: BitVec = bits.into(); - bits.truncate(atomic_bloom.num_bits); - Bloom { - keys: atomic_bloom.keys, - bits, - num_bits_set, - _phantom: PhantomData::default(), - } - } -} diff --git a/worker/src/data_ctx.rs b/worker/src/data_ctx.rs index 67ebe47e..1e25e873 100644 --- a/worker/src/data_ctx.rs +++ b/worker/src/data_ctx.rs @@ -1,6 +1,6 @@ use crate::config::Config; use crate::db::DbHandle; -use crate::db_writer::DbWriter; +use crate::db_writer::{hash_addr, DbWriter}; use crate::field_selection::FieldSelection; use crate::serialize_task::SerializeTask; use crate::types::{MiniLogSelection, MiniQuery, MiniTransactionSelection, Query}; @@ -23,6 +23,7 @@ use std::path::Path; use std::sync::Arc; use std::time::{Duration, Instant}; use std::{cmp, io}; +use xorf::Filter; pub struct DataCtx { config: Config, @@ -219,7 +220,7 @@ impl DataCtx { let address = address .iter() - .filter(|addr| parquet_idx.contains(addr)) + .filter(|addr| parquet_idx.contains(&hash_addr(addr.as_slice()))) .cloned() .collect::>(); @@ -242,7 +243,9 @@ impl DataCtx { Some(source) if !source.is_empty() => { let source = source .iter() - .filter(|addr| parquet_idx.contains(addr)) + .filter(|addr| { + parquet_idx.contains(&hash_addr(addr.as_slice())) + }) .cloned() .collect::>(); if source.is_empty() { @@ -258,7 +261,9 @@ impl DataCtx { Some(dest) if !dest.is_empty() => { let dest = dest .iter() - .filter(|addr| parquet_idx.contains(addr)) + .filter(|addr| { + parquet_idx.contains(&hash_addr(addr.as_slice())) + }) .cloned() .collect::>(); if dest.is_empty() { diff --git a/worker/src/db.rs b/worker/src/db.rs index dc7fc7d9..a8423ffa 100644 --- a/worker/src/db.rs +++ b/worker/src/db.rs @@ -1,4 +1,3 @@ -use crate::bloom::Bloom as BloomFilter; use crate::types::MiniQuery; use crate::{Error, Result}; use eth_archive_core::deserialize::Address; @@ -15,7 +14,6 @@ use std::sync::Arc; use std::time::Instant; use std::{cmp, iter, mem}; -pub type Bloom = BloomFilter
; pub type ParquetIdxIter<'a> = Box> + 'a>; pub struct DbHandle { @@ -559,7 +557,7 @@ mod cf_name { pub const ALL_CF_NAMES: [&str; 5] = [BLOCK, TX, LOG, LOG_TX, PARQUET_IDX]; } -pub type ParquetIdx = Bloom; +pub type ParquetIdx = xorf::BinaryFuse16; fn log_tx_key(block_number: u32, transaction_index: u32) -> [u8; 8] { let mut key = [0; 8]; diff --git a/worker/src/db_writer.rs b/worker/src/db_writer.rs index bc809c96..50f6c80d 100644 --- a/worker/src/db_writer.rs +++ b/worker/src/db_writer.rs @@ -1,7 +1,6 @@ use crate::data_ctx::scan_parquet_args; -use crate::db::{Bloom, DbHandle}; +use crate::db::DbHandle; use crate::{Error, Result}; -use eth_archive_core::deserialize::Address; use eth_archive_core::dir_name::DirName; use eth_archive_core::types::{Block, BlockRange, Log}; use polars::export::arrow::array::BinaryArray; @@ -11,6 +10,7 @@ use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::Duration; use tokio::sync::mpsc; +use xorf::BinaryFuse16; pub struct DbWriter { tx: mpsc::Sender, @@ -99,7 +99,7 @@ impl DbWriter { .iter() .flatten() { - addrs.insert(Address::new(addr)); + addrs.insert(addr.to_vec()); } } } @@ -125,7 +125,7 @@ impl DbWriter { .iter() .flatten() { - addrs.insert(Address::new(addr)); + addrs.insert(addr.to_vec()); } } @@ -145,22 +145,16 @@ impl DbWriter { .iter() .flatten() { - addrs.insert(Address::new(addr)); + addrs.insert(addr.to_vec()); } } }; - let mut bloom = Bloom::random( - addrs.len(), - 0.000_001, - 128_000, // 16KB max size - ); + let filter = + BinaryFuse16::try_from(&addrs.iter().map(|addr| hash_addr(addr)).collect::>()) + .unwrap(); - for addr in addrs { - bloom.add(&addr); - } - - db.insert_parquet_idx(dir_name, &bloom)?; + db.insert_parquet_idx(dir_name, &filter)?; db.delete_up_to(dir_name.range.to)?; @@ -168,6 +162,14 @@ impl DbWriter { } } +pub fn hash_addr(addr: &[u8]) -> u64 { + assert_eq!(addr.len(), 20); + + u64::from_be_bytes(addr[..8].try_into().unwrap()) + ^ u64::from_be_bytes(addr[8..16].try_into().unwrap()) + ^ u64::from(u32::from_be_bytes(addr[16..].try_into().unwrap())) +} + #[derive(Clone)] enum Job { WriteBatches((Vec, Vec>, Vec>)), diff --git a/worker/src/lib.rs b/worker/src/lib.rs index edf7f9fe..2914a30e 100644 --- a/worker/src/lib.rs +++ b/worker/src/lib.rs @@ -1,4 +1,3 @@ -mod bloom; mod config; mod data_ctx; mod db;