Skip to content

Commit

Permalink
Modify to make strings agg work with append
Browse files Browse the repository at this point in the history
  • Loading branch information
Hind-M committed Dec 26, 2023
1 parent d423312 commit 05115a9
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 57 deletions.
14 changes: 8 additions & 6 deletions cpp/arcticdb/processing/aggregation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,7 @@ void FirstAggregatorData::add_data_type(DataType data_type) {
add_data_type_impl(data_type, data_type_);
}

void FirstAggregatorData::set_string_offset_map(const std::unordered_map<entity::position_t, entity::position_t>& offset_map) {
void FirstAggregatorData::set_string_offset_map(const std::multimap<std::pair<entity::position_t, std::size_t>, entity::position_t>& offset_map) {
str_offset_mapping_ = offset_map;
}

Expand Down Expand Up @@ -581,8 +581,9 @@ SegmentInMemory FirstAggregatorData::finalize(const ColumnName& output_column_na
if constexpr(is_sequence_type(InputType::DataTypeTag::data_type)) {
auto col_ptr = reinterpret_cast<RawType*>(col->ptr());
for (auto i = 0u; i < unique_values; ++i, ++col_ptr) {
if(str_offset_mapping_.find(*col_ptr) != str_offset_mapping_.end()) {
*col_ptr = str_offset_mapping_[*col_ptr];
auto first_el = str_offset_mapping_.lower_bound(std::make_pair(*col_ptr, i));
if(first_el != str_offset_mapping_.end()) {
*col_ptr = first_el->second;
}
}
}
Expand All @@ -601,7 +602,7 @@ void LastAggregatorData::add_data_type(DataType data_type) {
add_data_type_impl(data_type, data_type_);
}

void LastAggregatorData::set_string_offset_map(const std::unordered_map<entity::position_t, entity::position_t>& offset_map) {
void LastAggregatorData::set_string_offset_map(const std::multimap<std::pair<entity::position_t, std::size_t>, entity::position_t>& offset_map) {
str_offset_mapping_ = offset_map;
}

Expand Down Expand Up @@ -653,8 +654,9 @@ SegmentInMemory LastAggregatorData::finalize(const ColumnName& output_column_nam
if constexpr(is_sequence_type(InputType::DataTypeTag::data_type)) {
auto col_ptr = reinterpret_cast<RawType*>(col->ptr());
for (auto i = 0u; i < unique_values; ++i, ++col_ptr) {
if(str_offset_mapping_.find(*col_ptr) != str_offset_mapping_.end()) {
*col_ptr = str_offset_mapping_[*col_ptr];
auto last_el = --str_offset_mapping_.upper_bound(std::make_pair(*col_ptr, i));
if(last_el != str_offset_mapping_.end()) {
*col_ptr = last_el->second;
}
}
}
Expand Down
20 changes: 10 additions & 10 deletions cpp/arcticdb/processing/aggregation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

#pragma once

#include <unordered_map>
#include <map>

#include <arcticdb/column_store/memory_segment.hpp>
#include <arcticdb/entity/types.hpp>
Expand Down Expand Up @@ -71,7 +71,7 @@ class SumAggregatorData : private AggregatorDataBase
public:

void add_data_type(DataType data_type);
void set_string_offset_map(const std::unordered_map<entity::position_t, entity::position_t>&) {}
void set_string_offset_map(const std::multimap<std::pair<entity::position_t, std::size_t>, entity::position_t>&) {}
void aggregate(const std::optional<ColumnWithStrings>& input_column, const std::vector<size_t>& groups, size_t unique_values);
SegmentInMemory finalize(const ColumnName& output_column_name, bool dynamic_schema, size_t unique_values);

Expand All @@ -86,7 +86,7 @@ class MaxAggregatorData : private AggregatorDataBase
public:

void add_data_type(DataType data_type);
void set_string_offset_map(const std::unordered_map<entity::position_t, entity::position_t>&) {}
void set_string_offset_map(const std::multimap<std::pair<entity::position_t, std::size_t>, entity::position_t>&) {}
void aggregate(const std::optional<ColumnWithStrings>& input_column, const std::vector<size_t>& groups, size_t unique_values);
SegmentInMemory finalize(const ColumnName& output_column_name, bool dynamic_schema, size_t unique_values);

Expand All @@ -101,7 +101,7 @@ class MinAggregatorData : private AggregatorDataBase
public:

void add_data_type(DataType data_type);
void set_string_offset_map(const std::unordered_map<entity::position_t, entity::position_t>&) {}
void set_string_offset_map(const std::multimap<std::pair<entity::position_t, std::size_t>, entity::position_t>&) {}
void aggregate(const std::optional<ColumnWithStrings>& input_column, const std::vector<size_t>& groups, size_t unique_values);
SegmentInMemory finalize(const ColumnName& output_column_name, bool dynamic_schema, size_t unique_values);

Expand All @@ -117,7 +117,7 @@ class MeanAggregatorData : private AggregatorDataBase

// Mean values are always doubles so this is a no-op
void add_data_type(DataType) {}
void set_string_offset_map(const std::unordered_map<entity::position_t, entity::position_t>&) {}
void set_string_offset_map(const std::multimap<std::pair<entity::position_t, std::size_t>, entity::position_t>&) {}
void aggregate(const std::optional<ColumnWithStrings>& input_column, const std::vector<size_t>& groups, size_t unique_values);
SegmentInMemory finalize(const ColumnName& output_column_name, bool dynamic_schema, size_t unique_values);

Expand All @@ -140,7 +140,7 @@ class CountAggregatorData : private AggregatorDataBase

// Count values are always integers so this is a no-op
void add_data_type(DataType) {}
void set_string_offset_map(const std::unordered_map<entity::position_t, entity::position_t>&) {}
void set_string_offset_map(const std::multimap<std::pair<entity::position_t, std::size_t>, entity::position_t>&) {}
void aggregate(const std::optional<ColumnWithStrings>& input_column, const std::vector<size_t>& groups, size_t unique_values);
SegmentInMemory finalize(const ColumnName& output_column_name, bool dynamic_schema, size_t unique_values);

Expand All @@ -155,7 +155,7 @@ class FirstAggregatorData : private AggregatorDataBase

void add_data_type(DataType data_type);
// Needs to be called before finalize - only used for strings columns case
void set_string_offset_map(const std::unordered_map<entity::position_t, entity::position_t>& offset_map);
void set_string_offset_map(const std::multimap<std::pair<entity::position_t, std::size_t>, entity::position_t>& offset_map);
void aggregate(const std::optional<ColumnWithStrings>& input_column, const std::vector<size_t>& groups, size_t unique_values);
SegmentInMemory finalize(const ColumnName& output_column_name, bool dynamic_schema, size_t unique_values);

Expand All @@ -164,7 +164,7 @@ class FirstAggregatorData : private AggregatorDataBase
std::vector<uint8_t> aggregated_;
std::optional<DataType> data_type_;

std::unordered_map<entity::position_t, entity::position_t> str_offset_mapping_;
std::multimap<std::pair<entity::position_t, std::size_t>, entity::position_t> str_offset_mapping_;
std::unordered_set<size_t> groups_cache_;
};

Expand All @@ -174,7 +174,7 @@ class LastAggregatorData : private AggregatorDataBase

void add_data_type(DataType data_type);
// Needs to be called before finalize - only used for strings columns case
void set_string_offset_map(const std::unordered_map<entity::position_t, entity::position_t>& offset_map);
void set_string_offset_map(const std::multimap<std::pair<entity::position_t, std::size_t>, entity::position_t>& offset_map);
void aggregate(const std::optional<ColumnWithStrings>& input_column, const std::vector<size_t>& groups, size_t unique_values);
SegmentInMemory finalize(const ColumnName& output_column_name, bool dynamic_schema, size_t unique_values);

Expand All @@ -183,7 +183,7 @@ class LastAggregatorData : private AggregatorDataBase
std::vector<uint8_t> aggregated_;
std::optional<DataType> data_type_;

std::unordered_map<entity::position_t, entity::position_t> str_offset_mapping_;
std::multimap<std::pair<entity::position_t, std::size_t>, entity::position_t> str_offset_mapping_;
std::unordered_set<size_t> groups_cache_;
};

Expand Down
5 changes: 3 additions & 2 deletions cpp/arcticdb/processing/aggregation_interface.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@

#pragma once

#include <unordered_map>
#include <map>
#include <utility>
#include <folly/Poly.h>

namespace arcticdb{
Expand All @@ -23,7 +24,7 @@ struct IGroupingAggregatorData {
[[nodiscard]] SegmentInMemory finalize(const ColumnName& output_column_name, bool dynamic_schema, size_t unique_values) {
return folly::poly_call<2>(*this, output_column_name, dynamic_schema, unique_values);
}
void set_string_offset_map(const std::unordered_map<entity::position_t, entity::position_t>& offset_map) { folly::poly_call<3>(*this, offset_map); }
void set_string_offset_map(const std::multimap<std::pair<entity::position_t, std::size_t>, entity::position_t>& offset_map) { folly::poly_call<3>(*this, offset_map); }
};

template<class T>
Expand Down
68 changes: 31 additions & 37 deletions cpp/arcticdb/processing/clause.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@
*
* As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0.
*/
#include <unordered_map>
#include <vector>
#include <variant>
#include <map>

#include <folly/Poly.h>

Expand Down Expand Up @@ -399,14 +397,17 @@ Composite<EntityIds> AggregationClause::process(Composite<EntityIds>&& entity_id
DataType grouping_data_type;
GroupingMap grouping_map;
Composite<ProcessingUnit> procs(std::move(procs_as_range));

// The multimap to store old/new string offsets: <<old_offset, group_id>, new_offset>
std::multimap<std::pair<entity::position_t, std::size_t>, entity::position_t> str_offset_mapping;
procs.broadcast(
[&num_unique, &grouping_data_type, &grouping_map, &next_group_id, &aggregators_data, &string_pool, this](auto &proc) {
[&str_offset_mapping, &num_unique, &grouping_data_type, &grouping_map, &next_group_id, &aggregators_data, &string_pool, this](auto &proc) {
auto partitioning_column = proc.get(ColumnName(grouping_column_));
if (std::holds_alternative<ColumnWithStrings>(partitioning_column)) {
ColumnWithStrings col = std::get<ColumnWithStrings>(partitioning_column);
entity::details::visit_type(col.column_->type().data_type(),
[&proc_=proc, &grouping_map, &next_group_id, &aggregators_data, &string_pool, &col,
&num_unique, &grouping_data_type, this](auto data_type_tag) {
&str_offset_mapping, &num_unique, &grouping_data_type, this](auto data_type_tag) {
using DataTypeTagType = decltype(data_type_tag);
using RawType = typename DataTypeTagType::raw_type;
constexpr auto data_type = DataTypeTagType::data_type;
Expand Down Expand Up @@ -498,6 +499,31 @@ Composite<EntityIds> AggregationClause::process(Composite<EntityIds>&& entity_id
opt_input_column.emplace(std::move(column_with_strings));
}
}

// Strings case: Add the string to the output string_pool and set map of strings offsets
auto output_column_name = aggregators_.at(agg_data.index).get_output_column_name();
auto output_column = proc_.get(output_column_name);
if (std::holds_alternative<ColumnWithStrings>(output_column)) {
auto output_column_with_strings = std::get<ColumnWithStrings>(output_column);
if (is_sequence_type(output_column_with_strings.column_->type().data_type())) {
auto output_data = output_column_with_strings.column_->data();
while (auto out_block = output_data.template next<ScalarTagType<DataTypeTagType>>()) {
const auto out_row_count = out_block->row_count();
auto out_ptr = out_block->data();
for (size_t orc = 0; orc < out_row_count; ++orc, ++out_ptr) {
std::optional<std::string_view> str = output_column_with_strings.string_at_offset(*out_ptr);
if (str.has_value()) {
// Add the string view `*str` to the output `string_pool` and map the new offset to the old one
auto out_offset = string_pool->get(*str, true).offset();
str_offset_mapping.insert(std::make_pair(std::make_pair(*out_ptr, row_to_group[orc]), out_offset));
}
}
}
// Set map of string offsets before calling finalize
agg_data->set_string_offset_map(str_offset_mapping);
}
}

agg_data->aggregate(opt_input_column, row_to_group, num_unique);
}
});
Expand Down Expand Up @@ -531,38 +557,6 @@ Composite<EntityIds> AggregationClause::process(Composite<EntityIds>&& entity_id
});
index_col->set_row_data(grouping_map.size() - 1);


// Strings case: Add the string to the output string_pool and set map of strings offsets
procs.broadcast([&grouping_data_type, &aggregators_data, &string_pool, this](auto &proc) {
entity::details::visit_type(grouping_data_type, [&aggregators_data, &proc, &string_pool, this](auto data_type_tag) {
using DataTypeTagType = decltype(data_type_tag);
for (auto agg_data: folly::enumerate(aggregators_data)) {
auto output_column_name = aggregators_.at(agg_data.index).get_output_column_name();
auto output_column = proc.get(output_column_name);
if (std::holds_alternative<ColumnWithStrings>(output_column)) {
auto output_column_with_strings = std::get<ColumnWithStrings>(output_column);
if (is_sequence_type(output_column_with_strings.column_->type().data_type())) {
std::unordered_map<entity::position_t, entity::position_t> str_offset_mapping;
auto output_data = output_column_with_strings.column_->data();
while (auto out_block = output_data.template next<ScalarTagType<DataTypeTagType>>()) {
const auto out_row_count = out_block->row_count();
auto out_ptr = out_block->data();
for (size_t orc = 0; orc < out_row_count; ++orc, ++out_ptr) {
std::optional<std::string_view> str = output_column_with_strings.string_at_offset(*out_ptr);
if (str.has_value()) {
// Add the string view `*str` to the output `string_pool` and map the new offset to the old one
str_offset_mapping[*out_ptr] = string_pool->get(*str, true).offset();
}
}
}
// Set map of string offsets before calling finalize
agg_data->set_string_offset_map(str_offset_mapping);
}
}
}
});
});

for (auto agg_data: folly::enumerate(aggregators_data)) {
seg.concatenate(agg_data->finalize(aggregators_.at(agg_data.index).get_output_column_name(), processing_config_.dynamic_schema_, num_unique));
}
Expand Down
44 changes: 42 additions & 2 deletions python/tests/unit/arcticdb/version_store/test_aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ def test_first_aggregation_strings(local_object_version_store):
df = DataFrame(
{
"grouping_column": ["group_1", "group_2", "group_1", "group_3"],
"get_first": ["Hello", "this", "is", "Homer", ],
"get_first": ["Hello", "this", "is", "Homer"],
},
index=np.arange(4),
)
Expand Down Expand Up @@ -333,6 +333,26 @@ def test_first_agg_numeric_with_append(local_object_version_store):
assert_frame_equal(vit.data, df)


def test_first_agg_strings_with_append(lmdb_version_store):
lib = lmdb_version_store

symbol = "first_agg"
lib.write(symbol, pd.DataFrame({"grouping_column": ["group_1"], "get_first": ["Hi"]}))
lib.append(symbol, pd.DataFrame({"grouping_column": ["group_2"], "get_first": ["HELLO"]}))
lib.append(symbol, pd.DataFrame({"grouping_column": ["group_1"], "get_first": ["NO"]}))
lib.append(symbol, pd.DataFrame({"grouping_column": ["group_2"], "get_first": ["BLABLABLA"]}))
lib.append(symbol, pd.DataFrame({"grouping_column": ["group_3"], "get_first": ["This is it"]}))
q = QueryBuilder().groupby("grouping_column").agg({"get_first": "first"})

vit = lib.read(symbol, query_builder=q)
vit.data.sort_index(inplace=True)

df = pd.DataFrame({"get_first": ["Hi", "HELLO", "This is it"]}, index=["group_1", "group_2", "group_3"])
df.index.rename("grouping_column", inplace=True)

assert_frame_equal(vit.data, df)


def last_aggregation(lmdb_version_store, df):
lib = lmdb_version_store
assume(not df.empty)
Expand Down Expand Up @@ -409,7 +429,7 @@ def test_last_aggregation_strings(local_object_version_store):
df = DataFrame(
{
"grouping_column": ["group_1", "group_2", "group_1", "group_3"],
"get_last": ["Hello", "this", "is", "Homer", ],
"get_last": ["Hello", "this", "is", "Homer"],
},
index=np.arange(4),
)
Expand Down Expand Up @@ -446,6 +466,26 @@ def test_last_agg_numeric_with_append(local_object_version_store):
assert_frame_equal(vit.data, df)


def test_last_agg_strings_with_append(lmdb_version_store):
lib = lmdb_version_store

symbol = "last_agg"
lib.write(symbol, pd.DataFrame({"grouping_column": ["group_1"], "get_last": ["Hi"]}))
lib.append(symbol, pd.DataFrame({"grouping_column": ["group_2"], "get_last": ["HELLO"]}))
lib.append(symbol, pd.DataFrame({"grouping_column": ["group_1"], "get_last": ["NO"]}))
lib.append(symbol, pd.DataFrame({"grouping_column": ["group_2"], "get_last": ["BLABLABLA"]}))
lib.append(symbol, pd.DataFrame({"grouping_column": ["group_3"], "get_last": ["This is something else"]}))
q = QueryBuilder().groupby("grouping_column").agg({"get_last": "last"})

vit = lib.read(symbol, query_builder=q)
vit.data.sort_index(inplace=True)

df = pd.DataFrame({"get_last": ["NO", "BLABLABLA", "This is something else"]}, index=["group_1", "group_2", "group_3"])
df.index.rename("grouping_column", inplace=True)

assert_frame_equal(vit.data, df)


def test_sum_aggregation(local_object_version_store):
df = DataFrame(
{"grouping_column": ["group_1", "group_1", "group_1", "group_2", "group_2"], "to_sum": [1, 1, 2, 2, 2]},
Expand Down

0 comments on commit 05115a9

Please sign in to comment.