Skip to content

Commit

Permalink
Feature flag the empty type using library config
Browse files Browse the repository at this point in the history
  • Loading branch information
Vasil Pashov committed Apr 10, 2024
1 parent e6c4f28 commit 2effe0c
Show file tree
Hide file tree
Showing 8 changed files with 88 additions and 66 deletions.
2 changes: 1 addition & 1 deletion cpp/arcticdb/python/python_to_tensor_frame.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ std::shared_ptr<InputTensorFrame> py_ndf_to_frame(
// Currently the python layers assign RowRange index to both empty dataframes and dataframes wich do not specify
// index explicitly. Thus we handle this case after all columns are read so that we know how many rows are there.
if (idx_names.empty()) {
if (res->num_rows > 0) {
if (!empty_types || res->num_rows > 0) {
res->index = stream::RowCountIndex();
res->desc.set_index_type(IndexDescriptor::ROWCOUNT);
} else {
Expand Down
8 changes: 5 additions & 3 deletions cpp/arcticdb/version/local_versioned_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -587,7 +587,8 @@ VersionedItem LocalVersionedEngine::update_internal(
query,
frame,
get_write_options(),
dynamic_schema);
dynamic_schema,
cfg().write_options().empty_types());
write_version_and_prune_previous(
prune_previous_versions, versioned_item.key_, update_info.previous_index_key_);
return versioned_item;
Expand Down Expand Up @@ -1398,7 +1399,8 @@ VersionedItem LocalVersionedEngine::append_internal(
update_info,
frame,
get_write_options(),
validate_index);
validate_index,
cfg().write_options().empty_types());
write_version_and_prune_previous(
prune_previous_versions, versioned_item.key_, update_info.previous_index_key_);
return versioned_item;
Expand Down Expand Up @@ -1446,7 +1448,7 @@ std::vector<std::variant<VersionedItem, DataError>> LocalVersionedEngine::batch_
auto index_key_fut = folly::Future<AtomKey>::makeEmpty();
auto write_options = get_write_options();
if (update_info.previous_index_key_.has_value()) {
index_key_fut = async_append_impl(store(), update_info, frame, write_options, validate_index);
index_key_fut = async_append_impl(store(), update_info, frame, write_options, validate_index, cfg().write_options().empty_types());
} else {
missing_data::check<ErrorCode::E_NO_SUCH_VERSION>(
upsert,
Expand Down
51 changes: 36 additions & 15 deletions cpp/arcticdb/version/schema_checks.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,29 +42,49 @@ inline IndexDescriptor::Type get_common_index_type(const IndexDescriptor::Type&
return IndexDescriptor::UNKNOWN;
}

inline void check_normalization_index_match(NormalizationOperation operation,
const StreamDescriptor &old_descriptor,
const pipelines::InputTensorFrame &frame) {
inline void check_normalization_index_match(
NormalizationOperation operation,
const StreamDescriptor& old_descriptor,
const pipelines::InputTensorFrame& frame,
bool empty_types
) {
const IndexDescriptor::Type old_idx_kind = old_descriptor.index().type();
const IndexDescriptor::Type new_idx_kind = frame.desc.index().type();
if (operation == UPDATE) {
const bool new_is_timeseries = std::holds_alternative<TimeseriesIndex>(frame.index);
util::check_rte(
(old_idx_kind == IndexDescriptor::TIMESTAMP || old_idx_kind == IndexDescriptor::EMPTY) && new_is_timeseries,
"Update will not work as expected with a non-timeseries index");
"Update will not work as expected with a non-timeseries index"
);
} else {
const IndexDescriptor::Type common_index_type = get_common_index_type(old_idx_kind, new_idx_kind);
normalization::check<ErrorCode::E_INCOMPATIBLE_INDEX>(
common_index_type != IndexDescriptor::UNKNOWN,
"Cannot append {} index to {} index",
index_type_to_str(new_idx_kind),
index_type_to_str(old_idx_kind)
);
if (empty_types) {
normalization::check<ErrorCode::E_INCOMPATIBLE_INDEX>(
common_index_type != IndexDescriptor::UNKNOWN,
"Cannot append {} index to {} index",
index_type_to_str(new_idx_kind),
index_type_to_str(old_idx_kind)
);
} else {
normalization::check<ErrorCode::E_INCOMPATIBLE_INDEX>(
common_index_type != IndexDescriptor::UNKNOWN ||
(old_idx_kind == IndexDescriptor::TIMESTAMP && new_idx_kind == IndexDescriptor::ROWCOUNT),
"Cannot append {} index to {} index",
index_type_to_str(new_idx_kind),
index_type_to_str(old_idx_kind)
);
}
}
}

inline bool columns_match(const StreamDescriptor& df_in_store_descriptor, const StreamDescriptor& new_df_descriptor) {
const int right_fields_offset = df_in_store_descriptor.index().type() == IndexDescriptor::EMPTY ? new_df_descriptor.index().field_count() : 0;
inline bool columns_match(
const StreamDescriptor& df_in_store_descriptor,
const StreamDescriptor& new_df_descriptor,
bool empty_types
) {
const int right_fields_offset = (empty_types && df_in_store_descriptor.index().type() == IndexDescriptor::EMPTY)
? new_df_descriptor.index().field_count()
: 0;
// The empty index is compatible with all other index types. Differences in the index fields in this case is
// allowed. The index fields are always the first in the list.
if (df_in_store_descriptor.fields().size() + right_fields_offset != new_df_descriptor.fields().size()) {
Expand All @@ -90,16 +110,17 @@ inline void fix_descriptor_mismatch_or_throw(
NormalizationOperation operation,
bool dynamic_schema,
const pipelines::index::IndexSegmentReader &existing_isr,
const pipelines::InputTensorFrame &new_frame) {
const pipelines::InputTensorFrame &new_frame,
bool empty_types) {
const auto &old_sd = existing_isr.tsd().as_stream_descriptor();
check_normalization_index_match(operation, old_sd, new_frame);
check_normalization_index_match(operation, old_sd, new_frame, empty_types);

if (dynamic_schema)
return; // TODO: dynamic schema may need some of the checks as below

fix_normalization_or_throw(operation == APPEND, existing_isr, new_frame);

if (!columns_match(old_sd, new_frame.desc)) {
if (!columns_match(old_sd, new_frame.desc, empty_types)) {
throw StreamDescriptorMismatch(
"The columns (names and types) in the argument are not identical to that of the existing version",
StreamDescriptor{old_sd},
Expand Down
16 changes: 10 additions & 6 deletions cpp/arcticdb/version/version_core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ folly::Future<AtomKey> async_append_impl(
const UpdateInfo& update_info,
const std::shared_ptr<InputTensorFrame>& frame,
const WriteOptions& options,
bool validate_index) {
bool validate_index,
bool empty_types) {

util::check(update_info.previous_index_key_.has_value(), "Cannot append as there is no previous index key to append to");
const StreamId stream_id = frame->desc.id();
Expand All @@ -147,7 +148,7 @@ folly::Future<AtomKey> async_append_impl(
auto row_offset = index_segment_reader.tsd().proto().total_rows();
util::check_rte(!index_segment_reader.is_pickled(), "Cannot append to pickled data");
frame->set_offset(static_cast<ssize_t>(row_offset));
fix_descriptor_mismatch_or_throw(APPEND, options.dynamic_schema, index_segment_reader, *frame);
fix_descriptor_mismatch_or_throw(APPEND, options.dynamic_schema, index_segment_reader, *frame, empty_types);
if (validate_index) {
sorted_data_check_append(*frame, index_segment_reader);
}
Expand All @@ -162,14 +163,16 @@ VersionedItem append_impl(
const UpdateInfo& update_info,
const std::shared_ptr<InputTensorFrame>& frame,
const WriteOptions& options,
bool validate_index) {
bool validate_index,
bool empty_types) {

ARCTICDB_SUBSAMPLE_DEFAULT(WaitForWriteCompletion)
auto version_key_fut = async_append_impl(store,
update_info,
frame,
options,
validate_index);
validate_index,
empty_types);
auto version_key = std::move(version_key_fut).get();
auto versioned_item = VersionedItem(to_atom(std::move(version_key)));
ARCTICDB_DEBUG(log::version(), "write_dataframe_impl stream_id: {} , version_id: {}", versioned_item.symbol(), update_info.next_version_id_);
Expand Down Expand Up @@ -323,7 +326,8 @@ VersionedItem update_impl(
const UpdateQuery& query,
const std::shared_ptr<InputTensorFrame>& frame,
const WriteOptions&& options,
bool dynamic_schema) {
bool dynamic_schema,
bool empty_types) {
util::check(update_info.previous_index_key_.has_value(), "Cannot update as there is no previous index key to update into");
const StreamId stream_id = frame->desc.id();
ARCTICDB_DEBUG(log::version(), "Update versioned dataframe for stream_id: {} , version_id = {}", stream_id, update_info.previous_index_key_->version_id());
Expand All @@ -337,7 +341,7 @@ VersionedItem update_impl(
sorted_data_check_update(*frame, index_segment_reader);
bool bucketize_dynamic = index_segment_reader.bucketize_dynamic();
(void)check_and_mark_slices(index_segment_reader, dynamic_schema, false, std::nullopt, bucketize_dynamic);
fix_descriptor_mismatch_or_throw(UPDATE, dynamic_schema, index_segment_reader, *frame);
fix_descriptor_mismatch_or_throw(UPDATE, dynamic_schema, index_segment_reader, *frame, empty_types);

std::vector<FilterQuery<index::IndexSegmentReader>> queries =
build_update_query_filters<index::IndexSegmentReader>(query.row_filter, frame->index, frame->index_range, dynamic_schema, index_segment_reader.bucketize_dynamic());
Expand Down
9 changes: 6 additions & 3 deletions cpp/arcticdb/version/version_core.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,22 +54,25 @@ folly::Future<AtomKey> async_append_impl(
const UpdateInfo& update_info,
const std::shared_ptr<InputTensorFrame>& frame,
const WriteOptions& options,
bool validate_index);
bool validate_index,
bool empty_types);

VersionedItem append_impl(
const std::shared_ptr<Store>& store,
const UpdateInfo& update_info,
const std::shared_ptr<InputTensorFrame>& frame,
const WriteOptions& options,
bool validate_index);
bool validate_index,
bool empty_types);

VersionedItem update_impl(
const std::shared_ptr<Store>& store,
const UpdateInfo& update_info,
const UpdateQuery & query,
const std::shared_ptr<InputTensorFrame>& frame,
const WriteOptions&& options,
bool dynamic_schema);
bool dynamic_schema,
bool empty_types);

VersionedItem delete_range_impl(
const std::shared_ptr<Store>& store,
Expand Down
31 changes: 16 additions & 15 deletions python/arcticdb/version_store/_normalization.py
Original file line number Diff line number Diff line change
Expand Up @@ -370,9 +370,9 @@ def _denormalize_single_index(item, norm_meta):
name = norm_meta.index.name if norm_meta.index.name else None
return RangeIndex(start=norm_meta.index.start, stop=stop, step=norm_meta.index.step, name=name)
else:
return Index([])
return None
else:
return Index([])
return RangeIndex(start=0, stop=0, step=1)
# this means that the index is not a datetime index and it's been represented as a regular field in the stream
item.index_columns.append(item.names.pop(0))

Expand Down Expand Up @@ -523,9 +523,10 @@ def denormalize(self, item, norm_meta):


class _PandasNormalizer(Normalizer):
def _index_to_records(self, df, pd_norm, dynamic_strings, string_max_len):
def _index_to_records(self, df, pd_norm, dynamic_strings, string_max_len, empty_types):
index = df.index
if len(index) == 0 and len(df.select_dtypes(include="category").columns) == 0:
empty_df = len(index) == 0
if empty_df and empty_types:
index_norm = pd_norm.index
index_norm.is_physically_stored = False
index = Index([])
Expand All @@ -550,12 +551,10 @@ def _index_to_records(self, df, pd_norm, dynamic_strings, string_max_len):
df.reset_index(fields, inplace=True)
index = df.index
else:
is_not_range_index = not isinstance(index, RangeIndex)
df_has_rows = not(len(index) == 0 and len(df.select_dtypes(include="category").columns) == 0)
index_norm = pd_norm.index
index_norm.is_physically_stored = is_not_range_index and df_has_rows
if not df_has_rows:
index = Index([])
index_norm.is_physically_stored = not isinstance(index, RangeIndex) and not empty_df
if empty_df:
index = DatetimeIndex([])

return _normalize_single_index(index, list(index.names), index_norm, dynamic_strings, string_max_len)

Expand Down Expand Up @@ -587,12 +586,13 @@ class SeriesNormalizer(_PandasNormalizer):
def __init__(self):
self._df_norm = DataFrameNormalizer()

def normalize(self, item, string_max_len=None, dynamic_strings=False, coerce_columns=None, **kwargs):
def normalize(self, item, string_max_len=None, dynamic_strings=False, coerce_columns=None, empty_types=False, **kwargs):
df, norm = self._df_norm.normalize(
item.to_frame(),
dynamic_strings=dynamic_strings,
string_max_len=string_max_len,
coerce_columns=coerce_columns,
empty_types=empty_types
)
norm.series.CopyFrom(norm.df)
if item.name:
Expand Down Expand Up @@ -843,7 +843,7 @@ def _denormalize_multi_index(df: pd.DataFrame, norm_meta: NormalizationMetadata.

return df

def normalize(self, item, string_max_len=None, dynamic_strings=False, coerce_columns=None, **kwargs):
def normalize(self, item, string_max_len=None, dynamic_strings=False, coerce_columns=None, empty_types=False, **kwargs):
# type: (DataFrame, Optional[int])->NormalizedInput
norm_meta = NormalizationMetadata()
norm_meta.df.common.mark = True
Expand All @@ -859,7 +859,7 @@ def normalize(self, item, string_max_len=None, dynamic_strings=False, coerce_col
raise ArcticDbNotYetImplemented("MultiIndex column are not supported yet")

index_names, ix_vals = self._index_to_records(
item, norm_meta.df.common, dynamic_strings, string_max_len=string_max_len
item, norm_meta.df.common, dynamic_strings, string_max_len=string_max_len, empty_types=empty_types
)
# The first branch of this if is faster, but does not work with null/duplicated column names
if item.columns.is_unique and not item.columns.hasnans:
Expand Down Expand Up @@ -1173,7 +1173,7 @@ def __init__(self, fallback_normalizer=None, use_norm_failure_handler_known_type
self.msg_pack_denorm = MsgPackNormalizer() # must exist for deserialization
self.fallback_normalizer = fallback_normalizer

def _normalize(self, item, string_max_len=None, dynamic_strings=False, coerce_columns=None, **kwargs):
def _normalize(self, item, string_max_len=None, dynamic_strings=False, coerce_columns=None, empty_types=False, **kwargs):
normalizer = self.get_normalizer_for_type(item)

if not normalizer:
Expand All @@ -1185,6 +1185,7 @@ def _normalize(self, item, string_max_len=None, dynamic_strings=False, coerce_co
string_max_len=string_max_len,
dynamic_strings=dynamic_strings,
coerce_columns=coerce_columns,
empty_types=empty_types,
**kwargs,
)

Expand Down Expand Up @@ -1222,7 +1223,7 @@ def get_normalizer_for_type(self, item):
return None

def normalize(
self, item, string_max_len=None, pickle_on_failure=False, dynamic_strings=False, coerce_columns=None, **kwargs
self, item, string_max_len=None, pickle_on_failure=False, dynamic_strings=False, coerce_columns=None, empty_types=False, **kwargs
):
"""
:param item: Item to be normalized to something Arctic Native understands.
Expand All @@ -1231,13 +1232,13 @@ def normalize(
:param pickle_on_failure: This will fallback to pickling the Supported objects (DataFrame, Series, TimeFrame)
even if use_norm_failure_handler_known_types was not configured at the library level.
"""

try:
return self._normalize(
item,
string_max_len=string_max_len,
dynamic_strings=dynamic_strings,
coerce_columns=coerce_columns,
empty_types=empty_types,
**kwargs,
)
except Exception as ex:
Expand Down
4 changes: 4 additions & 0 deletions python/arcticdb/version_store/_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,9 @@ def _try_normalize(
dynamic_schema = self.resolve_defaults(
"dynamic_schema", self._lib_cfg.lib_desc.version.write_options, False, **kwargs
)
empty_types = self.resolve_defaults(
"empty_types", self._lib_cfg.lib_desc.version.write_options, False, **kwargs
)
try:
udm = normalize_metadata(metadata) if metadata is not None else None
opt_custom = self._custom_normalizer.normalize(dataframe)
Expand All @@ -347,6 +350,7 @@ def _try_normalize(
dynamic_strings=dynamic_strings,
coerce_columns=coerce_columns,
dynamic_schema=dynamic_schema,
empty_types=empty_types,
**kwargs,
)
except ArcticDbNotYetImplemented as ex:
Expand Down
33 changes: 10 additions & 23 deletions python/tests/unit/arcticdb/version_store/test_empty_writes.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,33 +146,20 @@ def test_empty_series(lmdb_version_store_dynamic_schema, sym):
assert_series_equal(lmdb_version_store_dynamic_schema.read(sym).data, ser, check_index_type=False)


@pytest.mark.parametrize("dtype, existing_empty, append_empty", [
("int64", True, False),
("float64", True, False),
("float64", False, True),
("float64", True, True),
@pytest.mark.parametrize("dtype, series, append_series", [
("int64", pd.Series([]), pd.Series([1, 2, 3], dtype="int64")),
("float64", pd.Series([]), pd.Series([1, 2, 3], dtype="float64")),
("float64", pd.Series([1, 2, 3], dtype="float64"), pd.Series([])),
("float64", pd.Series([]), pd.Series([])),
])
def test_append_empty_series(lmdb_version_store_dynamic_schema, sym, dtype, existing_empty, append_empty):
empty_ser = pd.Series([])
non_empty_ser = pd.Series([1, 2, 3], dtype=dtype)
if existing_empty:
ser = empty_ser
else:
ser = non_empty_ser
lmdb_version_store_dynamic_schema.write(sym, ser)
def test_append_empty_series(lmdb_version_store_dynamic_schema, sym, dtype, series, append_series):
lmdb_version_store_dynamic_schema.write(sym, series)
assert not lmdb_version_store_dynamic_schema.is_symbol_pickled(sym)

# ArcticDB stores empty columns under a dedicated `EMPTYVAL` type, so the types are not going to match with pandas
# if the series is empty.
assert_series_equal(lmdb_version_store_dynamic_schema.read(sym).data, ser, check_index_type=(len(ser) > 0))

if append_empty:
new_ser = empty_ser
else:
new_ser = non_empty_ser
lmdb_version_store_dynamic_schema.append(sym, new_ser)

result_ser = pd.concat([ser, new_ser])
assert_series_equal(lmdb_version_store_dynamic_schema.read(sym).data, series, check_index_type=(len(series) > 0))
lmdb_version_store_dynamic_schema.append(sym, append_series)
result_ser = pd.concat([series, append_series])
assert_series_equal(lmdb_version_store_dynamic_schema.read(sym).data, result_ser, check_index_type=(len(result_ser) > 0))


Expand Down

0 comments on commit 2effe0c

Please sign in to comment.