diff --git a/cpp/arcticdb/column_store/memory_segment.hpp b/cpp/arcticdb/column_store/memory_segment.hpp index 561feaee0f..472bd48fcc 100644 --- a/cpp/arcticdb/column_store/memory_segment.hpp +++ b/cpp/arcticdb/column_store/memory_segment.hpp @@ -426,6 +426,14 @@ class SegmentInMemory { impl_->set_compacted(val); } + bool fixed_str_as_dyn() const { + return impl_->fixed_str_as_dyn(); + } + + void set_fixed_str_as_dyn(bool val) { + impl_->set_fixed_str_as_dyn(val); + } + void change_schema(const StreamDescriptor& descriptor) { return impl_->change_schema(descriptor); } diff --git a/cpp/arcticdb/column_store/memory_segment_impl.hpp b/cpp/arcticdb/column_store/memory_segment_impl.hpp index b09b726d56..1b390606bd 100644 --- a/cpp/arcticdb/column_store/memory_segment_impl.hpp +++ b/cpp/arcticdb/column_store/memory_segment_impl.hpp @@ -733,6 +733,14 @@ class SegmentInMemoryImpl { compacted_ = value; } + bool fixed_str_as_dyn() const { + return fixed_str_as_dyn_; + } + + void set_fixed_str_as_dyn(bool value) { + fixed_str_as_dyn_ = value; + } + void check_magic() const { magic_.check(); } @@ -815,6 +823,7 @@ class SegmentInMemoryImpl { mutable std::unique_ptr column_map_mutex_ = std::make_unique(); bool allow_sparse_ = false; bool compacted_ = false; + bool fixed_str_as_dyn_ = false; util::MagicNum<'M', 'S', 'e', 'g'> magic_; std::shared_ptr index_fields_; std::shared_ptr tsd_; diff --git a/cpp/arcticdb/pipeline/read_frame.cpp b/cpp/arcticdb/pipeline/read_frame.cpp index a7d63f3407..b7064209f5 100644 --- a/cpp/arcticdb/pipeline/read_frame.cpp +++ b/cpp/arcticdb/pipeline/read_frame.cpp @@ -1061,7 +1061,7 @@ std::unique_ptr get_string_reducer( const auto alloc_width = get_max_string_size_in_column(column.data().buffer(), context, frame, frame_field, slice_map, true); string_reducer = std::make_unique(column, context, frame, frame_field, alloc_width); } else { - const auto alloc_width = get_max_string_size_in_column(column.data().buffer(), context, frame, frame_field, slice_map, false); + const auto alloc_width = get_max_string_size_in_column(column.data().buffer(), context, frame, frame_field, slice_map, frame.fixed_str_as_dyn()); string_reducer = std::make_unique(column, context, frame, frame_field, alloc_width); } } else { diff --git a/cpp/arcticdb/processing/aggregation.cpp b/cpp/arcticdb/processing/aggregation.cpp index b52dd4b971..2d5c17f6d3 100644 --- a/cpp/arcticdb/processing/aggregation.cpp +++ b/cpp/arcticdb/processing/aggregation.cpp @@ -463,6 +463,10 @@ void FirstAggregatorData::add_data_type(DataType data_type) { add_data_type_impl(data_type, data_type_); } +void FirstAggregatorData::set_string_offset_map(const std::multimap, entity::position_t>& offset_map) { + str_offset_mapping_ = offset_map; +} + void FirstAggregatorData::aggregate(const std::optional& input_column, const std::vector& groups, size_t unique_values) { if(data_type_.has_value() && *data_type_ != DataType::EMPTYVAL && input_column.has_value()) { details::visit_type(*data_type_, [&input_column, unique_values, &groups, this] (auto global_tag) { @@ -502,13 +506,28 @@ void FirstAggregatorData::aggregate(const std::optional& inpu SegmentInMemory FirstAggregatorData::finalize(const ColumnName& output_column_name, bool, size_t unique_values) { SegmentInMemory res; if(!aggregated_.empty()) { - details::visit_type(*data_type_, [that=this, &res, &output_column_name, unique_values] (auto col_tag) { + details::visit_type(*data_type_, [this, &res, &output_column_name, unique_values] (auto col_tag) { + using InputType = decltype(col_tag); using RawType = typename decltype(col_tag)::DataTypeTag::raw_type; - that->aggregated_.resize(sizeof(RawType)* unique_values); - auto col = std::make_shared(make_scalar_type(that->data_type_.value()), unique_values, true, false); - memcpy(col->ptr(), that->aggregated_.data(), that->aggregated_.size()); - res.add_column(scalar_field(that->data_type_.value(), output_column_name.value), col); + aggregated_.resize(sizeof(RawType)* unique_values); + auto col = std::make_shared(make_scalar_type(data_type_.value()), unique_values, true, false); + memcpy(col->ptr(), aggregated_.data(), aggregated_.size()); + // In case of strings, we need to set the output column to the correct offset pointing to StringPool which contains the actual string + // This is done using `str_offset_mapping_` set in `AggregationClause::process` + if constexpr(is_sequence_type(InputType::DataTypeTag::data_type)) { + auto col_ptr = reinterpret_cast(col->ptr()); + for (auto i = 0u; i < unique_values; ++i, ++col_ptr) { + auto first_el = str_offset_mapping_.lower_bound(std::make_pair(*col_ptr, i)); + if(first_el != str_offset_mapping_.end()) { + *col_ptr = first_el->second; + } + } + } + res.add_column(scalar_field(data_type_.value(), output_column_name.value), col); col->set_row_data(unique_values - 1); + // Set `fixed_str_as_dyn` flag to `true` to get the max instead of first string size + // cf. `get_string_reducer` when reducing columns + res.set_fixed_str_as_dyn(true); }); } return res; @@ -522,6 +541,10 @@ void LastAggregatorData::add_data_type(DataType data_type) { add_data_type_impl(data_type, data_type_); } +void LastAggregatorData::set_string_offset_map(const std::multimap, entity::position_t>& offset_map) { + str_offset_mapping_ = offset_map; +} + void LastAggregatorData::aggregate(const std::optional& input_column, const std::vector& groups, size_t unique_values) { if(data_type_.has_value() && *data_type_ != DataType::EMPTYVAL && input_column.has_value()) { details::visit_type(*data_type_, [&input_column, unique_values, &groups, this] (auto global_tag) { @@ -559,13 +582,28 @@ void LastAggregatorData::aggregate(const std::optional& input SegmentInMemory LastAggregatorData::finalize(const ColumnName& output_column_name, bool, size_t unique_values) { SegmentInMemory res; if(!aggregated_.empty()) { - details::visit_type(*data_type_, [that=this, &res, &output_column_name, unique_values] (auto col_tag) { + details::visit_type(*data_type_, [this, &res, &output_column_name, unique_values] (auto col_tag) { + using InputType = decltype(col_tag); using RawType = typename decltype(col_tag)::DataTypeTag::raw_type; - that->aggregated_.resize(sizeof(RawType)* unique_values); - auto col = std::make_shared(make_scalar_type(that->data_type_.value()), unique_values, true, false); - memcpy(col->ptr(), that->aggregated_.data(), that->aggregated_.size()); - res.add_column(scalar_field(that->data_type_.value(), output_column_name.value), col); + aggregated_.resize(sizeof(RawType)* unique_values); + auto col = std::make_shared(make_scalar_type(data_type_.value()), unique_values, true, false); + memcpy(col->ptr(), aggregated_.data(), aggregated_.size()); + // In case of strings, we need to set the output column to the correct offset pointing to StringPool which contains the actual string + // This is done using `str_offset_mapping_` set in `AggregationClause::process` + if constexpr(is_sequence_type(InputType::DataTypeTag::data_type)) { + auto col_ptr = reinterpret_cast(col->ptr()); + for (auto i = 0u; i < unique_values; ++i, ++col_ptr) { + auto last_el = --str_offset_mapping_.upper_bound(std::make_pair(*col_ptr, i)); + if(last_el != str_offset_mapping_.end()) { + *col_ptr = last_el->second; + } + } + } + res.add_column(scalar_field(data_type_.value(), output_column_name.value), col); col->set_row_data(unique_values - 1); + // Set `fixed_str_as_dyn` flag to `true` to get the max instead of first string size + // cf. `get_string_reducer` when reducing columns + res.set_fixed_str_as_dyn(true); }); } return res; diff --git a/cpp/arcticdb/processing/aggregation.hpp b/cpp/arcticdb/processing/aggregation.hpp index 468774f6b4..e86b703956 100644 --- a/cpp/arcticdb/processing/aggregation.hpp +++ b/cpp/arcticdb/processing/aggregation.hpp @@ -7,6 +7,8 @@ #pragma once +#include + #include #include #include @@ -69,6 +71,7 @@ class SumAggregatorData : private AggregatorDataBase public: void add_data_type(DataType data_type); + void set_string_offset_map(const std::multimap, entity::position_t>&) {} void aggregate(const std::optional& input_column, const std::vector& groups, size_t unique_values); SegmentInMemory finalize(const ColumnName& output_column_name, bool dynamic_schema, size_t unique_values); @@ -83,6 +86,7 @@ class MaxAggregatorData : private AggregatorDataBase public: void add_data_type(DataType data_type); + void set_string_offset_map(const std::multimap, entity::position_t>&) {} void aggregate(const std::optional& input_column, const std::vector& groups, size_t unique_values); SegmentInMemory finalize(const ColumnName& output_column_name, bool dynamic_schema, size_t unique_values); @@ -97,6 +101,7 @@ class MinAggregatorData : private AggregatorDataBase public: void add_data_type(DataType data_type); + void set_string_offset_map(const std::multimap, entity::position_t>&) {} void aggregate(const std::optional& input_column, const std::vector& groups, size_t unique_values); SegmentInMemory finalize(const ColumnName& output_column_name, bool dynamic_schema, size_t unique_values); @@ -112,6 +117,7 @@ class MeanAggregatorData : private AggregatorDataBase // Mean values are always doubles so this is a no-op void add_data_type(DataType) {} + void set_string_offset_map(const std::multimap, entity::position_t>&) {} void aggregate(const std::optional& input_column, const std::vector& groups, size_t unique_values); SegmentInMemory finalize(const ColumnName& output_column_name, bool dynamic_schema, size_t unique_values); @@ -134,6 +140,7 @@ class CountAggregatorData : private AggregatorDataBase // Count values are always integers so this is a no-op void add_data_type(DataType) {} + void set_string_offset_map(const std::multimap, entity::position_t>&) {} void aggregate(const std::optional& input_column, const std::vector& groups, size_t unique_values); SegmentInMemory finalize(const ColumnName& output_column_name, bool dynamic_schema, size_t unique_values); @@ -147,6 +154,8 @@ class FirstAggregatorData : private AggregatorDataBase public: void add_data_type(DataType data_type); + // Needs to be called before finalize - only used for strings columns case + void set_string_offset_map(const std::multimap, entity::position_t>& offset_map); void aggregate(const std::optional& input_column, const std::vector& groups, size_t unique_values); SegmentInMemory finalize(const ColumnName& output_column_name, bool dynamic_schema, size_t unique_values); @@ -155,6 +164,7 @@ class FirstAggregatorData : private AggregatorDataBase std::vector aggregated_; std::optional data_type_; + std::multimap, entity::position_t> str_offset_mapping_; std::unordered_set groups_cache_; }; @@ -163,6 +173,8 @@ class LastAggregatorData : private AggregatorDataBase public: void add_data_type(DataType data_type); + // Needs to be called before finalize - only used for strings columns case + void set_string_offset_map(const std::multimap, entity::position_t>& offset_map); void aggregate(const std::optional& input_column, const std::vector& groups, size_t unique_values); SegmentInMemory finalize(const ColumnName& output_column_name, bool dynamic_schema, size_t unique_values); @@ -171,6 +183,7 @@ class LastAggregatorData : private AggregatorDataBase std::vector aggregated_; std::optional data_type_; + std::multimap, entity::position_t> str_offset_mapping_; std::unordered_set groups_cache_; }; diff --git a/cpp/arcticdb/processing/aggregation_interface.hpp b/cpp/arcticdb/processing/aggregation_interface.hpp index a2dcd99fa6..29002947e5 100644 --- a/cpp/arcticdb/processing/aggregation_interface.hpp +++ b/cpp/arcticdb/processing/aggregation_interface.hpp @@ -7,6 +7,8 @@ #pragma once +#include +#include #include namespace arcticdb{ @@ -22,10 +24,11 @@ struct IGroupingAggregatorData { [[nodiscard]] SegmentInMemory finalize(const ColumnName& output_column_name, bool dynamic_schema, size_t unique_values) { return folly::poly_call<2>(*this, output_column_name, dynamic_schema, unique_values); } + void set_string_offset_map(const std::multimap, entity::position_t>& offset_map) { folly::poly_call<3>(*this, offset_map); } }; template - using Members = folly::PolyMembers<&T::add_data_type, &T::aggregate, &T::finalize>; + using Members = folly::PolyMembers<&T::add_data_type, &T::aggregate, &T::finalize, &T::set_string_offset_map>; }; using GroupingAggregatorData = folly::Poly; diff --git a/cpp/arcticdb/processing/clause.cpp b/cpp/arcticdb/processing/clause.cpp index 5d38a013c4..6add000125 100644 --- a/cpp/arcticdb/processing/clause.cpp +++ b/cpp/arcticdb/processing/clause.cpp @@ -4,22 +4,19 @@ * * As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0. */ - -#include -#include +#include #include -#include #include -#include -#include - -#include #include -#include #include +#include +#include +#include #include +#include +#include #include namespace arcticdb { @@ -350,6 +347,10 @@ AggregationClause::AggregationClause(const std::string& grouping_column, aggregators_.emplace_back(MinAggregator(typed_column_name, typed_column_name)); } else if (aggregation_operator == "count") { aggregators_.emplace_back(CountAggregator(typed_column_name, typed_column_name)); + } else if (aggregation_operator == "first") { + aggregators_.emplace_back(FirstAggregator(typed_column_name, typed_column_name)); + } else if (aggregation_operator == "last") { + aggregators_.emplace_back(LastAggregator(typed_column_name, typed_column_name)); } else { user_input::raise("Unknown aggregation operator provided: {}", aggregation_operator); } @@ -395,14 +396,17 @@ Composite AggregationClause::process(Composite&& entity_id DataType grouping_data_type; GroupingMap grouping_map; Composite procs(std::move(procs_as_range)); + + // The multimap to store old/new string offsets: <, new_offset> + std::multimap, entity::position_t> str_offset_mapping; procs.broadcast( - [&num_unique, &grouping_data_type, &grouping_map, &next_group_id, &aggregators_data, &string_pool, this](auto &proc) { + [&str_offset_mapping, &num_unique, &grouping_data_type, &grouping_map, &next_group_id, &aggregators_data, &string_pool, this](auto &proc) { auto partitioning_column = proc.get(ColumnName(grouping_column_)); if (std::holds_alternative(partitioning_column)) { ColumnWithStrings col = std::get(partitioning_column); entity::details::visit_type(col.column_->type().data_type(), [&proc_=proc, &grouping_map, &next_group_id, &aggregators_data, &string_pool, &col, - &num_unique, &grouping_data_type, this](auto data_type_tag) { + &str_offset_mapping, &num_unique, &grouping_data_type, this](auto data_type_tag) { using DataTypeTagType = decltype(data_type_tag); using RawType = typename DataTypeTagType::raw_type; constexpr auto data_type = DataTypeTagType::data_type; @@ -494,6 +498,31 @@ Composite AggregationClause::process(Composite&& entity_id opt_input_column.emplace(std::move(column_with_strings)); } } + + // Strings case: Add the string to the output string_pool and set map of strings offsets + auto output_column_name = aggregators_.at(agg_data.index).get_output_column_name(); + auto output_column = proc_.get(output_column_name); + if (std::holds_alternative(output_column)) { + auto output_column_with_strings = std::get(output_column); + if (is_sequence_type(output_column_with_strings.column_->type().data_type())) { + auto output_data = output_column_with_strings.column_->data(); + while (auto out_block = output_data.template next>()) { + const auto out_row_count = out_block->row_count(); + auto out_ptr = out_block->data(); + for (size_t orc = 0; orc < out_row_count; ++orc, ++out_ptr) { + std::optional str = output_column_with_strings.string_at_offset(*out_ptr); + if (str.has_value()) { + // Add the string view `*str` to the output `string_pool` and map the new offset to the old one + auto out_offset = string_pool->get(*str, true).offset(); + str_offset_mapping.insert(std::make_pair(std::make_pair(*out_ptr, row_to_group[orc]), out_offset)); + } + } + } + // Set map of string offsets before calling finalize + agg_data->set_string_offset_map(str_offset_mapping); + } + } + agg_data->aggregate(opt_input_column, row_to_group, num_unique); } }); @@ -528,7 +557,11 @@ Composite AggregationClause::process(Composite&& entity_id index_col->set_row_data(grouping_map.size() - 1); for (auto agg_data: folly::enumerate(aggregators_data)) { - seg.concatenate(agg_data->finalize(aggregators_.at(agg_data.index).get_output_column_name(), processing_config_.dynamic_schema_, num_unique)); + auto inter_seg = agg_data->finalize(aggregators_.at(agg_data.index).get_output_column_name(), processing_config_.dynamic_schema_, num_unique); + if (inter_seg.fixed_str_as_dyn()) { + seg.set_fixed_str_as_dyn(true); + } + seg.concatenate(std::move(inter_seg)); } seg.set_string_pool(string_pool); diff --git a/cpp/arcticdb/processing/test/test_clause.cpp b/cpp/arcticdb/processing/test/test_clause.cpp index 33456233ef..ad7bbfccd7 100644 --- a/cpp/arcticdb/processing/test/test_clause.cpp +++ b/cpp/arcticdb/processing/test/test_clause.cpp @@ -62,7 +62,7 @@ TEST(Clause, AggregationEmptyColumn) { using namespace arcticdb; auto component_manager = std::make_shared(); - AggregationClause aggregation("int_repeated_values", {{"empty_sum", "sum"}, {"empty_min", "min"}, {"empty_max", "max"}, {"empty_mean", "mean"}, {"empty_count", "count"}}); + AggregationClause aggregation("int_repeated_values", {{"empty_sum", "sum"}, {"empty_min", "min"}, {"empty_max", "max"}, {"empty_mean", "mean"}, {"empty_count", "count"}, {"empty_first", "first"}, {"empty_last", "last"}}); aggregation.set_component_manager(component_manager); size_t num_rows{100}; diff --git a/cpp/arcticdb/version/version_core.cpp b/cpp/arcticdb/version/version_core.cpp index b31c957826..57b9faf8df 100644 --- a/cpp/arcticdb/version/version_core.cpp +++ b/cpp/arcticdb/version/version_core.cpp @@ -858,7 +858,7 @@ void check_incompletes_index_ranges_dont_overlap(const std::shared_ptr& store, const std::shared_ptr& pipeline_context, const SegmentInMemory& frame) { +void copy_segments_to_frame(const std::shared_ptr& store, const std::shared_ptr& pipeline_context, SegmentInMemory& frame) { for (auto context_row : folly::enumerate(*pipeline_context)) { auto& slice_and_key = context_row->slice_and_key(); auto& segment = slice_and_key.segment(store); diff --git a/python/arcticdb/version_store/processing.py b/python/arcticdb/version_store/processing.py index b37aa91697..4d0f588b6e 100644 --- a/python/arcticdb/version_store/processing.py +++ b/python/arcticdb/version_store/processing.py @@ -422,13 +422,15 @@ def apply(self, name, expr): def groupby(self, name: str): """ - Group symbol by column name. GroupBy operations must be followed by an aggregation operator. Currently the following five aggregation + Group symbol by column name. GroupBy operations must be followed by an aggregation operator. Currently the following seven aggregation operators are supported: * "mean" - compute the mean of the group * "sum" - compute the sum of the group * "min" - compute the min of the group * "max" - compute the max of the group * "count" - compute the count of group + * "first" - compute the first element of the group + * "last" - compute the last element of the group For usage examples, see below. diff --git a/python/tests/unit/arcticdb/version_store/test_aggregation.py b/python/tests/unit/arcticdb/version_store/test_aggregation.py index ed4dbdf674..69d1359c23 100644 --- a/python/tests/unit/arcticdb/version_store/test_aggregation.py +++ b/python/tests/unit/arcticdb/version_store/test_aggregation.py @@ -217,19 +217,7 @@ def test_count_aggregation(local_object_version_store): assert_frame_equal(res.data, df) -@use_of_function_scoped_fixtures_in_hypothesis_checked -@settings(deadline=None) -@given( - df=data_frames( - [ - column("grouping_column", elements=string_strategy, fill=string_strategy), - column("a", elements=numeric_type_strategies()), - ], - index=range_indexes(), - ) -) -@pytest.mark.skip(reason="Feature flagged off until working with string columns and dynamic schema") -def test_hypothesis_first_agg_numeric(lmdb_version_store, df): +def first_aggregation(lmdb_version_store, df): lib = lmdb_version_store assume(not df.empty) @@ -248,14 +236,43 @@ def test_hypothesis_first_agg_numeric(lmdb_version_store, df): assert_frame_equal(expected, vit.data) -@pytest.mark.skip(reason="Feature flagged off until working with string columns and dynamic schema") -def test_first_aggregation(local_object_version_store): +@use_of_function_scoped_fixtures_in_hypothesis_checked +@settings(deadline=None) +@given( + df=data_frames( + [ + column("grouping_column", elements=string_strategy, fill=string_strategy), + column("a", elements=numeric_type_strategies()), + ], + index=range_indexes(), + ) +) +def test_hypothesis_first_agg_numeric(lmdb_version_store, df): + first_aggregation(lmdb_version_store, df) + + +@use_of_function_scoped_fixtures_in_hypothesis_checked +@settings(deadline=None) +@given( + df=data_frames( + [ + column("grouping_column", elements=string_strategy, fill=string_strategy), + column("a", elements=string_strategy), + ], + index=range_indexes(), + ) +) +def test_hypothesis_first_agg_strings(lmdb_version_store, df): + first_aggregation(lmdb_version_store, df) + + +def test_first_aggregation_numeric(local_object_version_store): df = DataFrame( { - "grouping_column": ["group_1", "group_2", "group_4", "group_2", "group_1", "group_3", "group_1"], - "get_first": [100.0, np.nan, np.nan, 2.7, 1.4, 5.8, 3.45], + "grouping_column": ["group_1", "group_2", "group_4", "group_2", "group_1", "group_3", "group_1", "group_5", "group_6", "group_6"], + "get_first": [100.0, np.nan, np.nan, 2.7, 1.4, 5.8, 3.45, None, None, 9.5], }, - index=np.arange(7), + index=np.arange(10), ) q = QueryBuilder() q = q.groupby("grouping_column").agg({"get_first": "first"}) @@ -265,36 +282,58 @@ def test_first_aggregation(local_object_version_store): res = local_object_version_store.read(symbol, query_builder=q) res.data.sort_index(inplace=True) - df = pd.DataFrame({"get_first": [100.0, 2.7, 5.8, np.nan]}, index=["group_1", "group_2", "group_3", "group_4"]) + df = pd.DataFrame({"get_first": [100.0, 2.7, 5.8, np.nan, None, 9.5]}, index=["group_1", "group_2", "group_3", "group_4", "group_5", "group_6"]) df.index.rename("grouping_column", inplace=True) res.data.sort_index(inplace=True) assert_frame_equal(res.data, df) -@pytest.mark.skip(reason="Feature flagged off until working with string columns and dynamic schema") -def test_first_agg_with_append(local_object_version_store): - lib = local_object_version_store +@pytest.mark.parametrize("dynamic_strings", [True, False]) +def test_first_aggregation_strings(version_store_factory, dynamic_strings): + lib = version_store_factory(dynamic_strings=dynamic_strings) - symbol = "first_agg" - lib.write(symbol, pd.DataFrame({"grouping_column": [0], "get_first": [10.0]})) - lib.append(symbol, pd.DataFrame({"grouping_column": [1], "get_first": [30.0]})) - lib.append(symbol, pd.DataFrame({"grouping_column": [0], "get_first": [20.0]})) - q = QueryBuilder().groupby("grouping_column").agg({"get_first": "first"}) + df = DataFrame( + { + "grouping_column": ["group_1", "group_2", "group_1", "group_3"], + "get_first": ["Hello", "this", "is", "Homer"], + }, + index=np.arange(4), + ) + q = QueryBuilder() + q = q.groupby("grouping_column").agg({"get_first": "first"}) + symbol = "test_first_aggregation" + lib.write(symbol, df) - vit = lib.read(symbol, query_builder=q) - vit.data.sort_index(inplace=True) + res = lib.read(symbol, query_builder=q) + res.data.sort_index(inplace=True) - df = pd.DataFrame({"get_first": [10.0, 30.0]}, index=[0, 1]) + df = pd.DataFrame({"get_first": ["Hello", "this", "Homer"]}, index=["group_1", "group_2", "group_3"]) df.index.rename("grouping_column", inplace=True) - assert_frame_equal(vit.data, df) + assert_frame_equal(res.data, df) +# TODO add a test with hypothesis and append for strings as well (when numeric will be working) +# Same for last agg @use_of_function_scoped_fixtures_in_hypothesis_checked @settings(deadline=None) @given( - df=data_frames( + df1=data_frames( + [ + column("grouping_column", elements=string_strategy, fill=string_strategy), + column("a", elements=numeric_type_strategies()), + ], + index=range_indexes(), + ), + df2=data_frames( + [ + column("grouping_column", elements=string_strategy, fill=string_strategy), + column("a", elements=numeric_type_strategies()), + ], + index=range_indexes(), + ), + df3=data_frames( [ column("grouping_column", elements=string_strategy, fill=string_strategy), column("a", elements=numeric_type_strategies()), @@ -302,8 +341,71 @@ def test_first_agg_with_append(local_object_version_store): index=range_indexes(), ) ) -@pytest.mark.skip(reason="Feature flagged off until working with string columns and dynamic schema") -def test_hypothesis_last_agg_numeric(lmdb_version_store, df): +def test_hypothesis_first_agg_numeric_with_append(lmdb_version_store_tiny_segment, df1, df2, df3): + lib = lmdb_version_store_tiny_segment + assume(not df1.empty) + assume(not df2.empty) + assume(not df3.empty) + + q = QueryBuilder() + q = q.groupby("grouping_column").agg({"a": "first"}) + df = pd.concat([df1, df2, df3], ignore_index=True) + expected = df.groupby("grouping_column").agg({"a": "first"}) + expected.replace( + np.nan, np.inf, inplace=True + ) # New version of pandas treats values which exceeds limits as np.nan rather than np.inf, as in old version and arcticdb + + symbol = "first_agg" + lib.write(symbol, df1) + lib.append(symbol, df2) + lib.append(symbol, df3) + + vit = lib.read(symbol, query_builder=q) + vit.data.sort_index(inplace=True) + + assert_frame_equal(expected, vit.data) + + +def test_first_agg_numeric_with_append(local_object_version_store): + lib = local_object_version_store + + symbol = "first_agg" + lib.write(symbol, pd.DataFrame({"grouping_column": [0], "get_first": [10.0]})) + lib.append(symbol, pd.DataFrame({"grouping_column": [1], "get_first": [30.0]})) + lib.append(symbol, pd.DataFrame({"grouping_column": [0], "get_first": [20.0]})) + q = QueryBuilder().groupby("grouping_column").agg({"get_first": "first"}) + + vit = lib.read(symbol, query_builder=q) + vit.data.sort_index(inplace=True) + + df = pd.DataFrame({"get_first": [10.0, 30.0]}, index=[0, 1]) + df.index.rename("grouping_column", inplace=True) + + assert_frame_equal(vit.data, df) + + +@pytest.mark.parametrize("dynamic_strings", [True, False]) +def test_first_agg_strings_with_append(version_store_factory, dynamic_strings): + lib = version_store_factory(dynamic_strings=dynamic_strings) + + symbol = "first_agg" + lib.write(symbol, pd.DataFrame({"grouping_column": ["group_1"], "get_first": ["Hi"]})) + lib.append(symbol, pd.DataFrame({"grouping_column": ["group_2"], "get_first": ["HELLO"]})) + lib.append(symbol, pd.DataFrame({"grouping_column": ["group_1"], "get_first": ["NO"]})) + lib.append(symbol, pd.DataFrame({"grouping_column": ["group_2"], "get_first": ["BLABLABLA"]})) + lib.append(symbol, pd.DataFrame({"grouping_column": ["group_3"], "get_first": ["This is it"]})) + q = QueryBuilder().groupby("grouping_column").agg({"get_first": "first"}) + + vit = lib.read(symbol, query_builder=q) + vit.data.sort_index(inplace=True) + + df = pd.DataFrame({"get_first": ["Hi", "HELLO", "This is it"]}, index=["group_1", "group_2", "group_3"]) + df.index.rename("grouping_column", inplace=True) + + assert_frame_equal(vit.data, df) + + +def last_aggregation(lmdb_version_store, df): lib = lmdb_version_store assume(not df.empty) @@ -322,14 +424,43 @@ def test_hypothesis_last_agg_numeric(lmdb_version_store, df): assert_frame_equal(expected, vit.data) -@pytest.mark.skip(reason="Feature flagged off until working with string columns and dynamic schema") -def test_last_aggregation(local_object_version_store): +@use_of_function_scoped_fixtures_in_hypothesis_checked +@settings(deadline=None) +@given( + df=data_frames( + [ + column("grouping_column", elements=string_strategy, fill=string_strategy), + column("a", elements=numeric_type_strategies()), + ], + index=range_indexes(), + ) +) +def test_hypothesis_last_agg_numeric(lmdb_version_store, df): + last_aggregation(lmdb_version_store, df) + + +@use_of_function_scoped_fixtures_in_hypothesis_checked +@settings(deadline=None) +@given( + df=data_frames( + [ + column("grouping_column", elements=string_strategy, fill=string_strategy), + column("a", elements=string_strategy), + ], + index=range_indexes(), + ) +) +def test_hypothesis_last_agg_strings(lmdb_version_store, df): + last_aggregation(lmdb_version_store, df) + + +def test_last_aggregation_numeric(local_object_version_store): df = DataFrame( { - "grouping_column": ["group_1", "group_2", "group_4", "group_5", "group_2", "group_1", "group_3", "group_1", "group_5"], - "get_last": [100.0, 2.7, np.nan, np.nan, np.nan, 1.4, 5.8, 3.45, 6.9], + "grouping_column": ["group_1", "group_2", "group_4", "group_5", "group_2", "group_1", "group_3", "group_1", "group_5", "group_6", "group_7", "group_7"], + "get_last": [100.0, 2.7, np.nan, np.nan, np.nan, 1.4, 5.8, 3.45, 6.9, None, None, 7.8], }, - index=np.arange(9), + index=np.arange(12), ) q = QueryBuilder() q = q.groupby("grouping_column").agg({"get_last": "last"}) @@ -339,15 +470,90 @@ def test_last_aggregation(local_object_version_store): res = local_object_version_store.read(symbol, query_builder=q) res.data.sort_index(inplace=True) - df = pd.DataFrame({"get_last": [3.45, 2.7, 5.8, np.nan, 6.9]}, index=["group_1", "group_2", "group_3", "group_4", "group_5"]) + df = pd.DataFrame({"get_last": [3.45, 2.7, 5.8, np.nan, 6.9, None, 7.8]}, index=["group_1", "group_2", "group_3", "group_4", "group_5", "group_6", "group_7"]) df.index.rename("grouping_column", inplace=True) res.data.sort_index(inplace=True) assert_frame_equal(res.data, df) -@pytest.mark.skip(reason="Feature flagged off until working with string columns and dynamic schema") -def test_last_agg_with_append(local_object_version_store): +@pytest.mark.parametrize("dynamic_strings", [True, False]) +def test_last_aggregation_strings(version_store_factory, dynamic_strings): + lib = version_store_factory(dynamic_strings=dynamic_strings) + + df = DataFrame( + { + "grouping_column": ["group_1", "group_2", "group_1", "group_3"], + "get_last": ["Hello", "this", "is", "Homer"], + }, + index=np.arange(4), + ) + q = QueryBuilder() + q = q.groupby("grouping_column").agg({"get_last": "last"}) + symbol = "test_last_aggregation" + lib.write(symbol, df) + + res = lib.read(symbol, query_builder=q) + res.data.sort_index(inplace=True) + + df = pd.DataFrame({"get_last": ["is", "this", "Homer"]}, index=["group_1", "group_2", "group_3"]) + df.index.rename("grouping_column", inplace=True) + res.data.sort_index(inplace=True) + + assert_frame_equal(res.data, df) + + +@use_of_function_scoped_fixtures_in_hypothesis_checked +@settings(deadline=None) +@given( + df1=data_frames( + [ + column("grouping_column", elements=string_strategy, fill=string_strategy), + column("a", elements=numeric_type_strategies()), + ], + index=range_indexes(), + ), + df2=data_frames( + [ + column("grouping_column", elements=string_strategy, fill=string_strategy), + column("a", elements=numeric_type_strategies()), + ], + index=range_indexes(), + ), + df3=data_frames( + [ + column("grouping_column", elements=string_strategy, fill=string_strategy), + column("a", elements=numeric_type_strategies()), + ], + index=range_indexes(), + ) +) +def test_hypothesis_last_agg_numeric_with_append(lmdb_version_store_tiny_segment, df1, df2, df3): + lib = lmdb_version_store_tiny_segment + assume(not df1.empty) + assume(not df2.empty) + assume(not df3.empty) + + q = QueryBuilder() + q = q.groupby("grouping_column").agg({"a": "last"}) + df = pd.concat([df1, df2, df3], ignore_index=True) + expected = df.groupby("grouping_column").agg({"a": "last"}) + expected.replace( + np.nan, np.inf, inplace=True + ) # New version of pandas treats values which exceeds limits as np.nan rather than np.inf, as in old version and arcticdb + + symbol = "last_agg" + lib.write(symbol, df1) + lib.append(symbol, df2) + lib.append(symbol, df3) + + vit = lib.read(symbol, query_builder=q) + vit.data.sort_index(inplace=True) + + assert_frame_equal(expected, vit.data) + + +def test_last_agg_numeric_with_append(local_object_version_store): lib = local_object_version_store symbol = "last_agg" @@ -365,6 +571,27 @@ def test_last_agg_with_append(local_object_version_store): assert_frame_equal(vit.data, df) +@pytest.mark.parametrize("dynamic_strings", [True, False]) +def test_last_agg_strings_with_append(version_store_factory, dynamic_strings): + lib = version_store_factory(dynamic_strings=dynamic_strings) + + symbol = "last_agg" + lib.write(symbol, pd.DataFrame({"grouping_column": ["group_1"], "get_last": ["Hi"]})) + lib.append(symbol, pd.DataFrame({"grouping_column": ["group_2"], "get_last": ["HELLO"]})) + lib.append(symbol, pd.DataFrame({"grouping_column": ["group_1"], "get_last": ["NO"]})) + lib.append(symbol, pd.DataFrame({"grouping_column": ["group_2"], "get_last": ["BLABLABLA"]})) + lib.append(symbol, pd.DataFrame({"grouping_column": ["group_3"], "get_last": ["This is something else"]})) + q = QueryBuilder().groupby("grouping_column").agg({"get_last": "last"}) + + vit = lib.read(symbol, query_builder=q) + vit.data.sort_index(inplace=True) + + df = pd.DataFrame({"get_last": ["NO", "BLABLABLA", "This is something else"]}, index=["group_1", "group_2", "group_3"]) + df.index.rename("grouping_column", inplace=True) + + assert_frame_equal(vit.data, df) + + def test_sum_aggregation(local_object_version_store): df = DataFrame( {"grouping_column": ["group_1", "group_1", "group_1", "group_2", "group_2"], "to_sum": [1, 1, 2, 2, 2]}, diff --git a/python/tests/unit/arcticdb/version_store/test_aggregation_dynamic.py b/python/tests/unit/arcticdb/version_store/test_aggregation_dynamic.py index db096beea7..dda012b746 100644 --- a/python/tests/unit/arcticdb/version_store/test_aggregation_dynamic.py +++ b/python/tests/unit/arcticdb/version_store/test_aggregation_dynamic.py @@ -250,15 +250,17 @@ def test_count_aggregation_dynamic(s3_version_store_dynamic_schema_v2): index=range_indexes(), ) ) -@pytest.mark.xfail(reason="Not supported yet") def test_hypothesis_first_agg_dynamic_numeric(lmdb_version_store_dynamic_schema_v1, df): lib = lmdb_version_store_dynamic_schema_v1 assume(not df.empty) symbol = f"first_agg-{uuid.uuid4().hex}" expected, slices = make_dynamic(df) - for df_slice in slices: - lib.append(symbol, df_slice, write_if_missing=True) + # `slices` is not used since `expected` is a concatenation of `slices` elements with missing columns + # set to NaN (cf. `pandas.concat` used in `make_dynamic`) and resulting in a mismatch and therefore + # a test failure + for idx in range(len(expected.index)): + lib.append(symbol, expected.iloc[[idx],:], write_if_missing=True) try: q = QueryBuilder() @@ -275,17 +277,44 @@ def test_hypothesis_first_agg_dynamic_numeric(lmdb_version_store_dynamic_schema_ pass -@pytest.mark.xfail(reason="Not supported yet") -def test_first_aggregation_dynamic(s3_version_store_dynamic_schema_v2): +def test_first_aggregation_dynamic_numeric(s3_version_store_dynamic_schema_v2): lib = s3_version_store_dynamic_schema_v2 df = DataFrame( { - "grouping_column": ["group_1", "group_2", "group_4", "group_2", "group_1", "group_3", "group_1"], - "get_first": [100.0, np.nan, np.nan, 2.7, 1.4, 5.8, 3.45], + "grouping_column": ["group_1", "group_2", "group_4", "group_2", "group_1", "group_3", "group_1", "group_5", "group_6", "group_6"], + "get_first": [100.0, np.nan, np.nan, 2.7, 1.4, 5.8, 3.45, None, None, 8.7], }, - index=np.arange(7), + index=np.arange(10), + ) + symbol = "test_first_aggregation_dynamic_numeric" + expected, slices = make_dynamic(df) + # `slices` is not used since `expected` is a concatenation of `slices` elements with missing columns + # set to NaN (cf. `pandas.concat` used in `make_dynamic`) and resulting in a mismatch and therefore + # a test failure + for idx in range(len(expected.index)): + lib.append(symbol, expected.iloc[[idx],:], write_if_missing=True) + + q = QueryBuilder() + q = q.groupby("grouping_column").agg({"get_first": "first"}) + + received = lib.read(symbol, query_builder=q).data + received.sort_index(inplace=True) + + expected = expected.groupby("grouping_column").agg({"get_first": "first"}) + + assert_frame_equal(received, expected) + + +def test_first_aggregation_strings_dynamic(lmdb_version_store_dynamic_schema_v1): + lib = lmdb_version_store_dynamic_schema_v1 + df = DataFrame( + { + "grouping_column": ["group_1", "group_2", "group_1", "group_2", "group_3"], + "get_first": ["Hi", "HELLO", "NO", "BLABLABLA", "This is something else"], + }, + index=np.arange(5), ) - symbol = "test_first_aggregation_dynamic" + symbol = "test_first_aggregation_strings_dynamic" expected, slices = make_dynamic(df) for df_slice in slices: lib.append(symbol, df_slice, write_if_missing=True) @@ -312,15 +341,17 @@ def test_first_aggregation_dynamic(s3_version_store_dynamic_schema_v2): index=range_indexes(), ) ) -@pytest.mark.xfail(reason="Not supported yet") def test_hypothesis_last_agg_dynamic_numeric(lmdb_version_store_dynamic_schema_v1, df): lib = lmdb_version_store_dynamic_schema_v1 assume(not df.empty) symbol = f"last_agg-{uuid.uuid4().hex}" expected, slices = make_dynamic(df) - for df_slice in slices: - lib.append(symbol, df_slice, write_if_missing=True) + # `slices` is not used since `expected` is a concatenation of `slices` elements with missing columns + # set to NaN (cf. `pandas.concat` used in `make_dynamic`) and resulting in a mismatch and therefore + # a test failure + for idx in range(len(expected.index)): + lib.append(symbol, expected.iloc[[idx],:], write_if_missing=True) try: q = QueryBuilder() @@ -337,17 +368,44 @@ def test_hypothesis_last_agg_dynamic_numeric(lmdb_version_store_dynamic_schema_v pass -@pytest.mark.xfail(reason="Not supported yet") -def test_last_aggregation_dynamic(s3_version_store_dynamic_schema_v2): +def test_last_aggregation_dynamic_numeric(s3_version_store_dynamic_schema_v2): lib = s3_version_store_dynamic_schema_v2 df = DataFrame( { - "grouping_column": ["group_1", "group_2", "group_4", "group_5", "group_2", "group_1", "group_3", "group_1", "group_5"], - "get_last": [100.0, 2.7, np.nan, np.nan, np.nan, 1.4, 5.8, 3.45, 6.9], + "grouping_column": ["group_1", "group_2", "group_4", "group_5", "group_2", "group_1", "group_3", "group_1", "group_5", "group_6", "group_7", "group_7"], + "get_last": [100.0, 2.7, np.nan, np.nan, np.nan, 1.4, 5.8, 3.45, 6.9, None, None, 8.4], }, - index=np.arange(9), + index=np.arange(12), + ) + symbol = "test_last_aggregation_dynamic_numeric" + expected, slices = make_dynamic(df) + # `slices` is not used since `expected` is a concatenation of `slices` elements with missing columns + # set to NaN (cf. `pandas.concat` used in `make_dynamic`) and resulting in a mismatch and therefore + # a test failure + for idx in range(len(expected.index)): + lib.append(symbol, expected.iloc[[idx],:], write_if_missing=True) + + q = QueryBuilder() + q = q.groupby("grouping_column").agg({"get_last": "last"}) + + received = lib.read(symbol, query_builder=q).data + received.sort_index(inplace=True) + + expected = expected.groupby("grouping_column").agg({"get_last": "last"}) + + assert_frame_equal(received, expected) + + +def test_last_aggregation_strings_dynamic(s3_version_store_dynamic_schema_v2): + lib = s3_version_store_dynamic_schema_v2 + df = DataFrame( + { + "grouping_column": ["group_1", "group_2", "group_1", "group_2", "group_3"], + "get_last": ["Hi", "HELLO", "NO", "BLABLABLA", "This is something else"], + }, + index=np.arange(5), ) - symbol = "test_last_aggregation_dynamic" + symbol = "test_last_aggregation_strings_dynamic" expected, slices = make_dynamic(df) for df_slice in slices: lib.append(symbol, df_slice, write_if_missing=True)