Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resampling MVP #1495

Merged
merged 34 commits into from
May 30, 2024
Merged
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
b256370
Enhancement 1010: Resampling MVP
alexowens90 Apr 3, 2024
2e67393
Revert to C++17
alexowens90 May 9, 2024
e752b65
Comment changes
alexowens90 May 10, 2024
03df99c
Revert change to lmdb_version_store_tiny_segment
alexowens90 May 10, 2024
6c3739c
Remove check that input has initial expected get calls in split_by_ro…
alexowens90 May 10, 2024
519e56c
Use Bucket class in aggregation as well
alexowens90 May 10, 2024
37d9810
Remove Pandas date_range timing/logging, and modified some formatting
alexowens90 May 10, 2024
8ac1c3b
Move all sorted aggregation stuff to own files
alexowens90 May 10, 2024
cac141e
Renaming refactor
alexowens90 May 13, 2024
28404a8
Renaming refactor
alexowens90 May 13, 2024
6464068
Remove unused function
alexowens90 May 13, 2024
9e79d0f
Remove summing timestamps from supported aggregations
alexowens90 May 13, 2024
76e0baf
Started refactoring aggregation
alexowens90 May 13, 2024
b4e6aa7
Added push_to_aggregator method
alexowens90 May 13, 2024
39ab291
USe push_to_aggregator in the other relevant place
alexowens90 May 14, 2024
91080f6
Fixed test_resampling_unsupported_aggregation_type_combos
alexowens90 May 14, 2024
d2369eb
Factor out finalize_aggregator
alexowens90 May 14, 2024
32547a2
Presize output index column in blocks, and trim unused blocks at the end
alexowens90 May 14, 2024
868ffa0
USe constexpr where possible
alexowens90 May 14, 2024
108867a
Reinstate all tests, reorder source files
alexowens90 May 14, 2024
91e02f4
Comment changes
alexowens90 May 14, 2024
de3bd8e
Use ColumnDataIterator in copy_frame_data_to_buffer
alexowens90 May 14, 2024
7f1d178
Revert accidentally committed change to task scheduler
alexowens90 May 14, 2024
e04b124
Comment updates
alexowens90 May 14, 2024
bb3639c
Move profile_resample.py out of tests directory
alexowens90 May 14, 2024
d9c0506
Resample docstring
alexowens90 May 14, 2024
1d5ab87
Fix mac build?
alexowens90 May 14, 2024
13f598d
Fix tests
alexowens90 May 15, 2024
b7c7a1d
Make resamply.py in ASV benchmarks dir
alexowens90 May 15, 2024
05b92ca
Dummy commit
alexowens90 May 16, 2024
4ecf145
Resampling ASV benchmarks
alexowens90 May 16, 2024
0264dc4
Update benchmarks.json file too
alexowens90 May 16, 2024
cf6772f
Remove ASV features added in 0.6.0
alexowens90 May 17, 2024
c2d994c
Address review comments
alexowens90 May 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ __pycache__/
.vscode/
.vs/
.project
.idea

*.so
*.a
Expand Down
13 changes: 10 additions & 3 deletions cpp/arcticdb/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ set(arcticdb_srcs
pipeline/value_set.hpp
pipeline/write_frame.hpp
pipeline/write_options.hpp
processing/aggregation.hpp
processing/aggregation_utils.hpp
processing/component_manager.hpp
processing/operation_dispatch.hpp
processing/operation_dispatch_binary.hpp
Expand All @@ -263,6 +263,8 @@ set(arcticdb_srcs
processing/clause.hpp
processing/expression_context.hpp
processing/expression_node.hpp
processing/sorted_aggregation.hpp
processing/unsorted_aggregation.hpp
storage/constants.hpp
storage/common.hpp
storage/config_resolvers.hpp
Expand Down Expand Up @@ -425,7 +427,7 @@ set(arcticdb_srcs
pipeline/write_frame.cpp
python/normalization_checks.cpp
processing/processing_unit.cpp
processing/aggregation.cpp
processing/aggregation_utils.cpp
processing/clause.cpp
processing/component_manager.cpp
processing/expression_node.cpp
Expand All @@ -436,6 +438,8 @@ set(arcticdb_srcs
processing/operation_dispatch_binary_gt.cpp
processing/operation_dispatch_binary_lt.cpp
processing/operation_dispatch_binary_operator.cpp
processing/sorted_aggregation.cpp
processing/unsorted_aggregation.cpp
python/python_to_tensor_frame.cpp
storage/config_resolvers.cpp
storage/failure_simulation.cpp
Expand Down Expand Up @@ -786,6 +790,7 @@ if(${TEST})
processing/test/test_filter_and_project_sparse.cpp
processing/test/test_has_valid_type_promotion.cpp
processing/test/test_operation_dispatch.cpp
processing/test/test_resample.cpp
processing/test/test_set_membership.cpp
processing/test/test_signed_unsigned_comparison.cpp
processing/test/test_type_comparison.cpp
Expand Down Expand Up @@ -931,12 +936,14 @@ if(${TEST})
column_store/test/rapidcheck_column_data_random_accessor.cpp
column_store/test/rapidcheck_column_map.cpp
column_store/test/test_chunked_buffer.cpp
processing/test/rapidcheck_resample.cpp
stream/test/stream_test_common.cpp
util/test/rapidcheck_decimal.cpp
util/test/rapidcheck_generators.cpp
util/test/rapidcheck_string_pool.cpp
util/test/rapidcheck_main.cpp
version/test/rapidcheck_version_map.cpp)
version/test/rapidcheck_version_map.cpp
)

add_executable(arcticdb_rapidcheck_tests ${rapidcheck_srcs})
install(TARGETS arcticdb_rapidcheck_tests RUNTIME
Expand Down
2 changes: 1 addition & 1 deletion cpp/arcticdb/async/task_scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ class TaskScheduler {
using CPUSchedulerType = folly::FutureExecutor<folly::CPUThreadPoolExecutor>;
using IOSchedulerType = folly::FutureExecutor<folly::IOThreadPoolExecutor>;

explicit TaskScheduler(const std::optional<size_t>& cpu_thread_count = std::nullopt, const std::optional<size_t>& io_thread_count = std::nullopt) :
explicit TaskScheduler(const std::optional<size_t>& cpu_thread_count = std::nullopt, const std::optional<size_t>& io_thread_count = std::nullopt) :
cpu_thread_count_(cpu_thread_count ? *cpu_thread_count : ConfigsMap::instance()->get_int("VersionStore.NumCPUThreads", get_default_num_cpus())),
io_thread_count_(io_thread_count ? *io_thread_count : ConfigsMap::instance()->get_int("VersionStore.NumIOThreads", std::min(100, (int) (cpu_thread_count_ * 1.5)))),
cpu_exec_(cpu_thread_count_, std::make_shared<InstrumentedNamedFactory>("CPUPool")) ,
Expand Down
15 changes: 12 additions & 3 deletions cpp/arcticdb/async/tasks.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <arcticdb/codec/codec.hpp>

#include <type_traits>
#include <ranges>

namespace arcticdb::async {

Expand Down Expand Up @@ -405,12 +406,20 @@ struct MemSegmentProcessingTask : BaseTask {
ARCTICDB_MOVE_ONLY_DEFAULT(MemSegmentProcessingTask)

Composite<EntityIds> operator()() {
for(const auto& clause : clauses_) {
entity_ids_ = clause->process(std::move(entity_ids_));
// TODO: Replace with commented out code once C++20 is reinstated
for (auto clause = clauses_.crbegin(); clause != clauses_.crend(); ++clause) {
entity_ids_ = (*clause)->process(std::move(entity_ids_));

if(clause->clause_info().requires_repartition_)
if((*clause)->clause_info().requires_repartition_)
break;
}
// std::ranges::reverse_view reversed_clauses{clauses_};
alexowens90 marked this conversation as resolved.
Show resolved Hide resolved
// for (const auto& clause: reversed_clauses) {
// entity_ids_ = clause->process(std::move(entity_ids_));
//
// if(clause->clause_info().requires_repartition_)
// break;
// }
return std::move(entity_ids_);
}

Expand Down
19 changes: 19 additions & 0 deletions cpp/arcticdb/column_store/chunked_buffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,25 @@ class ChunkedBufferImpl {
return res;
}

// Trim will reduce the size of the chunked buffer to the specified size by dropping blocks that are wholly unneeded
// i.e. no allocation/memcpy is involved, only deallocation
// Use in performance critical code where a column is being created, the final size is unknown at construction
// time but a maximum size is known, by creating a Column using a chunked buffer that is presized in blocks. This
// unlocks ColumnDataIterator usage (more performant than repeated calls to Column::push_back). Once the column is
// created and the number of elements known, use this to drop unneeded blocks.
void trim(size_t requested_size) {
internal::check<ErrorCode::E_ASSERTION_FAILURE>(requested_size <= bytes_,
"Cannot trim ChunkedBuffer with {} bytes to {} bytes",
bytes_,
requested_size);
while (bytes_ - last_block().bytes() >= requested_size) {
bytes_ -= last_block().bytes();
free_last_block();
}
last_block().resize(last_block().bytes() - (bytes_ - requested_size));
bytes_ = requested_size;
}

struct BlockAndOffset {
MemBlock* block_;
size_t offset_;
Expand Down
12 changes: 8 additions & 4 deletions cpp/arcticdb/column_store/column.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -768,17 +768,19 @@ class Column {
// One sparse, one dense. Use the enumerating forward iterator over the sparse column as it is more efficient than random access
auto right_accessor = random_accessor<right_input_tdt>(&right_input_data);
const auto right_column_row_count = right_input_column.row_count();
const auto left_input_data_cend = left_input_data.cend<left_input_tdt, IteratorType::ENUMERATED, IteratorDensity::SPARSE>();
for (auto left_it = left_input_data.cbegin<left_input_tdt, IteratorType::ENUMERATED, IteratorDensity::SPARSE>();
left_it != left_input_data.cend<left_input_tdt, IteratorType::ENUMERATED, IteratorDensity::SPARSE>() && left_it->idx() < right_column_row_count;
left_it != left_input_data_cend && left_it->idx() < right_column_row_count;
++left_it) {
*output_it++ = f(left_it->value(), right_accessor.at(left_it->idx()));
}
} else if (!left_input_column.is_sparse() && right_input_column.is_sparse()) {
// One sparse, one dense. Use the enumerating forward iterator over the sparse column as it is more efficient than random access
auto left_accessor = random_accessor<left_input_tdt>(&left_input_data);
const auto left_column_row_count = left_input_column.row_count();
const auto right_input_data_cend = right_input_data.cend<right_input_tdt, IteratorType::ENUMERATED, IteratorDensity::SPARSE>();
for (auto right_it = right_input_data.cbegin<right_input_tdt, IteratorType::ENUMERATED, IteratorDensity::SPARSE>();
right_it != right_input_data.cend<right_input_tdt, IteratorType::ENUMERATED, IteratorDensity::SPARSE>() && right_it->idx() < left_column_row_count;
right_it != right_input_data_cend && right_it->idx() < left_column_row_count;
++right_it) {
*output_it++ = f(left_accessor.at(right_it->idx()), right_it->value());
}
Expand Down Expand Up @@ -871,8 +873,9 @@ class Column {
initialise_output_bitset(left_input_column.sparse_map(), sparse_missing_value_output, output_bitset);
auto right_accessor = random_accessor<right_input_tdt>(&right_input_data);
const auto right_column_row_count = right_input_column.row_count();
const auto left_input_data_cend = left_input_data.cend<left_input_tdt, IteratorType::ENUMERATED, IteratorDensity::SPARSE>();
for (auto left_it = left_input_data.cbegin<left_input_tdt, IteratorType::ENUMERATED, IteratorDensity::SPARSE>();
left_it != left_input_data.cend<left_input_tdt, IteratorType::ENUMERATED, IteratorDensity::SPARSE>() && left_it->idx() < right_column_row_count;
left_it != left_input_data_cend && left_it->idx() < right_column_row_count;
++left_it) {
if(f(left_it->value(), right_accessor.at(left_it->idx()))) {
inserter = left_it->idx();
Expand All @@ -883,8 +886,9 @@ class Column {
initialise_output_bitset(right_input_column.sparse_map(), sparse_missing_value_output, output_bitset);
auto left_accessor = random_accessor<left_input_tdt>(&left_input_data);
const auto left_column_row_count = left_input_column.row_count();
const auto right_input_data_cend = right_input_data.cend<right_input_tdt, IteratorType::ENUMERATED, IteratorDensity::SPARSE>();
for (auto right_it = right_input_data.cbegin<right_input_tdt, IteratorType::ENUMERATED, IteratorDensity::SPARSE>();
right_it != right_input_data.cend<right_input_tdt, IteratorType::ENUMERATED, IteratorDensity::SPARSE>() && right_it->idx() < left_column_row_count;
right_it != right_input_data_cend && right_it->idx() < left_column_row_count;
++right_it) {
if(f(left_accessor.at(right_it->idx()), right_it->value())) {
inserter = right_it->idx();
Expand Down
2 changes: 1 addition & 1 deletion cpp/arcticdb/column_store/test/test_memory_segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ TEST(MemSegment, StdFindIf) {
auto num_rows = 100u;
auto frame_wrapper = get_test_timeseries_frame("modify", num_rows, 0);
auto &segment = frame_wrapper.segment_;
auto it = std::find_if(std::begin(segment), std::end(segment), [] (SegmentInMemory::Row& row) { return row.template index<TimeseriesIndex>() == 50; });
const auto it = std::find_if(std::begin(segment), std::end(segment), [] (SegmentInMemory::Row& row) { return row.template index<TimeseriesIndex>() == 50; });
auto val_it = it->begin();
ASSERT_EQ(it->index<TimeseriesIndex>(), 50);
std::advance(val_it, 1);
Expand Down
2 changes: 1 addition & 1 deletion cpp/arcticdb/pipeline/column_stats.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#include <pipeline/column_stats.hpp>

#include <arcticdb/processing/aggregation_interface.hpp>
#include <arcticdb/processing/aggregation.hpp>
#include <arcticdb/processing/unsorted_aggregation.hpp>
#include <arcticdb/entity/type_utils.hpp>
#include <arcticdb/util/preconditions.hpp>

Expand Down
8 changes: 8 additions & 0 deletions cpp/arcticdb/pipeline/frame_slice.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,14 @@ struct RangesAndKey {
RangesAndKey() = delete;
ARCTICDB_MOVE_COPY_DEFAULT(RangesAndKey)

bool operator==(const RangesAndKey& right) const {
alexowens90 marked this conversation as resolved.
Show resolved Hide resolved
return row_range_ == right.row_range_ && col_range_ == right.col_range_ && key_ == right.key_;
}

bool operator!=(const RangesAndKey& right) const {
return !(*this == right);
}

RowRange row_range_;
ColRange col_range_;
entity::AtomKey key_;
Expand Down
27 changes: 27 additions & 0 deletions cpp/arcticdb/processing/aggregation_utils.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/* Copyright 2024 Man Group Operations Limited
*
* Use of this software is governed by the Business Source License 1.1 included in the file licenses/BSL.txt.
*
* As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0.
*/

#include <arcticdb/entity/type_utils.hpp>
#include <arcticdb/processing/aggregation_utils.hpp>

namespace arcticdb {

void add_data_type_impl(entity::DataType data_type, std::optional<entity::DataType>& current_data_type) {
if (current_data_type.has_value()) {
auto common_type = has_valid_common_type(entity::TypeDescriptor(*current_data_type, 0),
entity::TypeDescriptor(data_type, 0));
schema::check<ErrorCode::E_UNSUPPORTED_COLUMN_TYPE>(
common_type.has_value(),
"Cannot perform aggregation on column, incompatible types present: {} and {}",
entity::TypeDescriptor(*current_data_type, 0), entity::TypeDescriptor(data_type, 0));
current_data_type = common_type->data_type();
} else {
current_data_type = data_type;
}
}

} // namespace arcticdb
16 changes: 16 additions & 0 deletions cpp/arcticdb/processing/aggregation_utils.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/* Copyright 2024 Man Group Operations Limited
*
* Use of this software is governed by the Business Source License 1.1 included in the file licenses/BSL.txt.
*
* As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0.
*/

#pragma once

#include <arcticdb/entity/types.hpp>

namespace arcticdb {

void add_data_type_impl(entity::DataType data_type, std::optional<entity::DataType>& current_data_type);

} // namespace arcticdb
Loading
Loading