Skip to content

Commit

Permalink
Fix integration tests
Browse files Browse the repository at this point in the history
  • Loading branch information
G-D-Petrov committed Dec 17, 2024
1 parent d30fe12 commit 0b8d85e
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 31 deletions.
6 changes: 4 additions & 2 deletions python/tests/integration/arcticdb/test_storage_lock.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import pandas as pd
import numpy as np
import pytest
import sys

from arcticdb_ext.tools import ReliableStorageLock, ReliableStorageLockManager
from tests.util.mark import PERSISTENT_STORAGE_TESTS_ENABLED, REAL_S3_TESTS_MARK
Expand All @@ -23,6 +24,7 @@ def slow_increment_task(lib, symbol, sleep_time, lock_manager, lock):
lock_manager.free_lock_guard()


@pytest.mark.skipif(sys.platform == "win32", reason="There is some issue with this test on the windows runner")
@pytest.mark.parametrize("num_processes,max_sleep", [(100, 1), (5, 20)])
@REAL_S3_TESTS_MARK
def test_many_increments(real_s3_version_store, num_processes, max_sleep):
Expand All @@ -31,11 +33,11 @@ def test_many_increments(real_s3_version_store, num_processes, max_sleep):
symbol = "counter"
lib.version_store.force_delete_symbol(symbol)
lib.write(symbol, init_df)
lock = ReliableStorageLock("test_lock", lib._library, 10*one_sec)
lock = ReliableStorageLock("test_lock", lib._library, 10 * one_sec)
lock_manager = ReliableStorageLockManager()

processes = [
Process(target=slow_increment_task, args=(lib, symbol, 0 if i%2==0 else max_sleep, lock_manager, lock))
Process(target=slow_increment_task, args=(lib, symbol, 0 if i % 2 == 0 else max_sleep, lock_manager, lock))
for i in range(num_processes)
]
for p in processes:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1116,7 +1116,7 @@ def test_read_ts(basic_store):

def test_negative_strides(basic_store_tiny_segment):
lmdb_version_store = basic_store_tiny_segment
negative_stride_np = np.array([[1, 2, 3, 4, 5, 6], [7, 8, 9, 10, 11, 12, 13]], np.int32)[::-1]
negative_stride_np = np.array([[1, 2, 3, 4, 5, 6], [7, 8, 9, 10, 11, 12]], np.int32)[::-1]
lmdb_version_store.write("negative_strides", negative_stride_np)
vit = lmdb_version_store.read("negative_strides")
assert_array_equal(negative_stride_np, vit.data)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,68 +25,72 @@
# Uncomment for logging
# set_log_level(default_level="DEBUG", console_output=False, file_output_path="/tmp/arcticdb.log")

def generate_chunk_sizes(number_chunks:np.uint32, min_rows:np.uint32=100, max_rows:np.uint32=10000) -> List[np.uint32]:

def generate_chunk_sizes(
number_chunks: np.uint32, min_rows: np.uint32 = 100, max_rows: np.uint32 = 10000
) -> List[np.uint32]:
return np.random.randint(min_rows, max_rows, number_chunks, dtype=np.uint32)

class Results:

class Results:
def __init__(self):
self.options = None
self.iteration = None
self.number_staged_chunks = 0
self.total_rows_finalized = 0
self.finalization_time = None

def __str__(self):
return f"Options: {self.options}\nIteration: {self.iteration}\n# staged chunks: {self.number_staged_chunks}\ntotal rows finalized: {self.total_rows_finalized}\ntime for finalization (s): {self.finalization_time}"
def __str__(self):
return f"Options: {self.options}\nIteration: {self.iteration}\n# staged chunks: {self.number_staged_chunks}\ntotal rows finalized: {self.total_rows_finalized}\ntime for finalization (s): {self.finalization_time}"


@SLOW_TESTS_MARK
@SKIP_CONDA_MARK # Conda CI runner doesn't have enough storage to perform these stress tests
@SKIP_CONDA_MARK # Conda CI runner doesn't have enough storage to perform these stress tests
@pytest.mark.skipif(sys.platform == "win32", reason="Not enough storage on Windows runners")
def test_finalize_monotonic_unique_chunks(basic_arctic_library):
def test_finalize_monotonic_unique_chunks(arctic_library_lmdb):
"""
The test is designed to staged thousands of chunks with variable chunk size.
To experiment on local computer you can move up to 20k number of chunks approx 10k each
The test is designed to staged thousands of chunks with variable chunk size.
To experiment on local computer you can move up to 20k number of chunks approx 10k each
For stress testing this number is reduced due to github runner HDD size - 16 GB only
For stress testing this number is reduced due to github runner HDD size - 16 GB only
On local disk you must use "arctic_library_lmdb" fixture as it sets 100 GB limit.
If you use "basic_arctic_library" you might end with much more space taken eating all your space
if you want to experiment with more number of chunks
On local disk you must use "arctic_library_lmdb" fixture as it sets 100 GB limit.
If you use "basic_arctic_library" you might end with much more space taken eating all your space
if you want to experiment with more number of chunks
"""

options = [
{"chunks_descending" : True, "finalization_mode" : StagedDataFinalizeMethod.APPEND},
{"chunks_descending" : True, "finalization_mode" : StagedDataFinalizeMethod.WRITE},
{"chunks_descending" : False, "finalization_mode" : StagedDataFinalizeMethod.WRITE},
{"chunks_descending" : False, "finalization_mode" : StagedDataFinalizeMethod.APPEND},
]
{"chunks_descending": True, "finalization_mode": StagedDataFinalizeMethod.APPEND},
{"chunks_descending": True, "finalization_mode": StagedDataFinalizeMethod.WRITE},
{"chunks_descending": False, "finalization_mode": StagedDataFinalizeMethod.WRITE},
{"chunks_descending": False, "finalization_mode": StagedDataFinalizeMethod.APPEND},
]

# Will hold the results after each iteration (instance of Results class)
results = []

lib : Library = basic_arctic_library
lib: Library = arctic_library_lmdb

total_start_time = time.time()

# We would need to generate as fast as possible kind of random
# dataframes. To do that we build a large cache and will
# dataframes. To do that we build a large cache and will
# sample rows from there as we need to run as fast as we can
cachedDF = CachedDFGenerator(250000)

total_number_rows_all_iterations: int = 0

# This will serve us as a counter and at the same time it provides unique index for each row
total_number_rows: TimestampNumber = TimestampNumber(0, cachedDF.TIME_UNIT) # Synchronize index frequency
INITIAL_TIMESTAMP: TimestampNumber = TimestampNumber(0, cachedDF.TIME_UNIT) # Synchronize index frequency
symbol="staged"
total_number_rows: TimestampNumber = TimestampNumber(0, cachedDF.TIME_UNIT) # Synchronize index frequency
INITIAL_TIMESTAMP: TimestampNumber = TimestampNumber(0, cachedDF.TIME_UNIT) # Synchronize index frequency
symbol = "staged"

num_rows_initially = 99999
print(f"Writing to symbol initially {num_rows_initially} rows")
df = cachedDF.generate_dataframe_timestamp_indexed(num_rows_initially, total_number_rows, cachedDF.TIME_UNIT)

cnt = 0
for iter in [500, 1000, 1500, 2000] :
for iter in [500, 1000, 1500, 2000]:
res = Results()

total_number_rows = INITIAL_TIMESTAMP + num_rows_initially
Expand All @@ -98,12 +102,11 @@ def test_finalize_monotonic_unique_chunks(basic_arctic_library):
print(f"Chunks to stage {len(chunk_list)} ")
stage_chunks(lib, symbol, cachedDF, total_number_rows, chunk_list, options[cnt % 4]["chunks_descending"])

if (options[cnt % 4]["finalization_mode"] == StagedDataFinalizeMethod.APPEND):
if options[cnt % 4]["finalization_mode"] == StagedDataFinalizeMethod.APPEND:
total_number_rows = total_number_rows + sum(chunk_list)
else:
total_number_rows = INITIAL_TIMESTAMP + sum(chunk_list)


print("--" * 50)
print(f"STAGED ROWS {total_number_rows.get_value()} after iteration {cnt}")
print(f"SYMBOL ACTUAL ROWS before finalization - {lib._nvs.get_num_rows(symbol)} ")
Expand All @@ -128,13 +131,11 @@ def test_finalize_monotonic_unique_chunks(basic_arctic_library):

results.append(res)

total_number_rows.to_zero() # next iteration start from 0
total_number_rows.to_zero() # next iteration start from 0

for res in results:
print("_" * 100)
print(res)

total_time = time.time() - total_start_time
print("TOTAL TIME: ", total_time)


0 comments on commit 0b8d85e

Please sign in to comment.