Skip to content

Commit

Permalink
Fixing merge conflicts
Browse files Browse the repository at this point in the history
Signed-off-by: zackcam <[email protected]>
  • Loading branch information
zackcam committed Dec 4, 2024
1 parent fdee5d6 commit d8b41c2
Show file tree
Hide file tree
Showing 9 changed files with 19 additions and 50 deletions.
6 changes: 0 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ valkey-module-macros = "0"
linkme = "0"
bloomfilter = { version = "3.0.1", features = ["serde"] }
lazy_static = "1.4.0"
bit-vec = "0.8.0"
libc = "0.2"
serde = { version = "1.0", features = ["derive"] }
bincode = "1.3"
Expand All @@ -34,10 +33,5 @@ opt-level = 0
debug = 2
debug-assertions = true

[profile.release]
opt-level = 0
debug = 2
debug-assertions = true

[features]
enable-system-alloc = ["valkey-module/enable-system-alloc"]
2 changes: 1 addition & 1 deletion build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ echo "Running cargo build release..."
cargo build --all --all-targets --release

echo "Running unit tests..."
# cargo test --features enable-system-alloc
cargo test --features enable-system-alloc

# Ensure SERVER_VERSION environment variable is set
if [ -z "$SERVER_VERSION" ]; then
Expand Down
4 changes: 2 additions & 2 deletions src/bloom/data_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ impl ValkeyDataType for BloomFilterType {
} else {
Vec::new()
};
// let mut filters = Vec::new();
for i in 0..num_filters {
let Ok(bitmap) = raw::load_string_buffer(rdb) else {
return None;
Expand Down Expand Up @@ -108,13 +107,14 @@ impl ValkeyDataType for BloomFilterType {
};
let filter =
BloomFilter::from_existing(bitmap.as_ref(), num_items as u32, capacity as u32);
filters.push(filter);
filters.push(Box::new(filter));
}
BLOOM_OBJECT_TOTAL_MEMORY_BYTES.fetch_add(
mem::size_of::<BloomFilterType>()
+ (filters.capacity() * std::mem::size_of::<Box<BloomFilter>>()),
std::sync::atomic::Ordering::Relaxed,
);

BLOOM_NUM_OBJECTS.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let item = BloomFilterType {
expansion: expansion as u32,
Expand Down
38 changes: 10 additions & 28 deletions src/bloom/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@ use crate::{
},
metrics,
};
use bloomfilter;
use bloomfilter::Bloom;
use bloomfilter::{deserialize, serialize};
use serde::{Deserialize, Serialize};
use std::{mem, os::raw::c_void, sync::atomic::Ordering};
use valkey_module::{logging, raw};
use serde::{Deserialize, Deserializer, Serialize};
use std::{mem, sync::atomic::Ordering};

/// KeySpace Notification Events
pub const ADD_EVENT: &str = "bloom.add";
Expand Down Expand Up @@ -106,8 +105,6 @@ impl BloomFilterType {

/// Create a new BloomFilterType object from an existing one.
pub fn create_copy_from(from_bf: &BloomFilterType) -> BloomFilterType {
let mut filters: Vec<BloomFilter> = Vec::with_capacity(from_bf.filters.len());
let mut filters: Vec<Box<BloomFilter>> = Vec::with_capacity(from_bf.filters.len());
let mut filters: Vec<Box<BloomFilter>> = Vec::with_capacity(from_bf.filters.capacity());
metrics::BLOOM_NUM_OBJECTS.fetch_add(1, Ordering::Relaxed);
for filter in &from_bf.filters {
Expand All @@ -128,7 +125,8 @@ impl BloomFilterType {

/// Return the total memory usage of the BloomFilterType object.
pub fn memory_usage(&self) -> usize {
let mut mem: usize = std::mem::size_of::<BloomFilterType>();
let mut mem: usize = std::mem::size_of::<BloomFilterType>()
+ (self.filters.capacity() * std::mem::size_of::<Box<BloomFilter>>());
for filter in &self.filters {
mem += filter.number_of_bytes();
}
Expand Down Expand Up @@ -208,7 +206,6 @@ impl BloomFilterType {
if validate_size_limit && !BloomFilter::validate_size(new_capacity, new_fp_rate) {
return Err(BloomError::ExceedsMaxBloomSize);
}
let mut new_filter = BloomFilter::new(new_fp_rate, new_capacity);
let mut new_filter = Box::new(BloomFilter::new(new_fp_rate, new_capacity));
let capacity_before = self.filters.capacity();
// Add item.
Expand Down Expand Up @@ -244,6 +241,7 @@ impl BloomFilterType {
}
}

/// Calculate the false positive rate for the Nth filter using tightening ratio.
pub fn calculate_fp_rate(fp_rate: f64, num_filters: i32) -> Result<f64, BloomError> {
match fp_rate * configs::TIGHTENING_RATIO.powi(num_filters) {
x if x > f64::MIN_POSITIVE => Ok(x),
Expand All @@ -265,9 +263,10 @@ impl BloomFilterType {
1 => {
// always use new version to init bloomFilterType.
// This is to ensure that the new fields can be recognized when the object is serialized and deserialized in the future.
let (expansion, fp_rate, filters): (u32, f64, Vec<BloomFilter>) =
match bincode::deserialize::<(u32, f64, Vec<BloomFilter>)>(&decoded_bytes[1..])
{
let (expansion, fp_rate, filters): (u32, f64, Vec<Box<BloomFilter>>) =
match bincode::deserialize::<(u32, f64, Vec<Box<BloomFilter>>)>(
&decoded_bytes[1..],
) {
Ok(values) => {
if !(BLOOM_EXPANSION_MIN..=BLOOM_EXPANSION_MAX).contains(&values.0) {
return Err(BloomError::BadExpansion);
Expand Down Expand Up @@ -339,8 +338,6 @@ impl BloomFilterType {
/// well within the u32::MAX limit.
#[derive(Serialize, Deserialize)]
pub struct BloomFilter {
#[serde(serialize_with = "serialize", deserialize_with = "deserialize")]
pub bloom: bloomfilter::Bloom<[u8]>,
#[serde(
serialize_with = "serialize",
deserialize_with = "deserialize_boxed_bloom"
Expand All @@ -350,9 +347,6 @@ pub struct BloomFilter {
pub capacity: u32,
}

use bloomfilter::Bloom;
use serde::Deserializer;

pub fn deserialize_boxed_bloom<'de, D>(deserializer: D) -> Result<Box<Bloom<[u8]>>, D::Error>
where
D: Deserializer<'de>,
Expand All @@ -368,7 +362,6 @@ impl BloomFilter {
fp_rate,
&configs::FIXED_SEED,
)
.unwrap()
.expect("We expect bloomfilter::Bloom<[u8]> creation to succeed");
let fltr = BloomFilter {
bloom: Box::new(bloom),
Expand Down Expand Up @@ -406,7 +399,6 @@ impl BloomFilter {
}

pub fn number_of_bytes(&self) -> usize {
std::mem::size_of::<BloomFilter>() + (self.bloom.len() / 8) as usize
std::mem::size_of::<BloomFilter>()
+ std::mem::size_of::<bloomfilter::Bloom<[u8]>>()
+ (self.bloom.len() / 8) as usize
Expand Down Expand Up @@ -434,7 +426,6 @@ impl BloomFilter {

/// Create a new BloomFilter from an existing BloomFilter object (COPY command).
pub fn create_copy_from(bf: &BloomFilter) -> BloomFilter {
BloomFilter::from_existing(&bf.bloom.to_bytes(), bf.num_items, bf.capacity)
BloomFilter::from_existing(bf.bloom.as_slice(), bf.num_items, bf.capacity)
}
}
Expand Down Expand Up @@ -466,10 +457,6 @@ impl Drop for BloomFilter {
mod tests {
use super::*;
use configs::FIXED_SEED;
use crate::configs::{
FIXED_SIP_KEY_ONE_A, FIXED_SIP_KEY_ONE_B, FIXED_SIP_KEY_TWO_A, FIXED_SIP_KEY_TWO_B,
};
use configs::FIXED_SEED;
use rand::{distributions::Alphanumeric, Rng};

/// Returns random string with specified number of characters.
Expand Down Expand Up @@ -584,8 +571,6 @@ mod tests {
.any(
|filter| (filter.bloom.seed() == restore_filter.bloom.seed())
&& (restore_filter.bloom.seed() == FIXED_SEED)
|filter| (filter.bloom.seed() == restore_filter.bloom.seed())
&& (restore_filter.bloom.seed() == FIXED_SEED)
)));
assert!(restored_bloom_filter_type
.filters
Expand All @@ -602,7 +587,6 @@ mod tests {
.filters
.iter()
.any(|filter| filter.bloom.as_slice() == restore_filter.bloom.as_slice())));
.any(|filter| filter.bloom.as_slice() == restore_filter.bloom.as_slice())));
let (error_count, _) = check_items_exist(
restored_bloom_filter_type,
1,
Expand Down Expand Up @@ -753,8 +737,6 @@ mod tests {
let test_bloom_filter = BloomFilter::new(0.5_f64, 1000_u32);
let seed = test_bloom_filter.bloom.seed();
assert_eq!(seed, FIXED_SEED);
let test_sip_keys = test_bloom_filter.bloom.seed();
assert!(test_sip_keys == FIXED_SEED);
}

#[test]
Expand Down
3 changes: 0 additions & 3 deletions src/wrapper/bloom_callback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ use crate::bloom::utils::BloomFilter;
use crate::bloom::utils::BloomFilterType;
use crate::configs;
use crate::wrapper::digest::Digest;
use crate::configs;
use bit_vec::BitVec;
use bloomfilter::Bloom;
use lazy_static::lazy_static;
use std::ffi::CString;
Expand All @@ -14,7 +12,6 @@ use std::os::raw::{c_char, c_int, c_void};
use std::ptr::null_mut;
use std::sync::atomic::Ordering;
use std::sync::Mutex;
use valkey_module::logging;
use valkey_module::logging::{log_io_error, ValkeyLogLevel};
use valkey_module::raw;
use valkey_module::{RedisModuleDefragCtx, RedisModuleString};
Expand Down
8 changes: 4 additions & 4 deletions src/wrapper/defrag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,28 +13,28 @@ impl Defrag {

/// # Safety
///
/// This function should not be called before the horsemen are ready.
/// This function is temporary and will be removed once implemented in valkeymodule-rs .
pub unsafe fn alloc(&self, ptr: *mut c_void) -> *mut c_void {
unsafe { raw::RedisModule_DefragAlloc.unwrap()(self.defrag_ctx, ptr) }
}

/// # Safety
///
/// This function should not be called before the horsemen are ready.
/// This function sis temporary and will be removed once implemented in valkeymodule-rs .
pub unsafe fn curserset(&self, cursor: u64) -> i32 {
unsafe { raw::RedisModule_DefragCursorSet.unwrap()(self.defrag_ctx, cursor) }
}

/// # Safety
///
/// This function should not be called before the horsemen are ready.
/// This function sis temporary and will be removed once implemented in valkeymodule-rs .
pub unsafe fn curserget(&self, cursor: *mut u64) -> i32 {
unsafe { raw::RedisModule_DefragCursorGet.unwrap()(self.defrag_ctx, cursor) }
}

/// # Safety
///
/// This function should not be called before the horsemen are ready.
/// This function sis temporary and will be removed once implemented in valkeymodule-rs .
pub unsafe fn should_stop_defrag(&self) -> i32 {
unsafe { raw::RedisModule_DefragShouldStop.unwrap()(self.defrag_ctx) }
}
Expand Down
2 changes: 1 addition & 1 deletion src/wrapper/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
pub mod bloom_callback;
pub mod digest;
pub mod defrag;
pub mod digest;
2 changes: 0 additions & 2 deletions tests/test_bloom_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,4 @@ def test_save_and_restore_metrics(self):

# Compare original and loaded scaled bloomfilter infos
new_client = self.server.get_new_client()
# When we scale out in the original it scales the vec by a factor of four. However when we load from an rdb we create an exact sized vec, this means the last
# two 8 byte sized allocations for a vec are no longer in memory so we have 16 less bytes in memory now. Figure out if we want to do this
self.verify_bloom_metrics(new_client.execute_command("INFO bf"), original_info_obj[3] + DEFAULT_BLOOM_FILTER_SIZE, 2, 3, 7501, 21000 + DEFAULT_BLOOM_FILTER_CAPACITY)
4 changes: 1 addition & 3 deletions tests/test_save_and_restore.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,12 @@ def test_basic_save_and_restore(self):
self.server.restart(remove_rdb=False, remove_nodes_conf=False, connect_client=True)

assert self.server.is_alive()
assert uptime_in_sec_1 > uptime_in_sec_2
assert self.server.is_rdb_done_loading()
wait_for_equal(lambda: self.server.is_rdb_done_loading(), True)
restored_server_digest = client.debug_digest()
restored_object_digest = client.execute_command('DEBUG DIGEST-VALUE testSave')
assert restored_server_digest == server_digest
assert restored_object_digest == object_digest
self.server.verify_string_in_logfile("Loading RDB produced by Valkey")
wait_for_equal(lambda: self.server.is_rdb_done_loading(), True)
self.server.verify_string_in_logfile("Done loading RDB, keys loaded: 1, keys expired: 0")

# verify restore results
Expand Down

0 comments on commit d8b41c2

Please sign in to comment.