Skip to content

Commit

Permalink
Updating defragmentation to defrag both bloomfiltertype and bloomfile…
Browse files Browse the repository at this point in the history
…r structs

Signed-off-by: zackcam <[email protected]>
  • Loading branch information
zackcam committed Nov 22, 2024
1 parent a33e0e3 commit 2bf0ce8
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 24 deletions.
4 changes: 2 additions & 2 deletions src/bloom/data_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ pub trait ValkeyDataType {
impl ValkeyDataType for BloomFilterType {
/// Callback to load and parse RDB data of a bloom item and create it.
fn load_from_rdb(rdb: *mut raw::RedisModuleIO, encver: i32) -> Option<BloomFilterType> {
let mut filters = Vec::new();
if encver > BLOOM_FILTER_TYPE_ENCODING_VERSION {
logging::log_warning(format!("{}: Cannot load bloomfltr data type of version {} because it is higher than the loaded module's bloomfltr supported version {}", MODULE_NAME, encver, BLOOM_FILTER_TYPE_ENCODING_VERSION).as_str());
return None;
Expand All @@ -73,6 +72,7 @@ impl ValkeyDataType for BloomFilterType {
let Ok(fp_rate) = raw::load_double(rdb) else {
return None;
};
let mut filters = Vec::with_capacity(num_filters as usize);
for i in 0..num_filters {
let Ok(bitmap) = raw::load_string_buffer(rdb) else {
return None;
Expand Down Expand Up @@ -112,7 +112,7 @@ impl ValkeyDataType for BloomFilterType {
num_items as u32,
capacity as u32,
);
filters.push(filter);
filters.push(Box::new(filter));
}
BLOOM_OBJECT_TOTAL_MEMORY_BYTES.fetch_add(
mem::size_of::<BloomFilterType>(),
Expand Down
23 changes: 12 additions & 11 deletions src/bloom/utils.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use super::data_type::BLOOM_TYPE_VERSION;
use crate::{
configs::{
self, BLOOM_EXPANSION_MAX, BLOOM_EXPANSION_MIN, BLOOM_FP_RATE_MAX, BLOOM_FP_RATE_MIN,
Expand All @@ -6,9 +7,8 @@ use crate::{
};
use bloomfilter;
use serde::{Deserialize, Serialize};
use std::{mem, sync::atomic::Ordering};

use super::data_type::BLOOM_TYPE_VERSION;
use std::{mem, os::raw::c_void, sync::atomic::Ordering};
use valkey_module::{logging, raw};

/// KeySpace Notification Events
pub const ADD_EVENT: &str = "bloom.add";
Expand Down Expand Up @@ -69,7 +69,7 @@ impl BloomError {
pub struct BloomFilterType {
pub expansion: u32,
pub fp_rate: f64,
pub filters: Vec<BloomFilter>,
pub filters: Vec<Box<BloomFilter>>,
}

impl BloomFilterType {
Expand All @@ -92,7 +92,7 @@ impl BloomFilterType {
);

// Create the bloom filter and add to the main BloomFilter object.
let bloom = BloomFilter::new(fp_rate, capacity);
let bloom: Box<BloomFilter> = Box::new(BloomFilter::new(fp_rate, capacity));
let filters = vec![bloom];
let bloom = BloomFilterType {
expansion,
Expand All @@ -104,14 +104,14 @@ impl BloomFilterType {

/// Create a new BloomFilterType object from an existing one.
pub fn create_copy_from(from_bf: &BloomFilterType) -> BloomFilterType {
let mut filters = Vec::new();
let mut filters: Vec<Box<BloomFilter>> = Vec::with_capacity(from_bf.filters.len());
metrics::BLOOM_NUM_OBJECTS.fetch_add(1, Ordering::Relaxed);
metrics::BLOOM_OBJECT_TOTAL_MEMORY_BYTES.fetch_add(
mem::size_of::<BloomFilterType>(),
std::sync::atomic::Ordering::Relaxed,
);
for filter in &from_bf.filters {
let new_filter = BloomFilter::create_copy_from(filter);
let new_filter = Box::new(BloomFilter::create_copy_from(filter));
filters.push(new_filter);
}
BloomFilterType {
Expand Down Expand Up @@ -205,7 +205,7 @@ impl BloomFilterType {
if validate_size_limit && !BloomFilter::validate_size(new_capacity, new_fp_rate) {
return Err(BloomError::ExceedsMaxBloomSize);
}
let mut new_filter = BloomFilter::new(new_fp_rate, new_capacity);
let mut new_filter = Box::new(BloomFilter::new(new_fp_rate, new_capacity));
// Add item.
new_filter.set(item);
new_filter.num_items += 1;
Expand Down Expand Up @@ -245,9 +245,10 @@ impl BloomFilterType {
1 => {
// always use new version to init bloomFilterType.
// This is to ensure that the new fields can be recognized when the object is serialized and deserialized in the future.
let (expansion, fp_rate, filters): (u32, f64, Vec<BloomFilter>) =
match bincode::deserialize::<(u32, f64, Vec<BloomFilter>)>(&decoded_bytes[1..])
{
let (expansion, fp_rate, filters): (u32, f64, Vec<Box<BloomFilter>>) =
match bincode::deserialize::<(u32, f64, Vec<Box<BloomFilter>>)>(
&decoded_bytes[1..],
) {
Ok(values) => {
if !(BLOOM_EXPANSION_MIN..=BLOOM_EXPANSION_MAX).contains(&values.0) {
return Err(BloomError::BadExpansion);
Expand Down
43 changes: 32 additions & 11 deletions src/wrapper/bloom_callback.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use crate::bloom;
use crate::bloom::data_type::ValkeyDataType;
use crate::bloom::utils::BloomFilter;
use crate::bloom::utils::BloomFilterType;
use crate::configs;
use std::ffi::CString;
use std::os::raw::{c_char, c_int, c_void};
use std::ptr::null_mut;
use std::sync::atomic::Ordering;
use valkey_module::logging;
use valkey_module::logging::{log_io_error, ValkeyLogLevel};
use valkey_module::raw;
use valkey_module::{RedisModuleDefragCtx, RedisModuleString};
Expand Down Expand Up @@ -128,22 +128,43 @@ pub unsafe extern "C" fn bloom_free_effort(
curr_item.free_effort()
}

fn defrag_callback(ptr: *mut c_void) -> *mut c_void {
// Unsafe block to dereference the raw pointer
unsafe {
logging::log_warning("Starting inner defrag_callback");
logging::log_warning(format!("Before Address of allocated value. {:p}", ptr));
let v = raw::RedisModule_DefragAlloc.unwrap()(core::ptr::null_mut(), ptr);
logging::log_warning(format!("After Address of allocated value. {:p}", v));
logging::log_warning(format!("defrag_callback. Is Null? {:?}", v.is_null()));
logging::log_warning("Finished inner defrag_callback");
v
}
}

/// # Safety
/// Raw handler for the Bloom object's defrag callback.
pub unsafe extern "C" fn bloom_defrag(
_defrag_ctx: *mut RedisModuleDefragCtx,
_from_key: *mut RedisModuleString,
value: *mut *mut c_void,
) -> i32 {
let curr_item = &*(*value).cast::<BloomFilterType>();
if curr_item.memory_usage()
> configs::BLOOM_MEMORY_LIMIT_PER_FILTER.load(Ordering::Relaxed) as usize
{
return 0;
let bloom_filter_type: &mut BloomFilterType = &mut *(*value).cast::<BloomFilterType>();

let num_filts = bloom_filter_type.filters.len();

for _ in 0..num_filts {
let bloom_filter_box = bloom_filter_type.filters.remove(0);
let bloom_filter = Box::into_raw(bloom_filter_box);
let defrag_result = unsafe {
raw::RedisModule_DefragAlloc.unwrap()(
core::ptr::null_mut(),
(bloom_filter as *const BloomFilter as *mut BloomFilter) as *mut c_void,
)
};
let defragged_filter = Box::from_raw(defrag_result as *mut BloomFilter);
bloom_filter_type.filters.push(defragged_filter);
}
let new_item = BloomFilterType::create_copy_from(curr_item);
let bb = Box::new(new_item);
drop(Box::from_raw((*value).cast::<BloomFilterType>()));
*value = Box::into_raw(bb).cast::<libc::c_void>();
let val = unsafe { raw::RedisModule_DefragAlloc.unwrap()(core::ptr::null_mut(), *value) };
*value = val;
0
}

0 comments on commit 2bf0ce8

Please sign in to comment.