diff --git a/.gitignore b/.gitignore index 63910807a9..0d6fc84887 100644 --- a/.gitignore +++ b/.gitignore @@ -8,6 +8,7 @@ __pycache__/ .vscode/ .vs/ .project +.idea *.so *.a diff --git a/cpp/arcticdb/CMakeLists.txt b/cpp/arcticdb/CMakeLists.txt index 6149a0709d..fa16183855 100644 --- a/cpp/arcticdb/CMakeLists.txt +++ b/cpp/arcticdb/CMakeLists.txt @@ -786,6 +786,7 @@ if(${TEST}) processing/test/test_filter_and_project_sparse.cpp processing/test/test_has_valid_type_promotion.cpp processing/test/test_operation_dispatch.cpp + processing/test/test_resample.cpp processing/test/test_set_membership.cpp processing/test/test_signed_unsigned_comparison.cpp processing/test/test_type_comparison.cpp @@ -931,12 +932,14 @@ if(${TEST}) column_store/test/rapidcheck_column_data_random_accessor.cpp column_store/test/rapidcheck_column_map.cpp column_store/test/test_chunked_buffer.cpp + processing/test/rapidcheck_resample.cpp stream/test/stream_test_common.cpp util/test/rapidcheck_decimal.cpp util/test/rapidcheck_generators.cpp util/test/rapidcheck_string_pool.cpp util/test/rapidcheck_main.cpp - version/test/rapidcheck_version_map.cpp) + version/test/rapidcheck_version_map.cpp + ) add_executable(arcticdb_rapidcheck_tests ${rapidcheck_srcs}) install(TARGETS arcticdb_rapidcheck_tests RUNTIME diff --git a/cpp/arcticdb/async/tasks.hpp b/cpp/arcticdb/async/tasks.hpp index 5e870a3eae..0dd40c610a 100644 --- a/cpp/arcticdb/async/tasks.hpp +++ b/cpp/arcticdb/async/tasks.hpp @@ -25,6 +25,7 @@ #include #include +#include namespace arcticdb::async { @@ -405,7 +406,8 @@ struct MemSegmentProcessingTask : BaseTask { ARCTICDB_MOVE_ONLY_DEFAULT(MemSegmentProcessingTask) Composite operator()() { - for(const auto& clause : clauses_) { + std::ranges::reverse_view reversed_clauses{clauses_}; + for (const auto& clause: reversed_clauses) { entity_ids_ = clause->process(std::move(entity_ids_)); if(clause->clause_info().requires_repartition_) diff --git a/cpp/arcticdb/column_store/column.hpp b/cpp/arcticdb/column_store/column.hpp index 2eab601111..f86aa88ef1 100644 --- a/cpp/arcticdb/column_store/column.hpp +++ b/cpp/arcticdb/column_store/column.hpp @@ -768,8 +768,9 @@ class Column { // One sparse, one dense. Use the enumerating forward iterator over the sparse column as it is more efficient than random access auto right_accessor = random_accessor(&right_input_data); const auto right_column_row_count = right_input_column.row_count(); + const auto left_input_data_cend = left_input_data.cend(); for (auto left_it = left_input_data.cbegin(); - left_it != left_input_data.cend() && left_it->idx() < right_column_row_count; + left_it != left_input_data_cend && left_it->idx() < right_column_row_count; ++left_it) { *output_it++ = f(left_it->value(), right_accessor.at(left_it->idx())); } @@ -777,8 +778,9 @@ class Column { // One sparse, one dense. Use the enumerating forward iterator over the sparse column as it is more efficient than random access auto left_accessor = random_accessor(&left_input_data); const auto left_column_row_count = left_input_column.row_count(); + const auto right_input_data_cend = right_input_data.cend(); for (auto right_it = right_input_data.cbegin(); - right_it != right_input_data.cend() && right_it->idx() < left_column_row_count; + right_it != right_input_data_cend && right_it->idx() < left_column_row_count; ++right_it) { *output_it++ = f(left_accessor.at(right_it->idx()), right_it->value()); } @@ -871,8 +873,9 @@ class Column { initialise_output_bitset(left_input_column.sparse_map(), sparse_missing_value_output, output_bitset); auto right_accessor = random_accessor(&right_input_data); const auto right_column_row_count = right_input_column.row_count(); + const auto left_input_data_cend = left_input_data.cend(); for (auto left_it = left_input_data.cbegin(); - left_it != left_input_data.cend() && left_it->idx() < right_column_row_count; + left_it != left_input_data_cend && left_it->idx() < right_column_row_count; ++left_it) { if(f(left_it->value(), right_accessor.at(left_it->idx()))) { inserter = left_it->idx(); @@ -883,8 +886,9 @@ class Column { initialise_output_bitset(right_input_column.sparse_map(), sparse_missing_value_output, output_bitset); auto left_accessor = random_accessor(&left_input_data); const auto left_column_row_count = left_input_column.row_count(); + const auto right_input_data_cend = right_input_data.cend(); for (auto right_it = right_input_data.cbegin(); - right_it != right_input_data.cend() && right_it->idx() < left_column_row_count; + right_it != right_input_data_cend && right_it->idx() < left_column_row_count; ++right_it) { if(f(left_accessor.at(right_it->idx()), right_it->value())) { inserter = right_it->idx(); diff --git a/cpp/arcticdb/column_store/test/test_memory_segment.cpp b/cpp/arcticdb/column_store/test/test_memory_segment.cpp index d49e02168c..4936535ca1 100644 --- a/cpp/arcticdb/column_store/test/test_memory_segment.cpp +++ b/cpp/arcticdb/column_store/test/test_memory_segment.cpp @@ -210,7 +210,7 @@ TEST(MemSegment, StdFindIf) { auto num_rows = 100u; auto frame_wrapper = get_test_timeseries_frame("modify", num_rows, 0); auto &segment = frame_wrapper.segment_; - auto it = std::find_if(std::begin(segment), std::end(segment), [] (SegmentInMemory::Row& row) { return row.template index() == 50; }); + const auto it = std::find_if(std::begin(segment), std::end(segment), [] (SegmentInMemory::Row& row) { return row.template index() == 50; }); auto val_it = it->begin(); ASSERT_EQ(it->index(), 50); std::advance(val_it, 1); diff --git a/cpp/arcticdb/pipeline/frame_slice.hpp b/cpp/arcticdb/pipeline/frame_slice.hpp index 393ce7a862..6e0ca829cc 100644 --- a/cpp/arcticdb/pipeline/frame_slice.hpp +++ b/cpp/arcticdb/pipeline/frame_slice.hpp @@ -188,6 +188,14 @@ struct RangesAndKey { RangesAndKey() = delete; ARCTICDB_MOVE_COPY_DEFAULT(RangesAndKey) + bool operator==(const RangesAndKey& right) const { + return row_range_ == right.row_range_ && col_range_ == right.col_range_ && key_ == right.key_; + } + + bool operator!=(const RangesAndKey& right) const { + return !(*this == right); + } + RowRange row_range_; ColRange col_range_; entity::AtomKey key_; diff --git a/cpp/arcticdb/processing/aggregation.cpp b/cpp/arcticdb/processing/aggregation.cpp index 2d27449960..51a20b2b79 100644 --- a/cpp/arcticdb/processing/aggregation.cpp +++ b/cpp/arcticdb/processing/aggregation.cpp @@ -12,6 +12,20 @@ namespace arcticdb { +void add_data_type_impl(DataType data_type, std::optional& current_data_type) { + if (current_data_type.has_value()) { + auto common_type = has_valid_common_type(entity::TypeDescriptor(*current_data_type, 0), + entity::TypeDescriptor(data_type, 0)); + schema::check( + common_type.has_value(), + "Cannot perform aggregation on column, incompatible types present: {} and {}", + entity::TypeDescriptor(*current_data_type, 0), entity::TypeDescriptor(data_type, 0)); + current_data_type = common_type->data_type(); + } else { + current_data_type = data_type; + } +} + void MinMaxAggregatorData::aggregate(const ColumnWithStrings& input_column) { details::visit_type(input_column.column_->type().data_type(), [&] (auto col_tag) { using type_info = ScalarTypeInfo; @@ -58,20 +72,6 @@ SegmentInMemory MinMaxAggregatorData::finalize(const std::vector& ou namespace { - void add_data_type_impl(DataType data_type, std::optional& current_data_type) { - if (current_data_type.has_value()) { - auto common_type = has_valid_common_type(entity::TypeDescriptor(*current_data_type, 0), - entity::TypeDescriptor(data_type, 0)); - schema::check( - common_type.has_value(), - "Cannot perform aggregation on column, incompatible types present: {} and {}", - entity::TypeDescriptor(*current_data_type, 0), entity::TypeDescriptor(data_type, 0)); - current_data_type = common_type->data_type(); - } else { - current_data_type = data_type; - } - } - inline util::BitMagic::enumerator::value_type deref(util::BitMagic::enumerator iter) { return *iter; } @@ -577,4 +577,227 @@ SegmentInMemory LastAggregatorData::finalize(const ColumnName& output_column_nam return res; } +template +Column SortedAggregator::aggregate(const std::vector>& input_index_columns, + const std::vector>& input_agg_columns, + const std::vector& bucket_boundaries, + const Column& output_index_column, + StringPool& string_pool) const { + std::optional res; + std::optional common_input_type; + for (const auto& opt_input_agg_column: input_agg_columns) { + if (opt_input_agg_column.has_value()) { + auto input_data_type = opt_input_agg_column->column_->type().data_type(); + schema::check(is_numeric_type(input_data_type) || is_bool_type(input_data_type) || + (is_sequence_type(input_data_type) && + (aggregation_operator == SortedAggregationOperator::FIRST || + aggregation_operator == SortedAggregationOperator::LAST || + aggregation_operator == SortedAggregationOperator::COUNT)), + "Resample: Unsupported aggregation type {} on column '{}' of type {}", + aggregation_operator, get_input_column_name().value, input_data_type); + add_data_type_impl(input_data_type, common_input_type); + } else { + // Column is missing from this row-slice due to dynamic schema, currently unsupported + schema::raise("Resample: Cannot aggregate column '{}' as it is missing from some row slices", + get_input_column_name().value); + } + } + if (common_input_type.has_value()) { + DataType output_type{*common_input_type}; + if constexpr (aggregation_operator == SortedAggregationOperator::SUM) { + schema::check(!is_time_type(*common_input_type), + "Resample: Unsupported aggregation type {} on column '{}' of type {}", + aggregation_operator, get_input_column_name().value, *common_input_type); + // Deal with overflow as best we can + if (is_unsigned_type(*common_input_type) || is_bool_type(*common_input_type)) { + output_type = DataType::UINT64; + } else if (is_signed_type(*common_input_type)) { + output_type = DataType::INT64; + } else if (is_floating_point_type(*common_input_type)) { + output_type = DataType::FLOAT64; + } + } else if constexpr (aggregation_operator == SortedAggregationOperator::MEAN) { + if (!is_time_type(*common_input_type)) { + output_type = DataType::FLOAT64; + } + } else if constexpr (aggregation_operator == SortedAggregationOperator::COUNT) { + output_type = DataType::UINT64; + } + + res.emplace(TypeDescriptor(output_type, Dimension::Dim0), output_index_column.row_count(), true, false); + details::visit_type( + res->type().data_type(), + [this, + &input_index_columns, + &input_agg_columns, + &bucket_boundaries, + &string_pool, + &res](auto output_type_desc_tag) { + using output_type_info = ScalarTypeInfo; + auto output_data = res->data(); + auto output_it = output_data.begin(); + auto output_end_it = output_data.end(); + using IndexTDT = ScalarTagType>; + // Need this here to only generate valid get_bucket_aggregator code + constexpr bool supported_aggregation_type_combo = is_numeric_type(output_type_info::data_type) || + is_bool_type(output_type_info::data_type) || + (is_sequence_type(output_type_info::data_type) && + (aggregation_operator == SortedAggregationOperator::FIRST || + aggregation_operator == SortedAggregationOperator::LAST)); + if constexpr (supported_aggregation_type_combo) { + auto bucket_aggregator = get_bucket_aggregator(); + bool reached_end_of_buckets{false}; + auto bucket_start_it = bucket_boundaries.cbegin(); + auto bucket_end_it = std::next(bucket_start_it); + const auto bucket_boundaries_end = bucket_boundaries.cend(); + for (auto [idx, input_agg_column]: folly::enumerate(input_agg_columns)) { + // Always true right now due to check at the top of this function + if (input_agg_column.has_value()) { + details::visit_type( + input_agg_column->column_->type().data_type(), + [this, + &output_it, + &bucket_aggregator, + &agg_column = *input_agg_column, + &input_index_column = input_index_columns.at(idx), + &bucket_boundaries_end, + &string_pool, + &bucket_start_it, + &bucket_end_it, + &reached_end_of_buckets](auto input_type_desc_tag) { + using input_type_info = ScalarTypeInfo; + if constexpr ((is_numeric_type(input_type_info::data_type) && is_numeric_type(output_type_info::data_type)) || + (is_sequence_type(input_type_info::data_type) && (is_sequence_type(output_type_info::data_type) || aggregation_operator == SortedAggregationOperator::COUNT)) || + (is_bool_type(input_type_info::data_type) && (is_bool_type(output_type_info::data_type) || is_numeric_type(output_type_info::data_type)))) { + schema::check( + !agg_column.column_->is_sparse() && agg_column.column_->row_count() == input_index_column->row_count(), + "Resample: Cannot aggregate column '{}' as it is sparse", + get_input_column_name().value); + auto index_data = input_index_column->data(); + const auto index_cend = index_data.template cend(); + auto agg_data = agg_column.column_->data(); + auto agg_it = agg_data.template cbegin(); + for (auto index_it = index_data.template cbegin(); + index_it != index_cend && !reached_end_of_buckets; + ++index_it, ++agg_it) { + if (ARCTICDB_LIKELY(index_value_in_bucket(*index_it, *bucket_start_it, *bucket_end_it))) { + if constexpr(is_time_type(input_type_info::data_type) && aggregation_operator == SortedAggregationOperator::COUNT) { + bucket_aggregator.template push(*agg_it); + } else if constexpr (is_numeric_type(input_type_info::data_type) || is_bool_type(input_type_info::data_type)) { + bucket_aggregator.push(*agg_it); + } else if constexpr (is_sequence_type(input_type_info::data_type)) { + bucket_aggregator.push(agg_column.string_at_offset(*agg_it)); + } + } else if (ARCTICDB_LIKELY(index_value_past_end_of_bucket(*index_it, *bucket_end_it))) { + if constexpr (is_numeric_type(output_type_info::data_type) || + is_bool_type(output_type_info::data_type) || + aggregation_operator == + SortedAggregationOperator::COUNT) { + *output_it++ = bucket_aggregator.finalize(); + } else if constexpr (is_sequence_type(output_type_info::data_type)) { + auto opt_string_view = bucket_aggregator.finalize(); + if (ARCTICDB_LIKELY(opt_string_view.has_value())) { + *output_it++ = string_pool.get(*opt_string_view).offset(); + } else { + *output_it++ = string_none; + } + } + + // The following code is equivalent to: + // if constexpr (closed_boundary == ResampleBoundary::LEFT) { + // bucket_end_it = std::upper_bound(bucket_end_it, bucket_boundaries_end, *index_it); + // } else { + // bucket_end_it = std::upper_bound(bucket_end_it, bucket_boundaries_end, *index_it, std::less_equal{}); + // } + // bucket_start_it = std::prev(bucket_end_it); + // reached_end_of_buckets = bucket_end_it == bucket_boundaries_end; + // The above code will be more performant when the vast majority of buckets are empty + // See comment in ResampleClause::advance_bucket_past_value for mathematical and experimental bounds + ++bucket_start_it; + if (ARCTICDB_UNLIKELY(++bucket_end_it == bucket_boundaries_end)) { + reached_end_of_buckets = true; + } else { + while (ARCTICDB_UNLIKELY(index_value_past_end_of_bucket(*index_it, *bucket_end_it))) { + ++bucket_start_it; + if (ARCTICDB_UNLIKELY(++bucket_end_it == bucket_boundaries_end)) { + reached_end_of_buckets = true; + break; + } + } + } + if (ARCTICDB_LIKELY(!reached_end_of_buckets && index_value_in_bucket(*index_it, *bucket_start_it, *bucket_end_it))) { + if constexpr(is_time_type(input_type_info::data_type) && aggregation_operator == SortedAggregationOperator::COUNT) { + bucket_aggregator.template push(*agg_it); + } else if constexpr (is_numeric_type(input_type_info::data_type) || is_bool_type(input_type_info::data_type)) { + bucket_aggregator.push(*agg_it); + } else if constexpr (is_sequence_type(input_type_info::data_type)) { + bucket_aggregator.push(agg_column.string_at_offset(*agg_it)); + } + } + } + } + } + } + ); + } + } + // We were in the middle of aggregating a bucket when we ran out of index values + if (output_it != output_end_it) { + if constexpr (is_numeric_type(output_type_info::data_type) || + is_bool_type(output_type_info::data_type) || + aggregation_operator == SortedAggregationOperator::COUNT) { + *output_it++ = bucket_aggregator.finalize(); + } else if constexpr (is_sequence_type(output_type_info::data_type)) { + auto opt_string_view = bucket_aggregator.finalize(); + if (opt_string_view.has_value()) { + *output_it++ = string_pool.get(*opt_string_view).offset(); + } else { + *output_it++ = string_none; + } + } + } + } + } + ); + } + internal::check(res.has_value(), + "Should not be able to reach end of SortedAggregator without a column to return"); + return std::move(*res); +} + +template +bool SortedAggregator::index_value_past_end_of_bucket(timestamp index_value, timestamp bucket_end) const { + if constexpr (closed_boundary == ResampleBoundary::LEFT) { + return index_value >= bucket_end; + } else { + // closed_boundary == ResampleBoundary::RIGHT + return index_value > bucket_end; + } +} + +template +bool SortedAggregator::index_value_in_bucket(timestamp index_value, timestamp bucket_start, timestamp bucket_end) const { + if constexpr (closed_boundary == ResampleBoundary::LEFT) { + return index_value >= bucket_start && index_value < bucket_end; + } else { + // closed_boundary == ResampleBoundary::RIGHT + return index_value > bucket_start && index_value <= bucket_end; + } +} + +template class SortedAggregator; +template class SortedAggregator; +template class SortedAggregator; +template class SortedAggregator; +template class SortedAggregator; +template class SortedAggregator; +template class SortedAggregator; +template class SortedAggregator; +template class SortedAggregator; +template class SortedAggregator; +template class SortedAggregator; +template class SortedAggregator; +template class SortedAggregator; +template class SortedAggregator; + } //namespace arcticdb diff --git a/cpp/arcticdb/processing/aggregation.hpp b/cpp/arcticdb/processing/aggregation.hpp index 468774f6b4..e61e4ccff5 100644 --- a/cpp/arcticdb/processing/aggregation.hpp +++ b/cpp/arcticdb/processing/aggregation.hpp @@ -14,6 +14,13 @@ namespace arcticdb { +enum class ResampleBoundary { + LEFT, + RIGHT +}; + +void add_data_type_impl(DataType data_type, std::optional& current_data_type); + class MinMaxAggregatorData { public: @@ -205,4 +212,384 @@ using CountAggregator = GroupingAggregatorImpl; using FirstAggregator = GroupingAggregatorImpl; using LastAggregator = GroupingAggregatorImpl; +enum class SortedAggregationOperator { + SUM, + MEAN, + MIN, + MAX, + FIRST, + LAST, + COUNT +}; + +template +class SumBucketAggregator { +public: + void push(T value) { + if constexpr (std::is_floating_point_v) { + if (ARCTICDB_LIKELY(!std::isnan(value))) { + sum_ += value; + } + } else { + sum_ += value; + } + } + + T finalize() { + T res{sum_}; + sum_ = 0; + return res; + } +private: + T sum_{0}; +}; + +template +class MeanBucketAggregator { +public: + void push(T value) { + if constexpr (std::is_floating_point_v) { + if (ARCTICDB_LIKELY(!std::isnan(value))) { + sum_ += value; + ++count_; + } + } else if constexpr (std::is_same_v && TimeType) { + if (ARCTICDB_LIKELY(value != NaT)) { + sum_ += value; + ++count_; + } + } else { + sum_ += value; + ++count_; + } + } + + std::conditional_t finalize() { + if constexpr (std::is_same_v && TimeType) { + timestamp res; + if (ARCTICDB_LIKELY(count_ > 0)) { + res = static_cast(sum_ / static_cast(count_)); + sum_ = 0; + count_ = 0; + } else { + res = NaT; + } + return res; + } else { + double res; + if (ARCTICDB_LIKELY(count_ > 0)) { + res = sum_ / static_cast(count_); + sum_ = 0; + count_ = 0; + } else { + res = std::numeric_limits::quiet_NaN(); + } + return res; + } + } +private: + double sum_{0}; + uint64_t count_{0}; +}; + +template +class MinBucketAggregator { +public: + MinBucketAggregator() { + if constexpr (!std::is_floating_point_v && !TimeType) { + min_ = std::numeric_limits::max(); + } + } + + void push(T value) { + if constexpr (std::is_floating_point_v) { + if (ARCTICDB_LIKELY(!std::isnan(value))) { + min_ = std::min(min_.value_or(std::numeric_limits::max()), value); + } + } else if constexpr (std::is_same_v && TimeType) { + if (ARCTICDB_LIKELY(value != NaT)) { + min_ = std::min(min_.value_or(std::numeric_limits::max()), value); + } + } else { + min_ = std::min(min_, value); + } + } + + T finalize() { + T res; + if constexpr (std::is_floating_point_v) { + res = min_.value_or(std::numeric_limits::quiet_NaN()); + min_.reset(); + } else if constexpr (std::is_same_v && TimeType) { + res = min_.value_or(NaT); + min_.reset(); + } else { + res = min_; + min_ = std::numeric_limits::max(); + } + return res; + } +private: + // Floats and timestamps need a special case for when only nan/nat values are pushed + std::conditional_t || TimeType, std::optional,T> min_; +}; + +template +class MaxBucketAggregator { +public: + MaxBucketAggregator() { + if constexpr (!std::is_floating_point_v && !TimeType) { + max_ = std::numeric_limits::lowest(); + } + } + + void push(T value) { + if constexpr (std::is_floating_point_v) { + if (ARCTICDB_LIKELY(!std::isnan(value))) { + max_ = std::max(max_.value_or(std::numeric_limits::lowest()), value); + } + } else if constexpr (std::is_same_v && TimeType) { + if (ARCTICDB_LIKELY(value != NaT)) { + max_ = std::max(max_.value_or(std::numeric_limits::lowest()), value); + } + } else { + max_ = std::max(max_, value); + } + } + + T finalize() { + T res; + if constexpr (std::is_floating_point_v) { + res = max_.value_or(std::numeric_limits::quiet_NaN()); + max_.reset(); + } else if constexpr (std::is_same_v && TimeType) { + res = max_.value_or(NaT); + max_.reset(); + } else { + res = max_; + max_ = std::numeric_limits::min(); + } + return res; + } +private: + // Floats and timestamps need a special case for when only nan/nat values are pushed + std::conditional_t || TimeType, std::optional,T> max_; +}; + +template +class FirstBucketAggregator { +public: + void push(T value) { + if constexpr (std::is_same_v>) { + if (ARCTICDB_UNLIKELY(!first_.has_value() || !(*first_).has_value())) { + first_ = value; + } + } else if constexpr (std::is_floating_point_v) { + if (ARCTICDB_UNLIKELY(!first_.has_value() || std::isnan(*first_))) { + first_ = value; + } + } else if constexpr (std::is_same_v && TimeType) { + if (ARCTICDB_UNLIKELY(!first_.has_value() || *first_ == NaT)) { + first_ = value; + } + } else { + if (ARCTICDB_UNLIKELY(!first_.has_value())) { + first_ = value; + } + } + } + + T finalize() { + T res; + if constexpr (std::is_floating_point_v) { + res = first_.value_or(std::numeric_limits::quiet_NaN()); + } else if constexpr(std::is_same_v && TimeType) { + res = first_.value_or(NaT); + } else { + debug::check(first_.has_value(), "FirstBucketAggregator::finalize called with no values pushed"); + res = *first_; + } + first_.reset(); + return res; + } +private: + std::optional first_; +}; + +template +class LastBucketAggregator { +public: + void push(T value) { + if constexpr (std::is_same_v>) { + if (ARCTICDB_LIKELY(!last_.has_value() || value.has_value())) { + last_ = value; + } + } else if constexpr (std::is_floating_point_v) { + if (ARCTICDB_LIKELY(!last_.has_value() || !std::isnan(value))) { + last_ = value; + } + } else if constexpr (std::is_same_v && TimeType) { + if (ARCTICDB_LIKELY(!last_.has_value() || value != NaT)) { + last_ = value; + } + } else { + last_ = value; + } + } + + T finalize() { + T res; + if constexpr (std::is_floating_point_v) { + res = last_.value_or(std::numeric_limits::quiet_NaN()); + } else if constexpr(std::is_same_v && TimeType) { + res = last_.value_or(NaT); + } else { + debug::check(last_.has_value(), "LastBucketAggregator::finalize called with no values pushed"); + res = *last_; + } + last_.reset(); + return res; + } +private: + std::optional last_; +}; + +class CountBucketAggregator { +public: + template + void push(T value) { + if constexpr (std::is_same_v>) { + if (ARCTICDB_LIKELY(value.has_value())) { + ++count_; + } + } else if constexpr (std::is_floating_point_v) { + if (ARCTICDB_LIKELY(!std::isnan(value))) { + ++count_; + } + } else if constexpr (std::is_same_v && TimeType) { + if (ARCTICDB_LIKELY(value != NaT)) { + ++count_; + } + } else { + ++count_; + } + } + + uint64_t finalize() { + uint64_t res{count_}; + count_ = 0; + return res; + } +private: + uint64_t count_{0}; +}; + +template +class SortedAggregator +{ +public: + + explicit SortedAggregator(ColumnName input_column_name, ColumnName output_column_name) + : input_column_name_(std::move(input_column_name)) + , output_column_name_(std::move(output_column_name)) + {} + ARCTICDB_MOVE_COPY_DEFAULT(SortedAggregator) + + [[nodiscard]] ColumnName get_input_column_name() const { return input_column_name_; } + [[nodiscard]] ColumnName get_output_column_name() const { return output_column_name_; } + + [[nodiscard]] Column aggregate(const std::vector>& input_index_columns, + const std::vector>& input_agg_columns, + const std::vector& bucket_boundaries, + const Column& output_index_column, + StringPool& string_pool) const; +private: + [[nodiscard]] bool index_value_past_end_of_bucket(timestamp index_value, timestamp bucket_end) const; + [[nodiscard]] bool index_value_in_bucket(timestamp index_value, timestamp bucket_start, timestamp bucket_end) const; + + template + [[nodiscard]] auto get_bucket_aggregator() const { + if constexpr (aggregation_operator == SortedAggregationOperator::SUM) { + if constexpr (is_bool_type(scalar_type_info::data_type)) { + // Sum of bool column is just the count of true values + return SumBucketAggregator(); + } else { + return SumBucketAggregator(); + } + } else if constexpr (aggregation_operator == SortedAggregationOperator::MEAN) { + if constexpr (is_time_type(scalar_type_info::data_type)) { + return MeanBucketAggregator(); + } else { + return MeanBucketAggregator(); + } + } else if constexpr (aggregation_operator == SortedAggregationOperator::MIN) { + if constexpr (is_time_type(scalar_type_info::data_type)) { + return MinBucketAggregator(); + } else { + return MinBucketAggregator(); + } + } else if constexpr (aggregation_operator == SortedAggregationOperator::MAX) { + if constexpr (is_time_type(scalar_type_info::data_type)) { + return MaxBucketAggregator(); + } else { + return MaxBucketAggregator(); + } + } else if constexpr (aggregation_operator == SortedAggregationOperator::FIRST) { + if constexpr (is_numeric_type(scalar_type_info::data_type) || is_bool_type(scalar_type_info::data_type)) { + if constexpr (is_time_type(scalar_type_info::data_type)) { + return FirstBucketAggregator(); + } else { + return FirstBucketAggregator(); + } + } else if constexpr (is_sequence_type(scalar_type_info::data_type)) { + return FirstBucketAggregator>(); + } + } else if constexpr (aggregation_operator == SortedAggregationOperator::LAST) { + if constexpr (is_numeric_type(scalar_type_info::data_type) || is_bool_type(scalar_type_info::data_type)) { + if constexpr (is_time_type(scalar_type_info::data_type)) { + return LastBucketAggregator(); + } else { + return LastBucketAggregator(); + } + } else if constexpr (is_sequence_type(scalar_type_info::data_type)) { + return LastBucketAggregator>(); + } + } else if constexpr (aggregation_operator == SortedAggregationOperator::COUNT) { + return CountBucketAggregator(); + } + } + + ColumnName input_column_name_; + ColumnName output_column_name_; +}; + } //namespace arcticdb + +namespace fmt { +template<> +struct formatter { + template + constexpr auto parse(ParseContext &ctx) { return ctx.begin(); } + + template + auto format(const arcticdb::SortedAggregationOperator& agg, FormatContext &ctx) const { + switch(agg) { + case arcticdb::SortedAggregationOperator::SUM: + return fmt::format_to(ctx.out(), "SUM"); + case arcticdb::SortedAggregationOperator::MEAN: + return fmt::format_to(ctx.out(), "MEAN"); + case arcticdb::SortedAggregationOperator::MIN: + return fmt::format_to(ctx.out(), "MIN"); + case arcticdb::SortedAggregationOperator::MAX: + return fmt::format_to(ctx.out(), "MAX"); + case arcticdb::SortedAggregationOperator::FIRST: + return fmt::format_to(ctx.out(), "FIRST"); + case arcticdb::SortedAggregationOperator::LAST: + return fmt::format_to(ctx.out(), "LAST"); + case arcticdb::SortedAggregationOperator::COUNT: + default: + return fmt::format_to(ctx.out(), "COUNT"); + } + } +}; +} //namespace fmt diff --git a/cpp/arcticdb/processing/aggregation_interface.hpp b/cpp/arcticdb/processing/aggregation_interface.hpp index a2dcd99fa6..9291c2ed3c 100644 --- a/cpp/arcticdb/processing/aggregation_interface.hpp +++ b/cpp/arcticdb/processing/aggregation_interface.hpp @@ -73,4 +73,24 @@ struct IColumnStatsAggregator { using ColumnStatsAggregator = folly::Poly; +struct ISortedAggregator { + template + struct Interface : Base { + [[nodiscard]] ColumnName get_input_column_name() const { return folly::poly_call<0>(*this); }; + [[nodiscard]] ColumnName get_output_column_name() const { return folly::poly_call<1>(*this); }; + [[nodiscard]] Column aggregate(const std::vector>& input_index_columns, + const std::vector>& input_agg_columns, + const std::vector& bucket_boundaries, + const Column& output_index_column, + StringPool& string_pool) const { + return folly::poly_call<2>(*this, input_index_columns, input_agg_columns, bucket_boundaries, output_index_column, string_pool); + } + }; + + template + using Members = folly::PolyMembers<&T::get_input_column_name, &T::get_output_column_name, &T::aggregate>; +}; + +using SortedAggregatorInterface = folly::Poly; + } //namespace arcticdb diff --git a/cpp/arcticdb/processing/clause.cpp b/cpp/arcticdb/processing/clause.cpp index 8e4a050a0c..5e92d3619a 100644 --- a/cpp/arcticdb/processing/clause.cpp +++ b/cpp/arcticdb/processing/clause.cpp @@ -72,8 +72,9 @@ std::vector> structure_by_column_slice(std::vector gather_entities(std::shared_ptr component_manager, Composite&& entity_ids, bool include_atom_keys, - bool include_bucket) { - return entity_ids.transform([&component_manager, include_atom_keys, include_bucket] + bool include_bucket, + bool include_initial_expected_get_calls) { + return entity_ids.transform([&component_manager, include_atom_keys, include_bucket, include_initial_expected_get_calls] (const EntityIds& entity_ids) -> ProcessingUnit { ProcessingUnit res; std::vector> segments; @@ -114,6 +115,14 @@ Composite gather_entities(std::shared_ptr comp res.set_bucket(buckets.at(0)); } } + if (include_initial_expected_get_calls) { + std::vector segment_initial_expected_get_calls; + segment_initial_expected_get_calls.reserve(entity_ids.size()); + for (auto entity_id: entity_ids) { + segment_initial_expected_get_calls.emplace_back(component_manager->get_initial_expected_get_calls>(entity_id)); + } + res.set_segment_initial_expected_get_calls(std::move(segment_initial_expected_get_calls)); + } return res; }); } @@ -253,9 +262,8 @@ struct SegmentWrapper { } }; -Composite PassthroughClause::process(Composite &&p) const { - auto procs = std::move(p); - return procs; +Composite PassthroughClause::process(Composite&& entity_ids) const { + return std::move(entity_ids); } Composite FilterClause::process( @@ -533,6 +541,324 @@ Composite AggregationClause::process(Composite&& entity_id return str_; } +template +void ResampleClause::set_aggregations(const std::vector& named_aggregators) { + clause_info_.input_columns_ = std::make_optional>(); + str_ = fmt::format("RESAMPLE({}) | AGGREGATE {{", rule()); + for (const auto& named_aggregator: named_aggregators) { + str_.append(fmt::format("{}: ({}, {}), ", + named_aggregator.output_column_name_, + named_aggregator.input_column_name_, + named_aggregator.aggregation_operator_)); + clause_info_.input_columns_->insert(named_aggregator.input_column_name_); + auto typed_input_column_name = ColumnName(named_aggregator.input_column_name_); + auto typed_output_column_name = ColumnName(named_aggregator.output_column_name_); + if (named_aggregator.aggregation_operator_ == "sum") { + aggregators_.emplace_back( + SortedAggregator(typed_input_column_name, + typed_output_column_name)); + } else if (named_aggregator.aggregation_operator_ == "mean") { + aggregators_.emplace_back(SortedAggregator(typed_input_column_name, typed_output_column_name)); + } else if (named_aggregator.aggregation_operator_ == "min") { + aggregators_.emplace_back(SortedAggregator(typed_input_column_name, typed_output_column_name)); + } else if (named_aggregator.aggregation_operator_ == "max") { + aggregators_.emplace_back(SortedAggregator(typed_input_column_name, typed_output_column_name)); + } else if (named_aggregator.aggregation_operator_ == "first") { + aggregators_.emplace_back(SortedAggregator(typed_input_column_name, typed_output_column_name)); + } else if (named_aggregator.aggregation_operator_ == "last") { + aggregators_.emplace_back(SortedAggregator(typed_input_column_name, typed_output_column_name)); + } else if (named_aggregator.aggregation_operator_ == "count") { + aggregators_.emplace_back(SortedAggregator(typed_input_column_name, typed_output_column_name)); + } else { + user_input::raise("Unknown aggregation operator provided to resample: {}", named_aggregator.aggregation_operator_); + } + } + str_.append("}"); +} + +template +void ResampleClause::set_processing_config(const ProcessingConfig& processing_config) { + processing_config_ = processing_config; +} + +template +std::vector> ResampleClause::structure_for_processing( + std::vector& ranges_and_keys, + ARCTICDB_UNUSED size_t start_from) { + if (ranges_and_keys.empty()) { + return {}; + } + TimestampRange index_range( + std::min_element(ranges_and_keys.begin(), ranges_and_keys.end(), + [](const RangesAndKey& left, const RangesAndKey& right) { + return left.key_.start_time() < right.key_.start_time(); + })->key_.start_time(), + std::max_element(ranges_and_keys.begin(), ranges_and_keys.end(), + [](const RangesAndKey& left, const RangesAndKey& right) { + return left.key_.end_time() < right.key_.end_time(); + })->key_.end_time() - 1 + ); + if (date_range_.has_value()) { + date_range_->first = std::max(date_range_->first, index_range.first); + date_range_->second = std::min(date_range_->second, index_range.second); + } else { + date_range_ = index_range; + } + + auto start = std::chrono::steady_clock::now(); + bucket_boundaries_ = generate_bucket_boundaries_(date_range_->first, date_range_->second, rule_, closed_boundary); + auto end = std::chrono::steady_clock::now(); + auto duration = std::chrono::duration_cast(end - start); + log::version().warn("Pandas date range took {}ms", duration.count()); + + if (bucket_boundaries_.size() < 2) { + return {}; + } + debug::check(std::is_sorted(bucket_boundaries_.begin(), bucket_boundaries_.end()), + "Resampling expects provided bucket boundaries to be strictly monotonically increasing"); + std::erase_if(ranges_and_keys, [this](const RangesAndKey &ranges_and_key) { + auto [start_index, end_index] = ranges_and_key.key_.time_range(); + // end_index from the key is 1 nanosecond larger than the index value of the last row in the row-slice + end_index--; + return index_range_outside_bucket_range(start_index, end_index); + }); + auto res = structure_by_row_slice(ranges_and_keys, 0); + // Element i of res also needs the values from element i+1 if there is a bucket which incorporates the last index + // value of row-slice i and the first value of row-slice i+1 + // Element i+1 should be removed if the last bucket involved in element i covers all the index values in element i+1 + auto bucket_boundaries_it = std::cbegin(bucket_boundaries_); + // Exit if res_it == std::prev(res.end()) as this implies the last row slice was not incorporated into an earlier processing unit + for (auto res_it = res.begin(); res_it != res.end() && res_it != std::prev(res.end());) { + auto last_index_value_in_row_slice = ranges_and_keys[res_it->at(0)].key_.end_time() - 1; + advance_bucket_past_value(bucket_boundaries_, bucket_boundaries_it, last_index_value_in_row_slice); + // bucket_boundaries_it now contains the end value of the last bucket covering the row-slice in res_it, or an end iterator if the last bucket ends before the end of this row-slice + if (bucket_boundaries_it != bucket_boundaries_.end()) { + Bucket current_bucket{*std::prev(bucket_boundaries_it), *bucket_boundaries_it}; + auto next_row_slice_it = std::next(res_it); + while (next_row_slice_it != res.end()) { + // end_index from the key is 1 nanosecond larger than the index value of the last row in the row-slice + TimestampRange next_row_slice_timestamp_range{ + ranges_and_keys[next_row_slice_it->at(0)].key_.start_time(), + ranges_and_keys[next_row_slice_it->at(0)].key_.end_time() - 1}; + if (current_bucket.contains(next_row_slice_timestamp_range.first)) { + // The last bucket in the current processing unit overlaps with the first index value in the next row slice, so add segments into current processing unit + res_it->insert(res_it->end(), next_row_slice_it->begin(), next_row_slice_it->end()); + if (current_bucket.contains(next_row_slice_timestamp_range.second)) { + // The last bucket in the current processing unit wholly contains the next row slice, so remove it from the result + next_row_slice_it = res.erase(next_row_slice_it); + } else { + break; + } + } else { + break; + } + } + // This is the last bucket, and all the required row-slices have been incorporated into the current processing unit, so erase the rest + if (bucket_boundaries_it == std::prev(bucket_boundaries_.end())) { + res.erase(next_row_slice_it, res.end()); + break; + } + res_it = next_row_slice_it; + } + } + return res; +} + +template +bool ResampleClause::index_range_outside_bucket_range(timestamp start_index, timestamp end_index) const { + if constexpr (closed_boundary == ResampleBoundary::LEFT) { + return start_index >= bucket_boundaries_.back() || end_index < bucket_boundaries_.front(); + } else { + // closed_boundary == ResampleBoundary::RIGHT + return start_index > bucket_boundaries_.back() || end_index <= bucket_boundaries_.front(); + } +} + +template +void ResampleClause::advance_bucket_past_value(const std::vector& bucket_boundaries, + std::vector::const_iterator& bucket_boundaries_it, + timestamp value) const { + // These loops are equivalent to bucket_boundaries_it = std::upper_bound(bucket_boundaries_it, bucket_boundaries.end(), value, std::less[_equal]{}) + // but optimised for the case where most buckets are non-empty. + // Mathematically, this will be faster when b / log_2(b) < n, where b is the number of buckets and n is the number of index values + // Even if n is only 1000, this corresponds to 7/8 buckets being empty, rising to 19/20 for n=100,000 + // Experimentally, this implementation is around 10x faster when every bucket contains values, and 3x slower when 99.9% of buckets are empty + // If we wanted to speed this up when most buckets are empty, we could make this method adaptive to the number of buckets and rows + if constexpr(closed_boundary == ResampleBoundary::LEFT) { + while(bucket_boundaries_it != bucket_boundaries.end() && *bucket_boundaries_it <= value) { + ++bucket_boundaries_it; + } + } else { + // closed_boundary == ResampleBoundary::RIGHT + while(bucket_boundaries_it != bucket_boundaries.end() && *bucket_boundaries_it < value) { + ++bucket_boundaries_it; + } + } +} + +template +Composite ResampleClause::process(Composite&& entity_ids) const { + auto procs = gather_entities(component_manager_, std::move(entity_ids), false, false, true).as_range(); + internal::check(procs.size() == 1, "Expected a single ProcessingUnit on entry to ResampleClause::process"); + auto row_slices = split_by_row_slice(std::move(procs[0])); + // If the expected get calls for the segments in the first row slice are 2, the first bucket overlapping this row + // slice is being computed by the call to process dealing with the row slices above these. Otherwise, this call + // should do it + bool responsible_for_first_overlapping_bucket = row_slices.front().segment_initial_expected_get_calls_->at(0) == 1; + // Find the iterators into bucket_boundaries_ of the start of the first and the end of the last bucket this call to process is + // responsible for calculating + // All segments in a given row slice contain the same index column, so just grab info from the first one + const auto& index_column_name = row_slices.front().segments_->at(0)->field(0).name(); + const auto& first_row_slice_index_col = row_slices.front().segments_->at(0)->column(0); + // Resampling only makes sense for timestamp indexes + internal::check(is_time_type(first_row_slice_index_col.type().data_type()), + "Cannot resample data with index column of non-timestamp type"); + auto first_ts = first_row_slice_index_col.scalar_at(0).value(); + // We can use the last timestamp from the first row-slice's index column, as by construction (structure_for_processing) the bucket covering + // this value will cover the remaining index values this call is responsible for + auto last_ts = first_row_slice_index_col.scalar_at(first_row_slice_index_col.row_count() - 1).value(); + auto bucket_boundaries = generate_bucket_boundaries(first_ts, last_ts, responsible_for_first_overlapping_bucket); + // Construct the output index column and the bucket boundaries this call to process is responsible for + std::vector> input_index_columns; + input_index_columns.reserve(row_slices.size()); + for (const auto& row_slice: row_slices) { + input_index_columns.emplace_back(row_slice.segments_->at(0)->column_ptr(0)); + } + auto output_index_column = generate_output_index_column(input_index_columns, bucket_boundaries); + // Bucket boundaries can be wider than the date range specified by the user, narrow the first and last buckets here if necessary + bucket_boundaries.front() = std::max(bucket_boundaries.front(), date_range_->first - (closed_boundary == ResampleBoundary::RIGHT ? 1 : 0)); + bucket_boundaries.back() = std::min(bucket_boundaries.back(), date_range_->second + (closed_boundary == ResampleBoundary::LEFT ? 1 : 0)); + SegmentInMemory seg; + RowRange output_row_range(row_slices.front().row_ranges_->at(0)->start(), + row_slices.front().row_ranges_->at(0)->start() + output_index_column->row_count()); + seg.add_column(scalar_field(DataType::NANOSECONDS_UTC64, index_column_name), output_index_column); + seg.descriptor().set_index(IndexDescriptor(1, IndexDescriptor::TIMESTAMP)); + auto& string_pool = seg.string_pool(); + for (const auto& aggregator: aggregators_) { + std::vector> input_agg_columns; + input_agg_columns.reserve(row_slices.size()); + for (auto& row_slice: row_slices) { + auto variant_data = row_slice.get(aggregator.get_input_column_name()); + util::variant_match(variant_data, + [&input_agg_columns](const ColumnWithStrings& column_with_strings) { + input_agg_columns.emplace_back(column_with_strings); + }, + [&input_agg_columns](const EmptyResult&) { + // Dynamic schema, missing column from this row-slice + // Not currently supported, but will be, hence the argument to aggregate being a vector of optionals + input_agg_columns.emplace_back(); + }, + [](const auto&) { + internal::raise("Unexpected return type from ProcessingUnit::get, expected column-like"); + } + ); + } + auto aggregated_column = std::make_shared(aggregator.aggregate(input_index_columns, input_agg_columns, bucket_boundaries, *output_index_column, string_pool)); + seg.add_column(scalar_field(aggregated_column->type().data_type(), aggregator.get_output_column_name().value), aggregated_column); + } + seg.set_row_data(output_index_column->row_count() - 1); + return Composite(push_entities(component_manager_, ProcessingUnit(std::move(seg), std::move(output_row_range)))); +} + +template +[[nodiscard]] std::string ResampleClause::to_string() const { + return str_; +} + +template +std::vector ResampleClause::generate_bucket_boundaries(timestamp first_ts, + timestamp last_ts, + bool responsible_for_first_overlapping_bucket) const { + auto first_it = std::lower_bound(bucket_boundaries_.begin(), bucket_boundaries_.end(), first_ts, + [](timestamp boundary, timestamp first_ts) { + if constexpr(closed_boundary == ResampleBoundary::LEFT) { + return boundary <= first_ts; + } else { + // closed_boundary == ResampleBoundary::RIGHT + return boundary < first_ts; + } + }); + if (responsible_for_first_overlapping_bucket && first_it != bucket_boundaries_.begin()) { + --first_it; + } + auto last_it = std::upper_bound(first_it, bucket_boundaries_.end(), last_ts, + [](timestamp last_ts, timestamp boundary) { + if constexpr(closed_boundary == ResampleBoundary::LEFT) { + return last_ts < boundary; + } else { + // closed_boundary == ResampleBoundary::RIGHT + return last_ts <= boundary; + } + }); + if (last_it != bucket_boundaries_.end()) { + ++last_it; + } + std::vector bucket_boundaries(first_it, last_it); + internal::check(bucket_boundaries.size() >= 2, + "Always expect at least bucket boundaries in ResampleClause::generate_bucket_boundaries"); + return bucket_boundaries; +} + +template +std::shared_ptr ResampleClause::generate_output_index_column(const std::vector>& input_index_columns, + const std::vector& bucket_boundaries) const { + auto data_type = DataType::NANOSECONDS_UTC64; + using IndexTDT = ScalarTagType>; + + // bucket_boundaries.size() - 1 is the maximum number of buckets, and therefore the maximum number of index values in the output + // This could be wasteful if there are many empty buckets in between present index values + // TODO: Presize in blocks instead so unused ones can be trimmed at the end + auto output_index_column = std::make_shared(TypeDescriptor(data_type, Dimension::Dim0), + bucket_boundaries.size() - 1, + false, + false); + auto output_index_column_data = output_index_column->data(); + auto output_index_column_it = output_index_column_data.begin(); + size_t output_index_column_row_count{0}; + + auto bucket_end_it = std::next(bucket_boundaries.cbegin()); + Bucket current_bucket{*std::prev(bucket_end_it), *bucket_end_it}; + bool current_bucket_added_to_index{false}; + // Only include buckets that have at least one index value in range + for (const auto& input_index_column: input_index_columns) { + auto index_column_data = input_index_column->data(); + const auto cend = index_column_data.cend(); + for (auto it = index_column_data.cbegin(); it != cend; ++it) { + if (ARCTICDB_LIKELY(current_bucket.contains(*it))) { + if (ARCTICDB_UNLIKELY(!current_bucket_added_to_index)) { + *output_index_column_it++ = + label_boundary_ == ResampleBoundary::LEFT ? *std::prev(bucket_end_it) : *bucket_end_it; + ++output_index_column_row_count; + current_bucket_added_to_index = true; + } + } else { + advance_bucket_past_value(bucket_boundaries, bucket_end_it, *it); + if (ARCTICDB_UNLIKELY(bucket_end_it == bucket_boundaries.end())) { + break; + } else { + current_bucket.set_boundaries(*std::prev(bucket_end_it), *bucket_end_it); + current_bucket_added_to_index = false; + if (ARCTICDB_LIKELY(current_bucket.contains(*it))) { + *output_index_column_it++ = + label_boundary_ == ResampleBoundary::LEFT ? *std::prev(bucket_end_it) : *bucket_end_it; + ++output_index_column_row_count; + current_bucket_added_to_index = true; + } + } + } + } + } + // TODO: Trim unused blocks + // We preallocated the chunked buffer so this won't alloc, but just a hacky way to get bytes_ correct in the buffer + // TODO: Add API for this to remove hack + output_index_column->allocate_data(output_index_column_row_count * sizeof(timestamp)); + output_index_column->set_row_data(output_index_column_row_count - 1); + return output_index_column; +} + +template struct ResampleClause; +template struct ResampleClause; + [[nodiscard]] Composite RemoveColumnPartitioningClause::process(Composite&& entity_ids) const { auto procs = gather_entities(component_manager_, std::move(entity_ids)); Composite output; @@ -696,7 +1022,7 @@ std::optional>> MergeClause::repartition( } Composite ColumnStatsGenerationClause::process(Composite&& entity_ids) const { - auto procs = gather_entities(component_manager_, std::move(entity_ids), true, false); + auto procs = gather_entities(component_manager_, std::move(entity_ids), true); std::vector aggregators_data; internal::check( static_cast(column_stats_aggregators_), @@ -761,7 +1087,7 @@ Composite ColumnStatsGenerationClause::process(Composite&& std::vector> RowRangeClause::structure_for_processing( std::vector& ranges_and_keys, - ARCTICDB_UNUSED size_t start_from) const { + ARCTICDB_UNUSED size_t start_from) { ranges_and_keys.erase(std::remove_if(ranges_and_keys.begin(), ranges_and_keys.end(), [this](const RangesAndKey& ranges_and_key) { return ranges_and_key.row_range_.start() >= end_ || ranges_and_key.row_range_.end() <= start_; }), ranges_and_keys.end()); @@ -847,7 +1173,7 @@ std::string RowRangeClause::to_string() const { std::vector> DateRangeClause::structure_for_processing( std::vector& ranges_and_keys, - size_t start_from) const { + size_t start_from) { ranges_and_keys.erase(std::remove_if(ranges_and_keys.begin(), ranges_and_keys.end(), [this](const RangesAndKey& ranges_and_key) { auto [start_index, end_index] = ranges_and_key.key_.time_range(); return start_index > end_ || end_index <= start_; @@ -856,7 +1182,7 @@ std::vector> DateRangeClause::structure_for_processing( } Composite DateRangeClause::process(Composite &&entity_ids) const { - auto procs = gather_entities(component_manager_, std::move(entity_ids), true, false); + auto procs = gather_entities(component_manager_, std::move(entity_ids), true); Composite output; procs.broadcast([&output, this](ProcessingUnit &proc) { // We are only interested in the index, which is in every SegmentInMemory in proc.segments_, so just use the first diff --git a/cpp/arcticdb/processing/clause.hpp b/cpp/arcticdb/processing/clause.hpp index d6b6e2841d..37fa4800fc 100644 --- a/cpp/arcticdb/processing/clause.hpp +++ b/cpp/arcticdb/processing/clause.hpp @@ -73,7 +73,7 @@ struct IClause { // TODO #732: Factor out start_from as part of https://github.com/man-group/ArcticDB/issues/732 [[nodiscard]] std::vector> structure_for_processing(std::vector& ranges_and_keys, - size_t start_from) const { + size_t start_from) { return std::move(folly::poly_call<0>(*this, ranges_and_keys, start_from)); } @@ -117,7 +117,8 @@ std::vector> structure_by_column_slice(std::vector gather_entities(std::shared_ptr component_manager, Composite&& entity_ids, bool include_atom_keys = false, - bool include_bucket = false); + bool include_bucket = false, + bool include_initial_expected_get_calls = false); EntityIds push_entities(std::shared_ptr component_manager, ProcessingUnit&& proc); @@ -128,7 +129,7 @@ struct PassthroughClause { [[nodiscard]] std::vector> structure_for_processing( std::vector& ranges_and_keys, - size_t start_from) const { + size_t start_from) { return structure_by_row_slice(ranges_and_keys, start_from); } @@ -170,7 +171,7 @@ struct FilterClause { [[nodiscard]] std::vector> structure_for_processing( std::vector& ranges_and_keys, - size_t start_from) const { + size_t start_from) { return structure_by_row_slice(ranges_and_keys, start_from); } @@ -223,7 +224,7 @@ struct ProjectClause { [[nodiscard]] std::vector> structure_for_processing( std::vector& ranges_and_keys, - size_t start_from) const { + size_t start_from) { return structure_by_row_slice(ranges_and_keys, start_from); } @@ -270,7 +271,7 @@ struct PartitionClause { [[nodiscard]] std::vector> structure_for_processing( std::vector& ranges_and_keys, - size_t start_from) const { + size_t start_from) { return structure_by_row_slice(ranges_and_keys, start_from); } @@ -380,7 +381,7 @@ struct AggregationClause { [[noreturn]] std::vector> structure_for_processing( ARCTICDB_UNUSED const std::vector&, - ARCTICDB_UNUSED size_t) const { + ARCTICDB_UNUSED size_t) { internal::raise( "AggregationClause::structure_for_processing should never be called" ); @@ -409,6 +410,109 @@ struct AggregationClause { [[nodiscard]] std::string to_string() const; }; +template +class Bucket { +public: + Bucket(timestamp start, timestamp end): + start_(start), end_(end){} + + void set_boundaries(timestamp start, timestamp end) { + start_ = start; + end_ = end; + } + + bool contains(timestamp ts) const { + if constexpr (closed_boundary == ResampleBoundary::LEFT) { + return ts >= start_ && ts < end_; + } else { + // closed_boundary == ResampleBoundary::RIGHT + return ts > start_ && ts <= end_; + } + } + +private: + timestamp start_; + timestamp end_; +}; + +template +struct ResampleClause { + ClauseInfo clause_info_; + std::shared_ptr component_manager_; + ProcessingConfig processing_config_; + std::string rule_; + ResampleBoundary label_boundary_; + // This will either hold the date range specified by the user, or the first and last timestamps present in the index column + std::optional date_range_; + // Inject this as a callback in the ctor to avoid language-specific dependencies this low down in the codebase + std::function(timestamp, timestamp, std::string_view, ResampleBoundary)> generate_bucket_boundaries_; + std::vector bucket_boundaries_; + std::vector aggregators_; + std::string str_; + + ResampleClause() = delete; + + ARCTICDB_MOVE_COPY_DEFAULT(ResampleClause) + + ResampleClause(const std::string& rule, + ResampleBoundary label_boundary, + std::function(timestamp, timestamp, std::string_view, ResampleBoundary)>&& generate_bucket_boundaries): + rule_(rule), + label_boundary_(label_boundary), + generate_bucket_boundaries_(std::move(generate_bucket_boundaries)){ + clause_info_.can_combine_with_column_selection_ = false; + clause_info_.modifies_output_descriptor_ = true; + } + + [[nodiscard]] std::vector> structure_for_processing( + std::vector& ranges_and_keys, + size_t start_from); + + [[nodiscard]] Composite process(Composite&& entity_ids) const; + + [[nodiscard]] std::optional>> repartition( + ARCTICDB_UNUSED std::vector>&& + ) const { + return std::nullopt; + } + + [[nodiscard]] const ClauseInfo& clause_info() const { + return clause_info_; + } + + void set_processing_config(const ProcessingConfig& processing_config); + + void set_component_manager(std::shared_ptr component_manager) { + component_manager_ = component_manager; + } + + [[nodiscard]] std::string to_string() const; + + [[nodiscard]] std::string rule() const { + return rule_; + } + + void set_aggregations(const std::vector& named_aggregators); + + void set_date_range(timestamp date_range_start, timestamp date_range_end) { + date_range_.emplace(date_range_start, date_range_end); + } + + bool index_range_outside_bucket_range(timestamp start_index, timestamp end_index) const; + + // Advances the bucket boundary iterator to the end of the last bucket that includes a value from a row slice with the given last index value + void advance_bucket_past_value(const std::vector& bucket_boundaries, + std::vector::const_iterator& bucket_boundaries_it, + timestamp value) const; + + std::vector generate_bucket_boundaries(timestamp first_ts, + timestamp last_ts, + bool responsible_for_first_overlapping_bucket) const; + + std::shared_ptr generate_output_index_column(const std::vector>& input_index_columns, + const std::vector& bucket_boundaries) const; +}; + struct RemoveColumnPartitioningClause { ClauseInfo clause_info_; std::shared_ptr component_manager_; @@ -421,7 +525,7 @@ struct RemoveColumnPartitioningClause { [[nodiscard]] std::vector> structure_for_processing( std::vector& ranges_and_keys, - size_t start_from) const { + size_t start_from) { return structure_by_row_slice(ranges_and_keys, start_from); } @@ -455,7 +559,7 @@ struct SplitClause { [[nodiscard]] std::vector> structure_for_processing( std::vector& ranges_and_keys, - size_t start_from) const { + size_t start_from) { return structure_by_row_slice(ranges_and_keys, start_from); } @@ -488,7 +592,7 @@ struct SortClause { [[nodiscard]] std::vector> structure_for_processing( std::vector& ranges_and_keys, - size_t start_from) const { + size_t start_from) { return structure_by_row_slice(ranges_and_keys, start_from); } @@ -534,7 +638,7 @@ struct MergeClause { [[nodiscard]] std::vector> structure_for_processing( std::vector& ranges_and_keys, - size_t start_from) const { + size_t start_from) { return structure_by_row_slice(ranges_and_keys, start_from); } @@ -575,7 +679,7 @@ struct ColumnStatsGenerationClause { [[nodiscard]] std::vector> structure_for_processing( std::vector& ranges_and_keys, - size_t start_from) const { + size_t start_from) { return structure_by_row_slice(ranges_and_keys, start_from); } @@ -643,7 +747,7 @@ struct RowRangeClause { [[nodiscard]] std::vector> structure_for_processing( std::vector& ranges_and_keys, - ARCTICDB_UNUSED size_t start_from) const; + ARCTICDB_UNUSED size_t start_from); [[nodiscard]] Composite process(Composite&& entity_ids) const; @@ -684,7 +788,7 @@ struct DateRangeClause { [[nodiscard]] std::vector> structure_for_processing( std::vector& ranges_and_keys, - size_t start_from) const; + size_t start_from); [[nodiscard]] Composite process(Composite&& entity_ids) const; diff --git a/cpp/arcticdb/processing/component_manager.hpp b/cpp/arcticdb/processing/component_manager.hpp index 859d996e97..51698e4b1c 100644 --- a/cpp/arcticdb/processing/component_manager.hpp +++ b/cpp/arcticdb/processing/component_manager.hpp @@ -66,13 +66,26 @@ class ComponentManager { } } + template + uint64_t get_initial_expected_get_calls(EntityId id) { + // Only applies to ComponentMaps tracking expected get calls + if constexpr(std::is_same_v>) { + return segment_map_.get_initial_expected_get_calls(id); + } else { + // Hacky workaround for static_assert(false) not being allowed + // See https://www.open-std.org/jtc1/sc22/wg21/docs/papers/2022/p2593r0.html + static_assert(sizeof(T) == 0, "Unsupported component type passed to ComponentManager::get_initial_expected_get_calls"); + } + } + private: template class ComponentMap { public: explicit ComponentMap(std::string&& entity_type, bool track_expected_gets): entity_type_(std::move(entity_type)), - opt_expected_get_calls_map_(track_expected_gets ? std::make_optional>() : std::nullopt){ + opt_expected_get_calls_map_(track_expected_gets ? std::make_optional>() : std::nullopt), + opt_expected_get_calls_initial_map_(track_expected_gets ? std::make_optional>() : std::nullopt){ }; ARCTICDB_NO_MOVE_OR_COPY(ComponentMap) @@ -89,6 +102,9 @@ class ComponentManager { internal::check(opt_expected_get_calls_map_->try_emplace(id, *expected_get_calls).second, "Failed to insert {} with ID {}, already exists", entity_type_, id); + internal::check(opt_expected_get_calls_initial_map_->try_emplace(id, *expected_get_calls).second, + "Failed to insert {} with ID {}, already exists", + entity_type_, id); } } T get(EntityId id) { @@ -114,6 +130,17 @@ class ComponentManager { } return res; } + uint64_t get_initial_expected_get_calls(EntityId id) { + std::lock_guard lock(mtx_); + ARCTICDB_DEBUG(log::storage(), "Getting initial expected get calls of {} with id {}", entity_type_, id); + internal::check(opt_expected_get_calls_initial_map_.has_value(), + "Cannot get initial expected get calls for {} as they are not being tracked", entity_type_); + auto it = opt_expected_get_calls_initial_map_->find(id); + internal::check(it != opt_expected_get_calls_initial_map_->end(), + "Requested non-existent {} with ID {}", + entity_type_, id); + return it->second; + } private: // Just used for logging/exception messages std::string entity_type_; @@ -121,6 +148,7 @@ class ComponentManager { // If not nullopt, tracks the number of calls to get for each entity id, and erases from maps when it has been // called this many times std::optional> opt_expected_get_calls_map_; + std::optional> opt_expected_get_calls_initial_map_; std::mutex mtx_; }; diff --git a/cpp/arcticdb/processing/processing_unit.cpp b/cpp/arcticdb/processing/processing_unit.cpp index 2430022b6c..21e5f85aa3 100644 --- a/cpp/arcticdb/processing/processing_unit.cpp +++ b/cpp/arcticdb/processing/processing_unit.cpp @@ -89,7 +89,7 @@ VariantData ProcessingUnit::get(const VariantNode &name) { return computed->second; } else { auto expr = expression_context_->expression_nodes_.get_value(expression_name.value); - auto data = expr->compute(self()); + auto data = expr->compute(*this); computed_data_.try_emplace(expression_name.value, data); return data; } @@ -100,4 +100,59 @@ VariantData ProcessingUnit::get(const VariantNode &name) { ); } +std::vector split_by_row_slice(ProcessingUnit&& proc) { + auto input = std::move(proc); + internal::check(input.segments_.has_value(), "split_by_row_slice needs Segments"); + internal::check(input.row_ranges_.has_value(), "split_by_row_slice needs RowRanges"); + internal::check(input.col_ranges_.has_value(), "split_by_row_slice needs ColRanges"); + auto include_expected_get_calls = input.segment_initial_expected_get_calls_.has_value(); + internal::check(input.segment_initial_expected_get_calls_.has_value(), "split_by_row_slice needs expected get calls"); + + std::map output_map; + for (auto [idx, row_range_ptr]: folly::enumerate(*input.row_ranges_)) { + if (auto it = output_map.find(*row_range_ptr); it != output_map.end()) { + it->second.segments_->emplace_back(input.segments_->at(idx)); + it->second.row_ranges_->emplace_back(input.row_ranges_->at(idx)); + it->second.col_ranges_->emplace_back(input.col_ranges_->at(idx)); + if (include_expected_get_calls) { + it->second.segment_initial_expected_get_calls_->emplace_back( + input.segment_initial_expected_get_calls_->at(idx)); + } + } else { + auto [inserted_it, _] = output_map.emplace(*row_range_ptr, ProcessingUnit{}); + inserted_it->second.segments_.emplace(1, input.segments_->at(idx)); + inserted_it->second.row_ranges_.emplace(1, input.row_ranges_->at(idx)); + inserted_it->second.col_ranges_.emplace(1, input.col_ranges_->at(idx)); + if (include_expected_get_calls) { + inserted_it->second.segment_initial_expected_get_calls_.emplace(1, + input.segment_initial_expected_get_calls_->at( + idx)); + } + } + } + std::vector output; + output.reserve(output_map.size()); + for (auto&& [_, processing_unit]: output_map) { + output.emplace_back(std::move(processing_unit)); + } + // The expected get counts for all segments in a row slice should be the same + // This should always be 1 or 2 for the first row slice, and 1 for all of the others + internal::check(!output.empty(), "Unexpected empty output in split_by_row_slice"); + if (include_expected_get_calls) { + for (const auto &row_slice: output) { + auto expected_get_calls = row_slice.segment_initial_expected_get_calls_->front(); + internal::check(0 < expected_get_calls && expected_get_calls <= 2, + "expected_get_calls in split_by_row_slice should be 1 or 2, got {}", + expected_get_calls); + internal::check( + std::all_of(row_slice.segment_initial_expected_get_calls_->begin(), + row_slice.segment_initial_expected_get_calls_->end(), + [&expected_get_calls](uint64_t i) { return i == expected_get_calls; }), + "All segments in same row slice should have same expected_get_calls in split_by_row_slice"); + } + } + + return output; +} + } //namespace arcticdb \ No newline at end of file diff --git a/cpp/arcticdb/processing/processing_unit.hpp b/cpp/arcticdb/processing/processing_unit.hpp index eb7f6494d6..a275b9d42f 100644 --- a/cpp/arcticdb/processing/processing_unit.hpp +++ b/cpp/arcticdb/processing/processing_unit.hpp @@ -51,6 +51,7 @@ namespace arcticdb { std::optional>> col_ranges_; std::optional>> atom_keys_; std::optional bucket_; + std::optional> segment_initial_expected_get_calls_; std::shared_ptr expression_context_; std::unordered_map computed_data_; @@ -89,8 +90,8 @@ namespace arcticdb { bucket_.emplace(bucket); } - ProcessingUnit &self() { - return *this; + void set_segment_initial_expected_get_calls(std::vector&& segment_initial_expected_get_calls) { + segment_initial_expected_get_calls_.emplace(segment_initial_expected_get_calls); } void apply_filter(util::BitSet&& bitset, PipelineOptimisation optimisation); @@ -107,6 +108,8 @@ namespace arcticdb { VariantData get(const VariantNode &name); }; + std::vector split_by_row_slice(ProcessingUnit&& proc); + inline std::vector collect_segments(Composite&& p) { auto procs = std::move(p); std::vector output; diff --git a/cpp/arcticdb/processing/test/rapidcheck_resample.cpp b/cpp/arcticdb/processing/test/rapidcheck_resample.cpp new file mode 100644 index 0000000000..bd4041fc3e --- /dev/null +++ b/cpp/arcticdb/processing/test/rapidcheck_resample.cpp @@ -0,0 +1,122 @@ +/* Copyright 2023 Man Group Operations Limited + * + * Use of this software is governed by the Business Source License 1.1 included in the file licenses/BSL.txt. + * + * As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0. + */ + +#include + +#include "gtest/gtest.h" +#include + +#include +#include +#include + +using namespace arcticdb; + +auto generate_bucket_boundaries(std::vector&& bucket_boundaries) { + return [bucket_boundaries = std::move(bucket_boundaries)](timestamp, timestamp, std::string_view, ResampleBoundary) mutable { + return bucket_boundaries; + }; +} + +RC_GTEST_PROP(Resample, StructureForProcessing, ()) { + StreamId sym{"sym"}; + const auto unsorted_index_values = *rc::gen::unique>(rc::gen::arbitrary()); + RC_PRE(unsorted_index_values.size() > 0); + const auto unsorted_bucket_boundaries = *rc::gen::unique>(rc::gen::arbitrary()); + RC_PRE(unsorted_bucket_boundaries.size() > 1); + const auto left_boundary_closed = *rc::gen::arbitrary(); + + // Index values and bucket boundaries are sorted + auto index_values = unsorted_index_values; + std::sort(index_values.begin(), index_values.end()); + auto bucket_boundaries = unsorted_bucket_boundaries; + std::sort(bucket_boundaries.begin(), bucket_boundaries.end()); + + // Use a single column slice as the interesting behaviour to test is around row-slicing + ColRange col_range{1, 2}; + + // Only the start and end index values of each row-slice is used in structure_for_processing, so use 2 rows + // per row-slice + auto num_row_slices = index_values.size() / 2 + index_values.size() % 2; + std::vector sorted_ranges_and_keys; + sorted_ranges_and_keys.reserve(num_row_slices); + for (size_t idx = 0; idx < num_row_slices; idx++) { + auto row_range_start = idx * 2; + auto row_range_end = std::min((idx + 1) * 2, index_values.size()); + RowRange row_range{row_range_start, row_range_end}; + auto start_idx_value = index_values[row_range_start]; + auto end_idx_value = index_values[row_range_end - 1] + 1; + auto key = AtomKeyBuilder().start_index(start_idx_value).end_index(end_idx_value).build(sym); + sorted_ranges_and_keys.emplace_back(row_range, col_range, key); + } + auto ranges_and_keys = sorted_ranges_and_keys; + std::random_shuffle(ranges_and_keys.begin(), ranges_and_keys.end()); + + // Create vector of bucket boundary pairs, inclusive at both ends + // bucket_id will be used to refer to the index in these vectors of a specific bucket + std::vector> bucket_boundary_pairs; + bucket_boundary_pairs.reserve(bucket_boundaries.size() - 1); + for (size_t idx = 0; idx < bucket_boundaries.size() - 1; idx++) { + if (left_boundary_closed) { + bucket_boundary_pairs.emplace_back(bucket_boundaries[idx], bucket_boundaries[idx + 1] - 1); + } else { // right_boundary_closed + bucket_boundary_pairs.emplace_back(bucket_boundaries[idx] + 1, bucket_boundaries[idx + 1]); + } + } + + // Eliminate ranges that do not overlap with the buckets + for (auto it = sorted_ranges_and_keys.begin(); it != sorted_ranges_and_keys.end();) { + if (it->key_.start_time() > bucket_boundary_pairs.back().second || + it->key_.end_time() <= bucket_boundary_pairs.front().first) { + it = sorted_ranges_and_keys.erase(it); + } else { + it++; + } + } + + // Map from bucket_id to indexes in sorted_ranges_and_keys of row-slices needed for this bucket + std::vector> bucket_to_row_range_map(bucket_boundary_pairs.size(), std::vector()); + for (const auto& [bucket_id, bucket_boundary_pair]: folly::enumerate(bucket_boundary_pairs)) { + for (const auto& [idx, range]: folly::enumerate(sorted_ranges_and_keys)) { + if (range.key_.start_time() <= bucket_boundary_pair.second && + range.key_.end_time() > bucket_boundary_pair.first) { + bucket_to_row_range_map[bucket_id].emplace_back(idx); + } + } + } + + std::vector> expected_result; + std::optional current_range_idx; + for (const auto& row_range_ids: bucket_to_row_range_map) { + if (!row_range_ids.empty()) { + if (current_range_idx.has_value() && row_range_ids.front() == *current_range_idx) { + if (row_range_ids.front() != expected_result.back().front()) { + expected_result.emplace_back(row_range_ids); + } else { + for (const auto &id: row_range_ids) { + if (id > expected_result.back().back()) { + expected_result.back().emplace_back(id); + } + } + } + } else { + expected_result.emplace_back(row_range_ids); + current_range_idx = row_range_ids.back(); + } + } + } + + if (left_boundary_closed) { + ResampleClause resample_clause{"dummy", ResampleBoundary::LEFT, generate_bucket_boundaries(std::move(bucket_boundaries))}; + auto result = resample_clause.structure_for_processing(ranges_and_keys, 0); + RC_ASSERT(expected_result == result); + } else { + ResampleClause resample_clause{"dummy", ResampleBoundary::RIGHT, generate_bucket_boundaries(std::move(bucket_boundaries))}; + auto result = resample_clause.structure_for_processing(ranges_and_keys, 0); + RC_ASSERT(expected_result == result); + } +} diff --git a/cpp/arcticdb/processing/test/test_component_manager.cpp b/cpp/arcticdb/processing/test/test_component_manager.cpp index 181e060435..367bcf3953 100644 --- a/cpp/arcticdb/processing/test/test_component_manager.cpp +++ b/cpp/arcticdb/processing/test/test_component_manager.cpp @@ -40,6 +40,7 @@ TEST(ComponentManager, Simple) { ASSERT_EQ(id_1, 1); ASSERT_EQ(component_manager.get>(id_0), segment_0); + ASSERT_EQ(component_manager.get_initial_expected_get_calls>(id_0), expected_get_calls_0); ASSERT_EQ(component_manager.get>(id_0), row_range_0); ASSERT_EQ(component_manager.get>(id_0), col_range_0); ASSERT_EQ(component_manager.get>(id_0), key_0); @@ -47,6 +48,7 @@ TEST(ComponentManager, Simple) { ASSERT_EQ(component_manager.get>(id_1), segment_1); ASSERT_EQ(component_manager.get>(id_1), segment_1); + ASSERT_EQ(component_manager.get_initial_expected_get_calls>(id_1), expected_get_calls_1); ASSERT_EQ(component_manager.get>(id_1), row_range_1); ASSERT_EQ(component_manager.get>(id_1), col_range_1); ASSERT_EQ(component_manager.get>(id_1), key_1); diff --git a/cpp/arcticdb/processing/test/test_resample.cpp b/cpp/arcticdb/processing/test/test_resample.cpp new file mode 100644 index 0000000000..2da42f8430 --- /dev/null +++ b/cpp/arcticdb/processing/test/test_resample.cpp @@ -0,0 +1,376 @@ +/* Copyright 2023 Man Group Operations Limited + * + * Use of this software is governed by the Business Source License 1.1 included in the file licenses/BSL.txt. + * + * As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0. + */ + +#include + +#include +#include +#include + +using namespace arcticdb; + +auto generate_bucket_boundaries(std::vector&& bucket_boundaries) { + return [bucket_boundaries = std::move(bucket_boundaries)](timestamp, timestamp, std::string_view, ResampleBoundary) { + return bucket_boundaries; + }; +} + +TEST(Resample, StructureForProcessingBasic) { + // Bucket boundaries such that the first processing unit does not need any rows from the second row slice + // No column slicing + StreamId sym{"sym"}; + ColRange col_range{1, 2}; + RowRange row_range_1{0, 100}; + RowRange row_range_2{100, 200}; + auto key_1 = AtomKeyBuilder().start_index(0).end_index(1000).build(sym); + auto key_2 = AtomKeyBuilder().start_index(2000).end_index(3000).build(sym); + RangesAndKey top(row_range_1, col_range, key_1); + RangesAndKey bottom(row_range_2, col_range, key_2); + // Insert into vector "out of order" to ensure structure_for_processing reorders correctly + std::vector ranges_and_keys{bottom, top}; + + ResampleClause resample_clause{"dummy", ResampleBoundary::LEFT, generate_bucket_boundaries({1, 500, 1500, 2500, 2999})}; + auto proc_unit_ids = resample_clause.structure_for_processing(ranges_and_keys, 0); + ASSERT_EQ(ranges_and_keys.size(), 2); + ASSERT_EQ(ranges_and_keys[0], top); + ASSERT_EQ(ranges_and_keys[1], bottom); + std::vector> expected_proc_unit_ids{{0}, {1}}; + ASSERT_EQ(expected_proc_unit_ids, proc_unit_ids); +} + +TEST(Resample, StructureForProcessingColumnSlicing) { + // Bucket boundaries such that the first processing unit does not need any rows from the second row slice + // Two column slices + StreamId sym{"sym"}; + ColRange col_range_1{1, 2}; + ColRange col_range_2{2, 3}; + RowRange row_range_1{0, 100}; + RowRange row_range_2{100, 200}; + auto key_1 = AtomKeyBuilder().start_index(0).end_index(1000).build(sym); + auto key_2 = AtomKeyBuilder().start_index(0).end_index(1000).build(sym); + auto key_3 = AtomKeyBuilder().start_index(2000).end_index(3000).build(sym); + auto key_4 = AtomKeyBuilder().start_index(2000).end_index(3000).build(sym); + RangesAndKey top_left(row_range_1, col_range_1, key_1); + RangesAndKey top_right(row_range_1, col_range_2, key_2); + RangesAndKey bottom_left(row_range_2, col_range_1, key_3); + RangesAndKey bottom_right(row_range_2, col_range_2, key_4); + // Insert into vector "out of order" to ensure structure_for_processing reorders correctly + std::vector ranges_and_keys{top_right, bottom_left, bottom_right, top_left}; + + ResampleClause resample_clause{"dummy", ResampleBoundary::LEFT, generate_bucket_boundaries({1, 500, 1500, 2500, 2999})}; + auto proc_unit_ids = resample_clause.structure_for_processing(ranges_and_keys, 0); + ASSERT_EQ(ranges_and_keys.size(), 4); + ASSERT_EQ(ranges_and_keys[0], top_left); + ASSERT_EQ(ranges_and_keys[1], top_right); + ASSERT_EQ(ranges_and_keys[2], bottom_left); + ASSERT_EQ(ranges_and_keys[3], bottom_right); + std::vector> expected_proc_unit_ids{{0, 1}, {2, 3}}; + ASSERT_EQ(expected_proc_unit_ids, proc_unit_ids); +} + +TEST(Resample, StructureForProcessingOverlap) { + // Bucket boundaries such that the first processing unit needs rows from the second row slice + // No column slicing + StreamId sym{"sym"}; + ColRange col_range{1, 2}; + RowRange row_range_1{0, 100}; + RowRange row_range_2{100, 200}; + auto key_1 = AtomKeyBuilder().start_index(0).end_index(1000).build(sym); + auto key_2 = AtomKeyBuilder().start_index(2000).end_index(3000).build(sym); + RangesAndKey top(row_range_1, col_range, key_1); + RangesAndKey bottom(row_range_2, col_range, key_2); + // Insert into vector "out of order" to ensure structure_for_processing reorders correctly + std::vector ranges_and_keys{bottom, top}; + + ResampleClause resample_clause{"dummy", ResampleBoundary::LEFT, generate_bucket_boundaries({1, 500, 2500, 2999})}; + auto proc_unit_ids = resample_clause.structure_for_processing(ranges_and_keys, 0); + ASSERT_EQ(ranges_and_keys.size(), 2); + ASSERT_EQ(ranges_and_keys[0], top); + ASSERT_EQ(ranges_and_keys[1], bottom); + std::vector> expected_proc_unit_ids{{0, 1}, {1}}; + ASSERT_EQ(expected_proc_unit_ids, proc_unit_ids); +} + +TEST(Resample, StructureForProcessingSubsumed) { + // Bucket boundaries such that the first processing unit needs all of the rows from the second and third row slices, + // such that there is only one element in the returned vector + // No column slicing + StreamId sym{"sym"}; + ColRange col_range{1, 2}; + RowRange row_range_1{0, 100}; + RowRange row_range_2{100, 200}; + RowRange row_range_3{200, 300}; + auto key_1 = AtomKeyBuilder().start_index(0).end_index(1000).build(sym); + auto key_2 = AtomKeyBuilder().start_index(2000).end_index(3000).build(sym); + auto key_3 = AtomKeyBuilder().start_index(3000).end_index(4000).build(sym); + RangesAndKey top(row_range_1, col_range, key_1); + RangesAndKey middle(row_range_2, col_range, key_2); + RangesAndKey bottom(row_range_3, col_range, key_3); + // Insert into vector "out of order" to ensure structure_for_processing reorders correctly + std::vector ranges_and_keys{bottom, middle, top}; + + ResampleClause resample_clause{"dummy", ResampleBoundary::LEFT, generate_bucket_boundaries({1, 500, 4500})}; + auto proc_unit_ids = resample_clause.structure_for_processing(ranges_and_keys, 0); + ASSERT_EQ(ranges_and_keys.size(), 3); + ASSERT_EQ(ranges_and_keys[0], top); + ASSERT_EQ(ranges_and_keys[1], middle); + ASSERT_EQ(ranges_and_keys[2], bottom); + std::vector> expected_proc_unit_ids{{0, 1, 2}}; + ASSERT_EQ(expected_proc_unit_ids, proc_unit_ids); +} + +TEST(Resample, StructureForProcessingExactBoundary) { + // Bucket boundaries such that the first processing unit needs rows from the second row slice when the right + // boundary is closed, but does not when the left boundary is closed + // No column slicing + StreamId sym{"sym"}; + ColRange col_range{1, 2}; + RowRange row_range_1{0, 100}; + RowRange row_range_2{100, 200}; + auto key_1 = AtomKeyBuilder().start_index(0).end_index(1000).build(sym); + auto key_2 = AtomKeyBuilder().start_index(2000).end_index(3000).build(sym); + RangesAndKey top(row_range_1, col_range, key_1); + RangesAndKey bottom(row_range_2, col_range, key_2); + // Insert into vector "out of order" to ensure structure_for_processing reorders correctly + std::vector ranges_and_keys{bottom, top}; + + ResampleClause resample_clause_left{"dummy", ResampleBoundary::LEFT, generate_bucket_boundaries({1, 500, 2000, 2500, 2999})}; + auto proc_unit_ids = resample_clause_left.structure_for_processing(ranges_and_keys, 0); + ASSERT_EQ(ranges_and_keys.size(), 2); + ASSERT_EQ(ranges_and_keys[0], top); + ASSERT_EQ(ranges_and_keys[1], bottom); + std::vector> expected_proc_unit_ids_left{{0}, {1}}; + ASSERT_EQ(expected_proc_unit_ids_left, proc_unit_ids); + + ResampleClause resample_clause_right{"dummy", ResampleBoundary::LEFT, generate_bucket_boundaries({1, 500, 2000, 2500, 2999})}; + proc_unit_ids = resample_clause_right.structure_for_processing(ranges_and_keys, 0); + ASSERT_EQ(ranges_and_keys.size(), 2); + ASSERT_EQ(ranges_and_keys[0], top); + ASSERT_EQ(ranges_and_keys[1], bottom); + std::vector> expected_proc_unit_ids_right{{0, 1}, {1}}; + ASSERT_EQ(expected_proc_unit_ids_right, proc_unit_ids); +} + +TEST(Resample, FindBuckets) { + // Enough bucket boundaries to test all the interesting cases + ResampleClause resample_left("left", ResampleBoundary::LEFT, generate_bucket_boundaries({0, 10, 20, 30, 40})); + ResampleClause resample_right("right", ResampleBoundary::RIGHT, generate_bucket_boundaries({0, 10, 20, 30, 40})); + + resample_left.bucket_boundaries_ = resample_left.generate_bucket_boundaries_(0, 0, "dummy", ResampleBoundary::LEFT); + resample_right.bucket_boundaries_ = resample_right.generate_bucket_boundaries_(0, 0, "dummy", ResampleBoundary::RIGHT); + + std::vector res; + + // Wholly contained in first bucket + res = resample_left.generate_bucket_boundaries(5, 6, true); + ASSERT_EQ(res.front(), 0); + ASSERT_EQ(res.back(), 10); + // Wholly contained in middle bucket + res = resample_left.generate_bucket_boundaries(12, 13, true); + ASSERT_EQ(res.front(), 10); + ASSERT_EQ(res.back(), 20); + // Wholly contained in last bucket + res = resample_left.generate_bucket_boundaries(35, 37, true); + ASSERT_EQ(res.front(), 30); + ASSERT_EQ(res.back(), 40); + + // Spanning multiple buckets + res = resample_left.generate_bucket_boundaries(5, 15, true); + ASSERT_EQ(res.front(), 0); + ASSERT_EQ(res.back(), 20); + res = resample_left.generate_bucket_boundaries(15, 25, true); + ASSERT_EQ(res.front(), 10); + ASSERT_EQ(res.back(), 30); + res = resample_left.generate_bucket_boundaries(15, 35, true); + ASSERT_EQ(res.front(), 10); + ASSERT_EQ(res.back(), 40); + + // Spanning multiple buckets, not responsible for the first bucket + res = resample_left.generate_bucket_boundaries(5, 15, false); + ASSERT_EQ(res.front(), 10); + ASSERT_EQ(res.back(), 20); + res = resample_left.generate_bucket_boundaries(15, 25, false); + ASSERT_EQ(res.front(), 20); + ASSERT_EQ(res.back(), 30); + res = resample_left.generate_bucket_boundaries(15, 35, false); + ASSERT_EQ(res.front(), 20); + ASSERT_EQ(res.back(), 40); + + // First bucket starts after the first timestamp + res = resample_left.generate_bucket_boundaries(-5, 15, true); + ASSERT_EQ(res.front(), 0); + ASSERT_EQ(res.back(), 20); + // Last bucket ends before the last timestamp + res = resample_left.generate_bucket_boundaries(15, 45, true); + ASSERT_EQ(res.front(), 10); + ASSERT_EQ(res.back(), 40); + + // Bucket boundary matching first and last timestamps + res = resample_left.generate_bucket_boundaries(10, 20, true); + ASSERT_EQ(res.front(), 10); + ASSERT_EQ(res.back(), 30); + res = resample_right.generate_bucket_boundaries(10, 20, true); + ASSERT_EQ(res.front(), 0); + ASSERT_EQ(res.back(), 20); +} + +TEST(Resample, ProcessOneSegment) { + auto component_manager = std::make_shared(); + + ResampleClause resample("dummy", ResampleBoundary::LEFT, generate_bucket_boundaries({-1, 2, 5})); + resample.bucket_boundaries_ = resample.generate_bucket_boundaries_(0, 0, "dummy", ResampleBoundary::LEFT); + resample.date_range_ = {0, 5}; + resample.set_component_manager(component_manager); + resample.set_aggregations({{"sum", "sum_column", "sum_column"}}); + + using index_TDT = TypeDescriptorTag, DimensionTag>; + auto index_column = std::make_shared(static_cast(index_TDT{}), 0, false, true); + using col_TDT = TypeDescriptorTag, DimensionTag>; + auto sum_column = std::make_shared(static_cast(col_TDT{}), 0, false, true); + size_t num_rows{5}; + for(size_t idx = 0; idx < num_rows; ++idx) { + index_column->set_scalar(static_cast(idx), static_cast(idx)); + sum_column->set_scalar(static_cast(idx), static_cast(idx)); + } + SegmentInMemory seg; + seg.add_column(scalar_field(index_column->type().data_type(), "index"), index_column); + seg.add_column(scalar_field(sum_column->type().data_type(), "sum_column"), sum_column); + seg.set_row_id(num_rows - 1); + + auto proc_unit = ProcessingUnit{std::move(seg)}; + auto entity_ids = Composite(push_entities(component_manager, std::move(proc_unit))); + + auto resampled = gather_entities(component_manager, resample.process(std::move(entity_ids))).as_range(); + ASSERT_EQ(1, resampled.size()); + ASSERT_TRUE(resampled[0].segments_.has_value()); + auto segments = resampled[0].segments_.value(); + ASSERT_EQ(1, segments.size()); + auto resampled_seg = *segments[0]; + + auto index_column_index = resampled_seg.column_index("index"); + ASSERT_TRUE(index_column_index.has_value()); + auto& resampled_index_column = resampled_seg.column(*index_column_index); + ASSERT_EQ(-1, resampled_index_column.scalar_at(0)); + ASSERT_EQ(2, resampled_index_column.scalar_at(1)); + + auto sum_column_index = resampled_seg.column_index("sum_column"); + ASSERT_TRUE(sum_column_index.has_value()); + auto& resampled_sum_column = resampled_seg.column(*sum_column_index); + ASSERT_EQ(1, resampled_sum_column.scalar_at(0)); + ASSERT_EQ(9, resampled_sum_column.scalar_at(1)); +} + +TEST(Resample, ProcessMultipleSegments) { + auto component_manager = std::make_shared(); + + ResampleClause resample("dummy", ResampleBoundary::LEFT, generate_bucket_boundaries({-15, -5, 5, 6, 25, 35, 45, 46, 55, 65})); + resample.bucket_boundaries_ = resample.generate_bucket_boundaries_(0, 0, "dummy", ResampleBoundary::LEFT); + resample.date_range_ = {0, 51}; + resample.set_component_manager(component_manager); + resample.set_aggregations({{"sum", "sum_column", "sum_column"}}); + // Index values of segments will be as follows: + // 0, 10 + // 20, 30, 40 + // 50 + // Therefore the buckets will be structured such that: + // -15 - -5: Before the range of the segments + // -5 - 5: Covers just the first value in the first segment + // 5 - 6: Within the range of the first segment, but no index values in this range + // 6 - 25: Covers the last value from the first segment and the first value of the second segment + // 25 - 35: Covers just the middle value of the second segment + // 35 - 45: Covers just the last value of the second segment + // 45 - 46: Covers a gap between two segments + // 46 - 55: Covers the third segment + // 55 - 65: After the range of the segments + + + using index_TDT = TypeDescriptorTag, DimensionTag>; + using col_TDT = TypeDescriptorTag, DimensionTag>; + + std::vector> segs; + std::vector> row_ranges; + std::vector> col_ranges; + + auto index_column = std::make_shared(static_cast(index_TDT{}), 0, false, true); + auto sum_column = std::make_shared(static_cast(col_TDT{}), 0, false, true); + index_column->set_scalar(0, 0); + index_column->set_scalar(1, 10); + sum_column->set_scalar(0, 0); + sum_column->set_scalar(1, 10); + auto seg_0 = std::make_shared(); + seg_0->add_column(scalar_field(index_column->type().data_type(), "index"), index_column); + seg_0->add_column(scalar_field(sum_column->type().data_type(), "sum_column"), sum_column); + seg_0->set_row_id(1); + auto row_range_0 = std::make_shared(0, 2); + auto col_range_0 = std::make_shared(1, 2); + + index_column = std::make_shared(static_cast(index_TDT{}), 0, false, true); + sum_column = std::make_shared(static_cast(col_TDT{}), 0, false, true); + index_column->set_scalar(0, 20); + index_column->set_scalar(1, 30); + index_column->set_scalar(2, 40); + sum_column->set_scalar(0, 20); + sum_column->set_scalar(1, 30); + sum_column->set_scalar(2, 40); + auto seg_1 = std::make_shared(); + seg_1->add_column(scalar_field(index_column->type().data_type(), "index"), index_column); + seg_1->add_column(scalar_field(sum_column->type().data_type(), "sum_column"), sum_column); + seg_1->set_row_id(2); + auto row_range_1 = std::make_shared(2, 5); + auto col_range_1 = std::make_shared(1, 2); + + index_column = std::make_shared(static_cast(index_TDT{}), 0, false, true); + sum_column = std::make_shared(static_cast(col_TDT{}), 0, false, true); + index_column->set_scalar(0, 50); + sum_column->set_scalar(0, 50); + auto seg_2 = std::make_shared(); + seg_2->add_column(scalar_field(index_column->type().data_type(), "index"), index_column); + seg_2->add_column(scalar_field(sum_column->type().data_type(), "sum_column"), sum_column); + seg_2->set_row_id(2); + auto row_range_2 = std::make_shared(5, 6); + auto col_range_2 = std::make_shared(1, 2); + + auto id_0 = component_manager->add(seg_0, std::nullopt, 1); + component_manager->add(row_range_0, id_0); + component_manager->add(col_range_0, id_0); + auto id_1 = component_manager->add(seg_1, std::nullopt, 2); + component_manager->add(row_range_1, id_1); + component_manager->add(col_range_1, id_1); + auto id_2 = component_manager->add(seg_2, std::nullopt, 1); + component_manager->add(row_range_2, id_2); + component_manager->add(col_range_2, id_2); + + + auto ids_0 = Composite(EntityIds{id_0, id_1}); + auto ids_1 = Composite(EntityIds{id_1}); + auto ids_2 = Composite(EntityIds{id_2}); + + auto resampled_0 = gather_entities(component_manager, resample.process(std::move(ids_0))).as_range(); + auto resampled_seg_0 = *resampled_0[0].segments_.value()[0]; + auto& resampled_index_column_0 = resampled_seg_0.column(0); + auto& resampled_sum_column_0 = resampled_seg_0.column(1); + ASSERT_EQ(-5, resampled_index_column_0.scalar_at(0)); + ASSERT_EQ(6, resampled_index_column_0.scalar_at(1)); + ASSERT_EQ(0, resampled_sum_column_0.scalar_at(0)); + ASSERT_EQ(30, resampled_sum_column_0.scalar_at(1)); + + auto resampled_1 = gather_entities(component_manager, resample.process(std::move(ids_1))).as_range(); + auto resampled_seg_1 = *resampled_1[0].segments_.value()[0]; + auto& resampled_index_column_1 = resampled_seg_1.column(0); + auto& resampled_sum_column_1 = resampled_seg_1.column(1); + ASSERT_EQ(25, resampled_index_column_1.scalar_at(0)); + ASSERT_EQ(35, resampled_index_column_1.scalar_at(1)); + ASSERT_EQ(30, resampled_sum_column_1.scalar_at(0)); + ASSERT_EQ(40, resampled_sum_column_1.scalar_at(1)); + + auto resampled_2 = gather_entities(component_manager, resample.process(std::move(ids_2))).as_range(); + auto resampled_seg_2 = *resampled_2[0].segments_.value()[0]; + auto& resampled_index_column_2 = resampled_seg_2.column(0); + auto& resampled_sum_column_2 = resampled_seg_2.column(1); + ASSERT_EQ(46, resampled_index_column_2.scalar_at(0)); + ASSERT_EQ(50, resampled_sum_column_2.scalar_at(0)); +} diff --git a/cpp/arcticdb/python/python_utils.hpp b/cpp/arcticdb/python/python_utils.hpp index 7148c6eae2..8c3d23e248 100644 --- a/cpp/arcticdb/python/python_utils.hpp +++ b/cpp/arcticdb/python/python_utils.hpp @@ -13,6 +13,7 @@ #include #include #include +#include #include #include #include @@ -226,4 +227,25 @@ inline py::list adapt_read_dfs(std::vector>& return lst; } +// aggregations is a dict similar to that accepted by Pandas agg method +// The key-value pairs come in 2 forms: +// 1: key is the column name to aggregate, value is the aggregation operator. Output column name will be the same as input column name +// 2: key is the column name to output, value is a pair where the first element is the input column name, and the second element is the aggregation operator +// These 2 styles can be mixed and matched +inline std::vector named_aggregators_from_dict(const std::unordered_map>> aggregations) { + std::vector named_aggregators; + for (const auto& [output_column_name, var_agg_named_agg]: aggregations) { + util::variant_match( + var_agg_named_agg, + [&named_aggregators, &output_column_name] (const std::string& agg_operator) { + named_aggregators.emplace_back(agg_operator, output_column_name, output_column_name); + }, + [&named_aggregators, &output_column_name] (const std::pair& input_col_and_agg) { + named_aggregators.emplace_back(input_col_and_agg.second, input_col_and_agg.first, output_column_name); + } + ); + } + return named_aggregators; +} + } // namespace arcticdb::python_util diff --git a/cpp/arcticdb/version/python_bindings.cpp b/cpp/arcticdb/version/python_bindings.cpp index a75911105a..81c7ab6edf 100644 --- a/cpp/arcticdb/version/python_bindings.cpp +++ b/cpp/arcticdb/version/python_bindings.cpp @@ -27,6 +27,39 @@ namespace arcticdb::version_store { +template +void declare_resample_clause(py::module& version) { + std::string class_name; + if constexpr (closed_boundary == ResampleBoundary::LEFT) { + class_name = "ResampleClauseLeftClosed"; + } else { + // closed_boundary == ResampleBoundary::RIGHT + class_name = "ResampleClauseRightClosed"; + } + py::class_, std::shared_ptr>>(version, class_name.c_str()) + .def(py::init([](std::string rule, ResampleBoundary label_boundary){ + return ResampleClause(rule, + label_boundary, + [](timestamp start, timestamp end, std::string_view rule, ResampleBoundary closed_boundary_arg) -> std::vector { + py::gil_scoped_acquire acquire_gil; + auto py_start = python_util::pd_Timestamp()(start - (closed_boundary_arg == ResampleBoundary::RIGHT ? 1 : 0)).attr("floor")(rule); + auto py_end = python_util::pd_Timestamp()(end + (closed_boundary_arg == ResampleBoundary::LEFT ? 1 : 0)).attr("ceil")(rule); + static py::object date_range_function = py::module::import("pandas").attr("date_range"); + auto py_bucket_boundaries = date_range_function(py_start, py_end, nullptr, rule, nullptr, false).attr("values").cast>(); + return std::vector(py_bucket_boundaries.data(), py_bucket_boundaries.data() + py_bucket_boundaries.size()); + }); + })) + .def_property_readonly("rule", &ResampleClause::rule) + .def("set_aggregations", [](ResampleClause& self, + const std::unordered_map>> aggregations) { + self.set_aggregations(python_util::named_aggregators_from_dict(aggregations)); + }) + .def("set_date_range", [](ResampleClause& self, timestamp date_range_start, timestamp date_range_end) { + self.set_date_range(date_range_start, date_range_end); + }) + .def("__str__", &ResampleClause::to_string); +} + void register_bindings(py::module &version, py::exception& base_exception) { py::register_exception(version, "StreamDescriptorMismatch", base_exception.ptr()); @@ -266,22 +299,17 @@ void register_bindings(py::module &version, py::exception>> aggregations) { - std::vector named_aggregators; - for (const auto& [output_column_name, var_agg_named_agg]: aggregations) { - util::variant_match( - var_agg_named_agg, - [&named_aggregators, &output_column_name] (const std::string& agg_operator) { - named_aggregators.emplace_back(agg_operator, output_column_name, output_column_name); - }, - [&named_aggregators, &output_column_name] (const std::pair& input_col_and_agg) { - named_aggregators.emplace_back(input_col_and_agg.second, input_col_and_agg.first, output_column_name); - } - ); - } - return AggregationClause(grouping_colum, named_aggregators); + return AggregationClause(grouping_colum, python_util::named_aggregators_from_dict(aggregations)); })) .def("__str__", &AggregationClause::to_string); + declare_resample_clause(version); + declare_resample_clause(version); + + py::enum_(version, "ResampleBoundary") + .value("LEFT", ResampleBoundary::LEFT) + .value("RIGHT", ResampleBoundary::RIGHT); + py::enum_(version, "RowRangeType") .value("HEAD", RowRangeClause::RowRangeType::HEAD) .value("TAIL", RowRangeClause::RowRangeType::TAIL) @@ -310,6 +338,8 @@ void register_bindings(py::module &version, py::exception, std::shared_ptr, std::shared_ptr, + std::shared_ptr>, + std::shared_ptr>, std::shared_ptr, std::shared_ptr>> clauses) { std::vector> _clauses; diff --git a/cpp/arcticdb/version/version_core.cpp b/cpp/arcticdb/version/version_core.cpp index 7d33413955..5fa3b5c32a 100644 --- a/cpp/arcticdb/version/version_core.cpp +++ b/cpp/arcticdb/version/version_core.cpp @@ -472,6 +472,8 @@ Composite process_clauses( std::vector entity_added(segment_and_slice_futures.size(), false); std::vector>> futures; bool first_clause{true}; + // Reverse the order of clauses and iterate through them backwards so that the erase is efficient + std::reverse(clauses.begin(), clauses.end()); while (!clauses.empty()) { for (auto&& comp_entity_ids: vec_comp_entity_ids) { if (first_clause) { @@ -523,13 +525,12 @@ Composite process_clauses( first_clause = false; vec_comp_entity_ids = folly::collect(futures).get(); futures.clear(); - // Erasing from front of vector not ideal, but they're just shared_ptr and there shouldn't be loads of clauses - while (clauses.size() > 0 && !clauses[0]->clause_info().requires_repartition_) { - clauses.erase(clauses.begin()); + while (clauses.size() > 0 && !clauses.back()->clause_info().requires_repartition_) { + clauses.erase(clauses.end() - 1); } - if (clauses.size() > 0 && clauses[0]->clause_info().requires_repartition_) { - vec_comp_entity_ids = clauses[0]->repartition(std::move(vec_comp_entity_ids)).value(); - clauses.erase(clauses.begin()); + if (clauses.size() > 0 && clauses.back()->clause_info().requires_repartition_) { + vec_comp_entity_ids = clauses.back()->repartition(std::move(vec_comp_entity_ids)).value(); + clauses.erase(clauses.end() - 1); } } return merge_composites(std::move(vec_comp_entity_ids)); @@ -891,7 +892,7 @@ void copy_frame_data_to_buffer(const SegmentInMemory& destination, size_t target auto total_size = dst_rawtype_size * num_rows; buffer.assert_size(offset + total_size); - auto src_ptr = src_column.data().buffer().data(); + auto src_data = src_column.data(); auto dst_ptr = buffer.data() + offset; auto type_promotion_error_msg = fmt::format("Can't promote type {} to type {} in field {}", @@ -901,17 +902,29 @@ void copy_frame_data_to_buffer(const SegmentInMemory& destination, size_t target util::default_initialize(dst_ptr, num_rows * dst_rawtype_size); }); } else if (trivially_compatible_types(src_column.type(), dst_column.type())) { - memcpy(dst_ptr, src_ptr, total_size); + details::visit_type(src_column.type().data_type() ,[&src_data, &dst_ptr] (auto src_desc_tag) { + using SourceTDT = ScalarTagType; + using SourceType = typename decltype(src_desc_tag)::DataTypeTag::raw_type; + while (auto block = src_data.template next()) { + const auto row_count = block->row_count(); + memcpy(dst_ptr, block->data(), row_count * sizeof(SourceType)); + dst_ptr += row_count * sizeof(SourceType); + } + }); } else if (has_valid_type_promotion(src_column.type(), dst_column.type())) { - dst_column.type().visit_tag([&src_ptr, &dst_ptr, &src_column, &type_promotion_error_msg, num_rows] (auto dest_desc_tag) { + details::visit_type(dst_column.type().data_type() ,[&src_data, &dst_ptr, &src_column, &type_promotion_error_msg] (auto dest_desc_tag) { using DestinationType = typename decltype(dest_desc_tag)::DataTypeTag::raw_type; - src_column.type().visit_tag([&src_ptr, &dst_ptr, &type_promotion_error_msg, num_rows] (auto src_desc_tag ) { + auto typed_dst_ptr = reinterpret_cast(dst_ptr); + details::visit_type(src_column.type().data_type() ,[&src_data, &typed_dst_ptr, &type_promotion_error_msg] (auto src_desc_tag) { + using SourceTDT = ScalarTagType; using SourceType = typename decltype(src_desc_tag)::DataTypeTag::raw_type; if constexpr(std::is_arithmetic_v && std::is_arithmetic_v) { - auto typed_src_ptr = reinterpret_cast(src_ptr); - auto typed_dst_ptr = reinterpret_cast(dst_ptr); - for (auto i = 0u; i < num_rows; ++i) { - *typed_dst_ptr++ = static_cast(*typed_src_ptr++); + while (auto block = src_data.template next()) { + const auto row_count = block->row_count(); + auto src_ptr = reinterpret_cast(block->data()); + for (auto i = 0u; i < row_count; ++i) { + *typed_dst_ptr++ = static_cast(*src_ptr++); + } } } else { util::raise_rte(type_promotion_error_msg.c_str()); diff --git a/python/arcticdb/version_store/_store.py b/python/arcticdb/version_store/_store.py index 3c12b64a47..b93b5ca3e0 100644 --- a/python/arcticdb/version_store/_store.py +++ b/python/arcticdb/version_store/_store.py @@ -1521,7 +1521,15 @@ def _get_read_query( check(date_range is None or row_range is None, "Date range and row range both specified") read_query = _PythonVersionStoreReadQuery() + if date_range is not None and query_builder is not None and not query_builder.is_resample(): + query_builder = QueryBuilder().date_range(date_range).then(query_builder) + + if row_range is not None and query_builder is not None: + q = QueryBuilder() + query_builder = q._row_range(row_range).then(query_builder) + if query_builder: + query_builder.set_date_range(date_range) read_query.add_clauses(query_builder.clauses) if row_range is not None: @@ -1667,14 +1675,6 @@ def read( ------- VersionedItem """ - if date_range is not None and query_builder is not None: - q = QueryBuilder() - query_builder = q.date_range(date_range).then(query_builder) - - if row_range is not None and query_builder is not None: - q = QueryBuilder() - query_builder = q._row_range(row_range).then(query_builder) - version_query, read_options, read_query = self._get_queries( symbol=symbol, as_of=as_of, diff --git a/python/arcticdb/version_store/processing.py b/python/arcticdb/version_store/processing.py index dba4a507ae..1d390baf7a 100644 --- a/python/arcticdb/version_store/processing.py +++ b/python/arcticdb/version_store/processing.py @@ -6,15 +6,16 @@ As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0. """ from collections import namedtuple +from dataclasses import dataclass import datetime from math import inf import numpy as np import pandas as pd -from typing import Dict, NamedTuple, Tuple, Union +from typing import Dict, NamedTuple, Optional, Tuple, Union -from arcticdb.exceptions import ArcticNativeException, UserInputException +from arcticdb.exceptions import ArcticDbNotYetImplemented, ArcticNativeException, UserInputException from arcticdb.version_store._normalization import normalize_dt_range_to_ts from arcticdb.preconditions import check from arcticdb.supported_types import DateRangeInput, time_types as supported_time_types @@ -25,6 +26,9 @@ from arcticdb_ext.version_store import ProjectClause as _ProjectClause from arcticdb_ext.version_store import GroupByClause as _GroupByClause from arcticdb_ext.version_store import AggregationClause as _AggregationClause +from arcticdb_ext.version_store import ResampleClauseLeftClosed as _ResampleClauseLeftClosed +from arcticdb_ext.version_store import ResampleClauseRightClosed as _ResampleClauseRightClosed +from arcticdb_ext.version_store import ResampleBoundary as _ResampleBoundary from arcticdb_ext.version_store import RowRangeClause as _RowRangeClause from arcticdb_ext.version_store import DateRangeClause as _DateRangeClause from arcticdb_ext.version_store import RowRangeType as _RowRangeType @@ -299,6 +303,14 @@ class PythonRowRangeClause(NamedTuple): start: int = None end: int = None +# Would be cleaner if all Python*Clause classes were dataclasses, but this is used for pickling, so hard to change now +@dataclass +class PythonResampleClause: + rule: str + closed: _ResampleBoundary + label: _ResampleBoundary + aggregations: Dict[str, Union[str, Tuple[str, str]]] = None + class QueryBuilder: """ @@ -530,8 +542,8 @@ def groupby(self, name: str): def agg(self, aggregations: Dict[str, Union[str, Tuple[str, str]]]): # Only makes sense if previous stage is a group-by check( - len(self.clauses) and isinstance(self.clauses[-1], _GroupByClause), - f"Aggregation only makes sense after groupby", + len(self.clauses) and isinstance(self.clauses[-1], (_GroupByClause, _ResampleClauseLeftClosed, _ResampleClauseRightClosed)), + f"Aggregation only makes sense after groupby or resample", ) for k, v in aggregations.items(): check(isinstance(v, (str, tuple)), f"Values in agg dict expected to be strings or tuples, received {v} of type {type(v)}") @@ -544,10 +556,63 @@ def agg(self, aggregations: Dict[str, Union[str, Tuple[str, str]]]): ) aggregations[k] = (v[0], v[1].lower()) - self.clauses.append(_AggregationClause(self.clauses[-1].grouping_column, aggregations)) - self._python_clauses.append(PythonAggregationClause(aggregations)) + if isinstance(self.clauses[-1], _GroupByClause): + self.clauses.append(_AggregationClause(self.clauses[-1].grouping_column, aggregations)) + self._python_clauses.append(PythonAggregationClause(aggregations)) + else: + self.clauses[-1].set_aggregations(aggregations) + self._python_clauses[-1].aggregations = aggregations + # self._python_clauses[-1] = self._python_clauses[-1]._replace(aggregations=aggregations) + return self + + + def resample( + self, + rule: Union[str, pd.DateOffset], + closed: Optional[str] = None, + label: Optional[str] = None, + ): + check(not len(self.clauses), "resample only supported as first clause in the pipeline") + rule = rule.freqstr if isinstance(rule, pd.DateOffset) else rule + # We use floor and ceiling later to round user-provided date ranges and or start/end index values of the symbol + # before calling pandas.date_range to generate the bucket boundaries, but floor and ceiling only work with + # well-defined intervals that are multiples of whole ns/us/ms/s/min/h/D + try: + pd.Timestamp(0).floor(rule) + except ValueError: + raise ArcticDbNotYetImplemented(f"Frequency string '{rule}' not yet supported. Valid frequency strings " + f"are ns, us, ms, s, min, h, D, and multiples/combinations thereof such " + f"as 1h30min") + + # This set is documented here: + # https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.Series.resample.html#pandas.Series.resample + # and lifted directly from pandas.core.resample.TimeGrouper.__init__, and so is inherently fragile to upstream + # changes + end_types = {"M", "A", "Q", "BM", "BA", "BQ", "W"} + boundary_map = { + "left": _ResampleBoundary.LEFT, + "right": _ResampleBoundary.RIGHT, + None: _ResampleBoundary.RIGHT if rule in end_types else _ResampleBoundary.LEFT + } + check(closed in boundary_map.keys(), f"closed kwarg to resample must be `left`, 'right', or None, but received '{closed}'") + check(label in boundary_map.keys(), f"label kwarg to resample must be `left`, 'right', or None, but received '{closed}'") + if boundary_map[closed] == _ResampleBoundary.LEFT: + self.clauses.append(_ResampleClauseLeftClosed(rule, boundary_map[label])) + else: + # boundary_map[closed] == _ResampleBoundary.RIGHT + self.clauses.append(_ResampleClauseRightClosed(rule, boundary_map[label])) + self._python_clauses.append(PythonResampleClause(rule=rule, closed=boundary_map[closed], label=boundary_map[label])) return self + def is_resample(self): + return len(self.clauses) and isinstance(self.clauses[0], (_ResampleClauseLeftClosed, _ResampleClauseRightClosed)) + + def set_date_range(self, date_range:Optional[DateRangeInput]): + if self.is_resample() and date_range is not None: + start, end = normalize_dt_range_to_ts(date_range) + self.clauses[0].set_date_range(start.value, end.value) + + # TODO: specify type of other must be QueryBuilder with from __future__ import annotations once only Python 3.7+ # supported def then(self, other): @@ -662,6 +727,14 @@ def __setstate__(self, state): self.clauses.append(_GroupByClause(python_clause.name)) elif isinstance(python_clause, PythonAggregationClause): self.clauses.append(_AggregationClause(self.clauses[-1].grouping_column, python_clause.aggregations)) + elif isinstance(python_clause, PythonResampleClause): + if python_clause.closed == _ResampleBoundary.LEFT: + self.clauses.append(_ResampleClauseLeftClosed(python_clause.rule, python_clause.label)) + else: + # python_clause.closed == _ResampleBoundary.RIGHT + self.clauses.append(_ResampleClauseRightClosed(python_clause.rule, python_clause.label)) + if python_clause.aggregations is not None: + self.clauses[-1].set_aggregations(python_clause.aggregations) elif isinstance(python_clause, PythonRowRangeClause): if python_clause.start is not None and python_clause.end is not None: self.clauses.append(_RowRangeClause(python_clause.start, python_clause.end)) @@ -701,7 +774,7 @@ def optimise_for_memory(self): clause.set_pipeline_optimisation(_Optimisation.MEMORY) def needs_post_processing(self): - return not any(isinstance(clause, (_RowRangeClause, _DateRangeClause)) for clause in self.clauses) + return not any(isinstance(clause, (_RowRangeClause, _DateRangeClause, _ResampleClauseLeftClosed, _ResampleClauseRightClosed)) for clause in self.clauses) CONSTRUCTOR_MAP = { diff --git a/python/tests/conftest.py b/python/tests/conftest.py index b3e0c9b473..77c5c2298f 100644 --- a/python/tests/conftest.py +++ b/python/tests/conftest.py @@ -615,7 +615,7 @@ def lmdb_version_store_small_segment(version_store_factory): @pytest.fixture def lmdb_version_store_tiny_segment(version_store_factory): - return version_store_factory(column_group_size=2, segment_row_size=2, lmdb_config={"map_size": 2**30}) + return version_store_factory(column_group_size=2, segment_row_size=2, dynamic_strings=True, lmdb_config={"map_size": 2**30}) @pytest.fixture diff --git a/python/tests/unit/arcticdb/version_store/profile_resample.py b/python/tests/unit/arcticdb/version_store/profile_resample.py new file mode 100644 index 0000000000..b71a9b819a --- /dev/null +++ b/python/tests/unit/arcticdb/version_store/profile_resample.py @@ -0,0 +1,205 @@ +from math import log10 +import time + +import numpy as np +import pandas as pd +import pytest + +from arcticdb import QueryBuilder, Arctic +from arcticdb.util.test import assert_frame_equal, random_strings_of_length + +rows_per_segment = 100_000 +rng = np.random.default_rng() + +ac = Arctic("lmdb:///tmp/arcticdb") +lib = ac.get_library("resample_profiling", create_if_missing=True) +lib = lib._nvs + + +@pytest.mark.parametrize("num_rows", [100_000, 1_000_000, 10_000_000, 100_000_000, 1_000_000_000]) +@pytest.mark.parametrize( + "col_type", + ["bool", "int", "float", "float_with_nans", "datetime", "datetime_with_nats", + "str10", "str100", "str1000", "str10000", "str100000", "str_with_nones10"] +) +def test_write_data(num_rows, col_type): + power_of_ten = round(log10(num_rows)) + sym = f"10^{power_of_ten}_{col_type}" + lib.delete(sym) + + num_segments = num_rows // rows_per_segment + for idx in range(num_segments): + index = pd.date_range(pd.Timestamp(idx * rows_per_segment, unit="us"), freq="us", periods=rows_per_segment) + if col_type == "int": + col_data = rng.integers(0, 100_000, rows_per_segment) + elif col_type == "bool": + col_data = rng.integers(0, 2, rows_per_segment) + col_data = col_data.astype(np.bool) + elif col_type.startswith("float"): + col_data = 100_000 * rng.random(rows_per_segment) + if col_type == "float_with_nans": + col_data[:rows_per_segment // 2] = np.nan + rng.shuffle(col_data) + elif col_type.startswith("datetime"): + col_data = rng.integers(0, 100_000, rows_per_segment) + col_data = col_data.astype("datetime64[s]") + if col_type == "datetime_with_nats": + col_data[:rows_per_segment // 2] = np.datetime64('NaT') + rng.shuffle(col_data) + elif col_type.startswith("str"): + num_unique_strings = int(col_type.lstrip("str_with_nones")) + unique_strings = random_strings_of_length(num_unique_strings, 10, True) + col_data = np.random.choice(unique_strings, rows_per_segment) + if col_type.startswith("str_with_nones"): + col_data[:rows_per_segment // 2] = None + rng.shuffle(col_data) + df = pd.DataFrame({"col": col_data}, index=index) + lib.append(sym, df, write_if_missing=True) + + +@pytest.mark.parametrize("num_rows", [100_000, 1_000_000, 10_000_000, 100_000_000, 1_000_000_000]) +@pytest.mark.parametrize( + "col_type", + ["bool", "int", "float", "float_with_nans", "datetime", "datetime_with_nats", + "str10", "str100", "str1000", "str10000", "str100000", "str_with_nones10"] +) +@pytest.mark.parametrize("freq", ["1us", "10us", "100us", "1ms", "10ms", "100ms", "1s", "10s", "100s", "1000s"]) +@pytest.mark.parametrize("agg", ["sum", "mean", "min", "max", "first", "last", "count"]) +def test_resample_data(num_rows, col_type, freq, agg): + if col_type in ["datetime", "datetime_with_nats"] and agg == "sum": + pytest.skip() + if col_type.startswith("str") and agg in ["sum", "mean", "min", "max"]: + pytest.skip() + if num_rows == 100_000 and "freq" in ["1s", "10s", "100s", "1000s"]: + pytest.skip() + if num_rows == 1_000_000 and "freq" in ["10s", "100s", "1000s"]: + pytest.skip() + if num_rows == 10_000_000 and "freq" in ["100s", "1000s"]: + pytest.skip() + if num_rows == 100_000_000 and "freq" in ["1000s"]: + pytest.skip() + + input_power_of_ten = round(log10(num_rows)) + sym = f"10^{input_power_of_ten}_{col_type}" + + start = time.time() + df = lib.read(sym).data + end = time.time() + read_time = end - start + + q = QueryBuilder() + q = q.resample(freq).agg({"col": agg}) + start = time.time() + df = lib.read(sym, query_builder=q).data + end = time.time() + arcticdb_resample_time = end - start + + output_power_of_ten = round(log10(len(df))) + + results_sym = f"results_1_core_10^{input_power_of_ten}_to_10^{output_power_of_ten}_{col_type}_{agg}" + results_df = pd.DataFrame( + { + "Input rows": [num_rows], + "Output rows": [len(df)], + "Column type": [col_type], + "Aggregation": [agg], + "Read time": [read_time], + "Resample time": [arcticdb_resample_time], + } + ) + lib.write(results_sym, results_df) + print(f"Downsampling ({agg}) 10^{input_power_of_ten}->10^{output_power_of_ten} rows of {col_type} took {end - start}") + + +@pytest.mark.parametrize("num_rows", [100_000]) +@pytest.mark.parametrize("col_type", ["int"]) +@pytest.mark.parametrize("freq", ["10ns"]) +@pytest.mark.parametrize("agg", ["sum"]) +def test_resample_mostly_missing_buckets(num_rows, col_type, freq, agg): + input_power_of_ten = round(log10(num_rows)) + sym = f"10^{input_power_of_ten}_{col_type}" + + q = QueryBuilder() + q = q.resample(freq).agg({"col": agg}) + start = time.time() + df = lib.read(sym, query_builder=q).data + end = time.time() + + output_power_of_ten = round(log10(len(df))) + + print(f"Downsampling ({agg}) 10^{input_power_of_ten}->10^{output_power_of_ten} rows of {col_type} took {end - start}") + + +@pytest.mark.parametrize("num_rows", [100_000_000]) +@pytest.mark.parametrize("col_type", ["int"]) +@pytest.mark.parametrize("freq", ["10us"]) +def test_resample_all_aggs_one_column(num_rows, col_type, freq): + input_power_of_ten = round(log10(num_rows)) + sym = f"10^{input_power_of_ten}_{col_type}" + + q = QueryBuilder() + q = q.resample(freq).agg( + { + "sum": ("col", "sum"), + "mean": ("col", "mean"), + "min": ("col", "min"), + "max": ("col", "max"), + "first": ("col", "first"), + "last": ("col", "last"), + "count": ("col", "count"), + } + ) + start = time.time() + df = lib.read(sym, query_builder=q).data + end = time.time() + + output_power_of_ten = round(log10(len(df))) + + print(f"Downsampling (all aggregators) 10^{input_power_of_ten}->10^{output_power_of_ten} rows of {col_type} took {end - start}") + + +@pytest.mark.parametrize("num_rows", [10_000_000, 100_000_000]) +def test_write_ohlcvt(num_rows): + power_of_ten = round(log10(num_rows)) + sym = f"10^{power_of_ten}_ohlcvt" + lib.delete(sym) + + num_segments = num_rows // rows_per_segment + for idx in range(num_segments): + index = pd.date_range(pd.Timestamp(idx * rows_per_segment, unit="m"), freq="min", periods=rows_per_segment) + df = pd.DataFrame( + { + "open": 100 * rng.random(rows_per_segment), + "high": 100 * rng.random(rows_per_segment), + "low": 100 * rng.random(rows_per_segment), + "close": 100 * rng.random(rows_per_segment), + "volume": rng.integers(0, 100_000, rows_per_segment), + "trades": rng.integers(0, 1_000, rows_per_segment), + }, + index=index + ) + lib.append(sym, df, write_if_missing=True) + + +@pytest.mark.parametrize("num_rows", [10_000_000, 100_000_000]) +@pytest.mark.parametrize("freq", ["5min", "15min", "H", "D"]) +def test_resample_ohlcvt(num_rows, freq): + power_of_ten = round(log10(num_rows)) + sym = f"10^{power_of_ten}_ohlcvt" + + q = QueryBuilder() + q = q.resample(freq).agg( + { + "open": "first", + "high": "max", + "low": "min", + "close": "last", + "volume": "sum", + "trades": "sum", + } + ) + start = time.time() + df = lib.read(sym, query_builder=q).data + end = time.time() + + print(f"Downsampling OHLCVT {num_rows}->{len(df)} rows took {end - start}") diff --git a/python/tests/unit/arcticdb/version_store/test_query_builder.py b/python/tests/unit/arcticdb/version_store/test_query_builder.py index 56c15b90f9..a9bbd4cee4 100644 --- a/python/tests/unit/arcticdb/version_store/test_query_builder.py +++ b/python/tests/unit/arcticdb/version_store/test_query_builder.py @@ -7,6 +7,7 @@ """ import numpy as np import pandas as pd +import pickle import pytest import datetime import dateutil @@ -416,6 +417,69 @@ def test_querybuilder_groupby_then_groupby(lmdb_version_store_tiny_segment): assert_frame_equal(expected, received) +def test_querybuilder_resample_then_filter(lmdb_version_store_tiny_segment): + lib = lmdb_version_store_tiny_segment + symbol = "test_querybuilder_resample_then_filter" + idx = [0, 1, 2, 3, 1000, 1001] + idx = np.array(idx, dtype="datetime64[ns]") + df = pd.DataFrame({"col": np.arange(6)}, index=idx) + lib.write(symbol, df) + + q = QueryBuilder() + q = q.resample("us").agg({"col": "sum"}) + q = q[q["col"] == 9] + + received = lib.read(symbol, query_builder=q).data + + expected = df.resample("us").agg({"col": "sum"}) + expected = expected.query("col == 9") + assert_frame_equal(expected, received) + + +def test_querybuilder_resample_then_project(lmdb_version_store_tiny_segment): + lib = lmdb_version_store_tiny_segment + symbol = "test_querybuilder_resample_then_project" + idx = [0, 1, 2, 3, 1000, 1001] + idx = np.array(idx, dtype="datetime64[ns]") + df = pd.DataFrame({"col": np.arange(6)}, index=idx) + lib.write(symbol, df) + + q = QueryBuilder() + q = q.resample("us").agg({"col": "sum"}) + q = q.apply("new_col", q["col"] * 3) + + received = lib.read(symbol, query_builder=q).data + + expected = df.resample("us").agg({"col": "sum"}) + expected["new_col"] = expected["col"] * 3 + assert_frame_equal(expected, received) + + +def test_querybuilder_resample_then_groupby(lmdb_version_store_tiny_segment): + lib = lmdb_version_store_tiny_segment + symbol = "test_querybuilder_resample_then_groupby" + idx = [0, 1, 1000, 1001, 2000, 2001, 3000, 3001] + idx = np.array(idx, dtype="datetime64[ns]") + # After downsampling and summing, grouping_col will be [0, 1, 1, 0] + df = pd.DataFrame( + { + "grouping_col": [0, 0, 10, -9, 20, -19, 30, -30], + "agg_col": np.arange(8), + }, + index=idx) + lib.write(symbol, df) + + q = QueryBuilder() + q = q.resample("us").agg({"grouping_col": "sum", "agg_col": "sum"}) + q = q.groupby("grouping_col").agg({"agg_col": "sum"}) + + received = lib.read(symbol, query_builder=q).data + + expected = df.resample("us").agg({"grouping_col": "sum", "agg_col": "sum"}) + expected = expected.groupby("grouping_col").agg({"agg_col": "sum"}) + assert_frame_equal(expected, received) + + def test_querybuilder_pickling(): """QueryBuilder must be pickleable with all possible clauses.""" @@ -424,10 +488,10 @@ def test_querybuilder_pickling(): # PythonDateRangeClause q = q.date_range((pd.Timestamp("2000-01-04"), pd.Timestamp("2000-01-07"))) - # PythonProjectionClause + # PythonFilterClause q = q[q["col1"].isin(2, 3, 7)] - # PythonFilterClause + # PythonProjectionClause q = q.apply("new_col", (q["col1"] * q["col2"]) + 13) # PythonGroupByClause @@ -436,7 +500,15 @@ def test_querybuilder_pickling(): # PythonAggregationClause q = q.agg({"col2": "sum", "new_col": ("col2", "mean")}) - import pickle + assert pickle.loads(pickle.dumps(q)) == q + + # PythonResampleClause + q = QueryBuilder() + q = q.resample("T", "right", "left") + + assert pickle.loads(pickle.dumps(q)) == q + + q = q.agg({"col2": "sum", "new_col": ("col2", "sum")}) assert pickle.loads(pickle.dumps(q)) == q diff --git a/python/tests/unit/arcticdb/version_store/test_resample.py b/python/tests/unit/arcticdb/version_store/test_resample.py new file mode 100644 index 0000000000..9c41aeb552 --- /dev/null +++ b/python/tests/unit/arcticdb/version_store/test_resample.py @@ -0,0 +1,425 @@ +from functools import partial +import numpy as np +import pandas as pd +import pytest + +from arcticdb import QueryBuilder +from arcticdb.exceptions import ArcticDbNotYetImplemented, SchemaException +from arcticdb.util.test import assert_frame_equal +from arcticdb.util._versions import IS_PANDAS_TWO + +def generic_resample_test(lib, sym, rule, aggregations, date_range=None, closed=None, label=None): + # Pandas doesn't have a good date_range equivalent in resample, so just use read for that + expected = lib.read(sym, date_range=date_range).data + # Pandas 1.X needs None as the first argument to agg with named aggregators + expected = expected.resample(rule, closed=closed, label=label).agg(None, **aggregations) + expected = expected.reindex(columns=sorted(expected.columns)) + + q = QueryBuilder() + q = q.resample(rule, closed=closed, label=label).agg(aggregations) + received = lib.read(sym, date_range=date_range, query_builder=q).data + received = received.reindex(columns=sorted(received.columns)) + + assert_frame_equal(expected, received, check_dtype=False) + + +@pytest.mark.parametrize("freq", ("min", "h", "D", "1h30min")) +@pytest.mark.parametrize("date_range", (None, (pd.Timestamp("2024-01-02T12:00:00"), pd.Timestamp("2024-01-03T12:00:00")))) +@pytest.mark.parametrize("closed", ("left", "right")) +@pytest.mark.parametrize("label", ("left", "right")) +def test_resampling(lmdb_version_store_v1, freq, date_range, closed, label): + lib = lmdb_version_store_v1 + sym = "test_resampling" + # Want an index with data every minute for 2 days, with additional data points 1 nanosecond before and after each + # minute to catch off-by-one errors + idx_start_base = pd.Timestamp("2024-01-02") + idx_end_base = pd.Timestamp("2024-01-04") + + idx = pd.date_range(idx_start_base, idx_end_base, freq="min") + idx_1_nano_before = pd.date_range(idx_start_base - pd.Timedelta(1), idx_end_base - pd.Timedelta(1), freq="min") + idx_1_nano_after = pd.date_range(idx_start_base + pd.Timedelta(1), idx_end_base + pd.Timedelta(1), freq="min") + idx = idx.join(idx_1_nano_before, how="outer").join(idx_1_nano_after, how="outer") + rng = np.random.default_rng() + df = pd.DataFrame({"col": rng.integers(0, 100, len(idx))}, index=idx) + lib.write(sym, df) + + generic_resample_test( + lib, + sym, + freq, + { + "sum": ("col", "sum"), + "min": ("col", "min"), + "max": ("col", "max"), + "mean": ("col", "mean"), + "count": ("col", "count"), + "first": ("col", "first"), + "last": ("col", "last"), + }, + date_range=date_range, + closed=closed, + label=label + ) + + +@pytest.mark.parametrize("closed", ("left", "right")) +def test_resampling_duplicated_index_value_on_segment_boundary(lmdb_version_store_v1, closed): + lib = lmdb_version_store_v1 + sym = "test_resampling_duplicated_index_value_on_segment_boundary" + # Will group on microseconds + df_0 = pd.DataFrame({"col": np.arange(4)}, index=np.array([0, 1, 2, 1000], dtype="datetime64[ns]")) + df_1 = pd.DataFrame({"col": np.arange(4, 8)}, index=np.array([1000, 1000, 1000, 1000], dtype="datetime64[ns]")) + df_2 = pd.DataFrame({"col": np.arange(8, 12)}, index=np.array([1000, 1001, 2000, 2001], dtype="datetime64[ns]")) + lib.write(sym, df_0) + lib.append(sym, df_1) + lib.append(sym, df_2) + + generic_resample_test( + lib, + sym, + "us", + {"sum": ("col", "sum")}, + closed=closed, + ) + + +def test_resampling_timezones(lmdb_version_store_v1): + lib = lmdb_version_store_v1 + sym = "test_resampling_timezones" + # UK clocks go forward at 1am on March 31st in 2024 + index = pd.date_range("2024-03-31T00:00:00", freq="min", periods=240, tz="Europe/London") + df = pd.DataFrame({"col": np.arange(len(index))}, index=index) + lib.write(sym, df) + generic_resample_test( + lib, + sym, + "h", + {"sum": ("col", "sum")}, + ) + + # UK clocks go back at 2am on October 27th in 2024 + index = pd.date_range("2024-10-27T00:00:00", freq="min", periods=240, tz="Europe/London") + df = pd.DataFrame({"col": np.arange(len(index))}, index=index) + lib.write(sym, df) + generic_resample_test( + lib, + sym, + "h", + {"sum": ("col", "sum")}, + ) + + +def test_resampling_nan_correctness(lmdb_version_store_tiny_segment): + lib = lmdb_version_store_tiny_segment + sym = "test_resampling_nan_correctness" + # NaN here means NaT for datetime columns and NaN/None in string columns + # Create 5 buckets worth of data, each containing 3 values: + # - No nans + # - All nans + # - First value nan + # - Middle value nan + # - Last value nan + # Will group on microseconds + idx = [0, 1, 2, 1000, 1001, 1002, 2000, 2001, 2002, 3000, 3001, 3002, 4000, 4001, 4002] + idx = np.array(idx, dtype="datetime64[ns]") + float_col = np.arange(15, dtype=np.float64) + string_col = [f"str {str(i)}" for i in range(15)] + datetime_col = np.array(np.arange(0, 30, 2), dtype="datetime64[ns]") + for i in [3, 4, 5, 6, 10, 14]: + float_col[i] = np.nan + string_col[i] = None if i % 2 == 0 else np.nan + datetime_col[i] = np.datetime64('NaT') + + df = pd.DataFrame({"float_col": float_col, "string_col": string_col, "datetime_col": datetime_col}, index=idx) + lib.write(sym, df) + + agg_dict = { + "float_sum": ("float_col", "sum"), + "float_mean": ("float_col", "mean"), + "float_min": ("float_col", "min"), + "float_max": ("float_col", "max"), + "float_first": ("float_col", "first"), + "float_last": ("float_col", "last"), + "float_count": ("float_col", "count"), + } + + # Pandas 1.X does not support all of these aggregators/behaviours + if IS_PANDAS_TWO: + agg_dict.update( + { + "string_first": ("string_col", "first"), + "string_last": ("string_col", "last"), + "string_count": ("string_col", "count"), + "datetime_mean": ("datetime_col", "mean"), + "datetime_min": ("datetime_col", "min"), + "datetime_max": ("datetime_col", "max"), + "datetime_first": ("datetime_col", "first"), + "datetime_last": ("datetime_col", "last"), + "datetime_count": ("datetime_col", "count"), + } + ) + + generic_resample_test(lib, sym, "us", agg_dict) + + +def test_resampling_bool_columns(lmdb_version_store_tiny_segment): + lib = lmdb_version_store_tiny_segment + sym = "test_resampling_bool_columns" + + idx = [0, 1, 1000, 1001, 2000, 2001, 3000, 3001] + idx = np.array(idx, dtype="datetime64[ns]") + + col = [True, True, True, False, False, True, False, False] + + df = pd.DataFrame({"col": col}, index=idx) + lib.write(sym, df) + + generic_resample_test( + lib, + sym, + "us", + { + "sum": ("col", "sum"), + "mean": ("col", "mean"), + "min": ("col", "min"), + "max": ("col", "max"), + "first": ("col", "first"), + "last": ("col", "last"), + "count": ("col", "count"), + }, + ) + + +def test_resampling_dynamic_schema_types_changing(lmdb_version_store_dynamic_schema_v1): + lib = lmdb_version_store_dynamic_schema_v1 + sym = "test_resampling_dynamic_schema_types_changing" + # Will group on microseconds + idx_0 = [0, 1, 2, 1000] + idx_0 = np.array(idx_0, dtype="datetime64[ns]") + col_0 = np.arange(4, dtype=np.uint8) + df_0 = pd.DataFrame({"col": col_0}, index=idx_0) + lib.write(sym, df_0) + + idx_1 = [1001, 1002, 2000, 2001] + idx_1 = np.array(idx_1, dtype="datetime64[ns]") + col_1 = np.arange(1000, 1004, dtype=np.int64) + df_1 = pd.DataFrame({"col": col_1}, index=idx_1) + lib.append(sym, df_1) + + generic_resample_test( + lib, + sym, + "us", + { + "sum": ("col", "sum"), + "mean": ("col", "mean"), + "min": ("col", "min"), + "max": ("col", "max"), + "first": ("col", "first"), + "last": ("col", "last"), + "count": ("col", "count"), + }, + ) + + +def test_resampling_empty_bucket_in_range(lmdb_version_store_v1): + lib = lmdb_version_store_v1 + sym = "test_resampling_empty_bucket_in_range" + # Group on microseconds, so bucket 1000-1999 will be empty + idx = [0, 1, 2000, 2001] + idx = np.array(idx, dtype="datetime64[ns]") + col = np.arange(4, dtype=np.float64) + + df = pd.DataFrame({"col": col}, index=idx) + rng = np.random.default_rng() + df = pd.DataFrame( + { + "to_sum": rng.integers(0, 100, len(idx)), + "to_min": rng.integers(0, 100, len(idx)), + "to_max": rng.integers(0, 100, len(idx)), + "to_mean": rng.integers(0, 100, len(idx)), + "to_count": rng.integers(0, 100, len(idx)), + "to_first": rng.integers(0, 100, len(idx)), + "to_last": rng.integers(0, 100, len(idx)), + }, + index=idx, + ) + lib.write(sym, df) + + # Pandas recommended way to resample and exclude buckets with no index values, which is our behaviour + # See https://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html#sparse-resampling + def round(t, freq): + freq = pd.tseries.frequencies.to_offset(freq) + td = pd.Timedelta(freq) + return pd.Timestamp((t.value // td.value) * td.value) + + expected = df.groupby(partial(round, freq="us")).agg( + { + "to_sum": "sum", + "to_mean": "mean", + "to_min": "min", + "to_max": "max", + "to_first": "first", + "to_last": "last", + "to_count": "count", + } + ) + expected = expected.reindex(columns=sorted(expected.columns)) + expected["to_count"] = expected["to_count"].astype(np.uint64) + + q = QueryBuilder() + q = q.resample("us").agg( + { + "to_sum": "sum", + "to_mean": "mean", + "to_min": "min", + "to_max": "max", + "to_first": "first", + "to_last": "last", + "to_count": "count", + } + ) + received = lib.read(sym, query_builder=q).data + received = received.reindex(columns=sorted(received.columns)) + assert_frame_equal(expected, received, check_dtype=False) + + +@pytest.mark.parametrize("use_date_range", (True, False)) +@pytest.mark.parametrize("single_query", (True, False)) +def test_resampling_batch_read_query(lmdb_version_store_v1, use_date_range, single_query): + lib = lmdb_version_store_v1 + sym_0 = "test_resampling_batch_read_query_0" + sym_1 = "test_resampling_batch_read_query_1" + + if use_date_range: + date_range_0 = (pd.Timestamp("2024-01-01T01:00:00"), pd.Timestamp("2024-01-01T12:00:00")) + date_range_1 = (pd.Timestamp("2024-01-02T01:00:00"), pd.Timestamp("2024-01-02T11:00:00")) + date_ranges = [date_range_0, date_range_1] + else: + date_range_0 = None + date_range_1 = None + date_ranges = None + + df_0 = pd.DataFrame({"col": np.arange(2000)}, index=pd.date_range("2024-01-01", freq="min", periods=2000)) + df_1 = pd.DataFrame({"col": np.arange(1000)}, index=pd.date_range("2024-01-02", freq="min", periods=1000)) + lib.batch_write([sym_0, sym_1], [df_0, df_1]) + + if single_query: + agg_dict_0 = {"col": "sum"} + agg_dict_1 = agg_dict_0 + q = QueryBuilder().resample("h").agg(agg_dict_0) + else: + agg_dict_0 = {"col": "sum"} + agg_dict_1 = {"col": "mean"} + q_0 = QueryBuilder().resample("h").agg(agg_dict_0) + q_1 = QueryBuilder().resample("h").agg(agg_dict_1) + q = [q_0, q_1] + + # Date range filtering in Pandas is painful, so use our read call for that bit + expected_0 = lib.read(sym_0, date_range=date_range_0).data.resample("h").agg(agg_dict_0) + expected_1 = lib.read(sym_1, date_range=date_range_1).data.resample("h").agg(agg_dict_1) + expected_0 = expected_0.reindex(columns=sorted(expected_0.columns)) + expected_1 = expected_1.reindex(columns=sorted(expected_1.columns)) + + res = lib.batch_read([sym_0, sym_1], date_ranges=date_ranges, query_builder=q) + received_0 = res[sym_0].data + received_1 = res[sym_1].data + + received_0 = received_0.reindex(columns=sorted(received_0.columns)) + received_1 = received_1.reindex(columns=sorted(received_1.columns)) + assert_frame_equal(expected_0, received_0, check_dtype=False) + assert_frame_equal(expected_1, received_1, check_dtype=False) + + +# All following tests cover that an appropriate exception is thrown when unsupported operations are attempted + +@pytest.mark.parametrize("freq", ("B", "W", "M", "Q", "Y", "cbh", "bh", "BYS", "YS", "BYE", "YE", "BQS", "QS", "BQE", + "QE", "CBMS", "BMS", "SMS", "MS", "CBME", "BME", "SME", "ME", "C")) +def test_resample_rejects_unsupported_frequency_strings(freq): + with pytest.raises(ArcticDbNotYetImplemented): + QueryBuilder().resample(freq) + with pytest.raises(ArcticDbNotYetImplemented): + QueryBuilder().resample("2" + freq) + # Pandas 1.X throws an attribute error here + if IS_PANDAS_TWO: + with pytest.raises(ArcticDbNotYetImplemented): + QueryBuilder().resample(freq + "1h") + + +def test_resampling_unsupported_aggregation_type_combos(lmdb_version_store_v1): + lib = lmdb_version_store_v1 + sym = "test_resampling_unsupported_aggregation_type_combos" + + df = pd.DataFrame({"string": ["hello"], "datetime": [pd.Timestamp(0)]}, index=[pd.Timestamp(0)]) + lib.write(sym, df) + + for agg in ["sum", "mean", "min", "max"]: + q = QueryBuilder() + q = q.resample("min").agg({"string": agg}) + with pytest.raises(SchemaException): + lib.read(sym, query_builder=q) + + q = QueryBuilder() + q = q.resample("min").agg({"datetime": "sum"}) + with pytest.raises(SchemaException): + lib.read(sym, query_builder=q) + + +def test_resampling_dynamic_schema_missing_column(lmdb_version_store_dynamic_schema_v1): + lib = lmdb_version_store_dynamic_schema_v1 + sym = "test_resampling_dynamic_schema_missing_column" + + lib.write(sym, pd.DataFrame({"col_0": [0]}, index=[pd.Timestamp(0)])) + lib.append(sym, pd.DataFrame({"col_1": [1000]}, index=[pd.Timestamp(2000)])) + + # Schema exception should be thrown regardless of whether there are any buckets that span segments or not + q = QueryBuilder() + q = q.resample("us").agg({"col_0": "sum"}) + with pytest.raises(SchemaException): + lib.read(sym, query_builder=q) + + q = QueryBuilder() + q = q.resample("s").agg({"col_1": "sum"}) + with pytest.raises(SchemaException): + lib.read(sym, query_builder=q) + + +def test_resampling_sparse_data(lmdb_version_store_v1): + lib = lmdb_version_store_v1 + sym = "test_resampling_sparse_data" + + # col_1 will be dense, but with fewer rows than the index column, and so semantically sparse + data = { + "col_0": [np.nan, 1.0], + "col_1": [2.0, np.nan] + } + lib.write(sym, pd.DataFrame(data, index=[pd.Timestamp(0), pd.Timestamp(1000)]), sparsify_floats=True) + + q = QueryBuilder() + q = q.resample("us").agg({"col_0": "sum"}) + with pytest.raises(SchemaException): + lib.read(sym, query_builder=q) + + q = QueryBuilder() + q = q.resample("s").agg({"col_1": "sum"}) + with pytest.raises(SchemaException): + lib.read(sym, query_builder=q) + + +def test_resampling_empty_type_column(lmdb_version_store_empty_types_v1): + lib = lmdb_version_store_empty_types_v1 + sym = "test_resampling_empty_type_column" + + lib.write(sym, pd.DataFrame({"col": ["hello"]}, index=[pd.Timestamp(0)])) + lib.append(sym, pd.DataFrame({"col": [None]}, index=[pd.Timestamp(2000)])) + + # Schema exception should be thrown regardless of whether there are any buckets that span segments or not + q = QueryBuilder() + q = q.resample("us").agg({"col": "first"}) + with pytest.raises(SchemaException): + lib.read(sym, query_builder=q) + + q = QueryBuilder() + q = q.resample("s").agg({"col": "first"}) + with pytest.raises(SchemaException): + lib.read(sym, query_builder=q) \ No newline at end of file