Skip to content

Commit

Permalink
Bugfix 935: match pandas behviour when aggregating columns with nans (#…
Browse files Browse the repository at this point in the history
…1450)

Fixes #935 

Also addresses two of the bullet points from #1439:

- AggregationClause::process x2
- aggregation.cpp all finalize methods
  • Loading branch information
alexowens90 authored Mar 21, 2024
1 parent 85f4423 commit 39a2b1d
Show file tree
Hide file tree
Showing 4 changed files with 177 additions and 179 deletions.
4 changes: 2 additions & 2 deletions cpp/arcticdb/column_store/column_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ struct ColumnData {
}

// Used to construct [c]end iterators
explicit ColumnDataIterator(ColumnData* parent, typename TDT::DataTypeTag::raw_type* end_ptr):
explicit ColumnDataIterator(ColumnData* parent, RawType* end_ptr):
parent_(parent) {
data_.ptr_ = end_ptr;
}
Expand Down Expand Up @@ -304,7 +304,7 @@ struct ColumnData {
if(!data_->blocks().empty()) {
auto block = data_->blocks().at(num_blocks() - 1);
auto typed_block_data = next_typed_block<TDT>(block);
end_ptr = typed_block_data.data() + typed_block_data.row_count();
end_ptr = const_cast<RawType*>(typed_block_data.data() + typed_block_data.row_count());
}
return ColumnDataIterator<TDT, iterator_type, iterator_density, false>(this, end_ptr);
}
Expand Down
120 changes: 63 additions & 57 deletions cpp/arcticdb/processing/aggregation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,16 @@ namespace arcticdb
void MinMaxAggregatorData::aggregate(const ColumnWithStrings& input_column) {
details::visit_type(input_column.column_->type().data_type(), [&] (auto col_tag) {
using type_info = ScalarTypeInfo<decltype(col_tag)>;
using RawType = typename type_info::RawType;
if constexpr(!is_sequence_type(type_info::data_type)) {
Column::for_each<typename type_info::TDT>(*input_column.column_, [this](auto value) {
const auto& curr = static_cast<typename type_info::RawType>(value);
const auto& curr = static_cast<RawType>(value);
if (ARCTICDB_UNLIKELY(!min_.has_value())) {
min_ = std::make_optional<Value>(curr, type_info::data_type);
max_ = std::make_optional<Value>(curr, type_info::data_type);
} else {
min_->set(std::min(min_->get<typename type_info::RawType>(), curr));
max_->set(std::max(max_->get<typename type_info::RawType>(), curr));
min_->set(std::min(min_->get<RawType>(), curr));
max_->set(std::max(max_->get<RawType>(), curr));
}
});
} else {
Expand Down Expand Up @@ -168,21 +169,24 @@ void SumAggregatorData::aggregate(const std::optional<ColumnWithStrings>& input_
data_type_ = DataType::FLOAT64;
}
details::visit_type(*data_type_, [&input_column, unique_values, &groups, this] (auto global_tag) {
using GlobalInputType = decltype(global_tag);
if constexpr(!is_sequence_type(GlobalInputType::DataTypeTag::data_type)) {
using GlobalTypeDescriptorTag = typename OutputType<GlobalInputType>::type;
using GlobalRawType = typename GlobalTypeDescriptorTag::DataTypeTag::raw_type;
aggregated_.resize(sizeof(GlobalRawType)* unique_values);
auto out_ptr = reinterpret_cast<GlobalRawType*>(aggregated_.data());
using global_type_info = ScalarTypeInfo<decltype(global_tag)>;
using RawType = typename global_type_info::RawType;
if constexpr(!is_sequence_type(global_type_info::data_type)) {
aggregated_.resize(sizeof(RawType) * unique_values);
auto out_ptr = reinterpret_cast<RawType*>(aggregated_.data());
if (input_column.has_value()) {
details::visit_type(input_column->column_->type().data_type(), [&input_column, &groups, &out_ptr] (auto col_tag) {
using col_type_info = ScalarTypeInfo<decltype(col_tag)>;
if constexpr(!is_sequence_type(col_type_info::data_type)) {
Column::for_each_enumerated<typename col_type_info::TDT>(*input_column->column_, [&out_ptr, &groups](auto enumerating_it) {
if constexpr (std::is_same_v<GlobalRawType, bool>) {
out_ptr[groups[enumerating_it.idx()]] |= GlobalRawType(enumerating_it.value());
if constexpr (is_bool_type(global_type_info::data_type)) {
out_ptr[groups[enumerating_it.idx()]] |= RawType(enumerating_it.value());
} else if constexpr (is_floating_point_type(col_type_info::data_type)) {
if (ARCTICDB_LIKELY(!std::isnan(enumerating_it.value()))) {
out_ptr[groups[enumerating_it.idx()]] += RawType(enumerating_it.value());
}
} else {
out_ptr[groups[enumerating_it.idx()]] += GlobalRawType(enumerating_it.value());
out_ptr[groups[enumerating_it.idx()]] += RawType(enumerating_it.value());
}
});
} else {
Expand All @@ -198,8 +202,8 @@ SegmentInMemory SumAggregatorData::finalize(const ColumnName& output_column_name
SegmentInMemory res;
if(!aggregated_.empty()) {
details::visit_type(*data_type_, [that=this, &res, &output_column_name, unique_values] (auto col_tag) {
using RawType = typename decltype(col_tag)::DataTypeTag::raw_type;
that->aggregated_.resize(sizeof(RawType)* unique_values);
using col_type_info = ScalarTypeInfo<decltype(col_tag)>;
that->aggregated_.resize(sizeof(typename col_type_info::RawType)* unique_values);
auto col = std::make_shared<Column>(make_scalar_type(that->data_type_.value()), unique_values, true, false);
memcpy(col->ptr(), that->aggregated_.data(), that->aggregated_.size());
res.add_column(scalar_field(that->data_type_.value(), output_column_name.value), col);
Expand Down Expand Up @@ -248,26 +252,26 @@ namespace
) {
if(data_type_.has_value() && *data_type_ != DataType::EMPTYVAL && input_column.has_value()) {
details::visit_type(*data_type_, [&aggregated_, &input_column, unique_values, &groups] (auto global_tag) {
using GlobalInputType = decltype(global_tag);
if constexpr(!is_sequence_type(GlobalInputType::DataTypeTag::data_type)) {
using GlobalTypeDescriptorTag = typename OutputType<GlobalInputType>::type;
using GlobalRawType = typename GlobalTypeDescriptorTag::DataTypeTag::raw_type;
using global_type_info = ScalarTypeInfo<decltype(global_tag)>;
using GlobalRawType = typename global_type_info::RawType;
if constexpr(!is_sequence_type(global_type_info::data_type)) {
using MaybeValueType = MaybeValue<GlobalRawType, T>;
auto prev_size = aggregated_.size() / sizeof(MaybeValueType);
aggregated_.resize(sizeof(MaybeValueType) * unique_values);
auto out_ptr = reinterpret_cast<MaybeValueType*>(aggregated_.data());
std::fill(out_ptr + prev_size, out_ptr + unique_values, MaybeValueType{});
details::visit_type(input_column->column_->type().data_type(), [&input_column, &groups, &out_ptr] (auto col_tag) {
using col_type_info = ScalarTypeInfo<decltype(col_tag)>;
using ColRawType = typename col_type_info::RawType;
if constexpr(!is_sequence_type(col_type_info::data_type)) {
Column::for_each_enumerated<typename col_type_info::TDT>(*input_column->column_, [&groups, &out_ptr](auto enumerating_it) {
auto& val = out_ptr[groups[enumerating_it.idx()]];
if constexpr(std::is_floating_point_v<typename col_type_info::RawType>) {
if constexpr(std::is_floating_point_v<ColRawType>) {
const auto& curr = GlobalRawType(enumerating_it.value());
if (!val.written_ || std::isnan(static_cast<typename col_type_info::RawType>(val.value_))) {
if (!val.written_ || std::isnan(static_cast<ColRawType>(val.value_))) {
val.value_ = curr;
val.written_ = true;
} else if (!std::isnan(static_cast<typename col_type_info::RawType>(curr))) {
} else if (!std::isnan(static_cast<ColRawType>(curr))) {
if constexpr(T == Extremum::MAX) {
val.value_ = std::max(val.value_, curr);
} else {
Expand Down Expand Up @@ -302,36 +306,32 @@ namespace
) {
SegmentInMemory res;
if(!aggregated_.empty()) {
if(dynamic_schema) {
details::visit_type(*data_type_, [&aggregated_, &res, &output_column_name, unique_values] (auto col_tag) {
using RawType = typename decltype(col_tag)::DataTypeTag::raw_type;
using MaybeValueType = MaybeValue<RawType, T>;
constexpr auto dynamic_schema_data_type = DataType::FLOAT64;
using DynamicSchemaTDT = ScalarTagType<DataTypeTag<dynamic_schema_data_type>>;
auto col = std::make_shared<Column>(make_scalar_type(dynamic_schema ? dynamic_schema_data_type: data_type_.value()), unique_values, true, false);
auto column_data = col->data();
col->set_row_data(unique_values - 1);
res.add_column(scalar_field(dynamic_schema ? dynamic_schema_data_type : data_type_.value(), output_column_name.value), col);
details::visit_type(*data_type_, [&aggregated_, &column_data, unique_values, dynamic_schema] (auto col_tag) {
using col_type_info = ScalarTypeInfo<decltype(col_tag)>;
using MaybeValueType = MaybeValue<typename col_type_info::RawType, T>;
if(dynamic_schema) {
auto prev_size = aggregated_.size() / sizeof(MaybeValueType);
auto new_size = sizeof(MaybeValueType) * unique_values;
aggregated_.resize(new_size);
auto in_ptr = reinterpret_cast<MaybeValueType*>(aggregated_.data());
auto in_ptr = reinterpret_cast<MaybeValueType *>(aggregated_.data());
std::fill(in_ptr + prev_size, in_ptr + unique_values, MaybeValueType{});
auto col = std::make_shared<Column>(make_scalar_type(DataType::FLOAT64), unique_values, true, false);
auto out_ptr = reinterpret_cast<double*>(col->ptr());
for(auto i = 0u; i < unique_values; ++i, ++in_ptr, ++out_ptr) {
*out_ptr = in_ptr->written_ ? static_cast<double>(in_ptr->value_) : std::numeric_limits<double>::quiet_NaN(); }

col->set_row_data(unique_values - 1);
res.add_column(scalar_field(DataType::FLOAT64, output_column_name.value), col);
});
} else {
details::visit_type(*data_type_, [&aggregated_, &data_type_, &res, output_column_name, unique_values] (auto col_tag) {
using RawType = typename decltype(col_tag)::DataTypeTag::raw_type;
auto col = std::make_shared<Column>(make_scalar_type(data_type_.value()), unique_values, true, false);
const auto* in_ptr = reinterpret_cast<const MaybeValue<RawType, T>*>(aggregated_.data());
auto out_ptr = reinterpret_cast<RawType*>(col->ptr());
for(auto i = 0u; i < unique_values; ++i, ++in_ptr, ++out_ptr) {
*out_ptr = in_ptr->value_;
for (auto it = column_data.begin<DynamicSchemaTDT>(); it != column_data.end<DynamicSchemaTDT>(); ++it, ++in_ptr) {
*it = in_ptr->written_ ? static_cast<double>(in_ptr->value_)
: std::numeric_limits<double>::quiet_NaN();
}
col->set_row_data(unique_values - 1);
res.add_column(scalar_field(data_type_.value(), output_column_name.value), col);
});
}
} else {
auto in_ptr = reinterpret_cast<MaybeValueType*>(aggregated_.data());
for (auto it = column_data.begin<typename col_type_info::TDT>(); it != column_data.end<typename col_type_info::TDT>(); ++it, ++in_ptr) {
*it = in_ptr->value_;
}
}
});
}
return res;
}
Expand Down Expand Up @@ -387,8 +387,15 @@ void MeanAggregatorData::aggregate(const std::optional<ColumnWithStrings>& input
if constexpr(!is_sequence_type(col_type_info::data_type)) {
Column::for_each_enumerated<typename col_type_info::TDT>(*input_column->column_, [&groups, this](auto enumerating_it) {
auto& fraction = fractions_[groups[enumerating_it.idx()]];
fraction.numerator_ += double(enumerating_it.value());
++fraction.denominator_;
if constexpr ((is_floating_point_type(col_type_info ::data_type))) {
if (ARCTICDB_LIKELY(!std::isnan(enumerating_it.value()))) {
fraction.numerator_ += double(enumerating_it.value());
++fraction.denominator_;
}
} else {
fraction.numerator_ += double(enumerating_it.value());
++fraction.denominator_;
}
});
} else {
util::raise_rte("String aggregations not currently supported");
Expand All @@ -401,14 +408,13 @@ SegmentInMemory MeanAggregatorData::finalize(const ColumnName& output_column_nam
SegmentInMemory res;
if(!fractions_.empty()) {
fractions_.resize(unique_values);
auto pos = res.add_column(scalar_field(DataType::FLOAT64, output_column_name.value), fractions_.size(), true);
auto& column = res.column(pos);
auto ptr = reinterpret_cast<double*>(column.ptr());
column.set_row_data(fractions_.size() - 1);

for (auto idx = 0u; idx < fractions_.size(); ++idx) {
ptr[idx] = fractions_[idx].to_double();
}
auto col = std::make_shared<Column>(make_scalar_type(DataType::FLOAT64), fractions_.size(), true, false);
auto column_data = col->data();
std::transform(fractions_.cbegin(), fractions_.cend(), column_data.begin<ScalarTagType<DataTypeTag<DataType::FLOAT64>>>(), [](auto fraction) {
return fraction.to_double();
});
col->set_row_data(fractions_.size() - 1);
res.add_column(scalar_field(DataType::FLOAT64, output_column_name.value), col);
}
return res;
}
Expand All @@ -429,7 +435,7 @@ void CountAggregatorData::aggregate(const std::optional<ColumnWithStrings>& inpu
using col_type_info = ScalarTypeInfo<decltype(col_tag)>;
Column::for_each_enumerated<typename col_type_info::TDT>(*input_column->column_, [&groups, this](auto enumerating_it) {
if constexpr (is_floating_point_type(col_type_info::data_type)) {
if (!std::isnan(static_cast<double>(enumerating_it.value()))) {
if (ARCTICDB_LIKELY(!std::isnan(enumerating_it.value()))) {
auto& val = aggregated_[groups[enumerating_it.idx()]];
++val;
}
Expand Down
Loading

0 comments on commit 39a2b1d

Please sign in to comment.