Skip to content

Commit

Permalink
addressed comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Georgi Rusev authored and Georgi Rusev committed Dec 16, 2024
1 parent 83a058c commit ea54da2
Showing 1 changed file with 45 additions and 32 deletions.
77 changes: 45 additions & 32 deletions python/tests/stress/arcticdb/version_store/test_mem_leaks.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0.
"""

import logging
import sys
import time
import psutil
Expand All @@ -28,6 +29,11 @@
from arcticdb.version_store._store import NativeVersionStore
from tests.util.mark import MACOS, WINDOWS, MEMRAY_SUPPORTED, MEMRAY_TESTS_MARK, SKIP_CONDA_MARK


logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("Memory_tests")


#region HELPER functions for non-memray tests

def nice_bytes_str(bytes):
Expand Down Expand Up @@ -154,12 +160,13 @@ def grow_exp(df_to_grow: pd.DataFrame, num_times_xx2: int):
df_to_grow = pd.concat([df_to_grow, df_prev])
return df_to_grow


def generate_big_dataframe(rows:int=1000000):
print("Generating big dataframe")
logger.info("Generating big dataframe")
st = time.time()
df = get_sample_dataframe(rows)
df = grow_exp(df, 5)
print("Generation took :", time.time() - st)
logger.info(f"Generation took : {time.time() - st}")
return df

#endregion
Expand All @@ -174,23 +181,28 @@ def construct_df_querybuilder_tests(size: int) -> pd.DataFrame:
return df


def query_group() -> QueryBuilder:
def query_groupby_with_aggregations() -> QueryBuilder:
"""
groupby composite aggregation query for QueryBuilder memory tests.
The query basically will do aggregation of half of dataframe
groupby composite aggregation query for QueryBuilder memory tests.
The query basically will do aggregation of half of dataframe
"""
q = QueryBuilder()
return (
q[q["bool"] == True]
q[q["bool"]]
.groupby("uint8")
.agg({"uint32": "mean", "int32": "sum", "strings": "count", "float64": "sum", "float32": "min", "int16": "max"})
.agg({"uint32": "mean",
"int32": "sum",
"strings": "count",
"float64": "sum",
"float32": "min",
"int16": "max"})
)


def query_group_bigger_result() -> QueryBuilder:
def query_group_with_aggregations_bigger_result() -> QueryBuilder:
"""
groupby composite aggregation query for QueryBuilder memory tests.
The query basically will do aggregation of whole dataframe
groupby composite aggregation query for QueryBuilder memory tests.
The query basically will do aggregation of whole dataframe
"""
q = QueryBuilder()
return (
Expand All @@ -205,14 +217,14 @@ def query_group_bigger_result() -> QueryBuilder:
)


def query_apply(str:str) -> QueryBuilder:
def query_apply(strng:str) -> QueryBuilder:
"""
Apply query for QueryBuilder memory tests.
This version of apply does couple of nested operations
with columns that were just added
"""
q = QueryBuilder()
q = q[q["strings"] != str].apply("NEW 1", q["uint32"] * q["int32"] / q["float32"])
q = q[q["strings"] != strng].apply("NEW 1", q["uint32"] * q["int32"] / q["float32"])
q = q.apply("NEW 2", q["float32"] + q["float64"] + q["NEW 1"])
q = q.apply("NEW 3", q["NEW 2"] - q["NEW 1"] + q["timestamp"])
return q
Expand All @@ -235,6 +247,7 @@ def query_resample() -> QueryBuilder:
"bool" : "sum"}
)


def query_row_range(size:int) -> QueryBuilder:
"""
Row range query for QueryBuilder memory tests
Expand All @@ -259,8 +272,8 @@ def query_date_range(start:pd.Timestamp, end: pd.Timestamp) -> QueryBuilder:


def print_info(data: pd.DataFrame, q: QueryBuilder):
print("Query", q)
print("Size of DF returned by arctic:", data.shape[0], " columns:", data.shape[1])
logger.info(f"Query {q}")
logger.info(f"Size of DF returned by arctic:{data.shape[0]} columns: {data.shape[1]}")


def gen_random_date(start:pd.Timestamp, end: pd.Timestamp):
Expand Down Expand Up @@ -289,7 +302,6 @@ def proc_to_examine():
del data

df = lib.read(symbol).data
print(df)
del df

"""
Expand Down Expand Up @@ -336,11 +348,11 @@ def test_mem_leak_querybuilder_standard(arctic_library_lmdb):
del df
gc.collect()

queries = [query_group(), query_group_bigger_result(), query_apply(random_string(10)), query_resample()]
queries = [query_groupby_with_aggregations(), query_group_with_aggregations_bigger_result(), query_apply(random_string(10)), query_resample()]

def proc_to_examine():
queries = [query_group(),
query_group_bigger_result(),
queries = [query_groupby_with_aggregations(),
query_group_with_aggregations_bigger_result(),
query_apply(random_string(10)),
query_resample(),
query_row_range(size),
Expand Down Expand Up @@ -374,7 +386,6 @@ def proc_to_examine():
del data

df = lib.read(symbol).data
print(df)
del df

"""
Expand All @@ -397,9 +408,12 @@ def proc_to_examine():
@pytest.fixture
def library_with_symbol(arctic_library_lmdb) -> Generator[Tuple[Library, pd.DataFrame], None, None]:
"""
As memray instruments memory, we need to take out
everything not relevant from mem leak measurement out of
test, so it works as less as possible
As memray instruments memory, we need to take out
everything not relevant from mem leak measurement out of
test, and place it in setup of the test - in other words in the fixture
Otherwise, memray instruments that code and it results in much slower execution
as well as mixing results of memory leaks - are they in what we test - for example
read() or in construct_df_querybuilder_tests() ? or in other code?
"""
lib: Library = arctic_library_lmdb
df = construct_df_querybuilder_tests(size=2000000)
Expand All @@ -424,16 +438,16 @@ def mem_query(lib: Library, df: pd.DataFrame, num_repetitions:int=1, read_batch:

queries = []

queries = [query_group(),
query_group_bigger_result(),
queries = [query_groupby_with_aggregations(),
query_group_with_aggregations_bigger_result(),
query_apply(random_string(10)),
query_resample(),
query_row_range(size),
query_date_range(start_date, end_date)]

for rep in range(num_repetitions):
if (read_batch):
print ("RUN read_batch() tests")
logger.info("RUN read_batch() tests")
read_requests = [ReadRequest(symbol=symbol,
query_builder=query
) for query in queries]
Expand All @@ -445,7 +459,7 @@ def mem_query(lib: Library, df: pd.DataFrame, num_repetitions:int=1, read_batch:
cnt += 1
del read_requests, results_read
else:
print ("RUN read_batch() tests")
logger.info("RUN read_batch() tests")
for q in queries:
data: pd.DataFrame = lib.read(symbol, query_builder=q).data
lib.read_batch
Expand Down Expand Up @@ -555,7 +569,7 @@ def library_with_big_symbol(arctic_library_lmdb) -> Generator[Library, None, Non
test, so it works as less as possible
"""
lib: Library = arctic_library_lmdb
df = generate_big_dataframe(300000)
df: pd.DataFrame = generate_big_dataframe(300000)
lib.write(symbol, df)
del df
yield lib
Expand All @@ -569,15 +583,14 @@ def test_mem_leak_read_all_arctic_lib_memray(library_with_big_symbol):
big dataframe in memory
"""

print("Test starting")
logger.info("Test starting")
st = time.time()
lib: Library = library_with_big_symbol
data = lib.read(symbol).data
data : pd.DataFrame = lib.read(symbol).data
del data
print("Test took :", time.time() - st)
logger.info(f"Test took : {time.time() - st}")

print("Sleeping for 10 secs")
logger.info("Sleeping for 10 secs")
gc.collect()
time.sleep(10)

#endregion

0 comments on commit ea54da2

Please sign in to comment.