From 39a2b1d509e7792ac5b378c98958195757539bd1 Mon Sep 17 00:00:00 2001 From: Alex Owens <73388657+alexowens90@users.noreply.github.com> Date: Thu, 21 Mar 2024 15:38:31 +0000 Subject: [PATCH] Bugfix 935: match pandas behviour when aggregating columns with nans (#1450) Fixes #935 Also addresses two of the bullet points from #1439: - AggregationClause::process x2 - aggregation.cpp all finalize methods --- cpp/arcticdb/column_store/column_data.hpp | 4 +- cpp/arcticdb/processing/aggregation.cpp | 120 +++++------ cpp/arcticdb/processing/clause.cpp | 187 +++++++++--------- .../version_store/test_aggregation.py | 45 ++--- 4 files changed, 177 insertions(+), 179 deletions(-) diff --git a/cpp/arcticdb/column_store/column_data.hpp b/cpp/arcticdb/column_store/column_data.hpp index bba7fa81c5..5a1955e2a7 100644 --- a/cpp/arcticdb/column_store/column_data.hpp +++ b/cpp/arcticdb/column_store/column_data.hpp @@ -213,7 +213,7 @@ struct ColumnData { } // Used to construct [c]end iterators - explicit ColumnDataIterator(ColumnData* parent, typename TDT::DataTypeTag::raw_type* end_ptr): + explicit ColumnDataIterator(ColumnData* parent, RawType* end_ptr): parent_(parent) { data_.ptr_ = end_ptr; } @@ -304,7 +304,7 @@ struct ColumnData { if(!data_->blocks().empty()) { auto block = data_->blocks().at(num_blocks() - 1); auto typed_block_data = next_typed_block(block); - end_ptr = typed_block_data.data() + typed_block_data.row_count(); + end_ptr = const_cast(typed_block_data.data() + typed_block_data.row_count()); } return ColumnDataIterator(this, end_ptr); } diff --git a/cpp/arcticdb/processing/aggregation.cpp b/cpp/arcticdb/processing/aggregation.cpp index b52dd4b971..2d27449960 100644 --- a/cpp/arcticdb/processing/aggregation.cpp +++ b/cpp/arcticdb/processing/aggregation.cpp @@ -15,15 +15,16 @@ namespace arcticdb void MinMaxAggregatorData::aggregate(const ColumnWithStrings& input_column) { details::visit_type(input_column.column_->type().data_type(), [&] (auto col_tag) { using type_info = ScalarTypeInfo; + using RawType = typename type_info::RawType; if constexpr(!is_sequence_type(type_info::data_type)) { Column::for_each(*input_column.column_, [this](auto value) { - const auto& curr = static_cast(value); + const auto& curr = static_cast(value); if (ARCTICDB_UNLIKELY(!min_.has_value())) { min_ = std::make_optional(curr, type_info::data_type); max_ = std::make_optional(curr, type_info::data_type); } else { - min_->set(std::min(min_->get(), curr)); - max_->set(std::max(max_->get(), curr)); + min_->set(std::min(min_->get(), curr)); + max_->set(std::max(max_->get(), curr)); } }); } else { @@ -168,21 +169,24 @@ void SumAggregatorData::aggregate(const std::optional& input_ data_type_ = DataType::FLOAT64; } details::visit_type(*data_type_, [&input_column, unique_values, &groups, this] (auto global_tag) { - using GlobalInputType = decltype(global_tag); - if constexpr(!is_sequence_type(GlobalInputType::DataTypeTag::data_type)) { - using GlobalTypeDescriptorTag = typename OutputType::type; - using GlobalRawType = typename GlobalTypeDescriptorTag::DataTypeTag::raw_type; - aggregated_.resize(sizeof(GlobalRawType)* unique_values); - auto out_ptr = reinterpret_cast(aggregated_.data()); + using global_type_info = ScalarTypeInfo; + using RawType = typename global_type_info::RawType; + if constexpr(!is_sequence_type(global_type_info::data_type)) { + aggregated_.resize(sizeof(RawType) * unique_values); + auto out_ptr = reinterpret_cast(aggregated_.data()); if (input_column.has_value()) { details::visit_type(input_column->column_->type().data_type(), [&input_column, &groups, &out_ptr] (auto col_tag) { using col_type_info = ScalarTypeInfo; if constexpr(!is_sequence_type(col_type_info::data_type)) { Column::for_each_enumerated(*input_column->column_, [&out_ptr, &groups](auto enumerating_it) { - if constexpr (std::is_same_v) { - out_ptr[groups[enumerating_it.idx()]] |= GlobalRawType(enumerating_it.value()); + if constexpr (is_bool_type(global_type_info::data_type)) { + out_ptr[groups[enumerating_it.idx()]] |= RawType(enumerating_it.value()); + } else if constexpr (is_floating_point_type(col_type_info::data_type)) { + if (ARCTICDB_LIKELY(!std::isnan(enumerating_it.value()))) { + out_ptr[groups[enumerating_it.idx()]] += RawType(enumerating_it.value()); + } } else { - out_ptr[groups[enumerating_it.idx()]] += GlobalRawType(enumerating_it.value()); + out_ptr[groups[enumerating_it.idx()]] += RawType(enumerating_it.value()); } }); } else { @@ -198,8 +202,8 @@ SegmentInMemory SumAggregatorData::finalize(const ColumnName& output_column_name SegmentInMemory res; if(!aggregated_.empty()) { details::visit_type(*data_type_, [that=this, &res, &output_column_name, unique_values] (auto col_tag) { - using RawType = typename decltype(col_tag)::DataTypeTag::raw_type; - that->aggregated_.resize(sizeof(RawType)* unique_values); + using col_type_info = ScalarTypeInfo; + that->aggregated_.resize(sizeof(typename col_type_info::RawType)* unique_values); auto col = std::make_shared(make_scalar_type(that->data_type_.value()), unique_values, true, false); memcpy(col->ptr(), that->aggregated_.data(), that->aggregated_.size()); res.add_column(scalar_field(that->data_type_.value(), output_column_name.value), col); @@ -248,10 +252,9 @@ namespace ) { if(data_type_.has_value() && *data_type_ != DataType::EMPTYVAL && input_column.has_value()) { details::visit_type(*data_type_, [&aggregated_, &input_column, unique_values, &groups] (auto global_tag) { - using GlobalInputType = decltype(global_tag); - if constexpr(!is_sequence_type(GlobalInputType::DataTypeTag::data_type)) { - using GlobalTypeDescriptorTag = typename OutputType::type; - using GlobalRawType = typename GlobalTypeDescriptorTag::DataTypeTag::raw_type; + using global_type_info = ScalarTypeInfo; + using GlobalRawType = typename global_type_info::RawType; + if constexpr(!is_sequence_type(global_type_info::data_type)) { using MaybeValueType = MaybeValue; auto prev_size = aggregated_.size() / sizeof(MaybeValueType); aggregated_.resize(sizeof(MaybeValueType) * unique_values); @@ -259,15 +262,16 @@ namespace std::fill(out_ptr + prev_size, out_ptr + unique_values, MaybeValueType{}); details::visit_type(input_column->column_->type().data_type(), [&input_column, &groups, &out_ptr] (auto col_tag) { using col_type_info = ScalarTypeInfo; + using ColRawType = typename col_type_info::RawType; if constexpr(!is_sequence_type(col_type_info::data_type)) { Column::for_each_enumerated(*input_column->column_, [&groups, &out_ptr](auto enumerating_it) { auto& val = out_ptr[groups[enumerating_it.idx()]]; - if constexpr(std::is_floating_point_v) { + if constexpr(std::is_floating_point_v) { const auto& curr = GlobalRawType(enumerating_it.value()); - if (!val.written_ || std::isnan(static_cast(val.value_))) { + if (!val.written_ || std::isnan(static_cast(val.value_))) { val.value_ = curr; val.written_ = true; - } else if (!std::isnan(static_cast(curr))) { + } else if (!std::isnan(static_cast(curr))) { if constexpr(T == Extremum::MAX) { val.value_ = std::max(val.value_, curr); } else { @@ -302,36 +306,32 @@ namespace ) { SegmentInMemory res; if(!aggregated_.empty()) { - if(dynamic_schema) { - details::visit_type(*data_type_, [&aggregated_, &res, &output_column_name, unique_values] (auto col_tag) { - using RawType = typename decltype(col_tag)::DataTypeTag::raw_type; - using MaybeValueType = MaybeValue; + constexpr auto dynamic_schema_data_type = DataType::FLOAT64; + using DynamicSchemaTDT = ScalarTagType>; + auto col = std::make_shared(make_scalar_type(dynamic_schema ? dynamic_schema_data_type: data_type_.value()), unique_values, true, false); + auto column_data = col->data(); + col->set_row_data(unique_values - 1); + res.add_column(scalar_field(dynamic_schema ? dynamic_schema_data_type : data_type_.value(), output_column_name.value), col); + details::visit_type(*data_type_, [&aggregated_, &column_data, unique_values, dynamic_schema] (auto col_tag) { + using col_type_info = ScalarTypeInfo; + using MaybeValueType = MaybeValue; + if(dynamic_schema) { auto prev_size = aggregated_.size() / sizeof(MaybeValueType); auto new_size = sizeof(MaybeValueType) * unique_values; aggregated_.resize(new_size); - auto in_ptr = reinterpret_cast(aggregated_.data()); + auto in_ptr = reinterpret_cast(aggregated_.data()); std::fill(in_ptr + prev_size, in_ptr + unique_values, MaybeValueType{}); - auto col = std::make_shared(make_scalar_type(DataType::FLOAT64), unique_values, true, false); - auto out_ptr = reinterpret_cast(col->ptr()); - for(auto i = 0u; i < unique_values; ++i, ++in_ptr, ++out_ptr) { - *out_ptr = in_ptr->written_ ? static_cast(in_ptr->value_) : std::numeric_limits::quiet_NaN(); } - - col->set_row_data(unique_values - 1); - res.add_column(scalar_field(DataType::FLOAT64, output_column_name.value), col); - }); - } else { - details::visit_type(*data_type_, [&aggregated_, &data_type_, &res, output_column_name, unique_values] (auto col_tag) { - using RawType = typename decltype(col_tag)::DataTypeTag::raw_type; - auto col = std::make_shared(make_scalar_type(data_type_.value()), unique_values, true, false); - const auto* in_ptr = reinterpret_cast*>(aggregated_.data()); - auto out_ptr = reinterpret_cast(col->ptr()); - for(auto i = 0u; i < unique_values; ++i, ++in_ptr, ++out_ptr) { - *out_ptr = in_ptr->value_; + for (auto it = column_data.begin(); it != column_data.end(); ++it, ++in_ptr) { + *it = in_ptr->written_ ? static_cast(in_ptr->value_) + : std::numeric_limits::quiet_NaN(); } - col->set_row_data(unique_values - 1); - res.add_column(scalar_field(data_type_.value(), output_column_name.value), col); - }); - } + } else { + auto in_ptr = reinterpret_cast(aggregated_.data()); + for (auto it = column_data.begin(); it != column_data.end(); ++it, ++in_ptr) { + *it = in_ptr->value_; + } + } + }); } return res; } @@ -387,8 +387,15 @@ void MeanAggregatorData::aggregate(const std::optional& input if constexpr(!is_sequence_type(col_type_info::data_type)) { Column::for_each_enumerated(*input_column->column_, [&groups, this](auto enumerating_it) { auto& fraction = fractions_[groups[enumerating_it.idx()]]; - fraction.numerator_ += double(enumerating_it.value()); - ++fraction.denominator_; + if constexpr ((is_floating_point_type(col_type_info ::data_type))) { + if (ARCTICDB_LIKELY(!std::isnan(enumerating_it.value()))) { + fraction.numerator_ += double(enumerating_it.value()); + ++fraction.denominator_; + } + } else { + fraction.numerator_ += double(enumerating_it.value()); + ++fraction.denominator_; + } }); } else { util::raise_rte("String aggregations not currently supported"); @@ -401,14 +408,13 @@ SegmentInMemory MeanAggregatorData::finalize(const ColumnName& output_column_nam SegmentInMemory res; if(!fractions_.empty()) { fractions_.resize(unique_values); - auto pos = res.add_column(scalar_field(DataType::FLOAT64, output_column_name.value), fractions_.size(), true); - auto& column = res.column(pos); - auto ptr = reinterpret_cast(column.ptr()); - column.set_row_data(fractions_.size() - 1); - - for (auto idx = 0u; idx < fractions_.size(); ++idx) { - ptr[idx] = fractions_[idx].to_double(); - } + auto col = std::make_shared(make_scalar_type(DataType::FLOAT64), fractions_.size(), true, false); + auto column_data = col->data(); + std::transform(fractions_.cbegin(), fractions_.cend(), column_data.begin>>(), [](auto fraction) { + return fraction.to_double(); + }); + col->set_row_data(fractions_.size() - 1); + res.add_column(scalar_field(DataType::FLOAT64, output_column_name.value), col); } return res; } @@ -429,7 +435,7 @@ void CountAggregatorData::aggregate(const std::optional& inpu using col_type_info = ScalarTypeInfo; Column::for_each_enumerated(*input_column->column_, [&groups, this](auto enumerating_it) { if constexpr (is_floating_point_type(col_type_info::data_type)) { - if (!std::isnan(static_cast(enumerating_it.value()))) { + if (ARCTICDB_LIKELY(!std::isnan(enumerating_it.value()))) { auto& val = aggregated_[groups[enumerating_it.idx()]]; ++val; } diff --git a/cpp/arcticdb/processing/clause.cpp b/cpp/arcticdb/processing/clause.cpp index 430fbe8a54..a4889f5e32 100644 --- a/cpp/arcticdb/processing/clause.cpp +++ b/cpp/arcticdb/processing/clause.cpp @@ -400,103 +400,94 @@ Composite AggregationClause::process(Composite&& entity_id auto partitioning_column = proc.get(ColumnName(grouping_column_)); if (std::holds_alternative(partitioning_column)) { ColumnWithStrings col = std::get(partitioning_column); - entity::details::visit_type(col.column_->type().data_type(), - [&proc_=proc, &grouping_map, &next_group_id, &aggregators_data, &string_pool, &col, - &num_unique, &grouping_data_type, this](auto data_type_tag) { - using DataTypeTagType = decltype(data_type_tag); - using RawType = typename DataTypeTagType::raw_type; - constexpr auto data_type = DataTypeTagType::data_type; - grouping_data_type = data_type; - std::vector row_to_group; - row_to_group.reserve(col.column_->row_count()); - auto input_data = col.column_->data(); - auto hash_to_group = grouping_map.get(); - // For string grouping columns, keep a local map within this ProcessingUnit - // from offsets to groups, to avoid needless calls to col.string_at_offset and - // string_pool->get - // This could be slower in cases where there aren't many repeats in string - // grouping columns. Maybe track hit ratio of finds and stop using it if it is - // too low? - // Tested with 100,000,000 row dataframe with 100,000 unique values in the grouping column. Timings: - // 11.14 seconds without caching - // 11.01 seconds with caching - // Not worth worrying about right now - ankerl::unordered_dense::map offset_to_group; - - const bool is_sparse = col.column_->is_sparse(); - using optional_iter_type = std::optionalfirst())>; - optional_iter_type iter = std::nullopt; - size_t previous_value_index = 0; - constexpr size_t missing_value_group_id = 0; - - if (is_sparse) - { - iter = std::make_optional(input_data.bit_vector()->first()); - // We use 0 for the missing value group id - next_group_id++; - } - - while (auto block = input_data.next>()) { - const auto row_count = block->row_count(); - auto ptr = block->data(); - for (size_t i = 0; i < row_count; ++i, ++ptr) { - RawType val; - if constexpr(is_sequence_type(data_type)) { - auto offset = *ptr; - if (auto it = offset_to_group.find(offset); it != offset_to_group.end()) { - val = it->second; - } else { - std::optional str = col.string_at_offset(offset); - if (str.has_value()) { - val = string_pool->get(*str, true).offset(); - } else { - val = offset; - } - RawType val_copy(val); - offset_to_group.insert(std::make_pair(std::forward(offset), std::forward(val_copy))); - } + details::visit_type(col.column_->type().data_type(), + [&proc_=proc, &grouping_map, &next_group_id, &aggregators_data, &string_pool, &col, + &num_unique, &grouping_data_type, this](auto data_type_tag) { + using col_type_info = ScalarTypeInfo; + grouping_data_type = col_type_info::data_type; + std::vector row_to_group; + row_to_group.reserve(col.column_->row_count()); + auto hash_to_group = grouping_map.get(); + // For string grouping columns, keep a local map within this ProcessingUnit + // from offsets to groups, to avoid needless calls to col.string_at_offset and + // string_pool->get + // This could be slower in cases where there aren't many repeats in string + // grouping columns. Maybe track hit ratio of finds and stop using it if it is + // too low? + // Tested with 100,000,000 row dataframe with 100,000 unique values in the grouping column. Timings: + // 11.14 seconds without caching + // 11.01 seconds with caching + // Not worth worrying about right now + ankerl::unordered_dense::map offset_to_group; + + const bool is_sparse = col.column_->is_sparse(); + if (is_sparse && next_group_id == 0) { + // We use 0 for the missing value group id + ++next_group_id; + } + ssize_t previous_value_index = 0; + constexpr size_t missing_value_group_id = 0; + + Column::for_each_enumerated( + *col.column_, + [&](auto enumerating_it) { + typename col_type_info::RawType val; + if constexpr(is_sequence_type(col_type_info::data_type)) { + auto offset = enumerating_it.value(); + if (auto it = offset_to_group.find(offset); it != offset_to_group.end()) { + val = it->second; } else { - val = *ptr; - } - if (is_sparse) { - for (size_t j = previous_value_index; j != *(iter.value()); ++j) { - row_to_group.emplace_back(missing_value_group_id); + std::optional str = col.string_at_offset(offset); + if (str.has_value()) { + val = string_pool->get(*str, true).offset(); + } else { + val = offset; } - previous_value_index = *(iter.value()) + 1; - ++(iter.value()); + typename col_type_info::RawType val_copy(val); + offset_to_group.insert(std::make_pair(std::forward(offset), std::forward(val_copy))); } + } else { + val = enumerating_it.value(); + } - if (auto it = hash_to_group->find(val); it == hash_to_group->end()) { - row_to_group.emplace_back(next_group_id); - auto group_id = next_group_id++; - hash_to_group->insert(std::make_pair(std::forward(val), std::forward(group_id))); - } else { - row_to_group.emplace_back(it->second); + if (is_sparse) { + for (auto j = previous_value_index; j != enumerating_it.idx(); ++j) { + row_to_group.emplace_back(missing_value_group_id); } + previous_value_index = enumerating_it.idx() + 1; } - } - - // Marking all the last non-represented values as missing. - for (size_t i = row_to_group.size(); i <= size_t(col.column_->last_row()); ++i) { - row_to_group.emplace_back(missing_value_group_id); - } - num_unique = next_group_id; - util::check(num_unique != 0, "Got zero unique values"); - for (auto agg_data: folly::enumerate(aggregators_data)) { - auto input_column_name = aggregators_.at(agg_data.index).get_input_column_name(); - auto input_column = proc_.get(input_column_name); - std::optional opt_input_column; - if (std::holds_alternative(input_column)) { - auto column_with_strings = std::get(input_column); - // Empty columns don't contribute to aggregations - if (!is_empty_type(column_with_strings.column_->type().data_type())) { - opt_input_column.emplace(std::move(column_with_strings)); - } + if (auto it = hash_to_group->find(val); it == hash_to_group->end()) { + row_to_group.emplace_back(next_group_id); + auto group_id = next_group_id++; + hash_to_group->insert(std::make_pair(std::forward(val), std::forward(group_id))); + } else { + row_to_group.emplace_back(it->second); } - agg_data->aggregate(opt_input_column, row_to_group, num_unique); } - }); + ); + + // Marking all the last non-represented values as missing. + for (size_t i = row_to_group.size(); i <= size_t(col.column_->last_row()); ++i) { + row_to_group.emplace_back(missing_value_group_id); + } + + num_unique = next_group_id; + util::check(num_unique != 0, "Got zero unique values"); + for (auto agg_data: folly::enumerate(aggregators_data)) { + auto input_column_name = aggregators_.at(agg_data.index).get_input_column_name(); + auto input_column = proc_.get(input_column_name); + std::optional opt_input_column; + if (std::holds_alternative(input_column)) { + auto column_with_strings = std::get(input_column); + // Empty columns don't contribute to aggregations + if (!is_empty_type(column_with_strings.column_->type().data_type())) { + opt_input_column.emplace(std::move(column_with_strings)); + } + } + agg_data->aggregate(opt_input_column, row_to_group, num_unique); + } + }); } else { util::raise_rte("Expected single column from expression"); } @@ -504,26 +495,26 @@ Composite AggregationClause::process(Composite&& entity_id SegmentInMemory seg; auto index_col = std::make_shared(make_scalar_type(grouping_data_type), grouping_map.size(), true, false); - auto index_pos = seg.add_column(scalar_field(grouping_data_type, grouping_column_), index_col); + seg.add_column(scalar_field(grouping_data_type, grouping_column_), index_col); seg.descriptor().set_index(IndexDescriptor(0, IndexDescriptor::ROWCOUNT)); - entity::details::visit_type(grouping_data_type, [&seg, &grouping_map, index_pos](auto data_type_tag) { - using DataTypeTagType = decltype(data_type_tag); - using RawType = typename DataTypeTagType::raw_type; - auto hashes = grouping_map.get(); - auto index_ptr = reinterpret_cast(seg.column(index_pos).ptr()); - std::vector> elements; + details::visit_type(grouping_data_type, [&grouping_map, &index_col](auto data_type_tag) { + using col_type_info = ScalarTypeInfo; + auto hashes = grouping_map.get(); + std::vector> elements; for (const auto &hash : *hashes) elements.push_back(std::make_pair(hash.first, hash.second)); std::sort(std::begin(elements), std::end(elements), - [](const std::pair &l, const std::pair &r) { + [](const std::pair &l, const std::pair &r) { return l.second < r.second; }); - for (const auto &element : elements) - *index_ptr++ = element.first; + auto column_data = index_col->data(); + std::transform(elements.cbegin(), elements.cend(), column_data.begin(), [](const auto& element) { + return element.first; + }); }); index_col->set_row_data(grouping_map.size() - 1); diff --git a/python/tests/unit/arcticdb/version_store/test_aggregation.py b/python/tests/unit/arcticdb/version_store/test_aggregation.py index ed4dbdf674..e811cae25d 100644 --- a/python/tests/unit/arcticdb/version_store/test_aggregation.py +++ b/python/tests/unit/arcticdb/version_store/test_aggregation.py @@ -36,6 +36,29 @@ def test_group_on_float_column_with_nans(lmdb_version_store): assert_frame_equal(expected, received) +# TODO: Add first and last once un-feature flagged +@pytest.mark.parametrize("aggregator", ("sum", "min", "max", "mean", "count")) +def test_aggregate_float_columns_with_nans(lmdb_version_store, aggregator): + lib = lmdb_version_store + sym = "test_aggregate_float_columns_with_nans" + df = pd.DataFrame( + { + "grouping_column": 3 * ["some nans", "only nans"], + "agg_column": [1.0, np.nan, 2.0, np.nan, np.nan, np.nan], + } + ) + lib.write(sym, df) + expected = df.groupby("grouping_column").agg({"agg_column": aggregator}) + # We count in unsigned integers for obvious reasons + if aggregator == "count": + expected = expected.astype(np.uint64) + q = QueryBuilder() + q = q.groupby("grouping_column").agg({"agg_column": aggregator}) + received = lib.read(sym, query_builder=q).data + received.sort_index(inplace=True) + assert_frame_equal(expected, received) + + @use_of_function_scoped_fixtures_in_hypothesis_checked @settings(deadline=None) @given( @@ -428,28 +451,6 @@ def test_mean_aggregation_float(local_object_version_store): assert_frame_equal(res.data, df) -def test_mean_aggregation_float_nan(lmdb_version_store): - df = DataFrame( - { - "grouping_column": ["group_1", "group_1", "group_1", "group_2", "group_2"], - "to_mean": [1.1, 1.4, 2.5, np.nan, 2.2], - }, - index=np.arange(5), - ) - q = QueryBuilder() - q = q.groupby("grouping_column").agg({"to_mean": "mean"}) - symbol = "test_aggregation" - lmdb_version_store.write(symbol, df) - - res = lmdb_version_store.read(symbol, query_builder=q) - - df = pd.DataFrame({"to_mean": [(1.1 + 1.4 + 2.5) / 3, np.nan]}, index=["group_1", "group_2"]) - df.index.rename("grouping_column", inplace=True) - res.data.sort_index(inplace=True) - - assert_frame_equal(res.data, df) - - def test_max_minus_one(lmdb_version_store): symbol = "minus_one" lib = lmdb_version_store