Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updating defragmentation to defrag both bloomfiltertype and bloomfiler structs #24

Merged
merged 6 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -100,4 +100,4 @@ jobs:
- name: Update module path
run: echo "MODULE_PATH=$(realpath target/release/libvalkey_bloom.so)" >> $GITHUB_ENV
- name: Run integration tests
run: python -m pytest --cache-clear -v "tests/"
run: python -m pytest --cache-clear -v "tests/" -m "not skip_for_asan"
18 changes: 8 additions & 10 deletions src/bloom/data_type.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
use crate::bloom::utils::BloomFilter;
use crate::bloom::utils::BloomFilterType;
use crate::configs;
use crate::metrics::BLOOM_NUM_OBJECTS;
use crate::metrics::BLOOM_OBJECT_TOTAL_MEMORY_BYTES;
use crate::wrapper::bloom_callback;
use crate::wrapper::digest::Digest;
use crate::MODULE_NAME;
use std::mem;
use std::os::raw::c_int;
use valkey_module::native_types::ValkeyType;
use valkey_module::{logging, raw};
Expand Down Expand Up @@ -71,14 +68,18 @@ impl ValkeyDataType for BloomFilterType {
let Ok(fp_rate) = raw::load_double(rdb) else {
return None;
};

let Ok(tightening_ratio) = raw::load_double(rdb) else {
return None;
};
let mut filters: Vec<BloomFilter> = Vec::with_capacity(num_filters as usize);
let Ok(is_seed_random_u64) = raw::load_unsigned(rdb) else {
return None;
};
let is_seed_random = is_seed_random_u64 == 1;
// We start off with capacity as 1 to match the same expansion of the vector that would have occurred during bloom
// object creation and scaling as a result of BF.* operations.
let mut filters = Vec::with_capacity(1);
Copy link
Member

@KarthikSubbarao KarthikSubbarao Dec 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a short documentation explaining why we use this - Vec::with_capacity(1) here? I think I had made this comment earlier


for i in 0..num_filters {
let Ok(bitmap) = raw::load_string_buffer(rdb) else {
return None;
Expand Down Expand Up @@ -114,20 +115,17 @@ impl ValkeyDataType for BloomFilterType {
logging::log_warning("Failed to restore bloom object: Object in fixed seed mode, but seed does not match FIXED_SEED.");
return None;
}
filters.push(filter);
filters.push(Box::new(filter));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this to avoid re-assigning the ownership of filter variable? I don't understand the reason for it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we put the filter in the box this allows us to take it out later and have ownership which means we can update the pointers address to actually change it. Without putting it into the box we couldn't actually change the addresses associated with it. Documentation for box

}
BLOOM_OBJECT_TOTAL_MEMORY_BYTES.fetch_add(
mem::size_of::<BloomFilterType>(),
std::sync::atomic::Ordering::Relaxed,
);
BLOOM_NUM_OBJECTS.fetch_add(1, std::sync::atomic::Ordering::Relaxed);

let item = BloomFilterType {
expansion: expansion as u32,
fp_rate,
tightening_ratio,
is_seed_random,
filters,
};
item.bloom_filter_type_incr_metrics_on_new_create();
Some(item)
}

Expand Down
124 changes: 76 additions & 48 deletions src/bloom/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ use crate::{
},
metrics,
};
use bloomfilter;
use bloomfilter::Bloom;
use bloomfilter::{deserialize, serialize};
use serde::{Deserialize, Serialize};
use std::{mem, sync::atomic::Ordering};
use serde::{Deserialize, Deserializer, Serialize};
use std::sync::atomic::Ordering;

/// KeySpace Notification Events
pub const ADD_EVENT: &str = "bloom.add";
Expand Down Expand Up @@ -74,7 +74,7 @@ pub struct BloomFilterType {
pub fp_rate: f64,
pub tightening_ratio: f64,
pub is_seed_random: bool,
pub filters: Vec<BloomFilter>,
pub filters: Vec<Box<BloomFilter>>,
}

impl BloomFilterType {
Expand All @@ -92,15 +92,14 @@ impl BloomFilterType {
if validate_size_limit && !BloomFilter::validate_size(capacity, fp_rate) {
return Err(BloomError::ExceedsMaxBloomSize);
}
metrics::BLOOM_NUM_OBJECTS.fetch_add(1, Ordering::Relaxed);
metrics::BLOOM_OBJECT_TOTAL_MEMORY_BYTES.fetch_add(
mem::size_of::<BloomFilterType>(),
std::sync::atomic::Ordering::Relaxed,
);
// Create the bloom filter and add to the main BloomFilter object.
let bloom = match use_random_seed {
true => BloomFilter::with_random_seed(fp_rate, capacity),
false => BloomFilter::with_fixed_seed(fp_rate, capacity, &configs::FIXED_SEED),
true => Box::new(BloomFilter::with_random_seed(fp_rate, capacity)),
false => Box::new(BloomFilter::with_fixed_seed(
fp_rate,
capacity,
&configs::FIXED_SEED,
)),
};
let filters = vec![bloom];
let bloom = BloomFilterType {
Expand All @@ -110,21 +109,18 @@ impl BloomFilterType {
filters,
is_seed_random: use_random_seed,
};
bloom.bloom_filter_type_incr_metrics_on_new_create();
Ok(bloom)
}

/// Create a new BloomFilterType object from an existing one.
pub fn create_copy_from(from_bf: &BloomFilterType) -> BloomFilterType {
let mut filters: Vec<BloomFilter> = Vec::with_capacity(from_bf.filters.len());
metrics::BLOOM_NUM_OBJECTS.fetch_add(1, Ordering::Relaxed);
metrics::BLOOM_OBJECT_TOTAL_MEMORY_BYTES.fetch_add(
mem::size_of::<BloomFilterType>(),
std::sync::atomic::Ordering::Relaxed,
);
let mut filters: Vec<Box<BloomFilter>> = Vec::with_capacity(from_bf.filters.capacity());
for filter in &from_bf.filters {
let new_filter = BloomFilter::create_copy_from(filter);
let new_filter = Box::new(BloomFilter::create_copy_from(filter));
filters.push(new_filter);
}
from_bf.bloom_filter_type_incr_metrics_on_new_create();
BloomFilterType {
expansion: from_bf.expansion,
fp_rate: from_bf.fp_rate,
Expand All @@ -134,9 +130,9 @@ impl BloomFilterType {
}
}

/// Return the total memory usage of the BloomFilterType object.
/// Return the total memory usage of the BloomFilterType object and every allocation it contains.
pub fn memory_usage(&self) -> usize {
let mut mem: usize = std::mem::size_of::<BloomFilterType>();
let mut mem: usize = self.bloom_filter_type_memory_usage();
for filter in &self.filters {
mem += filter.number_of_bytes();
}
Expand Down Expand Up @@ -226,18 +222,36 @@ impl BloomFilterType {
return Err(BloomError::ExceedsMaxBloomSize);
}
let seed = self.seed();
let mut new_filter = BloomFilter::with_fixed_seed(new_fp_rate, new_capacity, &seed);
let mut new_filter = Box::new(BloomFilter::with_fixed_seed(
new_fp_rate,
new_capacity,
&seed,
));
let memory_usage_before: usize = self.bloom_filter_type_memory_usage();
// Add item.
new_filter.set(item);
new_filter.num_items += 1;
self.filters.push(new_filter);
// If we went over capacity and scaled the vec out we need to update the memory usage by the new capacity
let memory_usage_after = self.bloom_filter_type_memory_usage();

metrics::BLOOM_OBJECT_TOTAL_MEMORY_BYTES.fetch_add(
memory_usage_after - memory_usage_before,
std::sync::atomic::Ordering::Relaxed,
);
metrics::BLOOM_NUM_ITEMS_ACROSS_OBJECTS
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
return Ok(1);
}
Ok(0)
}

/// Calculates the memory usage of a BloomFilterType object
fn bloom_filter_type_memory_usage(&self) -> usize {
std::mem::size_of::<BloomFilterType>()
+ (self.filters.capacity() * std::mem::size_of::<Box<BloomFilter>>())
}

/// Serializes bloomFilter to a byte array.
pub fn encode_bloom_filter(&self) -> Result<Vec<u8>, BloomError> {
match bincode::serialize(self) {
Expand All @@ -259,6 +273,16 @@ impl BloomFilterType {
}
}

/// Increments metrics related to Bloom filter memory usage upon creation of a new filter.
pub fn bloom_filter_type_incr_metrics_on_new_create(&self) {
metrics::BLOOM_NUM_OBJECTS.fetch_add(1, std::sync::atomic::Ordering::Relaxed);

metrics::BLOOM_OBJECT_TOTAL_MEMORY_BYTES.fetch_add(
self.bloom_filter_type_memory_usage(),
std::sync::atomic::Ordering::Relaxed,
);
}

/// Deserialize a byte array to bloom filter.
/// We will need to handle any current or previous version and deserializing the bytes into a bloom object of the running Module's current version `BLOOM_TYPE_VERSION`.
pub fn decode_bloom_filter(
Expand All @@ -278,8 +302,8 @@ impl BloomFilterType {
f64,
f64,
bool,
Vec<BloomFilter>,
) = match bincode::deserialize::<(u32, f64, f64, bool, Vec<BloomFilter>)>(
Vec<Box<BloomFilter>>,
) = match bincode::deserialize::<(u32, f64, f64, bool, Vec<Box<BloomFilter>>)>(
&decoded_bytes[1..],
) {
Ok(values) => {
Expand Down Expand Up @@ -321,25 +345,15 @@ impl BloomFilterType {
filters,
};
// add bloom filter type metrics.
metrics::BLOOM_NUM_OBJECTS.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
metrics::BLOOM_OBJECT_TOTAL_MEMORY_BYTES.fetch_add(
mem::size_of::<BloomFilterType>(),
std::sync::atomic::Ordering::Relaxed,
);
item.bloom_filter_type_incr_metrics_on_new_create();
// add bloom filter metrics.

for filter in &item.filters {
metrics::BLOOM_NUM_FILTERS_ACROSS_OBJECTS
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
metrics::BLOOM_OBJECT_TOTAL_MEMORY_BYTES.fetch_add(
filter.number_of_bytes(),
std::sync::atomic::Ordering::Relaxed,
);
metrics::BLOOM_NUM_ITEMS_ACROSS_OBJECTS.fetch_add(
filter.num_items.into(),
std::sync::atomic::Ordering::Relaxed,
);
metrics::BLOOM_CAPACITY_ACROSS_OBJECTS
.fetch_add(filter.capacity.into(), std::sync::atomic::Ordering::Relaxed);
filter.bloom_filter_incr_metrics_on_new_create();
}
Ok(item)
}
Expand All @@ -356,37 +370,49 @@ impl BloomFilterType {
/// well within the u32::MAX limit.
#[derive(Serialize, Deserialize)]
pub struct BloomFilter {
#[serde(serialize_with = "serialize", deserialize_with = "deserialize")]
pub bloom: bloomfilter::Bloom<[u8]>,
#[serde(
serialize_with = "serialize",
deserialize_with = "deserialize_boxed_bloom"
)]
pub bloom: Box<bloomfilter::Bloom<[u8]>>,
pub num_items: u32,
pub capacity: u32,
}

pub fn deserialize_boxed_bloom<'de, D>(deserializer: D) -> Result<Box<Bloom<[u8]>>, D::Error>
where
D: Deserializer<'de>,
{
deserialize(deserializer).map(Box::new)
}

impl BloomFilter {
/// Instantiate empty BloomFilter object with a fixed seed used to create sip keys.
pub fn with_fixed_seed(fp_rate: f64, capacity: u32, fixed_seed: &[u8; 32]) -> BloomFilter {
let bloom =
bloomfilter::Bloom::new_for_fp_rate_with_seed(capacity as usize, fp_rate, fixed_seed)
.expect("We expect bloomfilter::Bloom<[u8]> creation to succeed");
let fltr = BloomFilter {
bloom,
bloom: Box::new(bloom),
num_items: 0,
capacity,
};
fltr.incr_metrics_on_new_create();
fltr.bloom_filter_incr_metrics_on_new_create();
fltr
}

/// Instantiate empty BloomFilter object with a randomly generated seed used to create sip keys.
pub fn with_random_seed(fp_rate: f64, capacity: u32) -> BloomFilter {
let bloom = bloomfilter::Bloom::new_for_fp_rate(capacity as usize, fp_rate)
.expect("We expect bloomfilter::Bloom<[u8]> creation to succeed");
let bloom = Box::new(
bloomfilter::Bloom::new_for_fp_rate(capacity as usize, fp_rate)
.expect("We expect bloomfilter::Bloom<[u8]> creation to succeed"),
);
let fltr = BloomFilter {
bloom,
num_items: 0,
capacity,
};
fltr.incr_metrics_on_new_create();
fltr.bloom_filter_incr_metrics_on_new_create();
fltr
}

Expand All @@ -396,11 +422,11 @@ impl BloomFilter {
.expect("We expect bloomfilter::Bloom<[u8]> creation to succeed");

let fltr = BloomFilter {
bloom,
bloom: Box::new(bloom),
num_items,
capacity,
};
fltr.incr_metrics_on_new_create();
fltr.bloom_filter_incr_metrics_on_new_create();
metrics::BLOOM_NUM_ITEMS_ACROSS_OBJECTS
.fetch_add(num_items.into(), std::sync::atomic::Ordering::Relaxed);
fltr
Expand All @@ -411,7 +437,7 @@ impl BloomFilter {
BloomFilter::from_existing(&bf.bloom.to_bytes(), bf.num_items, bf.capacity)
}

fn incr_metrics_on_new_create(&self) {
fn bloom_filter_incr_metrics_on_new_create(&self) {
metrics::BLOOM_NUM_FILTERS_ACROSS_OBJECTS
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
metrics::BLOOM_OBJECT_TOTAL_MEMORY_BYTES
Expand All @@ -426,7 +452,9 @@ impl BloomFilter {
}

pub fn number_of_bytes(&self) -> usize {
std::mem::size_of::<BloomFilter>() + (self.bloom.len() / 8) as usize
std::mem::size_of::<BloomFilter>()
zackcam marked this conversation as resolved.
Show resolved Hide resolved
+ std::mem::size_of::<bloomfilter::Bloom<[u8]>>()
+ (self.bloom.len() / 8) as usize
}

/// Caculates the number of bytes that the bloom filter will require to be allocated.
Expand All @@ -453,7 +481,7 @@ impl BloomFilter {
impl Drop for BloomFilterType {
fn drop(&mut self) {
metrics::BLOOM_OBJECT_TOTAL_MEMORY_BYTES.fetch_sub(
std::mem::size_of::<BloomFilterType>(),
self.bloom_filter_type_memory_usage(),
std::sync::atomic::Ordering::Relaxed,
);
metrics::BLOOM_NUM_OBJECTS.fetch_sub(1, Ordering::Relaxed);
Expand Down
5 changes: 3 additions & 2 deletions src/configs.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use lazy_static::lazy_static;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::AtomicI64;
use std::sync::atomic::{AtomicBool, AtomicI64};

/// Configurations
pub const BLOOM_CAPACITY_DEFAULT: i64 = 100000;
Expand All @@ -17,6 +16,7 @@ pub const BLOOM_FP_RATE_MAX: f64 = 1.0;

pub const BLOOM_USE_RANDOM_SEED_DEFAULT: bool = true;

pub const BLOOM_DEFRAG_DEAFULT: bool = true;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How will a user/operator change this value?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The user can add this to the start up arguments:
--bf.bloom-defrag-enabled no
This will skip all the code related to bloom defragmenting and instantly just return 0

// Max Memory usage allowed per bloom filter within a bloom object (64MB).
// Beyond this threshold, a bloom object is classified as large and is exempt from defrag operations.
// Also, write operations that result in bloom object allocation larger than this size will be rejected.
Expand All @@ -30,6 +30,7 @@ lazy_static! {
pub static ref BLOOM_MEMORY_LIMIT_PER_FILTER: AtomicI64 =
AtomicI64::new(BLOOM_MEMORY_LIMIT_PER_FILTER_DEFAULT);
pub static ref BLOOM_USE_RANDOM_SEED: AtomicBool = AtomicBool::default();
pub static ref BLOOM_DEFRAG: AtomicBool = AtomicBool::new(BLOOM_DEFRAG_DEAFULT);
}

/// Constants
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ valkey_module! {
],
bool: [
["bloom-use-random-seed", &*configs::BLOOM_USE_RANDOM_SEED, configs::BLOOM_USE_RANDOM_SEED_DEFAULT, ConfigurationFlags::DEFAULT, None],
["bloom-defrag-enabled", &*configs::BLOOM_DEFRAG, configs::BLOOM_DEFRAG_DEAFULT, ConfigurationFlags::DEFAULT, None],
],
enum: [
],
Expand Down
Loading
Loading