Skip to content

Commit

Permalink
Updating defrag and tests to use cursors and make test more robust by…
Browse files Browse the repository at this point in the history
… getting hits

Signed-off-by: zackcam <[email protected]>
  • Loading branch information
zackcam committed Dec 4, 2024
1 parent c2c1ce7 commit 17b961d
Show file tree
Hide file tree
Showing 10 changed files with 365 additions and 287 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -100,4 +100,4 @@ jobs:
- name: Update module path
run: echo "MODULE_PATH=$(realpath target/release/libvalkey_bloom.so)" >> $GITHUB_ENV
- name: Run integration tests
run: python -m pytest --cache-clear -v "tests/"
run: python -m pytest --cache-clear -v "tests/" -m "not skip_for_asan"
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ homepage = "https://github.com/valkey-io/valkey-bloom"
valkey-module = "0.1.2"
valkey-module-macros = "0"
linkme = "0"
bloomfilter = { path = "../rust-bloom-filter", features = ["serde"] }
bloomfilter = { version = "3.0.1", features = ["serde"] }
lazy_static = "1.4.0"
bit-vec = "0.8.0"
libc = "0.2"
Expand Down
13 changes: 4 additions & 9 deletions src/bloom/data_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,18 +104,13 @@ impl ValkeyDataType for BloomFilterType {
(FIXED_SIP_KEY_ONE_A, FIXED_SIP_KEY_ONE_B),
(FIXED_SIP_KEY_TWO_A, FIXED_SIP_KEY_TWO_B),
];
let filter = BloomFilter::from_existing(
bitmap.as_ref(),
number_of_bits,
number_of_hash_functions as u32,
sip_keys,
num_items as u32,
capacity as u32,
);
let filter =
BloomFilter::from_existing(bitmap.as_ref(), num_items as u32, capacity as u32);
filters.push(Box::new(filter));
}
BLOOM_OBJECT_TOTAL_MEMORY_BYTES.fetch_add(
mem::size_of::<BloomFilterType>(),
mem::size_of::<BloomFilterType>()
+ (filters.capacity() * std::mem::size_of::<Box<BloomFilter>>()),
std::sync::atomic::Ordering::Relaxed,
);
BLOOM_NUM_OBJECTS.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
Expand Down
94 changes: 51 additions & 43 deletions src/bloom/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::{
metrics,
};
use bloomfilter;
use bloomfilter::{deserialize, serialize};
use serde::{Deserialize, Serialize};
use std::{mem, os::raw::c_void, sync::atomic::Ordering};
use valkey_module::{logging, raw};
Expand Down Expand Up @@ -86,14 +87,15 @@ impl BloomFilterType {
return Err(BloomError::ExceedsMaxBloomSize);
}
metrics::BLOOM_NUM_OBJECTS.fetch_add(1, Ordering::Relaxed);
metrics::BLOOM_OBJECT_TOTAL_MEMORY_BYTES.fetch_add(
mem::size_of::<BloomFilterType>(),
std::sync::atomic::Ordering::Relaxed,
);

// Create the bloom filter and add to the main BloomFilter object.
let bloom: Box<BloomFilter> = Box::new(BloomFilter::new(fp_rate, capacity));
let filters = vec![bloom];
metrics::BLOOM_OBJECT_TOTAL_MEMORY_BYTES.fetch_add(
mem::size_of::<BloomFilterType>()
+ (filters.capacity() * std::mem::size_of::<Box<BloomFilter>>()),
std::sync::atomic::Ordering::Relaxed,
);
let bloom = BloomFilterType {
expansion,
fp_rate,
Expand All @@ -104,16 +106,17 @@ impl BloomFilterType {

/// Create a new BloomFilterType object from an existing one.
pub fn create_copy_from(from_bf: &BloomFilterType) -> BloomFilterType {
let mut filters: Vec<Box<BloomFilter>> = Vec::with_capacity(from_bf.filters.len());
let mut filters: Vec<Box<BloomFilter>> = Vec::with_capacity(from_bf.filters.capacity());
metrics::BLOOM_NUM_OBJECTS.fetch_add(1, Ordering::Relaxed);
metrics::BLOOM_OBJECT_TOTAL_MEMORY_BYTES.fetch_add(
mem::size_of::<BloomFilterType>(),
std::sync::atomic::Ordering::Relaxed,
);
for filter in &from_bf.filters {
let new_filter = Box::new(BloomFilter::create_copy_from(filter));
filters.push(new_filter);
}
metrics::BLOOM_OBJECT_TOTAL_MEMORY_BYTES.fetch_add(
mem::size_of::<BloomFilterType>()
+ (filters.capacity() * std::mem::size_of::<Box<BloomFilter>>()),
std::sync::atomic::Ordering::Relaxed,
);
BloomFilterType {
expansion: from_bf.expansion,
fp_rate: from_bf.fp_rate,
Expand Down Expand Up @@ -208,10 +211,19 @@ impl BloomFilterType {
return Err(BloomError::ExceedsMaxBloomSize);
}
let mut new_filter = Box::new(BloomFilter::new(new_fp_rate, new_capacity));
let capacity_before = self.filters.capacity();
// Add item.
new_filter.set(item);
new_filter.num_items += 1;
self.filters.push(new_filter);
// If we went over capacity and scaled the vec out we need to update the memory usage by the new capacity
if capacity_before != self.filters.capacity() {
metrics::BLOOM_OBJECT_TOTAL_MEMORY_BYTES.fetch_add(
(self.filters.capacity() - capacity_before)
* std::mem::size_of::<Box<BloomFilter>>(),
std::sync::atomic::Ordering::Relaxed,
);
}

metrics::BLOOM_NUM_ITEMS_ACROSS_OBJECTS
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
Expand Down Expand Up @@ -286,7 +298,8 @@ impl BloomFilterType {
// add bloom filter type metrics.
metrics::BLOOM_NUM_OBJECTS.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
metrics::BLOOM_OBJECT_TOTAL_MEMORY_BYTES.fetch_add(
mem::size_of::<BloomFilterType>(),
mem::size_of::<BloomFilterType>()
+ (item.filters.capacity() * std::mem::size_of::<Box<BloomFilter>>()),
std::sync::atomic::Ordering::Relaxed,
);
// add bloom filter metrics.
Expand Down Expand Up @@ -321,19 +334,34 @@ impl BloomFilterType {
/// well within the u32::MAX limit.
#[derive(Serialize, Deserialize)]
pub struct BloomFilter {
#[serde(
serialize_with = "serialize",
deserialize_with = "deserialize_boxed_bloom"
)]
pub bloom: Box<bloomfilter::Bloom<[u8]>>,
pub num_items: u32,
pub capacity: u32,
}

use bloomfilter::Bloom;
use serde::Deserializer;

pub fn deserialize_boxed_bloom<'de, D>(deserializer: D) -> Result<Box<Bloom<[u8]>>, D::Error>
where
D: Deserializer<'de>,
{
deserialize(deserializer).map(Box::new)
}

impl BloomFilter {
/// Instantiate empty BloomFilter object.
pub fn new(fp_rate: f64, capacity: u32) -> BloomFilter {
let bloom = bloomfilter::Bloom::new_for_fp_rate_with_seed(
capacity as usize,
fp_rate,
&configs::FIXED_SEED,
);
)
.unwrap();
let fltr = BloomFilter {
bloom: Box::new(bloom),
num_items: 0,
Expand All @@ -349,20 +377,8 @@ impl BloomFilter {
}

/// Create a new BloomFilter from dumped information (RDB load).
pub fn from_existing(
bitmap: &[u8],
number_of_bits: u64,
number_of_hash_functions: u32,
sip_keys: [(u64, u64); 2],
num_items: u32,
capacity: u32,
) -> BloomFilter {
let bloom = bloomfilter::Bloom::from_existing(
bitmap,
number_of_bits,
number_of_hash_functions,
sip_keys,
);
pub fn from_existing(bitmap: &[u8], num_items: u32, capacity: u32) -> BloomFilter {
let bloom = bloomfilter::Bloom::from_slice(bitmap).unwrap();
let fltr = BloomFilter {
bloom: Box::new(bloom),
num_items,
Expand All @@ -382,7 +398,7 @@ impl BloomFilter {
pub fn number_of_bytes(&self) -> usize {
std::mem::size_of::<BloomFilter>()
+ std::mem::size_of::<bloomfilter::Bloom<[u8]>>()
+ (self.bloom.number_of_bits() / 8) as usize
+ (self.bloom.len() / 8) as usize
}

/// Caculates the number of bytes that the bloom filter will require to be allocated.
Expand Down Expand Up @@ -418,21 +434,15 @@ impl BloomFilter {

/// Create a new BloomFilter from an existing BloomFilter object (COPY command).
pub fn create_copy_from(bf: &BloomFilter) -> BloomFilter {
BloomFilter::from_existing(
&bf.bloom.bitmap(),
bf.bloom.number_of_bits(),
bf.bloom.number_of_hash_functions(),
bf.bloom.sip_keys(),
bf.num_items,
bf.capacity,
)
BloomFilter::from_existing(bf.bloom.as_slice(), bf.num_items, bf.capacity)
}
}

impl Drop for BloomFilterType {
fn drop(&mut self) {
metrics::BLOOM_OBJECT_TOTAL_MEMORY_BYTES.fetch_sub(
std::mem::size_of::<BloomFilterType>(),
std::mem::size_of::<BloomFilterType>()
+ (self.filters.capacity() * std::mem::size_of::<Box<BloomFilter>>()),
std::sync::atomic::Ordering::Relaxed,
);
metrics::BLOOM_NUM_OBJECTS.fetch_sub(1, Ordering::Relaxed);
Expand All @@ -457,6 +467,7 @@ mod tests {
use crate::configs::{
FIXED_SIP_KEY_ONE_A, FIXED_SIP_KEY_ONE_B, FIXED_SIP_KEY_TWO_A, FIXED_SIP_KEY_TWO_B,
};
use configs::FIXED_SEED;
use rand::{distributions::Alphanumeric, Rng};

/// Returns random string with specified number of characters.
Expand Down Expand Up @@ -573,8 +584,8 @@ mod tests {
.filters
.iter()
.any(
|filter| (filter.bloom.sip_keys() == restore_filter.bloom.sip_keys())
&& (restore_filter.bloom.sip_keys() == expected_sip_keys)
|filter| (filter.bloom.seed() == restore_filter.bloom.seed())
&& (restore_filter.bloom.seed() == FIXED_SEED)
)));
assert!(restored_bloom_filter_type
.filters
Expand All @@ -590,7 +601,7 @@ mod tests {
.all(|restore_filter| original_bloom_filter_type
.filters
.iter()
.any(|filter| filter.bloom.bitmap() == restore_filter.bloom.bitmap())));
.any(|filter| filter.bloom.as_slice() == restore_filter.bloom.as_slice())));
let (error_count, _) = check_items_exist(
restored_bloom_filter_type,
1,
Expand Down Expand Up @@ -739,11 +750,8 @@ mod tests {
fn test_sip_keys() {
// The value of sip keys generated by the sip_keys with fixed seed should be equal to the constant in configs.rs
let test_bloom_filter = BloomFilter::new(0.5_f64, 1000_u32);
let test_sip_keys = test_bloom_filter.bloom.sip_keys();
assert_eq!(test_sip_keys[0].0, FIXED_SIP_KEY_ONE_A);
assert_eq!(test_sip_keys[0].1, FIXED_SIP_KEY_ONE_B);
assert_eq!(test_sip_keys[1].0, FIXED_SIP_KEY_TWO_A);
assert_eq!(test_sip_keys[1].1, FIXED_SIP_KEY_TWO_B);
let test_sip_keys = test_bloom_filter.bloom.seed();
assert!(test_sip_keys == FIXED_SEED);
}

#[test]
Expand Down
Loading

0 comments on commit 17b961d

Please sign in to comment.