Skip to content

Commit

Permalink
Updating defrag and tests to use cursors and make test more robust by…
Browse files Browse the repository at this point in the history
… getting hits

Signed-off-by: zackcam <[email protected]>
  • Loading branch information
zackcam committed Nov 30, 2024
1 parent c2c1ce7 commit bda4fc6
Show file tree
Hide file tree
Showing 4 changed files with 162 additions and 183 deletions.
163 changes: 29 additions & 134 deletions src/wrapper/bloom_callback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,24 @@ use crate::bloom::data_type::ValkeyDataType;
use crate::bloom::utils::BloomFilter;
use crate::bloom::utils::BloomFilterType;
use crate::configs;
use crate::wrapper::defrag;
use bit_vec::BitVec;
use bloomfilter::Bloom;
use lazy_static::lazy_static;
use std::ffi::CString;
use std::io::Cursor;
use std::mem;
use std::os::raw::{c_char, c_int, c_void};
use std::ptr;
use std::ptr::null_mut;
use std::sync::atomic::Ordering;
use std::sync::Mutex;
use std::{thread, time};
use valkey_module::logging;
use valkey_module::logging::{log_io_error, ValkeyLogLevel};
use valkey_module::raw;
use valkey_module::{RedisModuleDefragCtx, RedisModuleString};

use super::defrag::Defrag;

// Note: methods in this mod are for the bloom module data type callbacks.
// The reason they are unsafe is because the callback methods are expected to be
// "unsafe extern C" based on the Rust module API definition
Expand Down Expand Up @@ -136,98 +139,25 @@ pub unsafe extern "C" fn bloom_free_effort(
curr_item.free_effort()
}

// /// # Safety
// /// Raw handler for the Bloom object's defrag callback.
// pub unsafe extern "C" fn bloom_defrag(
// _defrag_ctx: *mut RedisModuleDefragCtx,
// _from_key: *mut RedisModuleString,
// value: *mut *mut c_void,
// ) -> i32 {
// if !configs::BLOOM_DEFRAG.load(Ordering::Relaxed) {
// return 0;
// }
// let bloom_filter_type: &mut BloomFilterType = &mut *(*value).cast::<BloomFilterType>();

// let num_filts = bloom_filter_type.filters.len();

// for _ in 0..num_filts {
// let bloom_filter_box = bloom_filter_type.filters.remove(0);
// let bloom_filter = Box::into_raw(bloom_filter_box);
// logging::log_warning(format!("Before Address: {:p}", bloom_filter));
// let defrag_result = unsafe {
// raw::RedisModule_DefragAlloc.unwrap()(
// core::ptr::null_mut(),
// bloom_filter as *mut c_void,
// )
// };
// let mut defragged_filter = {
// if !defrag_result.is_null() {
// Box::from_raw(defrag_result as *mut BloomFilter)
// } else {
// Box::from_raw(bloom_filter)
// }
// };
// logging::log_warning(format!("After Address: {:p}", defragged_filter));
// // let test = Box::leak(defragged_filter.bloom);
// // let tes = Box::into_raw(test);
// // let inner_bloom = mem::replace(
// // &mut defragged_filter.bloom,
// // Box::new(bloomfilter::Bloom::new(1, 1)),
// // );
// // let inner_bloom = mem::replace(
// // &mut defragged_filter.bloom,
// // Box::from_raw(ptr::null::<bloomfilter::Bloom<[u8]>>() as *mut bloomfilter::Bloom<[u8]>),
// // );
// let inner_bloom = mem::take(&mut defragged_filter.bloom);
// let inner_bloom_ptr = Box::into_raw(inner_bloom);
// logging::log_warning(format!("Before bloom Address: {:p}", inner_bloom_ptr));
// let defragged_inner_bloom = raw::RedisModule_DefragAlloc.unwrap()(
// core::ptr::null_mut(),
// inner_bloom_ptr as *mut c_void,
// );
// defragged_filter.bloom = {
// if !defrag_result.is_null() {
// Box::from_raw(defragged_inner_bloom as *mut bloomfilter::Bloom<[u8]>)
// } else {
// Box::from_raw(inner_bloom_ptr)
// }
// };
// logging::log_warning(format!("After bloom Address: {:p}", defragged_filter.bloom));
// bloom_filter_type.filters.push(defragged_filter);
// }
// let val = unsafe { raw::RedisModule_DefragAlloc.unwrap()(core::ptr::null_mut(), *value) };
// if !val.is_null() {
// *value = val;
// }
// 0
// }

lazy_static! {
static ref DEFRAG_BLOOM_FILTER: Mutex<Option<Box<Bloom<[u8]>>>> =
Mutex::new(Some(Box::new(Bloom::<[u8]>::new(1, 1))));
static ref DEFRAG_VEC: Mutex<Option<Vec<u32>>> = Mutex::new(Some(Vec::new()));
}

fn external_vec_defrag(mut vec: Vec<u32>) -> Vec<u32> {
let clonev = vec.clone();
fn external_vec_defrag(mut vec: Vec<u32>, priv_data: *mut c_void) -> Vec<u32> {
let defrag = Defrag::new(core::ptr::null_mut());
let len = vec.len();
let capacity = vec.capacity();
// let ptr: *mut u32 = vec.as_mut_ptr();
let vec_ptr = Box::into_raw(vec.into_boxed_slice()) as *mut c_void;
logging::log_warning(format!("Before vec_ptr start Address: {:p}", vec_ptr));

let defragged_filters_ptr =
unsafe { raw::RedisModule_DefragAlloc.unwrap()(core::ptr::null_mut(), vec_ptr) };
logging::log_warning(format!(
"After hmmm vec Address: {:p}",
defragged_filters_ptr
));
// let defragged_filters_ptr =
// unsafe { raw::RedisModule_DefragAlloc.unwrap()(core::ptr::null_mut(), vec_ptr) };
let defragged_filters_ptr = unsafe { defrag.alloc(vec_ptr) };
if !defragged_filters_ptr.is_null() {
unsafe { Vec::from_raw_parts(defragged_filters_ptr as *mut u32, len, capacity) }
} else {
unsafe { Vec::from_raw_parts(vec_ptr as *mut u32, len, capacity) }
}
// unsafe { Vec::from_raw_parts(defragged_filters_ptr as *mut u32, len, capacity) }
}

fn external_bitvec_defrag(bit_vec: BitVec) -> BitVec {
Expand All @@ -244,34 +174,22 @@ fn external_bitvec_defrag(bit_vec: BitVec) -> BitVec {
/// # Safety
/// Raw handler for the Bloom object's defrag callback.
pub unsafe extern "C" fn bloom_defrag(
_defrag_ctx: *mut RedisModuleDefragCtx,
defrag_ctx: *mut RedisModuleDefragCtx,
_from_key: *mut RedisModuleString,
value: *mut *mut c_void,
) -> i32 {
// logging::log_warning(format!("After here 0"));
let mut cursor: u64 = 0;
let defrag = Defrag::new(defrag_ctx);
defrag.curserget(&mut cursor);

let bloom_filter_type: &mut BloomFilterType = &mut *(*value).cast::<BloomFilterType>();

let num_filts = bloom_filter_type.filters.len();

logging::log_warning(format!(
"defrag in box Address: {:p}",
bloom_filter_type.filters.as_ptr()
));

for _ in 0..num_filts {
let bloom_filter_box = bloom_filter_type.filters.remove(0);
let mut one_iteration = false;
while defrag.should_stop_defrag() == 0 && cursor < num_filts.try_into().unwrap() {
let bloom_filter_box = bloom_filter_type.filters.remove(cursor.try_into().unwrap());
let bloom_filter = Box::into_raw(bloom_filter_box);

let defrag_result = unsafe {
raw::RedisModule_DefragAlloc.unwrap()(
core::ptr::null_mut(),
bloom_filter as *mut c_void,
)
};

logging::log_warning(format!("Before Vec start Address: {:p}", defrag_result));

let defrag_result = defrag.alloc(bloom_filter as *mut c_void);
let mut defragged_filter = {
if !defrag_result.is_null() {
Box::from_raw(defrag_result as *mut BloomFilter)
Expand All @@ -285,11 +203,7 @@ pub unsafe extern "C" fn bloom_defrag(
defrag_b.take().expect("We expect default to exist"),
);
let inner_bloom_ptr = Box::into_raw(inner_bloom);
let defragged_inner_bloom = raw::RedisModule_DefragAlloc.unwrap()(
core::ptr::null_mut(),
inner_bloom_ptr as *mut c_void,
);
logging::log_warning(format!("defrag in box Address: {:p}", defragged_filter));
let defragged_inner_bloom = defrag.alloc(inner_bloom_ptr as *mut c_void);
if !defragged_inner_bloom.is_null() {
let inner_bloom = mem::replace(
&mut defragged_filter.bloom,
Expand All @@ -301,38 +215,23 @@ pub unsafe extern "C" fn bloom_defrag(
mem::replace(&mut defragged_filter.bloom, Box::from_raw(inner_bloom_ptr));
*defrag_b = Some(inner_bloom); // Resetting the original static
}
// let inner_bloom = mem::replace(
// &mut defragged_filter.bloom,
// Box::from_raw(defragged_inner_bloom as *mut bloomfilter::Bloom<[u8]>),
// );
// *defrag_b = Some(inner_bloom); // Resetting the original static

// logging::log_warning(format!("1bloom filter len: {}", bloom_filter_type.filters.len()));
// let mut defrag_v = DEFRAG_VEC.lock().unwrap();
// let placeholder = defrag_v.take().unwrap();
// defragged_filter
// .bloom
// .defrag_no(external_bitvec_defrag, external_vec_defrag);
// // *defrag_v = Some(newplaceholder); // Resetting the original static
// logging::log_warning(format!("After bloom Address: {:p}", defragged_filter.bloom));

// logging::log_warning(format!("2bloom filter len: {}", bloom_filter_type.filters.len()));
defragged_filter
.bloom
.defrag_no(external_bitvec_defrag, external_vec_defrag);

bloom_filter_type.filters.push(defragged_filter);
bloom_filter_type
.filters
.insert(cursor.try_into().unwrap(), defragged_filter);
cursor += 1;
}
defrag.curserset(cursor);
if cursor < (num_filts).try_into().unwrap() {
return 1;
}
let filters_vec = mem::take(&mut bloom_filter_type.filters);
let filters_ptr = Box::into_raw(filters_vec.into_boxed_slice()) as *mut c_void;
// logging::log_warning(format!("Before Vec start Address: {:p}", filters_ptr));
let defragged_filters_ptr = defrag.alloc(filters_ptr);

let defragged_filters_ptr =
unsafe { raw::RedisModule_DefragAlloc.unwrap()(core::ptr::null_mut(), filters_ptr) };
logging::log_warning(format!(
"After Vec start Address: {:p} \n\n\n",
defragged_filters_ptr
));
if !defragged_filters_ptr.is_null() {
bloom_filter_type.filters = unsafe {
Vec::from_raw_parts(
Expand All @@ -346,13 +245,9 @@ pub unsafe extern "C" fn bloom_defrag(
Vec::from_raw_parts(filters_ptr as *mut Box<BloomFilter>, num_filts, num_filts)
};
}
// logging::log_warning(format!("After here last"));

let val = unsafe { raw::RedisModule_DefragAlloc.unwrap()(core::ptr::null_mut(), *value) };
let val = defrag.alloc(*value);
if !val.is_null() {
*value = val;
}
logging::log_warning("After here super last");

0
}
41 changes: 41 additions & 0 deletions src/wrapper/defrag.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
use std::os::raw::{c_ulong, c_void};

use valkey_module::raw;

pub struct Defrag {
pub defrag_ctx: *mut raw::RedisModuleDefragCtx,
}

impl Defrag {
pub const fn new(defrag_ctx: *mut raw::RedisModuleDefragCtx) -> Self {
Self { defrag_ctx }
}

/// # Safety
///
/// This function should not be called before the horsemen are ready.
pub unsafe fn alloc(&self, ptr: *mut c_void) -> *mut c_void {
unsafe { raw::RedisModule_DefragAlloc.unwrap()(self.defrag_ctx, ptr) }
}

/// # Safety
///
/// This function should not be called before the horsemen are ready.
pub unsafe fn curserset(&self, cursor: u64) -> i32 {
unsafe { raw::RedisModule_DefragCursorSet.unwrap()(self.defrag_ctx, cursor) }
}

/// # Safety
///
/// This function should not be called before the horsemen are ready.
pub unsafe fn curserget(&self, cursor: *mut u64) -> i32 {
unsafe { raw::RedisModule_DefragCursorGet.unwrap()(self.defrag_ctx, cursor) }
}

/// # Safety
///
/// This function should not be called before the horsemen are ready.
pub unsafe fn should_stop_defrag(&self) -> i32 {
unsafe { raw::RedisModule_DefragShouldStop.unwrap()(self.defrag_ctx) }
}
}
1 change: 1 addition & 0 deletions src/wrapper/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
pub mod bloom_callback;
pub mod defrag;
Loading

0 comments on commit bda4fc6

Please sign in to comment.