Skip to content

Commit

Permalink
First attempt
Browse files Browse the repository at this point in the history
  • Loading branch information
alexowens90 committed Mar 18, 2024
1 parent 05ec9d6 commit 574b13c
Show file tree
Hide file tree
Showing 9 changed files with 42 additions and 17 deletions.
4 changes: 2 additions & 2 deletions cpp/arcticdb/column_store/memory_segment.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -385,10 +385,10 @@ class SegmentInMemory {
impl_->set_string_pool(string_pool);
}

SegmentInMemory filter(const util::BitSet& filter_bitset,
SegmentInMemory filter(util::BitSet&& filter_bitset,
bool filter_down_stringpool=false,
bool validate=false) const{
return SegmentInMemory(impl_->filter(filter_bitset, filter_down_stringpool, validate));
return SegmentInMemory(impl_->filter(std::move(filter_bitset), filter_down_stringpool, validate));
}

/// @see SegmentInMemoryImpl::truncate
Expand Down
16 changes: 9 additions & 7 deletions cpp/arcticdb/column_store/memory_segment_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,10 @@ void SegmentInMemoryImpl::drop_column(std::string_view name) {
column_map_->erase(name);
}

std::shared_ptr<SegmentInMemoryImpl> SegmentInMemoryImpl::filter(const util::BitSet& filter_bitset,
std::shared_ptr<SegmentInMemoryImpl> SegmentInMemoryImpl::filter(util::BitSet&& filter_bitset,
bool filter_down_stringpool,
bool validate) const {
filter_bitset.resize(row_count());
bool is_input_sparse = is_sparse();
auto num_values = filter_bitset.count();
if(num_values == 0)
Expand Down Expand Up @@ -210,18 +211,19 @@ std::shared_ptr<SegmentInMemoryImpl> SegmentInMemoryImpl::filter(const util::Bit
} else {
bitset_including_sparse.resize((*column)->row_count());
}
if (bitset_including_sparse.count() == 0) {
// No values are set in the sparse column, skip it
return;
}
output_col_idx = output->add_column(field(column.index), bitset_including_sparse.count(), true);
final_bitset = &bitset_including_sparse;
} else {
final_bitset = &filter_bitset;
}
auto& output_col = output->column(position_t(output_col_idx));
if (sparse_map)
if (sparse_map) {
output_col.opt_sparse_map() = std::make_optional<util::BitSet>();
if (final_bitset->count() == 0) {
// No values are set in the sparse column, no more work to do
return;
}
}
auto output_ptr = reinterpret_cast<RawType*>(output_col.ptr());
auto input_data = (*column)->data();

Expand Down Expand Up @@ -585,7 +587,7 @@ std::vector<std::shared_ptr<SegmentInMemoryImpl>> SegmentInMemoryImpl::split(siz
util::BitSetSizeType end = std::min(start + rows, total_rows);
// set_range is close interval on [left, right]
bitset.set_range(start, end - 1, true);
output.emplace_back(filter(bitset));
output.emplace_back(filter(std::move(bitset)));
}
return output;
}
Expand Down
2 changes: 1 addition & 1 deletion cpp/arcticdb/column_store/memory_segment_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -758,7 +758,7 @@ class SegmentInMemoryImpl {

std::shared_ptr<SegmentInMemoryImpl> get_output_segment(size_t num_values, bool pre_allocate=true) const;

std::shared_ptr<SegmentInMemoryImpl> filter(const util::BitSet& filter_bitset,
std::shared_ptr<SegmentInMemoryImpl> filter(util::BitSet&& filter_bitset,
bool filter_down_stringpool=false,
bool validate=false) const;

Expand Down
4 changes: 2 additions & 2 deletions cpp/arcticdb/pipeline/filter_segment.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@
namespace arcticdb {

inline SegmentInMemory filter_segment(const SegmentInMemory& input,
const util::BitSet& filter_bitset,
util::BitSet&& filter_bitset,
bool filter_down_stringpool=false,
bool validate=false) {
return input.filter(filter_bitset, filter_down_stringpool, validate);
return input.filter(std::move(filter_bitset), filter_down_stringpool, validate);
}

inline std::vector<SegmentInMemory> partition_segment(const SegmentInMemory& input,
Expand Down
4 changes: 2 additions & 2 deletions cpp/arcticdb/processing/clause.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -267,9 +267,9 @@ Composite<EntityIds> FilterClause::process(
proc.set_expression_context(expression_context_);
auto variant_data = proc.get(expression_context_->root_node_name_);
util::variant_match(variant_data,
[&proc, &output, this](const util::BitSet& bitset) {
[&proc, &output, this](util::BitSet& bitset) {
if (bitset.count() > 0) {
proc.apply_filter(bitset, optimisation_);
proc.apply_filter(std::move(bitset), optimisation_);
output.push_back(push_entities(component_manager_, std::move(proc)));
} else {
log::version().debug("Filter returned empty result");
Expand Down
4 changes: 2 additions & 2 deletions cpp/arcticdb/processing/processing_unit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@
namespace arcticdb {

void ProcessingUnit::apply_filter(
const util::BitSet& bitset,
util::BitSet&& bitset,
PipelineOptimisation optimisation) {
internal::check<ErrorCode::E_ASSERTION_FAILURE>(segments_.has_value() && row_ranges_.has_value() && col_ranges_.has_value(),
"ProcessingUnit::apply_filter requires all of segments, row_ranges, and col_ranges to be present");
auto filter_down_stringpool = optimisation == PipelineOptimisation::MEMORY;

for (auto&& [idx, segment]: folly::enumerate(*segments_)) {
auto seg = filter_segment(*segment,
bitset,
std::move(bitset),
filter_down_stringpool);
auto num_rows = seg.is_null() ? 0 : seg.row_count();
row_ranges_->at(idx) = std::make_shared<pipelines::RowRange>(row_ranges_->at(idx)->first, row_ranges_->at(idx)->first + num_rows);
Expand Down
2 changes: 1 addition & 1 deletion cpp/arcticdb/processing/processing_unit.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ namespace arcticdb {
return *this;
}

void apply_filter(const util::BitSet& bitset, PipelineOptimisation optimisation);
void apply_filter(util::BitSet&& bitset, PipelineOptimisation optimisation);

void truncate(size_t start_row, size_t end_row);

Expand Down
2 changes: 2 additions & 0 deletions cpp/arcticdb/version/version_core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -864,6 +864,8 @@ void copy_frame_data_to_buffer(const SegmentInMemory& destination, size_t target
return;
}
auto& src_column = source.column(static_cast<position_t>(source_index));
internal::check<ErrorCode::E_INVALID_ARGUMENT>(!src_column.opt_sparse_map().has_value(),
"QueryBuilder not yet supported with sparse data");
auto& dst_column = destination.column(static_cast<position_t>(target_index));
auto& buffer = dst_column.data().buffer();
auto dst_rawtype_size = sizeof_datatype(dst_column.type());
Expand Down
21 changes: 21 additions & 0 deletions python/tests/unit/arcticdb/version_store/test_filtering.py
Original file line number Diff line number Diff line change
Expand Up @@ -2314,3 +2314,24 @@ def test_filter_with_column_slicing_defragmented(lmdb_version_store_tiny_segment
received = lmdb_version_store_tiny_segment.read(symbol, query_builder=q).data
expected = df.query(pandas_query)
assert np.array_equal(expected, received) and (not expected.empty and not received.empty)


def test_filter_sparse(lmdb_version_store):
# Remove this test and replace with more extensive tests once this issue is fixed:
# https://github.com/man-group/ArcticDB/issues/1404
lib = lmdb_version_store
sym = "test_filter_sparse"
df = pd.DataFrame({"col": [0.0, np.nan, 0.0]}, index=pd.date_range("2024-01-01", periods=3))
lib.write(sym, df, sparsify_floats=True)

# These 2 queries exercise different code paths
q = QueryBuilder()
q = q[q["col"].isnull()]
with pytest.raises(InternalException):
lib.read(sym, query_builder=q)

q = QueryBuilder()
q = q[q["col"] == np.float64(0.0)]
with pytest.raises(InternalException):
lib.read(sym, query_builder=q)

0 comments on commit 574b13c

Please sign in to comment.