Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for first and last aggregators - string columns #1151

Open
wants to merge 16 commits into
base: master
Choose a base branch
from
Open
8 changes: 8 additions & 0 deletions cpp/arcticdb/column_store/memory_segment.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,14 @@ class SegmentInMemory {
impl_->set_compacted(val);
}

bool fixed_str_as_dyn() const {
return impl_->fixed_str_as_dyn();
}

void set_fixed_str_as_dyn(bool val) {
impl_->set_fixed_str_as_dyn(val);
}

void change_schema(const StreamDescriptor& descriptor) {
return impl_->change_schema(descriptor);
}
Expand Down
9 changes: 9 additions & 0 deletions cpp/arcticdb/column_store/memory_segment_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -733,6 +733,14 @@ class SegmentInMemoryImpl {
compacted_ = value;
}

bool fixed_str_as_dyn() const {
return fixed_str_as_dyn_;
}

void set_fixed_str_as_dyn(bool value) {
fixed_str_as_dyn_ = value;
}

void check_magic() const {
magic_.check();
}
Expand Down Expand Up @@ -815,6 +823,7 @@ class SegmentInMemoryImpl {
mutable std::unique_ptr<std::mutex> column_map_mutex_ = std::make_unique<std::mutex>();
bool allow_sparse_ = false;
bool compacted_ = false;
bool fixed_str_as_dyn_ = false;
util::MagicNum<'M', 'S', 'e', 'g'> magic_;
std::shared_ptr<FieldCollection> index_fields_;
std::shared_ptr<arcticdb::proto::descriptors::TimeSeriesDescriptor> tsd_;
Expand Down
2 changes: 1 addition & 1 deletion cpp/arcticdb/pipeline/read_frame.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1061,7 +1061,7 @@ std::unique_ptr<StringReducer> get_string_reducer(
const auto alloc_width = get_max_string_size_in_column(column.data().buffer(), context, frame, frame_field, slice_map, true);
string_reducer = std::make_unique<UnicodeConvertingStringReducer>(column, context, frame, frame_field, alloc_width);
} else {
const auto alloc_width = get_max_string_size_in_column(column.data().buffer(), context, frame, frame_field, slice_map, false);
const auto alloc_width = get_max_string_size_in_column(column.data().buffer(), context, frame, frame_field, slice_map, frame.fixed_str_as_dyn());
string_reducer = std::make_unique<FixedStringReducer>(column, context, frame, frame_field, alloc_width);
}
} else {
Expand Down
58 changes: 48 additions & 10 deletions cpp/arcticdb/processing/aggregation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,10 @@ void FirstAggregatorData::add_data_type(DataType data_type) {
add_data_type_impl(data_type, data_type_);
}

void FirstAggregatorData::set_string_offset_map(const std::multimap<std::pair<entity::position_t, std::size_t>, entity::position_t>& offset_map) {
str_offset_mapping_ = offset_map;
}

void FirstAggregatorData::aggregate(const std::optional<ColumnWithStrings>& input_column, const std::vector<size_t>& groups, size_t unique_values) {
if(data_type_.has_value() && *data_type_ != DataType::EMPTYVAL && input_column.has_value()) {
details::visit_type(*data_type_, [&input_column, unique_values, &groups, this] (auto global_tag) {
Expand Down Expand Up @@ -502,13 +506,28 @@ void FirstAggregatorData::aggregate(const std::optional<ColumnWithStrings>& inpu
SegmentInMemory FirstAggregatorData::finalize(const ColumnName& output_column_name, bool, size_t unique_values) {
SegmentInMemory res;
if(!aggregated_.empty()) {
details::visit_type(*data_type_, [that=this, &res, &output_column_name, unique_values] (auto col_tag) {
details::visit_type(*data_type_, [this, &res, &output_column_name, unique_values] (auto col_tag) {
using InputType = decltype(col_tag);
using RawType = typename decltype(col_tag)::DataTypeTag::raw_type;
that->aggregated_.resize(sizeof(RawType)* unique_values);
auto col = std::make_shared<Column>(make_scalar_type(that->data_type_.value()), unique_values, true, false);
memcpy(col->ptr(), that->aggregated_.data(), that->aggregated_.size());
res.add_column(scalar_field(that->data_type_.value(), output_column_name.value), col);
aggregated_.resize(sizeof(RawType)* unique_values);
auto col = std::make_shared<Column>(make_scalar_type(data_type_.value()), unique_values, true, false);
memcpy(col->ptr(), aggregated_.data(), aggregated_.size());
// In case of strings, we need to set the output column to the correct offset pointing to StringPool which contains the actual string
// This is done using `str_offset_mapping_` set in `AggregationClause::process`
if constexpr(is_sequence_type(InputType::DataTypeTag::data_type)) {
auto col_ptr = reinterpret_cast<RawType*>(col->ptr());
for (auto i = 0u; i < unique_values; ++i, ++col_ptr) {
auto first_el = str_offset_mapping_.lower_bound(std::make_pair(*col_ptr, i));
if(first_el != str_offset_mapping_.end()) {
*col_ptr = first_el->second;
}
}
}
res.add_column(scalar_field(data_type_.value(), output_column_name.value), col);
col->set_row_data(unique_values - 1);
// Set `fixed_str_as_dyn` flag to `true` to get the max instead of first string size
// cf. `get_string_reducer` when reducing columns
res.set_fixed_str_as_dyn(true);
});
}
return res;
Expand All @@ -522,6 +541,10 @@ void LastAggregatorData::add_data_type(DataType data_type) {
add_data_type_impl(data_type, data_type_);
}

void LastAggregatorData::set_string_offset_map(const std::multimap<std::pair<entity::position_t, std::size_t>, entity::position_t>& offset_map) {
str_offset_mapping_ = offset_map;
}

void LastAggregatorData::aggregate(const std::optional<ColumnWithStrings>& input_column, const std::vector<size_t>& groups, size_t unique_values) {
if(data_type_.has_value() && *data_type_ != DataType::EMPTYVAL && input_column.has_value()) {
details::visit_type(*data_type_, [&input_column, unique_values, &groups, this] (auto global_tag) {
Expand Down Expand Up @@ -559,13 +582,28 @@ void LastAggregatorData::aggregate(const std::optional<ColumnWithStrings>& input
SegmentInMemory LastAggregatorData::finalize(const ColumnName& output_column_name, bool, size_t unique_values) {
SegmentInMemory res;
if(!aggregated_.empty()) {
details::visit_type(*data_type_, [that=this, &res, &output_column_name, unique_values] (auto col_tag) {
details::visit_type(*data_type_, [this, &res, &output_column_name, unique_values] (auto col_tag) {
using InputType = decltype(col_tag);
using RawType = typename decltype(col_tag)::DataTypeTag::raw_type;
that->aggregated_.resize(sizeof(RawType)* unique_values);
auto col = std::make_shared<Column>(make_scalar_type(that->data_type_.value()), unique_values, true, false);
memcpy(col->ptr(), that->aggregated_.data(), that->aggregated_.size());
res.add_column(scalar_field(that->data_type_.value(), output_column_name.value), col);
aggregated_.resize(sizeof(RawType)* unique_values);
auto col = std::make_shared<Column>(make_scalar_type(data_type_.value()), unique_values, true, false);
memcpy(col->ptr(), aggregated_.data(), aggregated_.size());
// In case of strings, we need to set the output column to the correct offset pointing to StringPool which contains the actual string
// This is done using `str_offset_mapping_` set in `AggregationClause::process`
if constexpr(is_sequence_type(InputType::DataTypeTag::data_type)) {
auto col_ptr = reinterpret_cast<RawType*>(col->ptr());
for (auto i = 0u; i < unique_values; ++i, ++col_ptr) {
auto last_el = --str_offset_mapping_.upper_bound(std::make_pair(*col_ptr, i));
if(last_el != str_offset_mapping_.end()) {
*col_ptr = last_el->second;
}
}
}
res.add_column(scalar_field(data_type_.value(), output_column_name.value), col);
col->set_row_data(unique_values - 1);
// Set `fixed_str_as_dyn` flag to `true` to get the max instead of first string size
// cf. `get_string_reducer` when reducing columns
res.set_fixed_str_as_dyn(true);
});
}
return res;
Expand Down
13 changes: 13 additions & 0 deletions cpp/arcticdb/processing/aggregation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

#pragma once

#include <map>

#include <arcticdb/column_store/memory_segment.hpp>
#include <arcticdb/entity/types.hpp>
#include <arcticdb/entity/type_utils.hpp>
Expand Down Expand Up @@ -69,6 +71,7 @@ class SumAggregatorData : private AggregatorDataBase
public:

void add_data_type(DataType data_type);
void set_string_offset_map(const std::multimap<std::pair<entity::position_t, std::size_t>, entity::position_t>&) {}
void aggregate(const std::optional<ColumnWithStrings>& input_column, const std::vector<size_t>& groups, size_t unique_values);
SegmentInMemory finalize(const ColumnName& output_column_name, bool dynamic_schema, size_t unique_values);

Expand All @@ -83,6 +86,7 @@ class MaxAggregatorData : private AggregatorDataBase
public:

void add_data_type(DataType data_type);
void set_string_offset_map(const std::multimap<std::pair<entity::position_t, std::size_t>, entity::position_t>&) {}
void aggregate(const std::optional<ColumnWithStrings>& input_column, const std::vector<size_t>& groups, size_t unique_values);
SegmentInMemory finalize(const ColumnName& output_column_name, bool dynamic_schema, size_t unique_values);

Expand All @@ -97,6 +101,7 @@ class MinAggregatorData : private AggregatorDataBase
public:

void add_data_type(DataType data_type);
void set_string_offset_map(const std::multimap<std::pair<entity::position_t, std::size_t>, entity::position_t>&) {}
void aggregate(const std::optional<ColumnWithStrings>& input_column, const std::vector<size_t>& groups, size_t unique_values);
SegmentInMemory finalize(const ColumnName& output_column_name, bool dynamic_schema, size_t unique_values);

Expand All @@ -112,6 +117,7 @@ class MeanAggregatorData : private AggregatorDataBase

// Mean values are always doubles so this is a no-op
void add_data_type(DataType) {}
void set_string_offset_map(const std::multimap<std::pair<entity::position_t, std::size_t>, entity::position_t>&) {}
void aggregate(const std::optional<ColumnWithStrings>& input_column, const std::vector<size_t>& groups, size_t unique_values);
SegmentInMemory finalize(const ColumnName& output_column_name, bool dynamic_schema, size_t unique_values);

Expand All @@ -134,6 +140,7 @@ class CountAggregatorData : private AggregatorDataBase

// Count values are always integers so this is a no-op
void add_data_type(DataType) {}
void set_string_offset_map(const std::multimap<std::pair<entity::position_t, std::size_t>, entity::position_t>&) {}
void aggregate(const std::optional<ColumnWithStrings>& input_column, const std::vector<size_t>& groups, size_t unique_values);
SegmentInMemory finalize(const ColumnName& output_column_name, bool dynamic_schema, size_t unique_values);

Expand All @@ -147,6 +154,8 @@ class FirstAggregatorData : private AggregatorDataBase
public:

void add_data_type(DataType data_type);
// Needs to be called before finalize - only used for strings columns case
void set_string_offset_map(const std::multimap<std::pair<entity::position_t, std::size_t>, entity::position_t>& offset_map);
void aggregate(const std::optional<ColumnWithStrings>& input_column, const std::vector<size_t>& groups, size_t unique_values);
SegmentInMemory finalize(const ColumnName& output_column_name, bool dynamic_schema, size_t unique_values);

Expand All @@ -155,6 +164,7 @@ class FirstAggregatorData : private AggregatorDataBase
std::vector<uint8_t> aggregated_;
std::optional<DataType> data_type_;

std::multimap<std::pair<entity::position_t, std::size_t>, entity::position_t> str_offset_mapping_;
std::unordered_set<size_t> groups_cache_;
};

Expand All @@ -163,6 +173,8 @@ class LastAggregatorData : private AggregatorDataBase
public:

void add_data_type(DataType data_type);
// Needs to be called before finalize - only used for strings columns case
void set_string_offset_map(const std::multimap<std::pair<entity::position_t, std::size_t>, entity::position_t>& offset_map);
void aggregate(const std::optional<ColumnWithStrings>& input_column, const std::vector<size_t>& groups, size_t unique_values);
SegmentInMemory finalize(const ColumnName& output_column_name, bool dynamic_schema, size_t unique_values);

Expand All @@ -171,6 +183,7 @@ class LastAggregatorData : private AggregatorDataBase
std::vector<uint8_t> aggregated_;
std::optional<DataType> data_type_;

std::multimap<std::pair<entity::position_t, std::size_t>, entity::position_t> str_offset_mapping_;
std::unordered_set<size_t> groups_cache_;
};

Expand Down
5 changes: 4 additions & 1 deletion cpp/arcticdb/processing/aggregation_interface.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

#pragma once

#include <map>
#include <utility>
#include <folly/Poly.h>

namespace arcticdb{
Expand All @@ -22,10 +24,11 @@ struct IGroupingAggregatorData {
[[nodiscard]] SegmentInMemory finalize(const ColumnName& output_column_name, bool dynamic_schema, size_t unique_values) {
return folly::poly_call<2>(*this, output_column_name, dynamic_schema, unique_values);
}
void set_string_offset_map(const std::multimap<std::pair<entity::position_t, std::size_t>, entity::position_t>& offset_map) { folly::poly_call<3>(*this, offset_map); }
};

template<class T>
using Members = folly::PolyMembers<&T::add_data_type, &T::aggregate, &T::finalize>;
using Members = folly::PolyMembers<&T::add_data_type, &T::aggregate, &T::finalize, &T::set_string_offset_map>;
};

using GroupingAggregatorData = folly::Poly<IGroupingAggregatorData>;
Expand Down
Loading
Loading