Skip to content

Commit

Permalink
Replication support + Update Module/Datatype name + Refactor
Browse files Browse the repository at this point in the history
Signed-off-by: KarthikSubbarao <[email protected]>
  • Loading branch information
KarthikSubbarao committed Aug 23, 2024
1 parent 87bcce5 commit c87fd97
Show file tree
Hide file tree
Showing 8 changed files with 94 additions and 67 deletions.
96 changes: 62 additions & 34 deletions src/commands/bloom.rs → src/bloom/command_handler.rs
Original file line number Diff line number Diff line change
@@ -1,35 +1,58 @@
use crate::bloom_config;
use crate::bloom_config::BLOOM_EXPANSION_MAX;
use crate::bloom_config::BLOOM_MAX_ITEM_COUNT_MAX;
use crate::commands::bloom_data_type::BLOOM_FILTER_TYPE;
use crate::commands::bloom_util::{BloomFilterType, ERROR};
use crate::bloom::data_type::BLOOM_FILTER_TYPE;
use crate::bloom::utils::{BloomFilterType, ERROR};
use crate::configs;
use crate::configs::BLOOM_CAPACITY_MAX;
use crate::configs::BLOOM_EXPANSION_MAX;
use std::sync::atomic::Ordering;
use valkey_module::{Context, ValkeyError, ValkeyResult, ValkeyString, ValkeyValue, VALKEY_OK};

// TODO: Replace string literals in error messages with static


fn bloom_single_add_helper(item: &[u8], bf: &mut BloomFilterType) -> Result<ValkeyValue, ValkeyError> {
fn bloom_single_add_helper(
ctx: &Context,
item: &[u8],
bf: &mut BloomFilterType,
replicate_on_success: bool,
) -> Result<ValkeyValue, ValkeyError> {
match bf.add_item(item) {
Ok(result) => Ok(ValkeyValue::Integer(result)),
Ok(result) => {
if replicate_on_success && result == 1 {
ctx.replicate_verbatim();
}
Ok(ValkeyValue::Integer(result))
}
Err(err) => Err(ValkeyError::Str(err.as_str())),
}
}

fn bloom_multi_add_helper(args: &[ValkeyString], argc: usize, skip_idx: usize, bf: &mut BloomFilterType) -> Vec<ValkeyValue> {
fn bloom_multi_add_helper(
ctx: &Context,
args: &[ValkeyString],
argc: usize,
skip_idx: usize,
bf: &mut BloomFilterType,
replicate_on_success: bool,
) -> Result<ValkeyValue, ValkeyError> {
let mut result = Vec::new();
let mut write_operation = false;
for item in args.iter().take(argc).skip(skip_idx) {
match bf.add_item(item.as_slice()) {
Ok(add_result) => {
if add_result == 1 {
write_operation = true;
}
result.push(ValkeyValue::Integer(add_result));
},
}
Err(err) => {
result.push(ValkeyValue::StaticError(err.as_str()));
return result;
continue;
}
};
}
result
if replicate_on_success && write_operation {
ctx.replicate_verbatim();
}
Ok(ValkeyValue::Array(result))
}

pub fn bloom_filter_add_value(
Expand Down Expand Up @@ -57,27 +80,28 @@ pub fn bloom_filter_add_value(
Some(bf) => {
if !multi {
let item = input_args[curr_cmd_idx].as_slice();
return bloom_single_add_helper(item, bf);
return bloom_single_add_helper(ctx, item, bf, true);
}
Ok(ValkeyValue::Array(bloom_multi_add_helper(input_args, argc, curr_cmd_idx, bf)))
bloom_multi_add_helper(ctx, input_args, argc, curr_cmd_idx, bf, true)
}
None => {
// Instantiate empty bloom filter.
let fp_rate = bloom_config::BLOOM_FP_RATE_DEFAULT;
let capacity = bloom_config::BLOOM_MAX_ITEM_COUNT.load(Ordering::Relaxed) as u32;
let expansion = bloom_config::BLOOM_EXPANSION.load(Ordering::Relaxed) as u32;
let fp_rate = configs::BLOOM_FP_RATE_DEFAULT;
let capacity = configs::BLOOM_CAPACITY.load(Ordering::Relaxed) as u32;
let expansion = configs::BLOOM_EXPANSION.load(Ordering::Relaxed) as u32;
let mut bf = BloomFilterType::new_reserved(fp_rate, capacity, expansion);
let result = match multi {
true => {
Ok(ValkeyValue::Array(bloom_multi_add_helper(input_args, argc, curr_cmd_idx, &mut bf)))
}
true => bloom_multi_add_helper(ctx, input_args, argc, curr_cmd_idx, &mut bf, false),
false => {
let item = input_args[curr_cmd_idx].as_slice();
return bloom_single_add_helper(item, &mut bf);
bloom_single_add_helper(ctx, item, &mut bf, false)
}
};
match filter_key.set_value(&BLOOM_FILTER_TYPE, bf) {
Ok(_) => result,
Ok(_) => {
ctx.replicate_verbatim();
result
}
Err(_) => Err(ValkeyError::Str(ERROR)),
}
}
Expand Down Expand Up @@ -170,7 +194,7 @@ pub fn bloom_filter_reserve(ctx: &Context, input_args: &[ValkeyString]) -> Valke
curr_cmd_idx += 1;
// Parse the capacity
let capacity = match input_args[curr_cmd_idx].to_string_lossy().parse::<u32>() {
Ok(num) if num > 0 && num < BLOOM_MAX_ITEM_COUNT_MAX => num,
Ok(num) if num > 0 && num < BLOOM_CAPACITY_MAX => num,
Ok(0) => {
return Err(ValkeyError::Str("ERR (capacity should be larger than 0)"));
}
Expand All @@ -179,7 +203,7 @@ pub fn bloom_filter_reserve(ctx: &Context, input_args: &[ValkeyString]) -> Valke
}
};
curr_cmd_idx += 1;
let mut expansion = bloom_config::BLOOM_EXPANSION.load(Ordering::Relaxed) as u32;
let mut expansion = configs::BLOOM_EXPANSION.load(Ordering::Relaxed) as u32;
if argc > 4 {
match input_args[curr_cmd_idx]
.to_string_lossy()
Expand Down Expand Up @@ -216,7 +240,10 @@ pub fn bloom_filter_reserve(ctx: &Context, input_args: &[ValkeyString]) -> Valke
None => {
let bloom = BloomFilterType::new_reserved(fp_rate, capacity, expansion);
match filter_key.set_value(&BLOOM_FILTER_TYPE, bloom) {
Ok(_v) => VALKEY_OK,
Ok(_v) => {
ctx.replicate_verbatim();
VALKEY_OK
}
Err(_) => Err(ValkeyError::Str(ERROR)),
}
}
Expand All @@ -233,9 +260,9 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
// Parse the filter name
let filter_name = &input_args[idx];
idx += 1;
let mut fp_rate = bloom_config::BLOOM_FP_RATE_DEFAULT;
let mut capacity = bloom_config::BLOOM_MAX_ITEM_COUNT.load(Ordering::Relaxed) as u32;
let mut expansion = bloom_config::BLOOM_EXPANSION.load(Ordering::Relaxed) as u32;
let mut fp_rate = configs::BLOOM_FP_RATE_DEFAULT;
let mut capacity = configs::BLOOM_CAPACITY.load(Ordering::Relaxed) as u32;
let mut expansion = configs::BLOOM_EXPANSION.load(Ordering::Relaxed) as u32;
let mut nocreate = false;
while idx < argc {
match input_args[idx].to_string_lossy().to_uppercase().as_str() {
Expand All @@ -254,7 +281,7 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
"CAPACITY" if idx < (argc - 1) => {
idx += 1;
capacity = match input_args[idx].to_string_lossy().parse::<u32>() {
Ok(num) if num > 0 && num < BLOOM_MAX_ITEM_COUNT_MAX => num,
Ok(num) if num > 0 && num < BLOOM_CAPACITY_MAX => num,
Ok(0) => {
return Err(ValkeyError::Str("ERR (capacity should be larger than 0)"));
}
Expand Down Expand Up @@ -297,17 +324,18 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
}
};
match value {
Some(bf) => {
Ok(ValkeyValue::Array(bloom_multi_add_helper(input_args, argc, idx, bf)))
}
Some(bf) => bloom_multi_add_helper(ctx, input_args, argc, idx, bf, true),
None => {
if nocreate {
return Err(ValkeyError::Str("ERR not found"));
}
let mut bf = BloomFilterType::new_reserved(fp_rate, capacity, expansion);
let result = bloom_multi_add_helper(input_args, argc, idx, &mut bf);
let result = bloom_multi_add_helper(ctx, input_args, argc, idx, &mut bf, false);
match filter_key.set_value(&BLOOM_FILTER_TYPE, bf) {
Ok(_) => Ok(ValkeyValue::Array(result)),
Ok(_) => {
ctx.replicate_verbatim();
result
}
Err(_) => Err(ValkeyError::Str(ERROR)),
}
}
Expand Down
6 changes: 3 additions & 3 deletions src/commands/bloom_data_type.rs → src/bloom/data_type.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::commands::bloom_util::BloomFilter;
use crate::commands::bloom_util::BloomFilterType;
use crate::bloom::utils::BloomFilter;
use crate::bloom::utils::BloomFilterType;
use crate::wrapper::bloom_callback;
use crate::MODULE_NAME;
use std::os::raw::c_int;
Expand All @@ -9,7 +9,7 @@ use valkey_module::{logging, raw};
const BLOOM_FILTER_TYPE_ENCODING_VERSION: i32 = 0;

pub static BLOOM_FILTER_TYPE: ValkeyType = ValkeyType::new(
"bloomtype",
"bloom----",
BLOOM_FILTER_TYPE_ENCODING_VERSION,
raw::RedisModuleTypeMethods {
version: raw::REDISMODULE_TYPE_METHOD_VERSION as u64,
Expand Down
3 changes: 3 additions & 0 deletions src/bloom/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pub mod command_handler;
pub mod data_type;
pub mod utils;
7 changes: 3 additions & 4 deletions src/commands/bloom_util.rs → src/bloom/utils.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::configs::TIGHTENING_RATIO;
use bloomfilter;
use crate::bloom_config::TIGHTENING_RATIO;

/// Constants
pub const ERROR: &str = "ERROR";
Expand Down Expand Up @@ -98,7 +98,7 @@ impl BloomFilterType {

/// Add an item to the BloomFilterType object.
/// If scaling is enabled, this can result in a new sub filter creation.
pub fn add_item(&mut self, item: &[u8]) -> Result<i64, BloomError> {
pub fn add_item(&mut self, item: &[u8]) -> Result<i64, BloomError> {
// Check if item exists already.
if self.item_exists(item) {
return Ok(0);
Expand Down Expand Up @@ -173,8 +173,7 @@ impl BloomFilter {
}

pub fn number_of_bytes(&self) -> usize {
std::mem::size_of::<BloomFilter>()
+ (self.bloom.number_of_bits() / 8) as usize
std::mem::size_of::<BloomFilter>() + (self.bloom.number_of_bits() / 8) as usize
}

pub fn check(&self, item: &[u8]) -> bool {
Expand Down
3 changes: 0 additions & 3 deletions src/commands/mod.rs

This file was deleted.

8 changes: 4 additions & 4 deletions src/bloom_config.rs → src/configs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,16 @@ use lazy_static::lazy_static;
use std::sync::atomic::AtomicI64;

/// Configurations
pub const BLOOM_MAX_ITEM_COUNT_DEFAULT: i64 = 100000;
pub const BLOOM_MAX_ITEM_COUNT_MIN: i64 = 1;
pub const BLOOM_MAX_ITEM_COUNT_MAX: u32 = u32::MAX;
pub const BLOOM_CAPACITY_DEFAULT: i64 = 100000;
pub const BLOOM_CAPACITY_MIN: i64 = 1;
pub const BLOOM_CAPACITY_MAX: u32 = u32::MAX;

pub const BLOOM_EXPANSION_DEFAULT: i64 = 2;
pub const BLOOM_EXPANSION_MIN: i64 = 1;
pub const BLOOM_EXPANSION_MAX: u32 = 10;

lazy_static! {
pub static ref BLOOM_MAX_ITEM_COUNT: AtomicI64 = AtomicI64::new(BLOOM_MAX_ITEM_COUNT_DEFAULT);
pub static ref BLOOM_CAPACITY: AtomicI64 = AtomicI64::new(BLOOM_CAPACITY_DEFAULT);
pub static ref BLOOM_EXPANSION: AtomicI64 = AtomicI64::new(BLOOM_EXPANSION_DEFAULT);
}

Expand Down
30 changes: 15 additions & 15 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use valkey_module::configuration::ConfigurationFlags;
use valkey_module::{valkey_module, Context, Status, ValkeyResult, ValkeyString};
pub mod bloom_config;
pub mod commands;
pub mod bloom;
pub mod configs;
pub mod wrapper;
use crate::commands::bloom;
use crate::commands::bloom_data_type::BLOOM_FILTER_TYPE;
use crate::bloom::command_handler;
use crate::bloom::data_type::BLOOM_FILTER_TYPE;

pub const MODULE_NAME: &str = "bloom";
pub const MODULE_NAME: &str = "bf";

fn initialize(_ctx: &Context, _args: &[ValkeyString]) -> Status {
Status::Ok
Expand All @@ -18,43 +18,43 @@ fn deinitialize(_ctx: &Context) -> Status {

/// Command handler for BF.EXISTS <key> <item>
fn bloom_exists_command(ctx: &Context, args: Vec<ValkeyString>) -> ValkeyResult {
bloom::bloom_filter_exists(ctx, &args, false)
command_handler::bloom_filter_exists(ctx, &args, false)
}

/// Command handler for BF.MEXISTS <key> <item> [<item> ...]
fn bloom_mexists_command(ctx: &Context, args: Vec<ValkeyString>) -> ValkeyResult {
bloom::bloom_filter_exists(ctx, &args, true)
command_handler::bloom_filter_exists(ctx, &args, true)
}

/// Command handler for BF.ADD <key> <item>
fn bloom_add_command(ctx: &Context, args: Vec<ValkeyString>) -> ValkeyResult {
bloom::bloom_filter_add_value(ctx, &args, false)
command_handler::bloom_filter_add_value(ctx, &args, false)
}

/// Command handler for BF.MADD <key> <item> [<item> ...]
fn bloom_madd_command(ctx: &Context, args: Vec<ValkeyString>) -> ValkeyResult {
bloom::bloom_filter_add_value(ctx, &args, true)
command_handler::bloom_filter_add_value(ctx, &args, true)
}

/// Command handler for BF.CARD <key>
fn bloom_card_command(ctx: &Context, args: Vec<ValkeyString>) -> ValkeyResult {
bloom::bloom_filter_card(ctx, &args)
command_handler::bloom_filter_card(ctx, &args)
}

/// Command handler for BF.RESERVE <key> <false_positive_rate> <capacity> [EXPANSION <expansion>] | [NONSCALING]
fn bloom_reserve_command(ctx: &Context, args: Vec<ValkeyString>) -> ValkeyResult {
bloom::bloom_filter_reserve(ctx, &args)
command_handler::bloom_filter_reserve(ctx, &args)
}

/// Command handler for BF.INFO <key> [CAPACITY | SIZE | FILTERS | ITEMS | EXPANSION]
fn bloom_info_command(ctx: &Context, args: Vec<ValkeyString>) -> ValkeyResult {
bloom::bloom_filter_info(ctx, &args)
command_handler::bloom_filter_info(ctx, &args)
}

/// Command handler for:
/// BF.INSERT <key> [ERROR <fp_error>] [CAPACITY <capacity>] [EXPANSION <expansion>] [NOCREATE] [NONSCALING] ITEMS <item> [<item> ...]
fn bloom_insert_command(ctx: &Context, args: Vec<ValkeyString>) -> ValkeyResult {
bloom::bloom_filter_insert(ctx, &args)
command_handler::bloom_filter_insert(ctx, &args)
}

//////////////////////////////////////////////////////
Expand All @@ -80,8 +80,8 @@ valkey_module! {
],
configurations: [
i64: [
["bloom_max_item_size", &*bloom_config::BLOOM_MAX_ITEM_COUNT, bloom_config::BLOOM_MAX_ITEM_COUNT_DEFAULT, bloom_config::BLOOM_MAX_ITEM_COUNT_MIN, bloom_config::BLOOM_MAX_ITEM_COUNT_MAX as i64, ConfigurationFlags::DEFAULT, None],
["bloom_expansion_rate", &*bloom_config::BLOOM_EXPANSION, bloom_config::BLOOM_EXPANSION_DEFAULT, bloom_config::BLOOM_EXPANSION_MIN, bloom_config::BLOOM_EXPANSION_MAX as i64, ConfigurationFlags::DEFAULT, None],
["bloom_max_item_size", &*configs::BLOOM_CAPACITY, configs::BLOOM_CAPACITY_DEFAULT, configs::BLOOM_CAPACITY_MIN, configs::BLOOM_CAPACITY_MAX as i64, ConfigurationFlags::DEFAULT, None],
["bloom_expansion_rate", &*configs::BLOOM_EXPANSION, configs::BLOOM_EXPANSION_DEFAULT, configs::BLOOM_EXPANSION_MIN, configs::BLOOM_EXPANSION_MAX as i64, ConfigurationFlags::DEFAULT, None],
],
string: [
],
Expand Down
8 changes: 4 additions & 4 deletions src/wrapper/bloom_callback.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::commands::bloom_data_type;
use crate::commands::bloom_util::BloomFilterType;
use crate::bloom;
use crate::bloom::utils::BloomFilterType;
use std::os::raw::{c_char, c_int, c_void};
use std::ptr::null_mut;
use valkey_module::raw;
Expand Down Expand Up @@ -41,7 +41,7 @@ pub unsafe extern "C" fn bloom_rdb_load(
rdb: *mut raw::RedisModuleIO,
encver: c_int,
) -> *mut c_void {
if let Some(item) = bloom_data_type::bloom_rdb_load_data_object(rdb, encver) {
if let Some(item) = bloom::data_type::bloom_rdb_load_data_object(rdb, encver) {
let bb = Box::new(item);
Box::into_raw(bb).cast::<libc::c_void>()
} else {
Expand All @@ -56,7 +56,7 @@ pub unsafe extern "C" fn bloom_aux_load(
_encver: c_int,
_when: c_int,
) -> c_int {
bloom_data_type::bloom_rdb_aux_load(rdb)
bloom::data_type::bloom_rdb_aux_load(rdb)
}

/// # Safety
Expand Down

0 comments on commit c87fd97

Please sign in to comment.