Skip to content

Commit

Permalink
Add integration tests for maxmemory scenarios and replication correct…
Browse files Browse the repository at this point in the history
…ness. Fixed others tests and updated the build.sh script for running single integration tests (#16)

Signed-off-by: Karthik Subbarao <[email protected]>
  • Loading branch information
KarthikSubbarao committed Oct 17, 2024
1 parent f5aa39e commit 57a7901
Show file tree
Hide file tree
Showing 7 changed files with 173 additions and 27 deletions.
10 changes: 8 additions & 2 deletions build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ fi
export MODULE_PATH="$SCRIPT_DIR/target/release/libvalkey_bloom.so"

echo "Running the integration tests..."
python3 -m pytest --cache-clear -v "$SCRIPT_DIR/tests/"
# TEST_PATTERN can be used to run specific tests or test patterns.
if [[ -n "$TEST_PATTERN" ]]; then
python3 -m pytest --cache-clear -v "$SCRIPT_DIR/tests/" -k $TEST_PATTERN
else
echo "TEST_PATTERN is not set. Running all integration tests."
python3 -m pytest --cache-clear -v "$SCRIPT_DIR/tests/"
fi

echo "Build and Integration Tests succeeded"
echo "Build, Format Checks, Unit tests, and Integration Tests succeeded"
6 changes: 3 additions & 3 deletions src/bloom/command_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ pub fn bloom_filter_add_value(
&mut add_succeeded,
);
match filter_key.set_value(&BLOOM_FILTER_TYPE, bf) {
Ok(_) => {
Ok(()) => {
replicate_and_notify_events(ctx, filter_name, add_succeeded, true);
response
}
Expand Down Expand Up @@ -262,7 +262,7 @@ pub fn bloom_filter_reserve(ctx: &Context, input_args: &[ValkeyString]) -> Valke
None => {
let bloom = BloomFilterType::new_reserved(fp_rate, capacity, expansion);
match filter_key.set_value(&BLOOM_FILTER_TYPE, bloom) {
Ok(_v) => {
Ok(()) => {
replicate_and_notify_events(ctx, filter_name, false, true);
VALKEY_OK
}
Expand Down Expand Up @@ -373,7 +373,7 @@ pub fn bloom_filter_insert(ctx: &Context, input_args: &[ValkeyString]) -> Valkey
let response =
handle_bloom_add(input_args, argc, idx, &mut bf, true, &mut add_succeeded);
match filter_key.set_value(&BLOOM_FILTER_TYPE, bf) {
Ok(_) => {
Ok(()) => {
replicate_and_notify_events(ctx, filter_name, add_succeeded, true);
response
}
Expand Down
42 changes: 42 additions & 0 deletions tests/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,48 @@ def test_memory_usage_cmd(self):
info_size = client.execute_command('BF.INFO filter SIZE')
assert memory_usage > info_size and info_size > 0

def test_large_allocation_when_below_maxmemory(self):
two_megabytes = 2 * 1024 * 1024
# The command below will result in an allocation greater than 2 MB.
bloom_cmd_large_allocation = 'BF.RESERVE newfilter 0.001 10000000'
client = self.server.get_new_client()
assert client.execute_command("CONFIG SET maxmemory-policy allkeys-lru") == b"OK"
assert client.execute_command("CONFIG SET maxmemory {}".format(two_megabytes)) == b"OK"
used_memory = client.info_obj().used_memory()
maxmemory = client.info_obj().maxmemory()
client.execute_command('BF.ADD filter item1')
new_used_memory = client.info_obj().used_memory()
assert new_used_memory > used_memory and new_used_memory < maxmemory
assert client.execute_command(bloom_cmd_large_allocation) == b"OK"
assert client.execute_command('DBSIZE') < 2
assert client.info("Stats")['evicted_keys'] > 0
used_memory = client.info_obj().used_memory()
assert used_memory < maxmemory
client.execute_command('FLUSHALL')
client.execute_command('BF.ADD filter item1')
assert client.execute_command("CONFIG SET maxmemory-policy volatile-lru") == b"OK"
assert client.execute_command(bloom_cmd_large_allocation) == b"OK"
assert client.execute_command('DBSIZE') == 2
used_memory = client.info_obj().used_memory()
assert used_memory > maxmemory

def test_large_allocation_when_above_maxmemory(self):
client = self.server.get_new_client()
assert client.execute_command("CONFIG SET maxmemory-policy allkeys-lru") == b"OK"
used_memory = client.info_obj().used_memory()
client.execute_command('BF.ADD filter item1')
new_used_memory = client.info_obj().used_memory()
assert new_used_memory > used_memory
# Configure the server to now be over maxmemory with allkeys-lru policy. Test that allocation fails.
assert client.execute_command("CONFIG SET maxmemory {}".format(used_memory)) == b"OK"
bloom_cmd_large_allocation = 'BF.RESERVE newfilter 0.001 10000000'
self.verify_error_response(self.client, bloom_cmd_large_allocation, "command not allowed when used memory > 'maxmemory'.")
assert client.info("Errorstats")['errorstat_OOM']['count'] == 1
# Configure the server to now be over maxmemory with volatile-lru policy. Test that allocation fails.
assert client.execute_command("CONFIG SET maxmemory-policy volatile-lru") == b"OK"
self.verify_error_response(self.client, bloom_cmd_large_allocation, "command not allowed when used memory > 'maxmemory'.")
assert client.info("Errorstats")['errorstat_OOM']['count'] == 2

def test_module_data_type(self):
# Validate the name of the Module data type.
client = self.server.get_new_client()
Expand Down
2 changes: 1 addition & 1 deletion tests/test_bloom_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ def test_bloom_command_behavior(self):
('BF.INSERT TEST_EXPANSION EXPANSION 9 ITEMS ITEM', [1]),
('BF.INSERT TEST_CAPACITY CAPACITY 2000 ITEMS ITEM', [1]),
('BF.INSERT TEST_ITEMS ITEMS 1 2 3 EXPANSION 2', [1, 1, 1, 1, 0]),
('BF.INFO TEST Capacity', 100),
('BF.INFO TEST Capacity', 100000),
('BF.INFO TEST ITEMS', 5),
('BF.INFO TEST filters', 1),
('bf.info TEST expansion', 2),
Expand Down
105 changes: 103 additions & 2 deletions tests/test_replication.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import pytest
from valkey import ResponseError
from valkeytests.valkey_test_case import ReplicationTestCase
from valkeytests.conftest import resource_port_tracker
import os
Expand All @@ -13,8 +14,7 @@ def get_custom_args(self):

def test_replication_success(self):
self.setup_replication(num_replicas=1)
bf_add_result = self.client.execute_command('BF.ADD key item1')
assert bf_add_result == 1
assert self.client.execute_command('BF.ADD key item1') == 1
bf_exists_result = self.client.execute_command('BF.EXISTS key item1')
bf_non_added_exists_result = self.client.execute_command('BF.EXISTS key item2')
bf_info_result = self.client.execute_command('BF.INFO key')
Expand All @@ -26,3 +26,104 @@ def test_replication_success(self):
assert bf_non_added_exists_result == bf_replica_non_added_exists_result
bf_replica_info_result = self.replicas[0].client.execute_command('BF.INFO key')
assert bf_info_result == bf_replica_info_result

def test_replication_behavior(self):
self.setup_replication(num_replicas=1)
# Test replication for write commands.
bloom_write_cmds = [
('BF.ADD', 'BF.ADD key item', 'BF.ADD key item1', 2),
('BF.MADD', 'BF.MADD key item', 'BF.MADD key item1', 2),
('BF.RESERVE', 'BF.RESERVE key 0.001 100000', 'BF.ADD key item1', 1),
('BF.INSERT', 'BF.INSERT key items item', 'BF.INSERT key items item1', 2),
]
for test_case in bloom_write_cmds:
prefix = test_case[0]
create_cmd = test_case[1]
# New bloom object being created is replicated.
self.client.execute_command(create_cmd)
assert self.client.execute_command('EXISTS key') == 1
self.waitForReplicaToSyncUp(self.replicas[0])
assert self.replicas[0].client.execute_command('EXISTS key') == 1

# New item added to an existing bloom is replicated.
item_add_cmd = test_case[2]
self.client.execute_command(item_add_cmd)
assert self.client.execute_command('BF.EXISTS key item1') == 1
self.waitForReplicaToSyncUp(self.replicas[0])
assert self.replicas[0].client.execute_command('BF.EXISTS key item1') == 1

# Validate that the bloom object creation command and item add command was replicated.
expected_calls = test_case[3]
primary_cmd_stats = self.client.info("Commandstats")['cmdstat_' + prefix]
replica_cmd_stats = self.replicas[0].client.info("Commandstats")['cmdstat_' + prefix]
assert primary_cmd_stats["calls"] == expected_calls and replica_cmd_stats["calls"] == expected_calls

# Attempting to add an existing item to an existing bloom will NOT replicated.
self.client.execute_command(item_add_cmd)
self.waitForReplicaToSyncUp(self.replicas[0])
primary_cmd_stats = self.client.info("Commandstats")
replica_cmd_stats = self.replicas[0].client.info("Commandstats")
if prefix == 'BF.RESERVE':
assert primary_cmd_stats['cmdstat_' + prefix]["calls"] == 1 and replica_cmd_stats['cmdstat_' + prefix]["calls"] == 1
assert primary_cmd_stats['cmdstat_BF.ADD']["calls"] == 2 and replica_cmd_stats['cmdstat_BF.ADD']["calls"] == 1
else:
assert primary_cmd_stats['cmdstat_' + prefix]["calls"] == (expected_calls + 1) and replica_cmd_stats['cmdstat_' + prefix]["calls"] == expected_calls
self.client.execute_command('FLUSHALL')
self.waitForReplicaToSyncUp(self.replicas[0])
self.client.execute_command('CONFIG RESETSTAT')
self.replicas[0].client.execute_command('CONFIG RESETSTAT')

self.client.execute_command('BF.ADD key item1')
self.waitForReplicaToSyncUp(self.replicas[0])

# Read commands executed on the primary will not be replicated.
read_commands = [
('BF.EXISTS', 'BF.EXISTS key item1', 1),
('BF.MEXISTS', 'BF.MEXISTS key item1 item2', 1),
('BF.INFO', 'BF.INFO key', 1),
('BF.INFO', 'BF.INFO key Capacity', 2),
('BF.INFO', 'BF.INFO key ITEMS', 3),
('BF.INFO', 'BF.INFO key filters', 4),
('BF.INFO', 'BF.INFO key size', 5),
('BF.INFO', 'BF.INFO key expansion', 6),
('BF.CARD', 'BF.CARD key', 1)
]
for test_case in read_commands:
prefix = test_case[0]
cmd = test_case[1]
expected_primary_calls = test_case[2]
self.client.execute_command(cmd)
primary_cmd_stats = self.client.info("Commandstats")['cmdstat_' + prefix]
assert primary_cmd_stats["calls"] == expected_primary_calls
assert ('cmdstat_' + prefix) not in self.replicas[0].client.info("Commandstats")

# Deletes of bloom objects are replicated
assert self.client.execute_command("EXISTS key") == 1
assert self.replicas[0].client.execute_command('EXISTS key') == 1
assert self.client.execute_command("DEL key") == 1
self.waitForReplicaToSyncUp(self.replicas[0])
assert self.client.execute_command("EXISTS key") == 0
assert self.replicas[0].client.execute_command('EXISTS key') == 0

self.client.execute_command('CONFIG RESETSTAT')
self.replicas[0].client.execute_command('CONFIG RESETSTAT')

# Write commands with errors are not replicated.
invalid_bloom_write_cmds = [
('BF.ADD', 'BF.ADD key item1 item2'),
('BF.MADD', 'BF.MADD key'),
('BF.RESERVE', 'BF.RESERVE key 1.001 100000'),
('BF.INSERT', 'BF.INSERT key CAPACITY 0 items item'),
]
for test_case in invalid_bloom_write_cmds:
prefix = test_case[0]
cmd = test_case[1]
try:
self.client.execute_command(cmd)
assert False
except ResponseError as e:
pass
primary_cmd_stats = self.client.info("Commandstats")['cmdstat_' + prefix]
assert primary_cmd_stats["calls"] == 1
assert primary_cmd_stats["failed_calls"] == 1
assert ('cmdstat_' + prefix) not in self.replicas[0].client.info("Commandstats")
29 changes: 10 additions & 19 deletions tests/valkey_bloom_test_case.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,29 +22,20 @@ def verify_error_response(self, client, cmd, expected_err_reply):
assert str(e) == expected_err_reply, assert_error_msg

def verify_command_success_reply(self, client, cmd, expected_result):
try:
cmd_actual_result = client.execute_command(cmd)
assert_error_msg = f"Actual command response '{cmd_actual_result}' is different from expected response '{expected_result}'"
assert cmd_actual_result == expected_result, assert_error_msg
except:
print("Something went wrong in command behavior verification")
cmd_actual_result = client.execute_command(cmd)
assert_error_msg = f"Actual command response '{cmd_actual_result}' is different from expected response '{expected_result}'"
assert cmd_actual_result == expected_result, assert_error_msg

def verify_bloom_filter_item_existence(self, client, key, value, should_exist=True):
try:
if should_exist:
assert client.execute_command(f'BF.EXISTS {key} {value}') == 1, f"Item {key} {value} doesn't exist"
else:
assert client.execute_command(f'BF.EXISTS {key} {value}') == 0, f"Item {key} {value} exists"
except:
print("Something went wrong in bloom filter item existence verification")
if should_exist:
assert client.execute_command(f'BF.EXISTS {key} {value}') == 1, f"Item {key} {value} doesn't exist"
else:
assert client.execute_command(f'BF.EXISTS {key} {value}') == 0, f"Item {key} {value} exists"

def verify_server_key_count(self, client, expected_num_keys):
try:
actual_num_keys = client.num_keys()
assert_num_key_error_msg = f"Actual key number {actual_num_keys} is different from expected key number {expected_num_key}"
assert actual_num_keys == expected_num_keys, assert_num_key_error_msg
except:
print("Something went wrong in key number verification")
actual_num_keys = client.info_obj().num_keys()
assert_num_key_error_msg = f"Actual key number {actual_num_keys} is different from expected key number {expected_num_keys}"
assert actual_num_keys == expected_num_keys, assert_num_key_error_msg

def create_bloom_filters_and_add_items(self, client, number_of_bf=5):
""" Creates the specified number of bloom filter objects (`number_of_bf`) and adds an item to it named FOO.
Expand Down
6 changes: 6 additions & 0 deletions tests/valkeytests/valkey_test_case.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,12 @@ def was_save_successful(self):
def used_memory(self):
return self.info['used_memory']

def maxmemory(self):
return self.info['maxmemory']

def maxmemory_policy(self):
return self.info['maxmemory_policy']

def uptime_in_secs(self):
return self.info['uptime_in_seconds']

Expand Down

0 comments on commit 57a7901

Please sign in to comment.