Skip to content

Commit

Permalink
Updating defragmentation to defrag both bloomfiltertype and bloomfile…
Browse files Browse the repository at this point in the history
…r structs

Signed-off-by: zackcam <[email protected]>
  • Loading branch information
zackcam committed Nov 26, 2024
1 parent a33e0e3 commit d490857
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 29 deletions.
4 changes: 2 additions & 2 deletions src/bloom/data_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ pub trait ValkeyDataType {
impl ValkeyDataType for BloomFilterType {
/// Callback to load and parse RDB data of a bloom item and create it.
fn load_from_rdb(rdb: *mut raw::RedisModuleIO, encver: i32) -> Option<BloomFilterType> {
let mut filters = Vec::new();
if encver > BLOOM_FILTER_TYPE_ENCODING_VERSION {
logging::log_warning(format!("{}: Cannot load bloomfltr data type of version {} because it is higher than the loaded module's bloomfltr supported version {}", MODULE_NAME, encver, BLOOM_FILTER_TYPE_ENCODING_VERSION).as_str());
return None;
Expand All @@ -73,6 +72,7 @@ impl ValkeyDataType for BloomFilterType {
let Ok(fp_rate) = raw::load_double(rdb) else {
return None;
};
let mut filters = Vec::with_capacity(num_filters as usize);
for i in 0..num_filters {
let Ok(bitmap) = raw::load_string_buffer(rdb) else {
return None;
Expand Down Expand Up @@ -112,7 +112,7 @@ impl ValkeyDataType for BloomFilterType {
num_items as u32,
capacity as u32,
);
filters.push(filter);
filters.push(Box::new(filter));
}
BLOOM_OBJECT_TOTAL_MEMORY_BYTES.fetch_add(
mem::size_of::<BloomFilterType>(),
Expand Down
33 changes: 18 additions & 15 deletions src/bloom/utils.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use super::data_type::BLOOM_TYPE_VERSION;
use crate::{
configs::{
self, BLOOM_EXPANSION_MAX, BLOOM_EXPANSION_MIN, BLOOM_FP_RATE_MAX, BLOOM_FP_RATE_MIN,
Expand All @@ -6,9 +7,8 @@ use crate::{
};
use bloomfilter;
use serde::{Deserialize, Serialize};
use std::{mem, sync::atomic::Ordering};

use super::data_type::BLOOM_TYPE_VERSION;
use std::{mem, os::raw::c_void, sync::atomic::Ordering};
use valkey_module::{logging, raw};

/// KeySpace Notification Events
pub const ADD_EVENT: &str = "bloom.add";
Expand Down Expand Up @@ -69,7 +69,7 @@ impl BloomError {
pub struct BloomFilterType {
pub expansion: u32,
pub fp_rate: f64,
pub filters: Vec<BloomFilter>,
pub filters: Vec<Box<BloomFilter>>,
}

impl BloomFilterType {
Expand All @@ -92,7 +92,7 @@ impl BloomFilterType {
);

// Create the bloom filter and add to the main BloomFilter object.
let bloom = BloomFilter::new(fp_rate, capacity);
let bloom: Box<BloomFilter> = Box::new(BloomFilter::new(fp_rate, capacity));
let filters = vec![bloom];
let bloom = BloomFilterType {
expansion,
Expand All @@ -104,14 +104,14 @@ impl BloomFilterType {

/// Create a new BloomFilterType object from an existing one.
pub fn create_copy_from(from_bf: &BloomFilterType) -> BloomFilterType {
let mut filters = Vec::new();
let mut filters: Vec<Box<BloomFilter>> = Vec::with_capacity(from_bf.filters.len());
metrics::BLOOM_NUM_OBJECTS.fetch_add(1, Ordering::Relaxed);
metrics::BLOOM_OBJECT_TOTAL_MEMORY_BYTES.fetch_add(
mem::size_of::<BloomFilterType>(),
std::sync::atomic::Ordering::Relaxed,
);
for filter in &from_bf.filters {
let new_filter = BloomFilter::create_copy_from(filter);
let new_filter = Box::new(BloomFilter::create_copy_from(filter));
filters.push(new_filter);
}
BloomFilterType {
Expand Down Expand Up @@ -205,7 +205,7 @@ impl BloomFilterType {
if validate_size_limit && !BloomFilter::validate_size(new_capacity, new_fp_rate) {
return Err(BloomError::ExceedsMaxBloomSize);
}
let mut new_filter = BloomFilter::new(new_fp_rate, new_capacity);
let mut new_filter = Box::new(BloomFilter::new(new_fp_rate, new_capacity));
// Add item.
new_filter.set(item);
new_filter.num_items += 1;
Expand Down Expand Up @@ -245,9 +245,10 @@ impl BloomFilterType {
1 => {
// always use new version to init bloomFilterType.
// This is to ensure that the new fields can be recognized when the object is serialized and deserialized in the future.
let (expansion, fp_rate, filters): (u32, f64, Vec<BloomFilter>) =
match bincode::deserialize::<(u32, f64, Vec<BloomFilter>)>(&decoded_bytes[1..])
{
let (expansion, fp_rate, filters): (u32, f64, Vec<Box<BloomFilter>>) =
match bincode::deserialize::<(u32, f64, Vec<Box<BloomFilter>>)>(
&decoded_bytes[1..],
) {
Ok(values) => {
if !(BLOOM_EXPANSION_MIN..=BLOOM_EXPANSION_MAX).contains(&values.0) {
return Err(BloomError::BadExpansion);
Expand Down Expand Up @@ -318,7 +319,7 @@ impl BloomFilterType {
/// well within the u32::MAX limit.
#[derive(Serialize, Deserialize)]
pub struct BloomFilter {
pub bloom: bloomfilter::Bloom<[u8]>,
pub bloom: Box<bloomfilter::Bloom<[u8]>>,
pub num_items: u32,
pub capacity: u32,
}
Expand All @@ -332,7 +333,7 @@ impl BloomFilter {
&configs::FIXED_SEED,
);
let fltr = BloomFilter {
bloom,
bloom: Box::new(bloom),
num_items: 0,
capacity,
};
Expand Down Expand Up @@ -361,7 +362,7 @@ impl BloomFilter {
sip_keys,
);
let fltr = BloomFilter {
bloom,
bloom: Box::new(bloom),
num_items,
capacity,
};
Expand All @@ -377,7 +378,9 @@ impl BloomFilter {
}

pub fn number_of_bytes(&self) -> usize {
std::mem::size_of::<BloomFilter>() + (self.bloom.number_of_bits() / 8) as usize
std::mem::size_of::<BloomFilter>()
+ std::mem::size_of::<bloomfilter::Bloom<[u8]>>()
+ (self.bloom.number_of_bits() / 8) as usize
}

/// Caculates the number of bytes that the bloom filter will require to be allocated.
Expand Down
43 changes: 32 additions & 11 deletions src/wrapper/bloom_callback.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use crate::bloom;
use crate::bloom::data_type::ValkeyDataType;
use crate::bloom::utils::BloomFilter;
use crate::bloom::utils::BloomFilterType;
use crate::configs;
use std::ffi::CString;
use std::mem;
use std::os::raw::{c_char, c_int, c_void};
use std::ptr::null_mut;
use std::sync::atomic::Ordering;
use valkey_module::logging;
use valkey_module::logging::{log_io_error, ValkeyLogLevel};
use valkey_module::raw;
use valkey_module::{RedisModuleDefragCtx, RedisModuleString};
Expand Down Expand Up @@ -135,15 +136,35 @@ pub unsafe extern "C" fn bloom_defrag(
_from_key: *mut RedisModuleString,
value: *mut *mut c_void,
) -> i32 {
let curr_item = &*(*value).cast::<BloomFilterType>();
if curr_item.memory_usage()
> configs::BLOOM_MEMORY_LIMIT_PER_FILTER.load(Ordering::Relaxed) as usize
{
return 0;
let bloom_filter_type: &mut BloomFilterType = &mut *(*value).cast::<BloomFilterType>();

let num_filts = bloom_filter_type.filters.len();

for _ in 0..num_filts {
let bloom_filter_box = bloom_filter_type.filters.remove(0);
let bloom_filter = Box::into_raw(bloom_filter_box);
let defrag_result = unsafe {
raw::RedisModule_DefragAlloc.unwrap()(
core::ptr::null_mut(),
(bloom_filter as *const BloomFilter as *mut BloomFilter) as *mut c_void,
)
};
let mut defragged_filter = Box::from_raw(defrag_result as *mut BloomFilter);

let inner_bloom = mem::replace(
&mut defragged_filter.bloom,
Box::new(bloomfilter::Bloom::new(1, 1)),
);
let inner_bloom_ptr = Box::into_raw(inner_bloom);
let defragged_inner_bloom = raw::RedisModule_DefragAlloc.unwrap()(
core::ptr::null_mut(),
inner_bloom_ptr as *mut c_void,
);
defragged_filter.bloom =
Box::from_raw(defragged_inner_bloom as *mut bloomfilter::Bloom<[u8]>);
bloom_filter_type.filters.push(defragged_filter);
}
let new_item = BloomFilterType::create_copy_from(curr_item);
let bb = Box::new(new_item);
drop(Box::from_raw((*value).cast::<BloomFilterType>()));
*value = Box::into_raw(bb).cast::<libc::c_void>();
let val = unsafe { raw::RedisModule_DefragAlloc.unwrap()(core::ptr::null_mut(), *value) };
*value = val;
0
}
2 changes: 1 addition & 1 deletion tests/test_bloom_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from valkeytests.conftest import resource_port_tracker
from util.waiters import *

DEFAULT_BLOOM_FILTER_SIZE = 179960
DEFAULT_BLOOM_FILTER_SIZE = 179968
DEFAULT_BLOOM_FILTER_CAPACITY = 100000
class TestBloomMetrics(ValkeyBloomTestCaseBase):

Expand Down

0 comments on commit d490857

Please sign in to comment.