Skip to content

Commit

Permalink
Bugfix 1209: Consistently return metadata from write, append, update,…
Browse files Browse the repository at this point in the history
… write_metadata, and batch versions thereof (#1444)

Fixes #1209
  • Loading branch information
alexowens90 authored Mar 20, 2024
1 parent 44713b5 commit a9b723d
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 39 deletions.
54 changes: 17 additions & 37 deletions python/arcticdb/version_store/_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -557,15 +557,7 @@ def write(
symbol, item, norm_meta, udm, prune_previous_version, sparsify_floats, validate_index
)

return VersionedItem(
symbol=vit.symbol,
library=self._library.library_path,
version=vit.version,
metadata=metadata,
data=None,
host=self.env,
timestamp=vit.timestamp
)
return self._convert_thin_cxx_item_to_python(vit, metadata)

def _resolve_dynamic_strings(self, kwargs):
proto_cfg = self._lib_cfg.lib_desc.version.write_options
Expand Down Expand Up @@ -677,15 +669,7 @@ def append(
vit = self.version_store.append(
symbol, item, norm_meta, udm, write_if_missing, prune_previous_version, validate_index
)
return VersionedItem(
symbol=vit.symbol,
library=self._library.library_path,
version=vit.version,
metadata=metadata,
data=None,
host=self.env,
timestamp=vit.timestamp
)
return self._convert_thin_cxx_item_to_python(vit, metadata)

def update(
self,
Expand Down Expand Up @@ -776,15 +760,7 @@ def update(
vit = self.version_store.update(
symbol, update_query, item, norm_meta, udm, upsert, dynamic_schema, prune_previous_version
)
return VersionedItem(
symbol=vit.symbol,
library=self._library.library_path,
version=vit.version,
metadata=metadata,
data=None,
host=self.env,
timestamp=vit.timestamp
)
return self._convert_thin_cxx_item_to_python(vit, metadata)

def create_column_stats(
self, symbol: str, column_stats: Dict[str, Set[str]], as_of: Optional[VersionQueryInput] = None
Expand Down Expand Up @@ -1110,16 +1086,16 @@ def batch_read_metadata_multi(

return results_dict

def _convert_thin_cxx_item_to_python(self, v) -> VersionedItem:
def _convert_thin_cxx_item_to_python(self, cxx_versioned_item, metadata) -> VersionedItem:
"""Convert a cxx versioned item that does not contain data or metadata to a Python equivalent."""
return VersionedItem(
symbol=v.symbol,
symbol=cxx_versioned_item.symbol,
library=self._library.library_path,
data=None,
version=v.version,
metadata=None,
version=cxx_versioned_item.version,
metadata=metadata,
host=self.env,
timestamp=v.timestamp
timestamp=cxx_versioned_item.timestamp
)

def batch_write(
Expand Down Expand Up @@ -1238,11 +1214,13 @@ def _batch_write_internal(
symbols, items, norm_metas, udms, prune_previous_version, validate_index, throw_on_error
)
write_results = []
if metadata_vector is not None:
metadata_itr = iter(metadata_vector)
for result in cxx_versioned_items:
if isinstance(result, DataError):
write_results.append(result)
else:
write_results.append(self._convert_thin_cxx_item_to_python(result))
write_results.append(self._convert_thin_cxx_item_to_python(result, next(metadata_itr)))
return write_results

def _batch_write_metadata_to_versioned_items(
Expand All @@ -1257,11 +1235,11 @@ def _batch_write_metadata_to_versioned_items(
symbols, normalized_meta, prune_previous_version, throw_on_error
)
write_metadata_results = []
for result in cxx_versioned_items:
for idx, result in enumerate(cxx_versioned_items):
if isinstance(result, DataError):
write_metadata_results.append(result)
else:
write_metadata_results.append(self._convert_thin_cxx_item_to_python(result))
write_metadata_results.append(self._convert_thin_cxx_item_to_python(result, metadata_vector[idx]))
return write_metadata_results

def batch_write_metadata(
Expand Down Expand Up @@ -1407,11 +1385,13 @@ def _batch_append_to_versioned_items(
throw_on_error,
)
append_results = []
if metadata_vector is not None:
metadata_itr = iter(metadata_vector)
for result in cxx_versioned_items:
if isinstance(result, DataError):
append_results.append(result)
else:
append_results.append(self._convert_thin_cxx_item_to_python(result))
append_results.append(self._convert_thin_cxx_item_to_python(result, next(metadata_itr)))
return append_results

def batch_restore_version(
Expand Down Expand Up @@ -2686,7 +2666,7 @@ def write_metadata(
)
udm = normalize_metadata(metadata) if metadata is not None else None
v = self.version_store.write_metadata(symbol, udm, prune_previous_version)
return self._convert_thin_cxx_item_to_python(v)
return self._convert_thin_cxx_item_to_python(v, metadata)

def is_symbol_fragmented(self, symbol: str, segment_size: Optional[int] = None) -> bool:
"""
Expand Down
5 changes: 5 additions & 0 deletions python/arcticdb/version_store/library.py
Original file line number Diff line number Diff line change
Expand Up @@ -809,6 +809,11 @@ def update(
prune_previous_versions
Removes previous (non-snapshotted) versions from the database when True.
Returns
-------
VersionedItem
Structure containing metadata and version number of the written symbol in the store.
Examples
--------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1342,7 +1342,7 @@ def test_batch_write(basic_store_tombstone_and_sync_passive):
vitem = lmdb_version_store.read(sym)
sequential_results[vitem.symbol] = vitem
lmdb_version_store.batch_write(
list(multi_data.keys()), list(multi_data.values()), metadata_vector=(metadata[sym] for sym in multi_data)
list(multi_data.keys()), list(multi_data.values()), metadata_vector=list(metadata.values())
)
batch_results = lmdb_version_store.batch_read(list(multi_data.keys()))
assert len(batch_results) == len(sequential_results)
Expand Down Expand Up @@ -2122,7 +2122,7 @@ def test_batch_append(basic_store_tombstone, three_col_df):
metadata = {"sym1": {"key1": "val1"}, "sym2": None, "sym3": None}

lmdb_version_store.batch_write(
list(multi_data.keys()), list(multi_data.values()), metadata_vector=(metadata[sym] for sym in multi_data)
list(multi_data.keys()), list(multi_data.values()), metadata_vector=list(metadata.values())
)

multi_append = {"sym1": three_col_df(10), "sym2": three_col_df(11), "sym3": three_col_df(12)}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0.
"""
import numpy as np
import pandas as pd
from pandas import DataFrame, Timestamp
import pytest

Expand Down Expand Up @@ -157,3 +158,79 @@ def test_write_metadata_preexisting_symbol_no_pruning(basic_store, sym):
assert lib.read(sym).data == 1
assert lib.read(sym, as_of=0).metadata == metadata_v0
assert lib.read(sym, as_of=0).data == 1


def timestamp_indexed_df():
return pd.DataFrame({"col": [0]}, index=[pd.Timestamp("2024-01-01")])


def test_rv_contains_metadata_write(lmdb_version_store_v1):
lib = lmdb_version_store_v1
sym = "test_write_rv_contains_metadata"
assert lib.write(sym, 1).metadata is None
metadata = {"some": "metadata"}
assert lib.write(sym, 1, metadata).metadata == metadata


def test_rv_contains_metadata_append(lmdb_version_store_v1):
lib = lmdb_version_store_v1
sym = "test_rv_contains_metadata_append"
assert lib.append(sym, timestamp_indexed_df(), write_if_missing=True).metadata is None
metadata = {"some": "metadata"}
assert lib.append(sym, timestamp_indexed_df(), metadata, write_if_missing=True).metadata == metadata


def test_rv_contains_metadata_update(lmdb_version_store_v1):
lib = lmdb_version_store_v1
sym = "test_rv_contains_metadata_update"
assert lib.update(sym, timestamp_indexed_df(), upsert=True).metadata is None
metadata = {"some": "metadata"}
assert lib.update(sym, timestamp_indexed_df(), metadata, upsert=True).metadata == metadata


def test_rv_contains_metadata_write_metadata(lmdb_version_store_v1):
lib = lmdb_version_store_v1
sym = "test_rv_contains_metadata_write_metadata"
metadata = {"some": "metadata"}
assert lib.write_metadata(sym, metadata).metadata == metadata


def test_rv_contains_metadata_batch_write(lmdb_version_store_v1):
lib = lmdb_version_store_v1
sym_0 = "test_rv_contains_metadata_batch_write_0"
sym_1 = "test_rv_contains_metadata_batch_write_1"
sym_2 = "test_rv_contains_metadata_batch_write_2"
vits = lib.batch_write([sym_0, sym_1, sym_2], 3 * [1])
assert all(vit.metadata is None for vit in vits)
metadata_0 = {"some": "metadata_0"}
metadata_2 = {"some": "metadata_2"}
vits = lib.batch_write([sym_0, sym_1, sym_2], 3 * [1], [metadata_0, None, metadata_2])
assert vits[0].metadata == metadata_0
assert vits[1].metadata is None
assert vits[2].metadata == metadata_2


def test_rv_contains_metadata_batch_append(lmdb_version_store_v1):
lib = lmdb_version_store_v1
sym_0 = "test_rv_contains_metadata_batch_append_0"
sym_1 = "test_rv_contains_metadata_batch_append_1"
sym_2 = "test_rv_contains_metadata_batch_append_2"
vits = lib.batch_append([sym_0, sym_1, sym_2], 3 * [timestamp_indexed_df()], write_if_missing=True)
assert all(vit.metadata is None for vit in vits)
metadata_0 = {"some": "metadata_0"}
metadata_2 = {"some": "metadata_2"}
vits = lib.batch_append([sym_0, sym_1, sym_2], 3 * [timestamp_indexed_df()], [metadata_0, None, metadata_2], write_if_missing=True)
assert vits[0].metadata == metadata_0
assert vits[1].metadata is None
assert vits[2].metadata == metadata_2


def test_rv_contains_metadata_batch_write_metadata(lmdb_version_store_v1):
lib = lmdb_version_store_v1
sym_0 = "test_rv_contains_metadata_batch_write_metadata_0"
sym_1 = "test_rv_contains_metadata_batch_write_metadata_1"
metadata_0 = {"some": "metadata_0"}
metadata_1 = {"some": "metadata_1"}
vits = lib.batch_write_metadata([sym_0, sym_1], [metadata_0, metadata_1])
assert vits[0].metadata == metadata_0
assert vits[1].metadata == metadata_1

0 comments on commit a9b723d

Please sign in to comment.