From 14c2cb36f9b3ed42ab5448e7c6a5dcf178f0ec04 Mon Sep 17 00:00:00 2001 From: Georgi Rusev Date: Mon, 25 Nov 2024 20:21:06 +0200 Subject: [PATCH] asv file + lfs rule for gzip and parquet files --- .gitattributes | 2 + python/.asv/results/benchmarks.json | 206 +++++++++++++++++++++++++++- python/benchmarks/bi_benchmarks.py | 168 +++++++++++++++++++++++ python/benchmarks/common.py | 152 ++++++++++++++++++++ 4 files changed, 522 insertions(+), 6 deletions(-) create mode 100644 .gitattributes create mode 100644 python/benchmarks/bi_benchmarks.py diff --git a/.gitattributes b/.gitattributes new file mode 100644 index 0000000000..0508cddce6 --- /dev/null +++ b/.gitattributes @@ -0,0 +1,2 @@ +*.gzip filter=lfs diff=lfs merge=lfs -text +*.parquet filter=lfs diff=lfs merge=lfs -text diff --git a/python/.asv/results/benchmarks.json b/python/.asv/results/benchmarks.json index ff9122eb97..e2e8e7ab25 100644 --- a/python/.asv/results/benchmarks.json +++ b/python/.asv/results/benchmarks.json @@ -746,6 +746,174 @@ "version": "80de9b1982a498c300177d02874a8626152eccb57cd0ba4228a5bb168e7608c8", "warmup_time": -1 }, + "bi_benchmarks.BIBenchmarks.peakmem_query_groupby_city_count_all": { + "code": "class BIBenchmarks:\n def peakmem_query_groupby_city_count_all(self, times_bigger) -> pd.DataFrame:\n return self.query_groupby_city_count_all(times_bigger)\n\n def setup(self, num_rows):\n self.ac = Arctic(f\"lmdb://opensource_datasets_{self.lib_name}?map_size=20GB\")\n self.lib = self.ac.get_library(self.lib_name)\n\n def setup_cache(self):\n \n file = os.path.join(Path(__file__).resolve().parent.parent, BIBenchmarks.CITY_BI_FILE2)\n if (not os.path.exists(file)) :\n dfo = download_and_process_city_to_parquet(file)\n dff = pd.read_parquet(file)\n pd.testing.assert_frame_equal(dfo,dff)\n else:\n print(\"Parquet file exists!\")\n \n # read data from bz.2 file\n # abs_path = os.path.join(Path(__file__).resolve().parent.parent,BIBenchmarks.CITY_BI_FILE)\n # self.df : pd.DataFrame = process_city(abs_path)\n \n self.df : pd.DataFrame = pd.read_parquet(file)\n \n self.ac = Arctic(f\"lmdb://opensource_datasets_{self.lib_name}?map_size=20GB\")\n self.ac.delete_library(self.lib_name)\n self.lib = self.ac.create_library(self.lib_name)\n \n print(\"The procedure is creating N times larger dataframes\")\n print(\"by concatenating original DF N times\")\n for num in BIBenchmarks.params:\n _df = self.df.copy(deep=True)\n if (num > 1):\n # lets create N times bigger DF\n dfcum = self.df.copy(deep=True)\n for i in range(1, (BIBenchmarks.params[-1])):\n dfcum = pd.concat([dfcum, self.df])\n _df = dfcum\n print(\"DF for iterration xSize original ready: \", num)\n _df.info(verbose=True,memory_usage='deep')\n self.lib.write(f\"{self.symbol}{num}\", _df)\n \n print(\"If pandas query produces different dataframe than arctic one stop tests!\")\n print(\"This will mean query problem is there most likely\")\n \n print(\"Pre-check correctness for query_groupby_city_count_all\")\n _df = self.df.copy(deep=True)\n arctic_df = self.time_query_groupby_city_count_all(BIBenchmarks.params[0])\n _df = get_query_groupby_city_count_all(_df)\n assert_frame_equal(_df, arctic_df)\n \n print(\"Pre-check correctness for query_groupby_city_count_isin_filter\")\n _df = self.df.copy(deep=True)\n arctic_df = self.time_query_groupby_city_count_isin_filter(BIBenchmarks.params[0])\n _df = get_query_groupby_city_count_isin_filter(_df)\n assert_frame_equal(_df, arctic_df)\n \n print(\"Pre-check correctness for query_groupby_city_count_filter_two_aggregations\")\n _df = self.df.copy(deep=True)\n arctic_df = self.time_query_groupby_city_count_filter_two_aggregations(BIBenchmarks.params[0])\n _df = get_query_groupby_city_count_filter_two_aggregations(_df)\n assert_frame_equal(_df, arctic_df)\n \n print(\"All pre-checks completed SUCCESSFULLY\")\n \n del self.ac", + "name": "bi_benchmarks.BIBenchmarks.peakmem_query_groupby_city_count_all", + "param_names": [ + "param1" + ], + "params": [ + [ + "1", + "10" + ] + ], + "setup_cache_key": "bi_benchmarks:61", + "timeout": 6000, + "type": "peakmemory", + "unit": "bytes", + "version": "576958b39e1560f56e73fa558989d2e101eecf9f5f36f4cc70604777fa4855b2" + }, + "bi_benchmarks.BIBenchmarks.peakmem_query_groupby_city_count_filter_two_aggregations": { + "code": "class BIBenchmarks:\n def peakmem_query_groupby_city_count_filter_two_aggregations(self, times_bigger):\n return self.query_groupby_city_count_filter_two_aggregations(times_bigger)\n\n def setup(self, num_rows):\n self.ac = Arctic(f\"lmdb://opensource_datasets_{self.lib_name}?map_size=20GB\")\n self.lib = self.ac.get_library(self.lib_name)\n\n def setup_cache(self):\n \n file = os.path.join(Path(__file__).resolve().parent.parent, BIBenchmarks.CITY_BI_FILE2)\n if (not os.path.exists(file)) :\n dfo = download_and_process_city_to_parquet(file)\n dff = pd.read_parquet(file)\n pd.testing.assert_frame_equal(dfo,dff)\n else:\n print(\"Parquet file exists!\")\n \n # read data from bz.2 file\n # abs_path = os.path.join(Path(__file__).resolve().parent.parent,BIBenchmarks.CITY_BI_FILE)\n # self.df : pd.DataFrame = process_city(abs_path)\n \n self.df : pd.DataFrame = pd.read_parquet(file)\n \n self.ac = Arctic(f\"lmdb://opensource_datasets_{self.lib_name}?map_size=20GB\")\n self.ac.delete_library(self.lib_name)\n self.lib = self.ac.create_library(self.lib_name)\n \n print(\"The procedure is creating N times larger dataframes\")\n print(\"by concatenating original DF N times\")\n for num in BIBenchmarks.params:\n _df = self.df.copy(deep=True)\n if (num > 1):\n # lets create N times bigger DF\n dfcum = self.df.copy(deep=True)\n for i in range(1, (BIBenchmarks.params[-1])):\n dfcum = pd.concat([dfcum, self.df])\n _df = dfcum\n print(\"DF for iterration xSize original ready: \", num)\n _df.info(verbose=True,memory_usage='deep')\n self.lib.write(f\"{self.symbol}{num}\", _df)\n \n print(\"If pandas query produces different dataframe than arctic one stop tests!\")\n print(\"This will mean query problem is there most likely\")\n \n print(\"Pre-check correctness for query_groupby_city_count_all\")\n _df = self.df.copy(deep=True)\n arctic_df = self.time_query_groupby_city_count_all(BIBenchmarks.params[0])\n _df = get_query_groupby_city_count_all(_df)\n assert_frame_equal(_df, arctic_df)\n \n print(\"Pre-check correctness for query_groupby_city_count_isin_filter\")\n _df = self.df.copy(deep=True)\n arctic_df = self.time_query_groupby_city_count_isin_filter(BIBenchmarks.params[0])\n _df = get_query_groupby_city_count_isin_filter(_df)\n assert_frame_equal(_df, arctic_df)\n \n print(\"Pre-check correctness for query_groupby_city_count_filter_two_aggregations\")\n _df = self.df.copy(deep=True)\n arctic_df = self.time_query_groupby_city_count_filter_two_aggregations(BIBenchmarks.params[0])\n _df = get_query_groupby_city_count_filter_two_aggregations(_df)\n assert_frame_equal(_df, arctic_df)\n \n print(\"All pre-checks completed SUCCESSFULLY\")\n \n del self.ac", + "name": "bi_benchmarks.BIBenchmarks.peakmem_query_groupby_city_count_filter_two_aggregations", + "param_names": [ + "param1" + ], + "params": [ + [ + "1", + "10" + ] + ], + "setup_cache_key": "bi_benchmarks:61", + "timeout": 6000, + "type": "peakmemory", + "unit": "bytes", + "version": "00ae811ef6427d56921273b8d93c7443a1c71ed305edc73cf2375a167813bd53" + }, + "bi_benchmarks.BIBenchmarks.peakmem_query_groupby_city_count_isin_filter": { + "code": "class BIBenchmarks:\n def peakmem_query_groupby_city_count_isin_filter(self, times_bigger) -> pd.DataFrame:\n return self.query_groupby_city_count_isin_filter(times_bigger)\n\n def setup(self, num_rows):\n self.ac = Arctic(f\"lmdb://opensource_datasets_{self.lib_name}?map_size=20GB\")\n self.lib = self.ac.get_library(self.lib_name)\n\n def setup_cache(self):\n \n file = os.path.join(Path(__file__).resolve().parent.parent, BIBenchmarks.CITY_BI_FILE2)\n if (not os.path.exists(file)) :\n dfo = download_and_process_city_to_parquet(file)\n dff = pd.read_parquet(file)\n pd.testing.assert_frame_equal(dfo,dff)\n else:\n print(\"Parquet file exists!\")\n \n # read data from bz.2 file\n # abs_path = os.path.join(Path(__file__).resolve().parent.parent,BIBenchmarks.CITY_BI_FILE)\n # self.df : pd.DataFrame = process_city(abs_path)\n \n self.df : pd.DataFrame = pd.read_parquet(file)\n \n self.ac = Arctic(f\"lmdb://opensource_datasets_{self.lib_name}?map_size=20GB\")\n self.ac.delete_library(self.lib_name)\n self.lib = self.ac.create_library(self.lib_name)\n \n print(\"The procedure is creating N times larger dataframes\")\n print(\"by concatenating original DF N times\")\n for num in BIBenchmarks.params:\n _df = self.df.copy(deep=True)\n if (num > 1):\n # lets create N times bigger DF\n dfcum = self.df.copy(deep=True)\n for i in range(1, (BIBenchmarks.params[-1])):\n dfcum = pd.concat([dfcum, self.df])\n _df = dfcum\n print(\"DF for iterration xSize original ready: \", num)\n _df.info(verbose=True,memory_usage='deep')\n self.lib.write(f\"{self.symbol}{num}\", _df)\n \n print(\"If pandas query produces different dataframe than arctic one stop tests!\")\n print(\"This will mean query problem is there most likely\")\n \n print(\"Pre-check correctness for query_groupby_city_count_all\")\n _df = self.df.copy(deep=True)\n arctic_df = self.time_query_groupby_city_count_all(BIBenchmarks.params[0])\n _df = get_query_groupby_city_count_all(_df)\n assert_frame_equal(_df, arctic_df)\n \n print(\"Pre-check correctness for query_groupby_city_count_isin_filter\")\n _df = self.df.copy(deep=True)\n arctic_df = self.time_query_groupby_city_count_isin_filter(BIBenchmarks.params[0])\n _df = get_query_groupby_city_count_isin_filter(_df)\n assert_frame_equal(_df, arctic_df)\n \n print(\"Pre-check correctness for query_groupby_city_count_filter_two_aggregations\")\n _df = self.df.copy(deep=True)\n arctic_df = self.time_query_groupby_city_count_filter_two_aggregations(BIBenchmarks.params[0])\n _df = get_query_groupby_city_count_filter_two_aggregations(_df)\n assert_frame_equal(_df, arctic_df)\n \n print(\"All pre-checks completed SUCCESSFULLY\")\n \n del self.ac", + "name": "bi_benchmarks.BIBenchmarks.peakmem_query_groupby_city_count_isin_filter", + "param_names": [ + "param1" + ], + "params": [ + [ + "1", + "10" + ] + ], + "setup_cache_key": "bi_benchmarks:61", + "timeout": 6000, + "type": "peakmemory", + "unit": "bytes", + "version": "2ae348f65721858288f1940833c76de99d61d33fd8e21a5e9ef2958b208c8320" + }, + "bi_benchmarks.BIBenchmarks.peakmem_query_readall": { + "code": "class BIBenchmarks:\n def peakmem_query_readall(self, times_bigger):\n self.lib.read(f\"{self.symbol}{times_bigger}\")\n\n def setup(self, num_rows):\n self.ac = Arctic(f\"lmdb://opensource_datasets_{self.lib_name}?map_size=20GB\")\n self.lib = self.ac.get_library(self.lib_name)\n\n def setup_cache(self):\n \n file = os.path.join(Path(__file__).resolve().parent.parent, BIBenchmarks.CITY_BI_FILE2)\n if (not os.path.exists(file)) :\n dfo = download_and_process_city_to_parquet(file)\n dff = pd.read_parquet(file)\n pd.testing.assert_frame_equal(dfo,dff)\n else:\n print(\"Parquet file exists!\")\n \n # read data from bz.2 file\n # abs_path = os.path.join(Path(__file__).resolve().parent.parent,BIBenchmarks.CITY_BI_FILE)\n # self.df : pd.DataFrame = process_city(abs_path)\n \n self.df : pd.DataFrame = pd.read_parquet(file)\n \n self.ac = Arctic(f\"lmdb://opensource_datasets_{self.lib_name}?map_size=20GB\")\n self.ac.delete_library(self.lib_name)\n self.lib = self.ac.create_library(self.lib_name)\n \n print(\"The procedure is creating N times larger dataframes\")\n print(\"by concatenating original DF N times\")\n for num in BIBenchmarks.params:\n _df = self.df.copy(deep=True)\n if (num > 1):\n # lets create N times bigger DF\n dfcum = self.df.copy(deep=True)\n for i in range(1, (BIBenchmarks.params[-1])):\n dfcum = pd.concat([dfcum, self.df])\n _df = dfcum\n print(\"DF for iterration xSize original ready: \", num)\n _df.info(verbose=True,memory_usage='deep')\n self.lib.write(f\"{self.symbol}{num}\", _df)\n \n print(\"If pandas query produces different dataframe than arctic one stop tests!\")\n print(\"This will mean query problem is there most likely\")\n \n print(\"Pre-check correctness for query_groupby_city_count_all\")\n _df = self.df.copy(deep=True)\n arctic_df = self.time_query_groupby_city_count_all(BIBenchmarks.params[0])\n _df = get_query_groupby_city_count_all(_df)\n assert_frame_equal(_df, arctic_df)\n \n print(\"Pre-check correctness for query_groupby_city_count_isin_filter\")\n _df = self.df.copy(deep=True)\n arctic_df = self.time_query_groupby_city_count_isin_filter(BIBenchmarks.params[0])\n _df = get_query_groupby_city_count_isin_filter(_df)\n assert_frame_equal(_df, arctic_df)\n \n print(\"Pre-check correctness for query_groupby_city_count_filter_two_aggregations\")\n _df = self.df.copy(deep=True)\n arctic_df = self.time_query_groupby_city_count_filter_two_aggregations(BIBenchmarks.params[0])\n _df = get_query_groupby_city_count_filter_two_aggregations(_df)\n assert_frame_equal(_df, arctic_df)\n \n print(\"All pre-checks completed SUCCESSFULLY\")\n \n del self.ac", + "name": "bi_benchmarks.BIBenchmarks.peakmem_query_readall", + "param_names": [ + "param1" + ], + "params": [ + [ + "1", + "10" + ] + ], + "setup_cache_key": "bi_benchmarks:61", + "timeout": 6000, + "type": "peakmemory", + "unit": "bytes", + "version": "45dc0723cbde50cbd213a97e50084ae8457ff69fb12a842d9c48469fcda2caa3" + }, + "bi_benchmarks.BIBenchmarks.time_query_groupby_city_count_all": { + "code": "class BIBenchmarks:\n def time_query_groupby_city_count_all(self, times_bigger) -> pd.DataFrame:\n return self.query_groupby_city_count_all(times_bigger)\n\n def setup(self, num_rows):\n self.ac = Arctic(f\"lmdb://opensource_datasets_{self.lib_name}?map_size=20GB\")\n self.lib = self.ac.get_library(self.lib_name)\n\n def setup_cache(self):\n \n file = os.path.join(Path(__file__).resolve().parent.parent, BIBenchmarks.CITY_BI_FILE2)\n if (not os.path.exists(file)) :\n dfo = download_and_process_city_to_parquet(file)\n dff = pd.read_parquet(file)\n pd.testing.assert_frame_equal(dfo,dff)\n else:\n print(\"Parquet file exists!\")\n \n # read data from bz.2 file\n # abs_path = os.path.join(Path(__file__).resolve().parent.parent,BIBenchmarks.CITY_BI_FILE)\n # self.df : pd.DataFrame = process_city(abs_path)\n \n self.df : pd.DataFrame = pd.read_parquet(file)\n \n self.ac = Arctic(f\"lmdb://opensource_datasets_{self.lib_name}?map_size=20GB\")\n self.ac.delete_library(self.lib_name)\n self.lib = self.ac.create_library(self.lib_name)\n \n print(\"The procedure is creating N times larger dataframes\")\n print(\"by concatenating original DF N times\")\n for num in BIBenchmarks.params:\n _df = self.df.copy(deep=True)\n if (num > 1):\n # lets create N times bigger DF\n dfcum = self.df.copy(deep=True)\n for i in range(1, (BIBenchmarks.params[-1])):\n dfcum = pd.concat([dfcum, self.df])\n _df = dfcum\n print(\"DF for iterration xSize original ready: \", num)\n _df.info(verbose=True,memory_usage='deep')\n self.lib.write(f\"{self.symbol}{num}\", _df)\n \n print(\"If pandas query produces different dataframe than arctic one stop tests!\")\n print(\"This will mean query problem is there most likely\")\n \n print(\"Pre-check correctness for query_groupby_city_count_all\")\n _df = self.df.copy(deep=True)\n arctic_df = self.time_query_groupby_city_count_all(BIBenchmarks.params[0])\n _df = get_query_groupby_city_count_all(_df)\n assert_frame_equal(_df, arctic_df)\n \n print(\"Pre-check correctness for query_groupby_city_count_isin_filter\")\n _df = self.df.copy(deep=True)\n arctic_df = self.time_query_groupby_city_count_isin_filter(BIBenchmarks.params[0])\n _df = get_query_groupby_city_count_isin_filter(_df)\n assert_frame_equal(_df, arctic_df)\n \n print(\"Pre-check correctness for query_groupby_city_count_filter_two_aggregations\")\n _df = self.df.copy(deep=True)\n arctic_df = self.time_query_groupby_city_count_filter_two_aggregations(BIBenchmarks.params[0])\n _df = get_query_groupby_city_count_filter_two_aggregations(_df)\n assert_frame_equal(_df, arctic_df)\n \n print(\"All pre-checks completed SUCCESSFULLY\")\n \n del self.ac", + "min_run_count": 2, + "name": "bi_benchmarks.BIBenchmarks.time_query_groupby_city_count_all", + "number": 2, + "param_names": [ + "param1" + ], + "params": [ + [ + "1", + "10" + ] + ], + "repeat": 0, + "rounds": 2, + "sample_time": 0.01, + "setup_cache_key": "bi_benchmarks:61", + "timeout": 6000, + "type": "time", + "unit": "seconds", + "version": "cc034dbad83f8695c4a670878f73e49b8ccb7548eb237cdbaeed0321fe4787ba", + "warmup_time": -1 + }, + "bi_benchmarks.BIBenchmarks.time_query_groupby_city_count_filter_two_aggregations": { + "code": "class BIBenchmarks:\n def time_query_groupby_city_count_filter_two_aggregations(self, times_bigger) -> pd.DataFrame:\n return self.query_groupby_city_count_filter_two_aggregations(times_bigger)\n\n def setup(self, num_rows):\n self.ac = Arctic(f\"lmdb://opensource_datasets_{self.lib_name}?map_size=20GB\")\n self.lib = self.ac.get_library(self.lib_name)\n\n def setup_cache(self):\n \n file = os.path.join(Path(__file__).resolve().parent.parent, BIBenchmarks.CITY_BI_FILE2)\n if (not os.path.exists(file)) :\n dfo = download_and_process_city_to_parquet(file)\n dff = pd.read_parquet(file)\n pd.testing.assert_frame_equal(dfo,dff)\n else:\n print(\"Parquet file exists!\")\n \n # read data from bz.2 file\n # abs_path = os.path.join(Path(__file__).resolve().parent.parent,BIBenchmarks.CITY_BI_FILE)\n # self.df : pd.DataFrame = process_city(abs_path)\n \n self.df : pd.DataFrame = pd.read_parquet(file)\n \n self.ac = Arctic(f\"lmdb://opensource_datasets_{self.lib_name}?map_size=20GB\")\n self.ac.delete_library(self.lib_name)\n self.lib = self.ac.create_library(self.lib_name)\n \n print(\"The procedure is creating N times larger dataframes\")\n print(\"by concatenating original DF N times\")\n for num in BIBenchmarks.params:\n _df = self.df.copy(deep=True)\n if (num > 1):\n # lets create N times bigger DF\n dfcum = self.df.copy(deep=True)\n for i in range(1, (BIBenchmarks.params[-1])):\n dfcum = pd.concat([dfcum, self.df])\n _df = dfcum\n print(\"DF for iterration xSize original ready: \", num)\n _df.info(verbose=True,memory_usage='deep')\n self.lib.write(f\"{self.symbol}{num}\", _df)\n \n print(\"If pandas query produces different dataframe than arctic one stop tests!\")\n print(\"This will mean query problem is there most likely\")\n \n print(\"Pre-check correctness for query_groupby_city_count_all\")\n _df = self.df.copy(deep=True)\n arctic_df = self.time_query_groupby_city_count_all(BIBenchmarks.params[0])\n _df = get_query_groupby_city_count_all(_df)\n assert_frame_equal(_df, arctic_df)\n \n print(\"Pre-check correctness for query_groupby_city_count_isin_filter\")\n _df = self.df.copy(deep=True)\n arctic_df = self.time_query_groupby_city_count_isin_filter(BIBenchmarks.params[0])\n _df = get_query_groupby_city_count_isin_filter(_df)\n assert_frame_equal(_df, arctic_df)\n \n print(\"Pre-check correctness for query_groupby_city_count_filter_two_aggregations\")\n _df = self.df.copy(deep=True)\n arctic_df = self.time_query_groupby_city_count_filter_two_aggregations(BIBenchmarks.params[0])\n _df = get_query_groupby_city_count_filter_two_aggregations(_df)\n assert_frame_equal(_df, arctic_df)\n \n print(\"All pre-checks completed SUCCESSFULLY\")\n \n del self.ac", + "min_run_count": 2, + "name": "bi_benchmarks.BIBenchmarks.time_query_groupby_city_count_filter_two_aggregations", + "number": 2, + "param_names": [ + "param1" + ], + "params": [ + [ + "1", + "10" + ] + ], + "repeat": 0, + "rounds": 2, + "sample_time": 0.01, + "setup_cache_key": "bi_benchmarks:61", + "timeout": 6000, + "type": "time", + "unit": "seconds", + "version": "9cdc08e3b0b8d92ffa8e4c6922e90417d82cdc653f3596ae38b729eac2cf00bb", + "warmup_time": -1 + }, + "bi_benchmarks.BIBenchmarks.time_query_groupby_city_count_isin_filter": { + "code": "class BIBenchmarks:\n def time_query_groupby_city_count_isin_filter(self, times_bigger) -> pd.DataFrame:\n return self.query_groupby_city_count_isin_filter(times_bigger)\n\n def setup(self, num_rows):\n self.ac = Arctic(f\"lmdb://opensource_datasets_{self.lib_name}?map_size=20GB\")\n self.lib = self.ac.get_library(self.lib_name)\n\n def setup_cache(self):\n \n file = os.path.join(Path(__file__).resolve().parent.parent, BIBenchmarks.CITY_BI_FILE2)\n if (not os.path.exists(file)) :\n dfo = download_and_process_city_to_parquet(file)\n dff = pd.read_parquet(file)\n pd.testing.assert_frame_equal(dfo,dff)\n else:\n print(\"Parquet file exists!\")\n \n # read data from bz.2 file\n # abs_path = os.path.join(Path(__file__).resolve().parent.parent,BIBenchmarks.CITY_BI_FILE)\n # self.df : pd.DataFrame = process_city(abs_path)\n \n self.df : pd.DataFrame = pd.read_parquet(file)\n \n self.ac = Arctic(f\"lmdb://opensource_datasets_{self.lib_name}?map_size=20GB\")\n self.ac.delete_library(self.lib_name)\n self.lib = self.ac.create_library(self.lib_name)\n \n print(\"The procedure is creating N times larger dataframes\")\n print(\"by concatenating original DF N times\")\n for num in BIBenchmarks.params:\n _df = self.df.copy(deep=True)\n if (num > 1):\n # lets create N times bigger DF\n dfcum = self.df.copy(deep=True)\n for i in range(1, (BIBenchmarks.params[-1])):\n dfcum = pd.concat([dfcum, self.df])\n _df = dfcum\n print(\"DF for iterration xSize original ready: \", num)\n _df.info(verbose=True,memory_usage='deep')\n self.lib.write(f\"{self.symbol}{num}\", _df)\n \n print(\"If pandas query produces different dataframe than arctic one stop tests!\")\n print(\"This will mean query problem is there most likely\")\n \n print(\"Pre-check correctness for query_groupby_city_count_all\")\n _df = self.df.copy(deep=True)\n arctic_df = self.time_query_groupby_city_count_all(BIBenchmarks.params[0])\n _df = get_query_groupby_city_count_all(_df)\n assert_frame_equal(_df, arctic_df)\n \n print(\"Pre-check correctness for query_groupby_city_count_isin_filter\")\n _df = self.df.copy(deep=True)\n arctic_df = self.time_query_groupby_city_count_isin_filter(BIBenchmarks.params[0])\n _df = get_query_groupby_city_count_isin_filter(_df)\n assert_frame_equal(_df, arctic_df)\n \n print(\"Pre-check correctness for query_groupby_city_count_filter_two_aggregations\")\n _df = self.df.copy(deep=True)\n arctic_df = self.time_query_groupby_city_count_filter_two_aggregations(BIBenchmarks.params[0])\n _df = get_query_groupby_city_count_filter_two_aggregations(_df)\n assert_frame_equal(_df, arctic_df)\n \n print(\"All pre-checks completed SUCCESSFULLY\")\n \n del self.ac", + "min_run_count": 2, + "name": "bi_benchmarks.BIBenchmarks.time_query_groupby_city_count_isin_filter", + "number": 2, + "param_names": [ + "param1" + ], + "params": [ + [ + "1", + "10" + ] + ], + "repeat": 0, + "rounds": 2, + "sample_time": 0.01, + "setup_cache_key": "bi_benchmarks:61", + "timeout": 6000, + "type": "time", + "unit": "seconds", + "version": "79b7c695f5c71eff57c7734047eb6b2d359b077c243444bb3ae2069cdfbc1011", + "warmup_time": -1 + }, + "bi_benchmarks.BIBenchmarks.time_query_readall": { + "code": "class BIBenchmarks:\n def time_query_readall(self, times_bigger):\n self.lib.read(f\"{self.symbol}{times_bigger}\")\n\n def setup(self, num_rows):\n self.ac = Arctic(f\"lmdb://opensource_datasets_{self.lib_name}?map_size=20GB\")\n self.lib = self.ac.get_library(self.lib_name)\n\n def setup_cache(self):\n \n file = os.path.join(Path(__file__).resolve().parent.parent, BIBenchmarks.CITY_BI_FILE2)\n if (not os.path.exists(file)) :\n dfo = download_and_process_city_to_parquet(file)\n dff = pd.read_parquet(file)\n pd.testing.assert_frame_equal(dfo,dff)\n else:\n print(\"Parquet file exists!\")\n \n # read data from bz.2 file\n # abs_path = os.path.join(Path(__file__).resolve().parent.parent,BIBenchmarks.CITY_BI_FILE)\n # self.df : pd.DataFrame = process_city(abs_path)\n \n self.df : pd.DataFrame = pd.read_parquet(file)\n \n self.ac = Arctic(f\"lmdb://opensource_datasets_{self.lib_name}?map_size=20GB\")\n self.ac.delete_library(self.lib_name)\n self.lib = self.ac.create_library(self.lib_name)\n \n print(\"The procedure is creating N times larger dataframes\")\n print(\"by concatenating original DF N times\")\n for num in BIBenchmarks.params:\n _df = self.df.copy(deep=True)\n if (num > 1):\n # lets create N times bigger DF\n dfcum = self.df.copy(deep=True)\n for i in range(1, (BIBenchmarks.params[-1])):\n dfcum = pd.concat([dfcum, self.df])\n _df = dfcum\n print(\"DF for iterration xSize original ready: \", num)\n _df.info(verbose=True,memory_usage='deep')\n self.lib.write(f\"{self.symbol}{num}\", _df)\n \n print(\"If pandas query produces different dataframe than arctic one stop tests!\")\n print(\"This will mean query problem is there most likely\")\n \n print(\"Pre-check correctness for query_groupby_city_count_all\")\n _df = self.df.copy(deep=True)\n arctic_df = self.time_query_groupby_city_count_all(BIBenchmarks.params[0])\n _df = get_query_groupby_city_count_all(_df)\n assert_frame_equal(_df, arctic_df)\n \n print(\"Pre-check correctness for query_groupby_city_count_isin_filter\")\n _df = self.df.copy(deep=True)\n arctic_df = self.time_query_groupby_city_count_isin_filter(BIBenchmarks.params[0])\n _df = get_query_groupby_city_count_isin_filter(_df)\n assert_frame_equal(_df, arctic_df)\n \n print(\"Pre-check correctness for query_groupby_city_count_filter_two_aggregations\")\n _df = self.df.copy(deep=True)\n arctic_df = self.time_query_groupby_city_count_filter_two_aggregations(BIBenchmarks.params[0])\n _df = get_query_groupby_city_count_filter_two_aggregations(_df)\n assert_frame_equal(_df, arctic_df)\n \n print(\"All pre-checks completed SUCCESSFULLY\")\n \n del self.ac", + "min_run_count": 2, + "name": "bi_benchmarks.BIBenchmarks.time_query_readall", + "number": 2, + "param_names": [ + "param1" + ], + "params": [ + [ + "1", + "10" + ] + ], + "repeat": 0, + "rounds": 2, + "sample_time": 0.01, + "setup_cache_key": "bi_benchmarks:61", + "timeout": 6000, + "type": "time", + "unit": "seconds", + "version": "fc198dfac3e8e832aaa7e0d3355d4038a4acf2ada7cbf9bc3ff34bf0f7c433b8", + "warmup_time": -1 + }, "list_functions.ListFunctions.peakmem_list_symbols": { "code": "class ListFunctions:\n def peakmem_list_symbols(self, num_symbols):\n self.lib.list_symbols()\n\n def setup(self, num_symbols):\n self.ac = Arctic(\"lmdb://list_functions\")\n self.lib = self.ac[f\"{num_symbols}_num_symbols\"]\n\n def setup_cache(self):\n self.ac = Arctic(\"lmdb://list_functions\")\n \n num_symbols = ListFunctions.params\n for syms in num_symbols:\n lib_name = f\"{syms}_num_symbols\"\n self.ac.delete_library(lib_name)\n lib = self.ac.create_library(lib_name)\n for sym in range(syms):\n lib.write(f\"{sym}_sym\", generate_benchmark_df(ListFunctions.rows))", "name": "list_functions.ListFunctions.peakmem_list_symbols", @@ -1245,7 +1413,7 @@ "warmup_time": -1 }, "resample.Resample.peakmem_resample": { - "code": "class Resample:\n def peakmem_resample(self, num_rows, downsampling_factor, col_type, aggregation):\n if col_type == \"datetime\" and aggregation == \"sum\" or col_type == \"str\" and aggregation in [\"sum\", \"mean\", \"min\", \"max\"]:\n raise SkipNotImplemented(f\"{aggregation} not supported on columns of type {col_type}\")\n else:\n self.lib.read(col_type, date_range=self.date_range, query_builder=self.query_builder)\n\n def setup(self, num_rows, downsampling_factor, col_type, aggregation):\n self.ac = Arctic(self.CONNECTION_STRING)\n self.lib = self.ac[self.LIB_NAME]\n self.date_range = (pd.Timestamp(0), pd.Timestamp(num_rows, unit=\"us\"))\n self.query_builder = QueryBuilder().resample(f\"{downsampling_factor}us\").agg({\"col\": aggregation})\n\n def setup_cache(self):\n ac = Arctic(self.CONNECTION_STRING)\n ac.delete_library(self.LIB_NAME)\n lib = ac.create_library(self.LIB_NAME)\n rng = np.random.default_rng()\n col_types = self.params[2]\n rows = max(self.params[0])\n for col_type in col_types:\n if col_type == \"str\":\n num_unique_strings = 100\n unique_strings = random_strings_of_length(num_unique_strings, 10, True)\n sym = col_type\n num_segments = rows // self.ROWS_PER_SEGMENT\n for idx in range(num_segments):\n index = pd.date_range(pd.Timestamp(idx * self.ROWS_PER_SEGMENT, unit=\"us\"), freq=\"us\", periods=self.ROWS_PER_SEGMENT)\n if col_type == \"int\":\n col_data = rng.integers(0, 100_000, self.ROWS_PER_SEGMENT)\n elif col_type == \"bool\":\n col_data = rng.integers(0, 2, self.ROWS_PER_SEGMENT)\n col_data = col_data.astype(bool)\n elif col_type == \"float\":\n col_data = 100_000 * rng.random(self.ROWS_PER_SEGMENT)\n elif col_type == \"datetime\":\n col_data = rng.integers(0, 100_000, self.ROWS_PER_SEGMENT)\n col_data = col_data.astype(\"datetime64[s]\")\n elif col_type == \"str\":\n col_data = np.random.choice(unique_strings, self.ROWS_PER_SEGMENT)\n df = pd.DataFrame({\"col\": col_data}, index=index)\n lib.append(sym, df)", + "code": "class Resample:\n def peakmem_resample(self, num_rows, downsampling_factor, col_type, aggregation):\n if col_type == \"datetime\" and aggregation == \"sum\" or col_type == \"str\" and aggregation in [\"sum\", \"mean\", \"min\", \"max\"]:\n pass\n # Use this when upgrading to ASV 0.6.0 or later\n # raise SkipNotImplemented(f\"{aggregation} not supported on columns of type {col_type}\")\n else:\n self.lib.read(col_type, date_range=self.date_range, query_builder=self.query_builder)\n\n def setup(self, num_rows, downsampling_factor, col_type, aggregation):\n self.ac = Arctic(self.CONNECTION_STRING)\n self.lib = self.ac[self.LIB_NAME]\n self.date_range = (pd.Timestamp(0), pd.Timestamp(num_rows, unit=\"us\"))\n self.query_builder = QueryBuilder().resample(f\"{downsampling_factor}us\").agg({\"col\": aggregation})\n\n def setup_cache(self):\n ac = Arctic(self.CONNECTION_STRING)\n ac.delete_library(self.LIB_NAME)\n lib = ac.create_library(self.LIB_NAME)\n rng = np.random.default_rng()\n col_types = self.params[2]\n rows = max(self.params[0])\n for col_type in col_types:\n if col_type == \"str\":\n num_unique_strings = 100\n unique_strings = random_strings_of_length(num_unique_strings, 10, True)\n sym = col_type\n num_segments = rows // self.ROWS_PER_SEGMENT\n for idx in range(num_segments):\n index = pd.date_range(pd.Timestamp(idx * self.ROWS_PER_SEGMENT, unit=\"us\"), freq=\"us\", periods=self.ROWS_PER_SEGMENT)\n if col_type == \"int\":\n col_data = rng.integers(0, 100_000, self.ROWS_PER_SEGMENT)\n elif col_type == \"bool\":\n col_data = rng.integers(0, 2, self.ROWS_PER_SEGMENT)\n col_data = col_data.astype(bool)\n elif col_type == \"float\":\n col_data = 100_000 * rng.random(self.ROWS_PER_SEGMENT)\n elif col_type == \"datetime\":\n col_data = rng.integers(0, 100_000, self.ROWS_PER_SEGMENT)\n col_data = col_data.astype(\"datetime64[s]\")\n elif col_type == \"str\":\n col_data = np.random.choice(unique_strings, self.ROWS_PER_SEGMENT)\n df = pd.DataFrame({\"col\": col_data}, index=index)\n lib.append(sym, df)", "name": "resample.Resample.peakmem_resample", "param_names": [ "num_rows", @@ -1280,13 +1448,13 @@ "'count'" ] ], - "setup_cache_key": "resample:38", + "setup_cache_key": "resample:37", "type": "peakmemory", "unit": "bytes", - "version": "e64300ebb5bd625e1a0f3774aadd035e5738b41295ec2a8ce082d2e9add9b580" + "version": "760c9d62e17a5467f1e93abb258d89057e8fdf9ee67d98ceb376e731157a4d2e" }, "resample.Resample.time_resample": { - "code": "class Resample:\n def time_resample(self, num_rows, downsampling_factor, col_type, aggregation):\n if col_type == \"datetime\" and aggregation == \"sum\" or col_type == \"str\" and aggregation in [\"sum\", \"mean\", \"min\", \"max\"]:\n raise SkipNotImplemented(f\"{aggregation} not supported on columns of type {col_type}\")\n else:\n self.lib.read(col_type, date_range=self.date_range, query_builder=self.query_builder)\n\n def setup(self, num_rows, downsampling_factor, col_type, aggregation):\n self.ac = Arctic(self.CONNECTION_STRING)\n self.lib = self.ac[self.LIB_NAME]\n self.date_range = (pd.Timestamp(0), pd.Timestamp(num_rows, unit=\"us\"))\n self.query_builder = QueryBuilder().resample(f\"{downsampling_factor}us\").agg({\"col\": aggregation})\n\n def setup_cache(self):\n ac = Arctic(self.CONNECTION_STRING)\n ac.delete_library(self.LIB_NAME)\n lib = ac.create_library(self.LIB_NAME)\n rng = np.random.default_rng()\n col_types = self.params[2]\n rows = max(self.params[0])\n for col_type in col_types:\n if col_type == \"str\":\n num_unique_strings = 100\n unique_strings = random_strings_of_length(num_unique_strings, 10, True)\n sym = col_type\n num_segments = rows // self.ROWS_PER_SEGMENT\n for idx in range(num_segments):\n index = pd.date_range(pd.Timestamp(idx * self.ROWS_PER_SEGMENT, unit=\"us\"), freq=\"us\", periods=self.ROWS_PER_SEGMENT)\n if col_type == \"int\":\n col_data = rng.integers(0, 100_000, self.ROWS_PER_SEGMENT)\n elif col_type == \"bool\":\n col_data = rng.integers(0, 2, self.ROWS_PER_SEGMENT)\n col_data = col_data.astype(bool)\n elif col_type == \"float\":\n col_data = 100_000 * rng.random(self.ROWS_PER_SEGMENT)\n elif col_type == \"datetime\":\n col_data = rng.integers(0, 100_000, self.ROWS_PER_SEGMENT)\n col_data = col_data.astype(\"datetime64[s]\")\n elif col_type == \"str\":\n col_data = np.random.choice(unique_strings, self.ROWS_PER_SEGMENT)\n df = pd.DataFrame({\"col\": col_data}, index=index)\n lib.append(sym, df)", + "code": "class Resample:\n def time_resample(self, num_rows, downsampling_factor, col_type, aggregation):\n if col_type == \"datetime\" and aggregation == \"sum\" or col_type == \"str\" and aggregation in [\"sum\", \"mean\", \"min\", \"max\"]:\n pass\n # Use this when upgrading to ASV 0.6.0 or later\n # raise SkipNotImplemented(f\"{aggregation} not supported on columns of type {col_type}\")\n else:\n self.lib.read(col_type, date_range=self.date_range, query_builder=self.query_builder)\n\n def setup(self, num_rows, downsampling_factor, col_type, aggregation):\n self.ac = Arctic(self.CONNECTION_STRING)\n self.lib = self.ac[self.LIB_NAME]\n self.date_range = (pd.Timestamp(0), pd.Timestamp(num_rows, unit=\"us\"))\n self.query_builder = QueryBuilder().resample(f\"{downsampling_factor}us\").agg({\"col\": aggregation})\n\n def setup_cache(self):\n ac = Arctic(self.CONNECTION_STRING)\n ac.delete_library(self.LIB_NAME)\n lib = ac.create_library(self.LIB_NAME)\n rng = np.random.default_rng()\n col_types = self.params[2]\n rows = max(self.params[0])\n for col_type in col_types:\n if col_type == \"str\":\n num_unique_strings = 100\n unique_strings = random_strings_of_length(num_unique_strings, 10, True)\n sym = col_type\n num_segments = rows // self.ROWS_PER_SEGMENT\n for idx in range(num_segments):\n index = pd.date_range(pd.Timestamp(idx * self.ROWS_PER_SEGMENT, unit=\"us\"), freq=\"us\", periods=self.ROWS_PER_SEGMENT)\n if col_type == \"int\":\n col_data = rng.integers(0, 100_000, self.ROWS_PER_SEGMENT)\n elif col_type == \"bool\":\n col_data = rng.integers(0, 2, self.ROWS_PER_SEGMENT)\n col_data = col_data.astype(bool)\n elif col_type == \"float\":\n col_data = 100_000 * rng.random(self.ROWS_PER_SEGMENT)\n elif col_type == \"datetime\":\n col_data = rng.integers(0, 100_000, self.ROWS_PER_SEGMENT)\n col_data = col_data.astype(\"datetime64[s]\")\n elif col_type == \"str\":\n col_data = np.random.choice(unique_strings, self.ROWS_PER_SEGMENT)\n df = pd.DataFrame({\"col\": col_data}, index=index)\n lib.append(sym, df)", "min_run_count": 2, "name": "resample.Resample.time_resample", "number": 5, @@ -1326,10 +1494,36 @@ "repeat": 0, "rounds": 2, "sample_time": 0.01, - "setup_cache_key": "resample:38", + "setup_cache_key": "resample:37", + "type": "time", + "unit": "seconds", + "version": "1381d2db90e66cb5cd04febf62398827a3ac9928795eaced908daec35d5c0c31", + "warmup_time": -1 + }, + "resample.ResampleWide.peakmem_resample_wide": { + "code": "class ResampleWide:\n def peakmem_resample_wide(self):\n self.lib.read(self.SYM, query_builder=self.query_builder)\n\n def setup(self):\n self.ac = Arctic(self.CONNECTION_STRING)\n self.lib = self.ac[self.LIB_NAME]\n aggs = dict()\n for col in self.COLS:\n aggs[col] = \"last\"\n self.query_builder = QueryBuilder().resample(\"30us\").agg(aggs)\n\n def setup_cache(self):\n ac = Arctic(self.CONNECTION_STRING)\n ac.delete_library(self.LIB_NAME)\n lib = ac.create_library(self.LIB_NAME)\n rng = np.random.default_rng()\n num_rows = 3000\n index = pd.date_range(pd.Timestamp(0, unit=\"us\"), freq=\"us\", periods=num_rows)\n data = dict()\n for col in self.COLS:\n data[col] = 100 * rng.random(num_rows, dtype=np.float64)\n df = pd.DataFrame(data, index=index)\n lib.write(self.SYM, df)", + "name": "resample.ResampleWide.peakmem_resample_wide", + "param_names": [], + "params": [], + "setup_cache_key": "resample:103", + "type": "peakmemory", + "unit": "bytes", + "version": "53f042192048c92d282637c1bbcee9e52dacec9086c534782de30d7ff67e77eb" + }, + "resample.ResampleWide.time_resample_wide": { + "code": "class ResampleWide:\n def time_resample_wide(self):\n self.lib.read(self.SYM, query_builder=self.query_builder)\n\n def setup(self):\n self.ac = Arctic(self.CONNECTION_STRING)\n self.lib = self.ac[self.LIB_NAME]\n aggs = dict()\n for col in self.COLS:\n aggs[col] = \"last\"\n self.query_builder = QueryBuilder().resample(\"30us\").agg(aggs)\n\n def setup_cache(self):\n ac = Arctic(self.CONNECTION_STRING)\n ac.delete_library(self.LIB_NAME)\n lib = ac.create_library(self.LIB_NAME)\n rng = np.random.default_rng()\n num_rows = 3000\n index = pd.date_range(pd.Timestamp(0, unit=\"us\"), freq=\"us\", periods=num_rows)\n data = dict()\n for col in self.COLS:\n data[col] = 100 * rng.random(num_rows, dtype=np.float64)\n df = pd.DataFrame(data, index=index)\n lib.write(self.SYM, df)", + "min_run_count": 2, + "name": "resample.ResampleWide.time_resample_wide", + "number": 5, + "param_names": [], + "params": [], + "repeat": 0, + "rounds": 2, + "sample_time": 0.01, + "setup_cache_key": "resample:103", "type": "time", "unit": "seconds", - "version": "2d10a27f3668632f382e90783829b4bb08cabb656c02754c00d5953ee42f3794", + "version": "ece714f981e8de31ee8296644624bf8f5fb895e6bf48d64a6ae2a9c50c5db7a2", "warmup_time": -1 }, "version": 2, diff --git a/python/benchmarks/bi_benchmarks.py b/python/benchmarks/bi_benchmarks.py new file mode 100644 index 0000000000..733244abef --- /dev/null +++ b/python/benchmarks/bi_benchmarks.py @@ -0,0 +1,168 @@ +""" +Copyright 2023 Man Group Operations Limited + +Use of this software is governed by the Business Source License 1.1 included in the file licenses/BSL.txt. + +As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0. +""" +import os +from pathlib import Path +from arcticdb import Arctic +from arcticdb.version_store.processing import QueryBuilder + +from .common import * + + +def get_query_groupby_city_count_all(q): + return q.groupby("City").agg({"Keyword": "count"}) + + +def get_query_groupby_city_count_isin_filter(q): + return q[q["Keyword"].isin(["bimbo", "twat", "faggot"])].groupby("City").agg({"Keyword": "count"}) + + +def get_query_groupby_city_count_filter_two_aggregations(q): + return q[q["Keyword"] == "faggot" ].groupby("City").agg({"Keyword": "count", "Number of Records" : "sum"}) + +def assert_frame_equal(pandas_df:pd.DataFrame, arctic_df:pd.DataFrame): + arctic_df.sort_index(inplace=True) + pd.testing.assert_frame_equal(pandas_df, + arctic_df, + check_column_type=False, + check_dtype=False) + + +class BIBenchmarks: + ''' + Sample test benchmark for using one opensource BI CSV source. + The logic of a test is + - download if parquet file does not exists source in .bz2 format + - convert it to parquet format + - prepare library with it containing several symbols that are constructed based on this DF + - for each query we want to benchmark do a pre-check that this query produces SAME result on Pandas and arcticDB + - run the benchmark tests + ''' + + + number = 2 + timeout = 6000 + LIB_NAME = "BI_benchmark_lib" + # We use dataframe in this file + CITY_BI_FILE = "data/CityMaxCapita_1.csv.bz2" + CITY_BI_FILE2 = "data/CityMaxCapita_1.parquet.gzip" + + #Defines how many times bigger the database is + params = [1, 10] + + def __init__(self): + self.lib_name = BIBenchmarks.LIB_NAME + self.symbol = self.lib_name + + def setup_cache(self): + + file = os.path.join(Path(__file__).resolve().parent.parent, BIBenchmarks.CITY_BI_FILE2) + if (not os.path.exists(file)) : + dfo = download_and_process_city_to_parquet(file) + dff = pd.read_parquet(file) + pd.testing.assert_frame_equal(dfo,dff) + else: + print("Parquet file exists!") + + # read data from bz.2 file + # abs_path = os.path.join(Path(__file__).resolve().parent.parent,BIBenchmarks.CITY_BI_FILE) + # self.df : pd.DataFrame = process_city(abs_path) + + self.df : pd.DataFrame = pd.read_parquet(file) + + self.ac = Arctic(f"lmdb://opensource_datasets_{self.lib_name}?map_size=20GB") + self.ac.delete_library(self.lib_name) + self.lib = self.ac.create_library(self.lib_name) + + print("The procedure is creating N times larger dataframes") + print("by concatenating original DF N times") + for num in BIBenchmarks.params: + _df = self.df.copy(deep=True) + if (num > 1): + # lets create N times bigger DF + dfcum = self.df.copy(deep=True) + for i in range(1, (BIBenchmarks.params[-1])): + dfcum = pd.concat([dfcum, self.df]) + _df = dfcum + print("DF for iterration xSize original ready: ", num) + _df.info(verbose=True,memory_usage='deep') + self.lib.write(f"{self.symbol}{num}", _df) + + print("If pandas query produces different dataframe than arctic one stop tests!") + print("This will mean query problem is there most likely") + + print("Pre-check correctness for query_groupby_city_count_all") + _df = self.df.copy(deep=True) + arctic_df = self.time_query_groupby_city_count_all(BIBenchmarks.params[0]) + _df = get_query_groupby_city_count_all(_df) + assert_frame_equal(_df, arctic_df) + + print("Pre-check correctness for query_groupby_city_count_isin_filter") + _df = self.df.copy(deep=True) + arctic_df = self.time_query_groupby_city_count_isin_filter(BIBenchmarks.params[0]) + _df = get_query_groupby_city_count_isin_filter(_df) + assert_frame_equal(_df, arctic_df) + + print("Pre-check correctness for query_groupby_city_count_filter_two_aggregations") + _df = self.df.copy(deep=True) + arctic_df = self.time_query_groupby_city_count_filter_two_aggregations(BIBenchmarks.params[0]) + _df = get_query_groupby_city_count_filter_two_aggregations(_df) + assert_frame_equal(_df, arctic_df) + + print("All pre-checks completed SUCCESSFULLY") + + del self.ac + + def setup(self, num_rows): + self.ac = Arctic(f"lmdb://opensource_datasets_{self.lib_name}?map_size=20GB") + self.lib = self.ac.get_library(self.lib_name) + + def teardown(self, num_rows): + del self.ac + + def time_query_readall(self, times_bigger): + self.lib.read(f"{self.symbol}{times_bigger}") + + def peakmem_query_readall(self, times_bigger): + self.lib.read(f"{self.symbol}{times_bigger}") + + def query_groupby_city_count_all(self, times_bigger) -> pd.DataFrame: + q = QueryBuilder() + q = get_query_groupby_city_count_all( q) + df = self.lib.read(f"{self.symbol}{times_bigger}", query_builder=q) + return df.data + + def time_query_groupby_city_count_all(self, times_bigger) -> pd.DataFrame: + return self.query_groupby_city_count_all(times_bigger) + + def peakmem_query_groupby_city_count_all(self, times_bigger) -> pd.DataFrame: + return self.query_groupby_city_count_all(times_bigger) + + def query_groupby_city_count_isin_filter(self, times_bigger) -> pd.DataFrame: + q = QueryBuilder() + q = get_query_groupby_city_count_isin_filter(q) + df = self.lib.read(f"{self.symbol}{times_bigger}", query_builder=q) + return df.data + + def time_query_groupby_city_count_isin_filter(self, times_bigger) -> pd.DataFrame: + return self.query_groupby_city_count_isin_filter(times_bigger) + + def peakmem_query_groupby_city_count_isin_filter(self, times_bigger) -> pd.DataFrame: + return self.query_groupby_city_count_isin_filter(times_bigger) + + def query_groupby_city_count_filter_two_aggregations(self, times_bigger) -> pd.DataFrame: + q = QueryBuilder() + q = get_query_groupby_city_count_filter_two_aggregations(q) + df = self.lib.read(f"{self.symbol}{times_bigger}", query_builder=q) + return df.data + + def time_query_groupby_city_count_filter_two_aggregations(self, times_bigger) -> pd.DataFrame: + return self.query_groupby_city_count_filter_two_aggregations(times_bigger) + + def peakmem_query_groupby_city_count_filter_two_aggregations(self, times_bigger): + return self.query_groupby_city_count_filter_two_aggregations(times_bigger) + diff --git a/python/benchmarks/common.py b/python/benchmarks/common.py index e538309b27..d5839f2bf0 100644 --- a/python/benchmarks/common.py +++ b/python/benchmarks/common.py @@ -5,9 +5,14 @@ As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0. """ +import urllib.parse import pandas as pd import numpy as np +import os +import bz2 +import urllib.request + def generate_pseudo_random_dataframe(n, freq="s", end_timestamp="1/1/2023"): """ @@ -68,3 +73,150 @@ def generate_benchmark_df(n, freq="min", end_timestamp="1/1/2023"): def get_prewritten_lib_name(rows): return f"prewritten_{rows}" + + +def get_filename_from_url(url): + parsed_url = urllib.parse.urlparse(url) + return os.path.basename(parsed_url.path) + + +def download_file(url: str) -> str: + """ + Downloads file from specific location and then saves + it under same name at current directory. + Returns the name of file just saved + """ + print("Downloading file from: ", url) + name = get_filename_from_url(url) + urllib.request.urlretrieve(url, name) + print("File downloaded: ", name) + return name + +def download_and_process_city_to_parquet(save_to_file:str) -> pd.DataFrame : + ''' + Downloads CSV from a location then saves it in gziped parqet + ''' + name = download_file("http://www.cwi.nl/~boncz/PublicBIbenchmark/CityMaxCapita/CityMaxCapita_1.csv.bz2") + name = decompress_bz2_file(name) + df : pd.DataFrame = read_city(name) + location = os.path.join(save_to_file) + directory = os.path.dirname(location) + if not os.path.exists(directory): + os.makedirs(directory) + print("Saving dataframe to gzip/parquet file: " ,location) + df.to_parquet(location, + compression='gzip', + index=True) + return df + +def decompress_bz2_file(name: str) -> str: + """ + Decompresses a bz2 file and saves content in + a text file having same name (without bz.2 extensions) + in current directory. + Returns the name of the saved file + """ + print("Decompressing file: ", name) + nn = name.replace(".bz2", "") + new_name = os.path.basename(nn) + + with bz2.open(name, 'rb') as input_file: + decompressed_data = input_file.read() + + with open(new_name, 'wb') as output_file: + output_file.write(decompressed_data) + + print("Decompressed file: ", new_name) + + return new_name + +def read_city(file1:str): + """ + Data source: + https://github.com/cwida/public_bi_benchmark/blob/master/benchmark/CityMaxCapita/queries/11.sql + + As CSV file contains nulls in int and float we fix those programatically + """ + columns =[ + "City/Admin", + "City/State", + "City", + "Created Date/Time", + "Date Joined", + "FF Ratio", + "Favorites", + "First Link in Tweet", + "Followers", + "Following", + "Gender", + "Influencer?", + "Keyword", + "LPF", + "Language", + "Lat", + "Listed Number", + "Long Domain", + "Long", + "Number of Records", + "Region", + "Short Domain", + "State/Country", + "State", + "Tweet Text", + "Tweets", + "Twitter Client", + "User Bio", + "User Loc", + "Username 1", + "Username" + ] + types = { + "City/Admin" : str, + "City/State" : str, + "City" : str, + "Created Date/Time" : np.float64, + "Date Joined" : np.float64, + "FF Ratio" : np.float64, + "Favorites" : np.int32, + "First Link in Tweet" : str, + "Followers" : np.int32, + "Following" : np.int32, + "Gender" : str, + "Influencer?" : pd.Int32Dtype(), + "Keyword" : str, + "LPF" : np.float64, + "Language" : str, + "Lat" : np.float64, + "Listed Number" : pd.Int32Dtype(), + "Long Domain" : str, + "Long" : np.float64, + "Number of Records" : np.int32, + "Region" : str, + "Short Domain" : str, + "State/Country" : str, + "State" : str, + "Tweet Text" : str, + "Tweets" : np.int32, + "Twitter Client" : str, + "User Bio" : str, + "User Loc" : str, + "Username 1" : str, + "Username" : str + } + + df = pd.read_csv(file1, sep="|", + header=None, + dtype=types, + names=columns, + ) + + df["Influencer?"]=df["Influencer?"].fillna(0).astype(np.int32) + df["Listed Number"]=df["Listed Number"].fillna(0).astype(np.int32) + + return df + +def process_city(fileloc:str) -> pd.DataFrame : + # read data from bz.2 file + name = decompress_bz2_file(fileloc) + df : pd.DataFrame = read_city(name) + return df