Skip to content

Commit

Permalink
[Wasm] Omit materialization in wasm::SortMergeJoin if possible.
Browse files Browse the repository at this point in the history
Materialization can be omitted if the child operator
is a table scan and no sorting has to be performed.
In this case, we directly iterate over the underlying
table store instead of allocating a buffer.

Note that omitting materialization by reusing an
already filled buffer is possible in general, however,
our current implementation is restricted to passing
only tuples to parent operators.  Thus, this change
was easy to implement and will benefit our benchmarks
that assume an existing ordering on base relations.
  • Loading branch information
lucagretscher committed Apr 12, 2024
1 parent 3c8b2dd commit 1858b9c
Showing 1 changed file with 64 additions and 33 deletions.
97 changes: 64 additions & 33 deletions src/backend/WasmOperator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4213,38 +4213,47 @@ void SortMergeJoin<SortLeft, SortRight, Predicated, CmpPredicated>::execute(
teardown_t teardown)
{
auto &env = CodeGenContext::Get().env();
const bool needs_buffer_parent = not is<const ScanOperator>(M.parent) or SortLeft;
const bool needs_buffer_child = not is<const ScanOperator>(M.child) or SortRight;

/*----- Create infinite buffers to materialize the current results. -----*/
/*----- Create infinite buffers to materialize the current results (if necessary). -----*/
M_insist(bool(M.left_materializing_factory),
"`wasm::SortMergeJoin` must have a factory for the materialized left child");
M_insist(bool(M.right_materializing_factory),
"`wasm::SortMergeJoin` must have a factory for the materialized right child");
const auto schema_parent = M.parent.schema().drop_constants().deduplicate();
const auto schema_child = M.child.schema().drop_constants().deduplicate();
GlobalBuffer buffer_parent(schema_parent, *M.left_materializing_factory),
buffer_child(schema_child, *M.right_materializing_factory);
std::optional<GlobalBuffer> buffer_parent, buffer_child;
if (needs_buffer_parent)
buffer_parent.emplace(schema_parent, *M.left_materializing_factory);
if (needs_buffer_child)
buffer_child.emplace(schema_child, *M.right_materializing_factory);

/*----- Create child functions. -----*/
FUNCTION(sort_merge_join_parent_pipeline, void(void)) // create function for parent pipeline
{
auto S = CodeGenContext::Get().scoped_environment(); // create scoped environment for this function
M.children[0]->execute(
/* setup= */ setup_t::Make_Without_Parent([&](){ buffer_parent.setup(); }),
/* pipeline= */ [&](){ buffer_parent.consume(); },
/* teardown= */ teardown_t::Make_Without_Parent([&](){ buffer_parent.teardown(); })
);
if (needs_buffer_parent) {
FUNCTION(sort_merge_join_parent_pipeline, void(void)) // create function for parent pipeline
{
auto S = CodeGenContext::Get().scoped_environment(); // create scoped environment for this function
M.children[0]->execute(
/* setup= */ setup_t::Make_Without_Parent([&](){ buffer_parent->setup(); }),
/* pipeline= */ [&](){ buffer_parent->consume(); },
/* teardown= */ teardown_t::Make_Without_Parent([&](){ buffer_parent->teardown(); })
);
}
sort_merge_join_parent_pipeline(); // call parent function
}
FUNCTION(sort_merge_join_child_pipeline, void(void)) // create function for child pipeline
{
auto S = CodeGenContext::Get().scoped_environment(); // create scoped environment for this function
M.children[1]->execute(
/* setup= */ setup_t::Make_Without_Parent([&](){ buffer_child.setup(); }),
/* pipeline= */ [&](){ buffer_child.consume(); },
/* teardown= */ teardown_t::Make_Without_Parent([&](){ buffer_child.teardown(); })
);
if (needs_buffer_child) {
FUNCTION(sort_merge_join_child_pipeline, void(void)) // create function for child pipeline
{
auto S = CodeGenContext::Get().scoped_environment(); // create scoped environment for this function
M.children[1]->execute(
/* setup= */ setup_t::Make_Without_Parent([&](){ buffer_child->setup(); }),
/* pipeline= */ [&](){ buffer_child->consume(); },
/* teardown= */ teardown_t::Make_Without_Parent([&](){ buffer_child->teardown(); })
);
}
sort_merge_join_child_pipeline(); // call child function
}
sort_merge_join_parent_pipeline(); // call parent function
sort_merge_join_child_pipeline(); // call child function

/*----- Decompose each clause of the join predicate of the form `A.x = B.y` into parts `A.x` and `B.y`. -----*/
std::vector<SortingOperator::order_type> order_parent, order_child;
Expand All @@ -4266,9 +4275,9 @@ void SortMergeJoin<SortLeft, SortRight, Predicated, CmpPredicated>::execute(

/*----- If necessary, invoke sorting algorithm with buffer to sort. -----*/
if constexpr (SortLeft)
quicksort<CmpPredicated>(buffer_parent, order_parent);
quicksort<CmpPredicated>(*buffer_parent, order_parent);
if constexpr (SortRight)
quicksort<CmpPredicated>(buffer_child, order_child);
quicksort<CmpPredicated>(*buffer_child, order_child);

/*----- Create predicate to check if child co-group is smaller or equal than the one of the parent relation. -----*/
auto child_smaller_equal = [&]() -> Boolx1 {
Expand Down Expand Up @@ -4297,20 +4306,40 @@ void SortMergeJoin<SortLeft, SortRight, Predicated, CmpPredicated>::execute(
/*----- Compile data layouts to generate sequential loads from buffers. -----*/
static Schema empty_schema;
Var<U32x1> tuple_id_parent, tuple_id_child; // default initialized to 0
auto [inits_parent, loads_parent, _jumps_parent] =
compile_load_sequential(buffer_parent.schema(), empty_schema, buffer_parent.base_address(),
buffer_parent.layout(), 1, buffer_parent.schema(), tuple_id_parent);
auto [inits_child, loads_child, _jumps_child] =
compile_load_sequential(buffer_child.schema(), empty_schema, buffer_child.base_address(),
buffer_child.layout(), 1, buffer_child.schema(), tuple_id_child);
auto [inits_parent, loads_parent, _jumps_parent] = [&](){
if (needs_buffer_parent) {
return compile_load_sequential(buffer_parent->schema(), empty_schema, buffer_parent->base_address(),
buffer_parent->layout(), 1, buffer_parent->schema(), tuple_id_parent);
} else {
auto &scan = as<const ScanOperator>(M.parent);
return compile_load_sequential(schema_parent, empty_schema, get_base_address(scan.store().table().name()),
scan.store().table().layout(), 1, scan.store().table().schema(scan.alias()),
tuple_id_parent);
}
}();
auto [inits_child, loads_child, _jumps_child] = [&](){
if (needs_buffer_child) {
return compile_load_sequential(buffer_child->schema(), empty_schema, buffer_child->base_address(),
buffer_child->layout(), 1, buffer_child->schema(), tuple_id_child);
} else {
auto &scan = as<const ScanOperator>(M.child);
return compile_load_sequential(schema_child, empty_schema, get_base_address(scan.store().table().name()),
scan.store().table().layout(), 1, scan.store().table().schema(scan.alias()),
tuple_id_child);
}
}();
/* since structured bindings cannot be used in lambda capture */
Block jumps_parent(std::move(_jumps_parent)), jumps_child(std::move(_jumps_child));

/*----- Process both buffers together. -----*/
setup();
inits_parent.attach_to_current();
inits_child.attach_to_current();
WHILE (tuple_id_parent < buffer_parent.size() and tuple_id_child < buffer_child.size()) { // neither end reached
U32x1 size_parent = needs_buffer_parent ? buffer_parent->size()
: get_num_rows(as<const ScanOperator>(M.parent).store().table().name());
U32x1 size_child = needs_buffer_child ? buffer_child->size()
: get_num_rows(as<const ScanOperator>(M.child).store().table().name());
WHILE (tuple_id_parent < size_parent and tuple_id_child < size_child) { // neither end reached
loads_parent.attach_to_current();
loads_child.attach_to_current();
if constexpr (Predicated) {
Expand Down Expand Up @@ -5309,11 +5338,13 @@ void Match<m::wasm::SortMergeJoin<SortLeft, SortRight, Predicated, CmpPredicated
case 2: out << "sorting left input " << (CmpPredicated ? "predicated " : ""); break;
case 3: out << "sorting both inputs " << (CmpPredicated ? "predicated " : ""); break;
}
if (this->left_materializing_factory and this->right_materializing_factory)
const bool needs_buffer_parent = not is<const ScanOperator>(this->parent) or SortLeft;
const bool needs_buffer_child = not is<const ScanOperator>(this->child) or SortRight;
if (needs_buffer_parent and needs_buffer_child)
out << "and materializing both inputs ";
else if (this->left_materializing_factory)
else if (needs_buffer_parent)
out << "and materializing left input ";
else if (this->right_materializing_factory)
else if (needs_buffer_child)
out << "and materializing right input ";
out << this->join.schema() << print_info(this->join) << " (cumulative cost " << cost() << ')';

Expand Down

0 comments on commit 1858b9c

Please sign in to comment.