From a9b723dcbef7491efb1777cfd747747f96cb9428 Mon Sep 17 00:00:00 2001 From: Alex Owens <73388657+alexowens90@users.noreply.github.com> Date: Wed, 20 Mar 2024 16:43:29 +0000 Subject: [PATCH] Bugfix 1209: Consistently return metadata from write, append, update, write_metadata, and batch versions thereof (#1444) Fixes #1209 --- python/arcticdb/version_store/_store.py | 54 ++++--------- python/arcticdb/version_store/library.py | 5 ++ .../version_store/test_basic_version_store.py | 4 +- .../version_store/test_metadata_support.py | 77 +++++++++++++++++++ 4 files changed, 101 insertions(+), 39 deletions(-) diff --git a/python/arcticdb/version_store/_store.py b/python/arcticdb/version_store/_store.py index d81c42885d..fce6c0b278 100644 --- a/python/arcticdb/version_store/_store.py +++ b/python/arcticdb/version_store/_store.py @@ -557,15 +557,7 @@ def write( symbol, item, norm_meta, udm, prune_previous_version, sparsify_floats, validate_index ) - return VersionedItem( - symbol=vit.symbol, - library=self._library.library_path, - version=vit.version, - metadata=metadata, - data=None, - host=self.env, - timestamp=vit.timestamp - ) + return self._convert_thin_cxx_item_to_python(vit, metadata) def _resolve_dynamic_strings(self, kwargs): proto_cfg = self._lib_cfg.lib_desc.version.write_options @@ -677,15 +669,7 @@ def append( vit = self.version_store.append( symbol, item, norm_meta, udm, write_if_missing, prune_previous_version, validate_index ) - return VersionedItem( - symbol=vit.symbol, - library=self._library.library_path, - version=vit.version, - metadata=metadata, - data=None, - host=self.env, - timestamp=vit.timestamp - ) + return self._convert_thin_cxx_item_to_python(vit, metadata) def update( self, @@ -776,15 +760,7 @@ def update( vit = self.version_store.update( symbol, update_query, item, norm_meta, udm, upsert, dynamic_schema, prune_previous_version ) - return VersionedItem( - symbol=vit.symbol, - library=self._library.library_path, - version=vit.version, - metadata=metadata, - data=None, - host=self.env, - timestamp=vit.timestamp - ) + return self._convert_thin_cxx_item_to_python(vit, metadata) def create_column_stats( self, symbol: str, column_stats: Dict[str, Set[str]], as_of: Optional[VersionQueryInput] = None @@ -1110,16 +1086,16 @@ def batch_read_metadata_multi( return results_dict - def _convert_thin_cxx_item_to_python(self, v) -> VersionedItem: + def _convert_thin_cxx_item_to_python(self, cxx_versioned_item, metadata) -> VersionedItem: """Convert a cxx versioned item that does not contain data or metadata to a Python equivalent.""" return VersionedItem( - symbol=v.symbol, + symbol=cxx_versioned_item.symbol, library=self._library.library_path, data=None, - version=v.version, - metadata=None, + version=cxx_versioned_item.version, + metadata=metadata, host=self.env, - timestamp=v.timestamp + timestamp=cxx_versioned_item.timestamp ) def batch_write( @@ -1238,11 +1214,13 @@ def _batch_write_internal( symbols, items, norm_metas, udms, prune_previous_version, validate_index, throw_on_error ) write_results = [] + if metadata_vector is not None: + metadata_itr = iter(metadata_vector) for result in cxx_versioned_items: if isinstance(result, DataError): write_results.append(result) else: - write_results.append(self._convert_thin_cxx_item_to_python(result)) + write_results.append(self._convert_thin_cxx_item_to_python(result, next(metadata_itr))) return write_results def _batch_write_metadata_to_versioned_items( @@ -1257,11 +1235,11 @@ def _batch_write_metadata_to_versioned_items( symbols, normalized_meta, prune_previous_version, throw_on_error ) write_metadata_results = [] - for result in cxx_versioned_items: + for idx, result in enumerate(cxx_versioned_items): if isinstance(result, DataError): write_metadata_results.append(result) else: - write_metadata_results.append(self._convert_thin_cxx_item_to_python(result)) + write_metadata_results.append(self._convert_thin_cxx_item_to_python(result, metadata_vector[idx])) return write_metadata_results def batch_write_metadata( @@ -1407,11 +1385,13 @@ def _batch_append_to_versioned_items( throw_on_error, ) append_results = [] + if metadata_vector is not None: + metadata_itr = iter(metadata_vector) for result in cxx_versioned_items: if isinstance(result, DataError): append_results.append(result) else: - append_results.append(self._convert_thin_cxx_item_to_python(result)) + append_results.append(self._convert_thin_cxx_item_to_python(result, next(metadata_itr))) return append_results def batch_restore_version( @@ -2686,7 +2666,7 @@ def write_metadata( ) udm = normalize_metadata(metadata) if metadata is not None else None v = self.version_store.write_metadata(symbol, udm, prune_previous_version) - return self._convert_thin_cxx_item_to_python(v) + return self._convert_thin_cxx_item_to_python(v, metadata) def is_symbol_fragmented(self, symbol: str, segment_size: Optional[int] = None) -> bool: """ diff --git a/python/arcticdb/version_store/library.py b/python/arcticdb/version_store/library.py index a50d0d8fd4..b13b5dc3ef 100644 --- a/python/arcticdb/version_store/library.py +++ b/python/arcticdb/version_store/library.py @@ -809,6 +809,11 @@ def update( prune_previous_versions Removes previous (non-snapshotted) versions from the database when True. + Returns + ------- + VersionedItem + Structure containing metadata and version number of the written symbol in the store. + Examples -------- diff --git a/python/tests/integration/arcticdb/version_store/test_basic_version_store.py b/python/tests/integration/arcticdb/version_store/test_basic_version_store.py index e524c8b773..d7d2a3dadc 100644 --- a/python/tests/integration/arcticdb/version_store/test_basic_version_store.py +++ b/python/tests/integration/arcticdb/version_store/test_basic_version_store.py @@ -1342,7 +1342,7 @@ def test_batch_write(basic_store_tombstone_and_sync_passive): vitem = lmdb_version_store.read(sym) sequential_results[vitem.symbol] = vitem lmdb_version_store.batch_write( - list(multi_data.keys()), list(multi_data.values()), metadata_vector=(metadata[sym] for sym in multi_data) + list(multi_data.keys()), list(multi_data.values()), metadata_vector=list(metadata.values()) ) batch_results = lmdb_version_store.batch_read(list(multi_data.keys())) assert len(batch_results) == len(sequential_results) @@ -2122,7 +2122,7 @@ def test_batch_append(basic_store_tombstone, three_col_df): metadata = {"sym1": {"key1": "val1"}, "sym2": None, "sym3": None} lmdb_version_store.batch_write( - list(multi_data.keys()), list(multi_data.values()), metadata_vector=(metadata[sym] for sym in multi_data) + list(multi_data.keys()), list(multi_data.values()), metadata_vector=list(metadata.values()) ) multi_append = {"sym1": three_col_df(10), "sym2": three_col_df(11), "sym3": three_col_df(12)} diff --git a/python/tests/integration/arcticdb/version_store/test_metadata_support.py b/python/tests/integration/arcticdb/version_store/test_metadata_support.py index cb9457ff30..83f3215a8d 100644 --- a/python/tests/integration/arcticdb/version_store/test_metadata_support.py +++ b/python/tests/integration/arcticdb/version_store/test_metadata_support.py @@ -6,6 +6,7 @@ As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0. """ import numpy as np +import pandas as pd from pandas import DataFrame, Timestamp import pytest @@ -157,3 +158,79 @@ def test_write_metadata_preexisting_symbol_no_pruning(basic_store, sym): assert lib.read(sym).data == 1 assert lib.read(sym, as_of=0).metadata == metadata_v0 assert lib.read(sym, as_of=0).data == 1 + + +def timestamp_indexed_df(): + return pd.DataFrame({"col": [0]}, index=[pd.Timestamp("2024-01-01")]) + + +def test_rv_contains_metadata_write(lmdb_version_store_v1): + lib = lmdb_version_store_v1 + sym = "test_write_rv_contains_metadata" + assert lib.write(sym, 1).metadata is None + metadata = {"some": "metadata"} + assert lib.write(sym, 1, metadata).metadata == metadata + + +def test_rv_contains_metadata_append(lmdb_version_store_v1): + lib = lmdb_version_store_v1 + sym = "test_rv_contains_metadata_append" + assert lib.append(sym, timestamp_indexed_df(), write_if_missing=True).metadata is None + metadata = {"some": "metadata"} + assert lib.append(sym, timestamp_indexed_df(), metadata, write_if_missing=True).metadata == metadata + + +def test_rv_contains_metadata_update(lmdb_version_store_v1): + lib = lmdb_version_store_v1 + sym = "test_rv_contains_metadata_update" + assert lib.update(sym, timestamp_indexed_df(), upsert=True).metadata is None + metadata = {"some": "metadata"} + assert lib.update(sym, timestamp_indexed_df(), metadata, upsert=True).metadata == metadata + + +def test_rv_contains_metadata_write_metadata(lmdb_version_store_v1): + lib = lmdb_version_store_v1 + sym = "test_rv_contains_metadata_write_metadata" + metadata = {"some": "metadata"} + assert lib.write_metadata(sym, metadata).metadata == metadata + + +def test_rv_contains_metadata_batch_write(lmdb_version_store_v1): + lib = lmdb_version_store_v1 + sym_0 = "test_rv_contains_metadata_batch_write_0" + sym_1 = "test_rv_contains_metadata_batch_write_1" + sym_2 = "test_rv_contains_metadata_batch_write_2" + vits = lib.batch_write([sym_0, sym_1, sym_2], 3 * [1]) + assert all(vit.metadata is None for vit in vits) + metadata_0 = {"some": "metadata_0"} + metadata_2 = {"some": "metadata_2"} + vits = lib.batch_write([sym_0, sym_1, sym_2], 3 * [1], [metadata_0, None, metadata_2]) + assert vits[0].metadata == metadata_0 + assert vits[1].metadata is None + assert vits[2].metadata == metadata_2 + + +def test_rv_contains_metadata_batch_append(lmdb_version_store_v1): + lib = lmdb_version_store_v1 + sym_0 = "test_rv_contains_metadata_batch_append_0" + sym_1 = "test_rv_contains_metadata_batch_append_1" + sym_2 = "test_rv_contains_metadata_batch_append_2" + vits = lib.batch_append([sym_0, sym_1, sym_2], 3 * [timestamp_indexed_df()], write_if_missing=True) + assert all(vit.metadata is None for vit in vits) + metadata_0 = {"some": "metadata_0"} + metadata_2 = {"some": "metadata_2"} + vits = lib.batch_append([sym_0, sym_1, sym_2], 3 * [timestamp_indexed_df()], [metadata_0, None, metadata_2], write_if_missing=True) + assert vits[0].metadata == metadata_0 + assert vits[1].metadata is None + assert vits[2].metadata == metadata_2 + + +def test_rv_contains_metadata_batch_write_metadata(lmdb_version_store_v1): + lib = lmdb_version_store_v1 + sym_0 = "test_rv_contains_metadata_batch_write_metadata_0" + sym_1 = "test_rv_contains_metadata_batch_write_metadata_1" + metadata_0 = {"some": "metadata_0"} + metadata_1 = {"some": "metadata_1"} + vits = lib.batch_write_metadata([sym_0, sym_1], [metadata_0, metadata_1]) + assert vits[0].metadata == metadata_0 + assert vits[1].metadata == metadata_1