diff --git a/python/tests/integration/toolbox/test_storage_mover.py b/python/tests/integration/toolbox/test_storage_mover.py new file mode 100644 index 00000000000..b9c94c0af4f --- /dev/null +++ b/python/tests/integration/toolbox/test_storage_mover.py @@ -0,0 +1,299 @@ +import numpy as np +import pytest + +from typing import List, Tuple +from hypothesis import given, strategies as st, settings +from arcticdb.config import Defaults +from arcticdb.version_store.helper import ArcticcMemConf, get_lib_cfg +from arcticdb.toolbox.storage import SymbolVersionsPair, KeyType, get_library_tool, Key +from arcticcxx.tools import StorageMover +from pandas import DataFrame +from pandas.testing import assert_frame_equal +from arcticc.util.test import sample_dataframe + +# configure_test_logger("DEBUG") + + +def add_data(version_store): + version_store.write("symbol", sample_dataframe()) + version_store.write("pickled", {"a": 1}, pickle_on_failure=True) + version_store.snapshot("mysnap") + version_store.write("rec_norm", data={"a": np.arange(5), "b": np.arange(8), "c": None}, recursive_normalizers=True) + version_store.write("symbol", sample_dataframe()) + version_store.snapshot("mysnap2") + + +def compare_two_libs(lib1, lib2): + ver1 = lib1.list_versions() + ver2 = lib2.list_versions() + + print(ver1) + print(ver2) + + assert len(lib1.list_versions()) == len(lib2.list_versions()) + assert lib1.list_versions() == lib2.list_versions() + assert lib1.list_snapshots() == lib2.list_snapshots() + + assert_frame_equal(lib1.read("symbol", as_of=0).data, lib2.read("symbol", as_of=0).data) + assert_frame_equal(lib1.read("symbol", as_of=1).data, lib2.read("symbol", as_of=1).data) + assert_frame_equal(lib1.read("symbol", as_of="mysnap").data, lib2.read("symbol", as_of="mysnap").data) + assert_frame_equal(lib1.read("symbol", as_of="mysnap2").data, lib2.read("symbol", as_of="mysnap2").data) + + assert lib1.read("pickled").data == lib2.read("pickled").data + assert lib1.read("pickled", as_of="mysnap").data == lib2.read("pickled", as_of="mysnap").data + assert lib1.read("pickled", as_of="mysnap2").data == lib2.read("pickled", as_of="mysnap2").data + + assert lib1.read("rec_norm").data.keys() == lib2.read("rec_norm").data.keys() + assert all(lib1.read("rec_norm").data["a"] == lib2.read("rec_norm").data["a"]) + assert all(lib1.read("rec_norm").data["b"] == lib2.read("rec_norm").data["b"]) + assert lib1.read("rec_norm").data["c"] == lib2.read("rec_norm").data["c"] + assert lib1.read("rec_norm", as_of="mysnap2").data.keys() == lib2.read("rec_norm", as_of="mysnap2").data.keys() + + +def test_storage_mover_single_go(lmdb_version_store, arcticc_native_local_lib_cfg_extra): + add_data(lmdb_version_store) + arcticc = ArcticcMemConf(arcticc_native_local_lib_cfg_extra(), env=Defaults.ENV) + lib_cfg = get_lib_cfg(arcticc, Defaults.ENV, "local.extra") + lib_cfg.version.symbol_list = True + dst_lib = arcticc["local.extra"] + + s = StorageMover(lmdb_version_store._library, dst_lib._library) + s.go() + + compare_two_libs(lmdb_version_store, dst_lib) + + +def test_storage_mover_key_by_key(lmdb_version_store, arcticc_native_local_lib_cfg_extra): + add_data(lmdb_version_store) + arcticc = ArcticcMemConf(arcticc_native_local_lib_cfg_extra(), env=Defaults.ENV) + lib_cfg = get_lib_cfg(arcticc, Defaults.ENV, "local.extra") + lib_cfg.version.symbol_list = True + dst_lib = arcticc["local.extra"] + + s = StorageMover(lmdb_version_store._library, dst_lib._library) + all_keys = s.get_all_source_keys() + for key in all_keys: + s.write_keys_from_source_to_target([key], 2) + + compare_two_libs(lmdb_version_store, dst_lib) + + +def test_storage_mover_symbol_tree(arcticc_native_local_lib_cfg_extra, arcticc_native_local_lib_cfg, lib_name): + col_per_group = 5 + row_per_segment = 10 + local_lib_cfg = arcticc_native_local_lib_cfg(lib_name) + lib = local_lib_cfg.env_by_id[Defaults.ENV].lib_by_path[lib_name] + lib.version.write_options.column_group_size = col_per_group + lib.version.write_options.segment_row_size = row_per_segment + lib.version.symbol_list = True + lmdb_version_store_symbol_list = ArcticcMemConf(local_lib_cfg, Defaults.ENV)[lib_name] + + lmdb_version_store_symbol_list.write("symbol", sample_dataframe(), metadata="yolo") + lmdb_version_store_symbol_list.write("symbol", sample_dataframe(), metadata="yolo2") + lmdb_version_store_symbol_list.write("snapshot_test", 1) + lmdb_version_store_symbol_list.snapshot("my_snap") + lmdb_version_store_symbol_list.snapshot("my_snap2") + lmdb_version_store_symbol_list.snapshot("snapshot_test", 2) + lmdb_version_store_symbol_list._delete_version("snapshot_test", 0) + lmdb_version_store_symbol_list.write("pickled", {"a": 1}, metadata="cantyolo", pickle_on_failure=True) + lmdb_version_store_symbol_list.write("pickled", {"b": 1}, metadata="cantyolo2", pickle_on_failure=True) + lmdb_version_store_symbol_list.write("pickled", {"c": 1}, metadata="yoloded", pickle_on_failure=True) + lmdb_version_store_symbol_list.write( + "rec_norm", + data={"a": np.arange(1000), "b": np.arange(8000), "c": None}, + metadata="realyolo", + recursive_normalizers=True, + ) + lmdb_version_store_symbol_list.write( + "rec_norm", + data={"e": np.arange(1000), "f": np.arange(8000), "g": None}, + metadata="realyolo2", + recursive_normalizers=True, + ) + + lmdb_version_store_symbol_list.write("dup_data", np.array(["YOLO"] * 10000)) + + arcticc = ArcticcMemConf(arcticc_native_local_lib_cfg_extra(), env=Defaults.ENV) + lib_cfg = get_lib_cfg(arcticc, Defaults.ENV, "local.extra") + lib_cfg.version.symbol_list = True + dst_lib = arcticc["local.extra"] + + s = StorageMover(lmdb_version_store_symbol_list._library, dst_lib._library) + sv1 = SymbolVersionsPair("symbol", [1, 0]) + sv2 = SymbolVersionsPair("pickled", [2, 0]) + sv3 = SymbolVersionsPair("rec_norm", [1, 0]) + sv4 = SymbolVersionsPair("dup_data", [0]) + sv5 = SymbolVersionsPair("snapshot_test", ["my_snap", "my_snap2"]) + res = s.write_symbol_trees_from_source_to_target([sv1, sv2, sv3, sv4, sv5], False) + assert len(res) == 5 + for r in res: + for v in res[r]: + assert type(res[r][v]) == int + + assert len(dst_lib.list_versions()) == 8 + assert_frame_equal(lmdb_version_store_symbol_list.read("symbol").data, dst_lib.read("symbol").data) + assert_frame_equal(lmdb_version_store_symbol_list.read("symbol", 0).data, dst_lib.read("symbol", 0).data) + assert lmdb_version_store_symbol_list.read("symbol").metadata == dst_lib.read("symbol").metadata + assert lmdb_version_store_symbol_list.read("symbol", 0).metadata == dst_lib.read("symbol", 0).metadata + + assert lmdb_version_store_symbol_list.read("pickled").data == dst_lib.read("pickled").data + assert lmdb_version_store_symbol_list.read("pickled", 0).data == dst_lib.read("pickled", 0).data + assert lmdb_version_store_symbol_list.read("pickled").metadata == dst_lib.read("pickled").metadata + assert lmdb_version_store_symbol_list.read("pickled", 0).metadata == dst_lib.read("pickled", 0).metadata + + def comp_dict(d1, d2): + assert len(d1) == len(d2) + for k in d1: + if isinstance(d1[k], np.ndarray): + assert (d1[k] == d2[k]).all() + else: + assert d1[k] == d2[k] + + comp_dict(lmdb_version_store_symbol_list.read("rec_norm").data, dst_lib.read("rec_norm").data) + comp_dict(lmdb_version_store_symbol_list.read("rec_norm", 0).data, dst_lib.read("rec_norm", 0).data) + assert lmdb_version_store_symbol_list.read("rec_norm").metadata == dst_lib.read("rec_norm").metadata + assert lmdb_version_store_symbol_list.read("rec_norm", 0).metadata == dst_lib.read("rec_norm", 0).metadata + + np.testing.assert_equal(lmdb_version_store_symbol_list.read("dup_data").data, dst_lib.read("dup_data").data) + assert lmdb_version_store_symbol_list.read("dup_data").metadata == dst_lib.read("dup_data").metadata + + assert lmdb_version_store_symbol_list.read("snapshot_test", "my_snap").data, dst_lib.read("snapshot_test", 0).data + + lmdb_version_store_symbol_list.write("new_symbol", 1) + lmdb_version_store_symbol_list.snapshot("new_snap") + lmdb_version_store_symbol_list.write("new_symbol", 2) + lmdb_version_store_symbol_list.snapshot("new_snap2") + lmdb_version_store_symbol_list.write("new_symbol", 3) + lmdb_version_store_symbol_list._delete_version("new_symbol", 1) + sv6 = SymbolVersionsPair("new_symbol", [2, 0, "new_snap", "new_snap2"]) + dst_lib.write("new_symbol", 0) + + res = s.write_symbol_trees_from_source_to_target([sv6], True) + assert len(res) == 1 + assert "new_symbol" in res + assert res["new_symbol"][2] == 3 + assert res["new_symbol"][0] == 1 + assert res["new_symbol"]["new_snap"] == 1 + assert res["new_symbol"]["new_snap2"] == 2 + + assert dst_lib.read("new_symbol", 0).data == 0 + assert dst_lib.read("new_symbol", 1).data == 1 + assert dst_lib.read("new_symbol", 2).data == 2 + assert dst_lib.read("new_symbol", 3).data == 3 + + +def test_storage_mover_and_key_checker(lmdb_version_store, arcticc_native_local_lib_cfg_extra): + add_data(lmdb_version_store) + arcticc = ArcticcMemConf(arcticc_native_local_lib_cfg_extra(), env=Defaults.ENV) + lib_cfg = get_lib_cfg(arcticc, Defaults.ENV, "local.extra") + lib_cfg.version.symbol_list = True + dst_lib = arcticc["local.extra"] + + s = StorageMover(lmdb_version_store._library, dst_lib._library) + s.go() + + keys = s.get_keys_in_source_only() + assert len(keys) == 0 + + +def test_storage_mover_clone_keys_for_symbol(lmdb_version_store, arcticc_native_local_lib_cfg_extra): + add_data(lmdb_version_store) + lmdb_version_store.write("a", 1) + lmdb_version_store.write("a", 2) + lmdb_version_store.write("b", 1) + arcticc = ArcticcMemConf(arcticc_native_local_lib_cfg_extra(), env=Defaults.ENV) + lib_cfg = get_lib_cfg(arcticc, Defaults.ENV, "local.extra") + lib_cfg.version.symbol_list = True + dst_lib = arcticc["local.extra"] + + s = StorageMover(lmdb_version_store._library, dst_lib._library) + s.clone_all_keys_for_symbol("a", 1000) + assert dst_lib.read("a").data == 2 + + +@pytest.fixture() +def lib_with_gaps_and_reused_keys(version_store_factory): + lib = version_store_factory(name="source", de_duplication=True) + + lib.write("x", 0) + lib.write("x", 1) + lib.write("x", 2) + lib.snapshot("s2") + lib.write("x", DataFrame({"c": [0, 1]}, index=[0, 1])) + lib.write("x", DataFrame({"c": list(range(5))}, index=list(range(5))), prune_previous_version=True) # 2 slices + lib.write("x", 5) + lib.delete_version("x", 5) + lib.write("x", 6) + + return lib + + +@pytest.mark.parametrize("mode", ("check assumptions", "go", "no force")) +def test_correct_versions_in_destination(mode, lib_with_gaps_and_reused_keys, lmdb_version_store): + s = StorageMover(lib_with_gaps_and_reused_keys._library, lmdb_version_store._library) + if mode == "check assumptions": + check = lib_with_gaps_and_reused_keys + elif mode == "go": + s.go() + check = lmdb_version_store + else: + s.write_symbol_trees_from_source_to_target([SymbolVersionsPair("x", ["s2", 4, 6])], False) + check = lmdb_version_store + + lt = get_library_tool(check) + assert {vi["version"] for vi in check.list_versions("x")} == {2, 4, 6} + assert len(lt.find_keys(KeyType.TABLE_INDEX)) == 3 + assert [k.version_id for k in lt.find_keys(KeyType.TABLE_DATA)] == [2, 3, 4, 4, 6] + + +@settings(deadline=None) +@given(to_copy=st.permutations(["s2", 4, 6]), existing=st.booleans()) +def test_correct_versions_in_destination_force(to_copy, existing, lib_with_gaps_and_reused_keys, version_store_factory): + try: + _tmp_test_body(to_copy, existing, lib_with_gaps_and_reused_keys, version_store_factory) + except: + import traceback + + traceback.print_exc() + raise + + +def _tmp_test_body(to_copy, existing, lib_with_gaps_and_reused_keys, version_store_factory): + # mongoose_copy_data's force mode rewrite version numbers in the target + source = lib_with_gaps_and_reused_keys + target = version_store_factory(name="_unique_") + + if existing: + target.write("x", 0) + + s = StorageMover(source._library, target._library) + s.write_symbol_trees_from_source_to_target([SymbolVersionsPair("x", to_copy)], True) + + actual_vers = sorted(vi["version"] for vi in target.list_versions("x")) + print(to_copy, existing, "->", actual_vers) + + lt = get_library_tool(target) + start = 0 if existing else 2 # mover starts at the first input version if target is empty.... + n = int(existing) + len(to_copy) + assert actual_vers == list(range(start, start + n)) + assert len(lt.find_keys(KeyType.TABLE_INDEX)) == n + + source_keys: List[Key] = get_library_tool(source).find_keys(KeyType.TABLE_DATA) + expected_target: List[Key] = [] + for item in to_copy: + if item == "s2": + expected_target.append(source_keys[0]) + elif item == 4: + expected_target.extend(source_keys[1:4]) + else: + expected_target.append(source_keys[-1]) + expected_target.sort() # key=lambda k: (k.version_id, k.start_index)) + + target_keys: List[Key] = lt.find_keys(KeyType.TABLE_DATA) + target_keys.sort() + if existing: + target_keys.pop(0) + + for a, e in zip(target_keys, expected_target): + assert a.content_hash == e.content_hash + assert a.creation_ts >= source_keys[-1].creation_ts