Skip to content

Commit

Permalink
Finalize stage data tests (#2015)
Browse files Browse the repository at this point in the history
#### Reference Issues/PRs
<!--Example: Fixes #1234. See also #3456.-->

A stress test to check finilize_staged_data() under stress - iterating
through rounds of different finilization setups with different sizes of
chunked data. Perhaps the test will take 1-2hrs. Overall should stretch
the product and examine its weks points like leaking memories etc.

ASV test for time and peak memory to benchmark the performance of
finilize_staged_data() NOTE: The test will be moved to a different PR
after the approval of this PR, and removed from here, as ASV cannot
produce results when dependent modules of the tests are not avail in
repo. In this PR there are 2 utility modules which are used both by
stress and the ASV test test.py and utils.py.

Added several functional tests

#### What does this implement or fix?

#### Any other comments?

#### Checklist

<details>
  <summary>
   Checklist for code changes...
  </summary>
 
- [ ] Have you updated the relevant docstrings, documentation and
copyright notice?
- [ ] Is this contribution tested against [all ArcticDB's
features](../docs/mkdocs/docs/technical/contributing.md)?
- [ ] Do all exceptions introduced raise appropriate [error
messages](https://docs.arcticdb.io/error_messages/)?
 - [ ] Are API changes highlighted in the PR description?
- [ ] Is the PR labelled as enhancement or bug so it appears in
autogenerated release notes?
</details>

<!--
Thanks for contributing a Pull Request to ArcticDB! Please ensure you
have taken a look at:
- ArcticDB's Code of Conduct:
https://github.com/man-group/ArcticDB/blob/master/CODE_OF_CONDUCT.md
- ArcticDB's Contribution Licensing:
https://github.com/man-group/ArcticDB/blob/master/docs/mkdocs/docs/technical/contributing.md#contribution-licensing
-->

---------

Co-authored-by: Georgi Rusev <Georgi Rusev>
  • Loading branch information
grusev authored Dec 10, 2024
1 parent 2a6b3cd commit 4c12a0b
Show file tree
Hide file tree
Showing 9 changed files with 1,128 additions and 10 deletions.
84 changes: 84 additions & 0 deletions python/.asv/results/benchmarks.json
Original file line number Diff line number Diff line change
Expand Up @@ -914,6 +914,90 @@
"version": "c746faf05e4dbb872efa770cbe5ae057dafe3ecc1fb8969d1026db2dee7bfd99",
"warmup_time": -1
},
"finalize_staged_data.FinalizeStagedData.peakmem_finalize_staged_data": {
"code": "class FinalizeStagedData:\n def peakmem_finalize_staged_data(self, cache:CachedDFGenerator, param:int):\n print(\">>> Library:\", self.lib)\n print(\">>> Symbol:\", self.symbol)\n self.lib.finalize_staged_data(self.symbol, mode=StagedDataFinalizeMethod.WRITE)\n\n def setup(self, cache:CachedDFGenerator, param:int):\n cachedDF = cache\n \n # Unfortunately there is no way to tell asv to run single time\n # each of finalize_stage_data() tests if we do the large setup in the\n # setup_cache() method. We can only force it to work with single execution\n # if the symbol setup with stage data is in the setup() method\n \n self.ac = Arctic(f\"lmdb://{self.lib_name}{param}?map_size=40GB\")\n self.ac.delete_library(self.lib_name)\n self.lib = self.ac.create_library(self.lib_name)\n \n INITIAL_TIMESTAMP: TimestampNumber = TimestampNumber(0, cachedDF.TIME_UNIT) # Synchronize index frequency\n \n df = cachedDF.generate_dataframe_timestamp_indexed(200, 0, cachedDF.TIME_UNIT)\n list_of_chunks = [10000] * param\n self.symbol\n \n self.lib.write(self.symbol, data=df, prune_previous_versions=True)\n stage_chunks(self.lib, self.symbol, cachedDF, INITIAL_TIMESTAMP, list_of_chunks)\n\n def setup_cache(self):\n # Generating dataframe with all kind of supported data types\n cachedDF = CachedDFGenerator(350000, [5])\n return cachedDF",
"name": "finalize_staged_data.FinalizeStagedData.peakmem_finalize_staged_data",
"param_names": [
"param1"
],
"params": [
[
"1000",
"2000"
]
],
"setup_cache_key": "finalize_staged_data:38",
"timeout": 600,
"type": "peakmemory",
"unit": "bytes",
"version": "9dece3813ff661e5876028ee105ad9549ad62e7997ade7bf3ed4cb43f77854a7"
},
"finalize_staged_data.FinalizeStagedData.time_finalize_staged_data": {
"code": "class FinalizeStagedData:\n def time_finalize_staged_data(self, cache:CachedDFGenerator, param:int):\n print(\">>> Library:\", self.lib)\n print(\">>> Symbol:\", self.symbol)\n self.lib.finalize_staged_data(self.symbol, mode=StagedDataFinalizeMethod.WRITE)\n\n def setup(self, cache:CachedDFGenerator, param:int):\n cachedDF = cache\n \n # Unfortunately there is no way to tell asv to run single time\n # each of finalize_stage_data() tests if we do the large setup in the\n # setup_cache() method. We can only force it to work with single execution\n # if the symbol setup with stage data is in the setup() method\n \n self.ac = Arctic(f\"lmdb://{self.lib_name}{param}?map_size=40GB\")\n self.ac.delete_library(self.lib_name)\n self.lib = self.ac.create_library(self.lib_name)\n \n INITIAL_TIMESTAMP: TimestampNumber = TimestampNumber(0, cachedDF.TIME_UNIT) # Synchronize index frequency\n \n df = cachedDF.generate_dataframe_timestamp_indexed(200, 0, cachedDF.TIME_UNIT)\n list_of_chunks = [10000] * param\n self.symbol\n \n self.lib.write(self.symbol, data=df, prune_previous_versions=True)\n stage_chunks(self.lib, self.symbol, cachedDF, INITIAL_TIMESTAMP, list_of_chunks)\n\n def setup_cache(self):\n # Generating dataframe with all kind of supported data types\n cachedDF = CachedDFGenerator(350000, [5])\n return cachedDF",
"min_run_count": 1,
"name": "finalize_staged_data.FinalizeStagedData.time_finalize_staged_data",
"number": 1,
"param_names": [
"param1"
],
"params": [
[
"1000",
"2000"
]
],
"repeat": 1,
"rounds": 1,
"sample_time": 0.01,
"setup_cache_key": "finalize_staged_data:38",
"timeout": 600,
"type": "time",
"unit": "seconds",
"version": "670c39a4321a96cffa7d92609c5817cd36f3cba1cce9929e9c41e246005a0b62",
"warmup_time": -1
},
"finalize_staged_data.FinalizeStagedDataWiderDataframeX3.peakmem_finalize_staged_data": {
"code": "class FinalizeStagedDataWiderDataframeX3:\n def peakmem_finalize_staged_data(self, cache:CachedDFGenerator, param:int):\n if (not SLOW_TESTS):\n raise SkipNotImplemented (\"Slow tests are skipped\")\n super().peakmem_finalize_staged_data(cache,param)\n\n def setup(self, cache:CachedDFGenerator, param:int):\n if (not SLOW_TESTS):\n raise SkipNotImplemented (\"Slow tests are skipped\")\n super().setup(cache,param)\n\n def setup_cache(self):\n # Generating dataframe with all kind of supported data type\n cachedDF = CachedDFGenerator(350000, [5, 25, 50]) # 3 times wider DF with bigger string columns\n return cachedDF",
"name": "finalize_staged_data.FinalizeStagedDataWiderDataframeX3.peakmem_finalize_staged_data",
"param_names": [
"param1"
],
"params": [
[
"1000",
"2000"
]
],
"setup_cache_key": "finalize_staged_data:82",
"timeout": 600,
"type": "peakmemory",
"unit": "bytes",
"version": "34e03b8d818b1f727b831791e307ca5dd4a1b434643fa5bc0c98c49a0b455523"
},
"finalize_staged_data.FinalizeStagedDataWiderDataframeX3.time_finalize_staged_data": {
"code": "class FinalizeStagedDataWiderDataframeX3:\n def time_finalize_staged_data(self, cache:CachedDFGenerator, param:int):\n if (not SLOW_TESTS):\n raise SkipNotImplemented (\"Slow tests are skipped\")\n super().time_finalize_staged_data(cache,param)\n\n def setup(self, cache:CachedDFGenerator, param:int):\n if (not SLOW_TESTS):\n raise SkipNotImplemented (\"Slow tests are skipped\")\n super().setup(cache,param)\n\n def setup_cache(self):\n # Generating dataframe with all kind of supported data type\n cachedDF = CachedDFGenerator(350000, [5, 25, 50]) # 3 times wider DF with bigger string columns\n return cachedDF",
"min_run_count": 1,
"name": "finalize_staged_data.FinalizeStagedDataWiderDataframeX3.time_finalize_staged_data",
"number": 1,
"param_names": [
"param1"
],
"params": [
[
"1000",
"2000"
]
],
"repeat": 1,
"rounds": 1,
"sample_time": 0.01,
"setup_cache_key": "finalize_staged_data:82",
"timeout": 600,
"type": "time",
"unit": "seconds",
"version": "317bc16b3c8e30237836ae85a390b11995fe361c69b31984f0e5d32a344cd571",
"warmup_time": -1
},
"list_functions.ListFunctions.peakmem_list_symbols": {
"code": "class ListFunctions:\n def peakmem_list_symbols(self, num_symbols):\n self.lib.list_symbols()\n\n def setup(self, num_symbols):\n self.ac = Arctic(\"lmdb://list_functions\")\n self.lib = self.ac[f\"{num_symbols}_num_symbols\"]\n\n def setup_cache(self):\n self.ac = Arctic(\"lmdb://list_functions\")\n \n num_symbols = ListFunctions.params\n for syms in num_symbols:\n lib_name = f\"{syms}_num_symbols\"\n self.ac.delete_library(lib_name)\n lib = self.ac.create_library(lib_name)\n for sym in range(syms):\n lib.write(f\"{sym}_sym\", generate_benchmark_df(ListFunctions.rows))",
"name": "list_functions.ListFunctions.peakmem_list_symbols",
Expand Down
27 changes: 23 additions & 4 deletions python/arcticdb/util/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@

import os
from contextlib import contextmanager
from typing import Mapping, Any, Optional, Iterable, NamedTuple, List, AnyStr, Sequence
from typing import Mapping, Any, Optional, NamedTuple, List, AnyStr, Union
import numpy as np
import pandas as pd
from pandas.core.series import Series
from pandas import Index
from pandas import DateOffset, Index, Timedelta
from pandas._typing import Scalar
import datetime as dt
import string
Expand Down Expand Up @@ -46,6 +46,7 @@ def create_df(start=0, columns=1) -> pd.DataFrame:

index = np.arange(start, start + 10, dtype=np.int64)
return pd.DataFrame(data, index=index)


def create_df_index_rownum(num_columns: int, start_index: int, end_index : int) -> pd.DataFrame:
"""
Expand Down Expand Up @@ -91,6 +92,18 @@ def create_df_index_rownum(num_columns: int, start_index: int, end_index : int)
df.index = np.arange(start_index, end_index, 1)
return df

def create_datetime_index(df: pd.DataFrame, name_col:str, freq:Union[str , dt.timedelta , Timedelta , DateOffset],
start_time: pd.Timestamp = pd.Timestamp(0)):
"""
creates a datetime index to a dataframe. The index will start at specified timestamp
and will be generated using specified frequency having same number of periods as the rows
in the table
"""
periods = len(df)
index = pd.date_range(start=start_time, periods=periods, freq=freq, name=name_col)
df.index = index


def create_df_index_datetime(num_columns: int, start_hour: int, end_hour : int) -> pd.DataFrame:
"""
Creates data frame with specified number of columns
Expand All @@ -117,6 +130,7 @@ def create_df_index_datetime(num_columns: int, start_hour: int, end_hour : int)
df.index = dr
return df


def dataframe_dump_to_log(label_for_df, df: pd.DataFrame):
"""
Useful for printing in log content and data types of columns of
Expand All @@ -131,6 +145,7 @@ def dataframe_dump_to_log(label_for_df, df: pd.DataFrame):
print(df.dtypes)
print("-" * 80)


def dataframe_simulate_arcticdb_update_static(existing_df: pd.DataFrame, update_df: pd.DataFrame) -> pd.DataFrame:
"""
Does implement arctic logic of update() method functionality over pandas dataframes.
Expand All @@ -155,6 +170,7 @@ def dataframe_simulate_arcticdb_update_static(existing_df: pd.DataFrame, update_
result_df = pd.concat(chunks)
return result_df


def dataframe_single_column_string(length=1000, column_label='string_short', seed=0, string_len=1):
"""
creates a dataframe with one column, which label can be changed, containing string
Expand All @@ -163,6 +179,7 @@ def dataframe_single_column_string(length=1000, column_label='string_short', see
np.random.seed(seed)
return pd.DataFrame({ column_label : [random_string(string_len) for _ in range(length)] })


def dataframe_filter_with_datetime_index(df: pd.DataFrame, start_timestamp:Scalar, end_timestamp:Scalar, inclusive='both') -> pd.DataFrame:
"""
Filters dataframe which has datetime index, and selects dates from start till end,
Expand All @@ -175,6 +192,7 @@ def dataframe_filter_with_datetime_index(df: pd.DataFrame, start_timestamp:Scala
.between(start_timestamp, end_timestamp, inclusive='both')
]


def maybe_not_check_freq(f):
"""Ignore frequency when pandas is newer as starts to check frequency which it did not previously do."""

Expand Down Expand Up @@ -215,16 +233,17 @@ def assert_frame_equal_rebuild_index_first(expected : pd.DataFrame, actual : pd.
actual.reset_index(inplace = True, drop = True)
assert_frame_equal(left=expected, right=actual)


def random_string(length: int):
return "".join(random.choice(string.ascii_uppercase + string.digits) for _ in range(length))


def get_sample_dataframe(size=1000, seed=0):
def get_sample_dataframe(size=1000, seed=0, str_size=10):
np.random.seed(seed)
df = pd.DataFrame(
{
"uint8": random_integers(size, np.uint8),
"strings": [random_string(10) for _ in range(size)],
"strings": [random_string(str_size) for _ in range(size)],
"uint16": random_integers(size, np.uint16),
"uint32": random_integers(size, np.uint32),
"uint64": random_integers(size, np.uint64),
Expand Down
Loading

0 comments on commit 4c12a0b

Please sign in to comment.