diff --git a/src/bloom/data_type.rs b/src/bloom/data_type.rs index 215f182..c0693d7 100644 --- a/src/bloom/data_type.rs +++ b/src/bloom/data_type.rs @@ -59,7 +59,6 @@ pub trait ValkeyDataType { impl ValkeyDataType for BloomFilterType { /// Callback to load and parse RDB data of a bloom item and create it. fn load_from_rdb(rdb: *mut raw::RedisModuleIO, encver: i32) -> Option { - let mut filters = Vec::new(); if encver > BLOOM_FILTER_TYPE_ENCODING_VERSION { logging::log_warning(format!("{}: Cannot load bloomfltr data type of version {} because it is higher than the loaded module's bloomfltr supported version {}", MODULE_NAME, encver, BLOOM_FILTER_TYPE_ENCODING_VERSION).as_str()); return None; @@ -73,6 +72,7 @@ impl ValkeyDataType for BloomFilterType { let Ok(fp_rate) = raw::load_double(rdb) else { return None; }; + let mut filters = Vec::with_capacity(num_filters as usize); for i in 0..num_filters { let Ok(bitmap) = raw::load_string_buffer(rdb) else { return None; @@ -112,7 +112,7 @@ impl ValkeyDataType for BloomFilterType { num_items as u32, capacity as u32, ); - filters.push(filter); + filters.push(Box::new(filter)); } BLOOM_OBJECT_TOTAL_MEMORY_BYTES.fetch_add( mem::size_of::(), diff --git a/src/bloom/utils.rs b/src/bloom/utils.rs index 55e327f..68d037e 100644 --- a/src/bloom/utils.rs +++ b/src/bloom/utils.rs @@ -1,3 +1,4 @@ +use super::data_type::BLOOM_TYPE_VERSION; use crate::{ configs::{ self, BLOOM_EXPANSION_MAX, BLOOM_EXPANSION_MIN, BLOOM_FP_RATE_MAX, BLOOM_FP_RATE_MIN, @@ -6,9 +7,8 @@ use crate::{ }; use bloomfilter; use serde::{Deserialize, Serialize}; -use std::{mem, sync::atomic::Ordering}; - -use super::data_type::BLOOM_TYPE_VERSION; +use std::{mem, os::raw::c_void, sync::atomic::Ordering}; +use valkey_module::{logging, raw}; /// KeySpace Notification Events pub const ADD_EVENT: &str = "bloom.add"; @@ -69,7 +69,7 @@ impl BloomError { pub struct BloomFilterType { pub expansion: u32, pub fp_rate: f64, - pub filters: Vec, + pub filters: Vec>, } impl BloomFilterType { @@ -92,7 +92,7 @@ impl BloomFilterType { ); // Create the bloom filter and add to the main BloomFilter object. - let bloom = BloomFilter::new(fp_rate, capacity); + let bloom: Box = Box::new(BloomFilter::new(fp_rate, capacity)); let filters = vec![bloom]; let bloom = BloomFilterType { expansion, @@ -104,14 +104,14 @@ impl BloomFilterType { /// Create a new BloomFilterType object from an existing one. pub fn create_copy_from(from_bf: &BloomFilterType) -> BloomFilterType { - let mut filters = Vec::new(); + let mut filters: Vec> = Vec::with_capacity(from_bf.filters.len()); metrics::BLOOM_NUM_OBJECTS.fetch_add(1, Ordering::Relaxed); metrics::BLOOM_OBJECT_TOTAL_MEMORY_BYTES.fetch_add( mem::size_of::(), std::sync::atomic::Ordering::Relaxed, ); for filter in &from_bf.filters { - let new_filter = BloomFilter::create_copy_from(filter); + let new_filter = Box::new(BloomFilter::create_copy_from(filter)); filters.push(new_filter); } BloomFilterType { @@ -205,7 +205,7 @@ impl BloomFilterType { if validate_size_limit && !BloomFilter::validate_size(new_capacity, new_fp_rate) { return Err(BloomError::ExceedsMaxBloomSize); } - let mut new_filter = BloomFilter::new(new_fp_rate, new_capacity); + let mut new_filter = Box::new(BloomFilter::new(new_fp_rate, new_capacity)); // Add item. new_filter.set(item); new_filter.num_items += 1; @@ -245,9 +245,10 @@ impl BloomFilterType { 1 => { // always use new version to init bloomFilterType. // This is to ensure that the new fields can be recognized when the object is serialized and deserialized in the future. - let (expansion, fp_rate, filters): (u32, f64, Vec) = - match bincode::deserialize::<(u32, f64, Vec)>(&decoded_bytes[1..]) - { + let (expansion, fp_rate, filters): (u32, f64, Vec>) = + match bincode::deserialize::<(u32, f64, Vec>)>( + &decoded_bytes[1..], + ) { Ok(values) => { if !(BLOOM_EXPANSION_MIN..=BLOOM_EXPANSION_MAX).contains(&values.0) { return Err(BloomError::BadExpansion); diff --git a/src/wrapper/bloom_callback.rs b/src/wrapper/bloom_callback.rs index caa882b..8c56e5c 100644 --- a/src/wrapper/bloom_callback.rs +++ b/src/wrapper/bloom_callback.rs @@ -1,11 +1,11 @@ use crate::bloom; use crate::bloom::data_type::ValkeyDataType; +use crate::bloom::utils::BloomFilter; use crate::bloom::utils::BloomFilterType; -use crate::configs; use std::ffi::CString; use std::os::raw::{c_char, c_int, c_void}; use std::ptr::null_mut; -use std::sync::atomic::Ordering; +use valkey_module::logging; use valkey_module::logging::{log_io_error, ValkeyLogLevel}; use valkey_module::raw; use valkey_module::{RedisModuleDefragCtx, RedisModuleString}; @@ -128,6 +128,19 @@ pub unsafe extern "C" fn bloom_free_effort( curr_item.free_effort() } +fn defrag_callback(ptr: *mut c_void) -> *mut c_void { + // Unsafe block to dereference the raw pointer + unsafe { + logging::log_warning("Starting inner defrag_callback"); + logging::log_warning(format!("Before Address of allocated value. {:p}", ptr)); + let v = raw::RedisModule_DefragAlloc.unwrap()(core::ptr::null_mut(), ptr); + logging::log_warning(format!("After Address of allocated value. {:p}", v)); + logging::log_warning(format!("defrag_callback. Is Null? {:?}", v.is_null())); + logging::log_warning("Finished inner defrag_callback"); + v + } +} + /// # Safety /// Raw handler for the Bloom object's defrag callback. pub unsafe extern "C" fn bloom_defrag( @@ -135,15 +148,23 @@ pub unsafe extern "C" fn bloom_defrag( _from_key: *mut RedisModuleString, value: *mut *mut c_void, ) -> i32 { - let curr_item = &*(*value).cast::(); - if curr_item.memory_usage() - > configs::BLOOM_MEMORY_LIMIT_PER_FILTER.load(Ordering::Relaxed) as usize - { - return 0; + let bloom_filter_type: &mut BloomFilterType = &mut *(*value).cast::(); + + let num_filts = bloom_filter_type.filters.len(); + + for _ in 0..num_filts { + let bloom_filter_box = bloom_filter_type.filters.remove(0); + let bloom_filter = Box::into_raw(bloom_filter_box); + let defrag_result = unsafe { + raw::RedisModule_DefragAlloc.unwrap()( + core::ptr::null_mut(), + (bloom_filter as *const BloomFilter as *mut BloomFilter) as *mut c_void, + ) + }; + let defragged_filter = Box::from_raw(defrag_result as *mut BloomFilter); + bloom_filter_type.filters.push(defragged_filter); } - let new_item = BloomFilterType::create_copy_from(curr_item); - let bb = Box::new(new_item); - drop(Box::from_raw((*value).cast::())); - *value = Box::into_raw(bb).cast::(); + let val = unsafe { raw::RedisModule_DefragAlloc.unwrap()(core::ptr::null_mut(), *value) }; + *value = val; 0 }