Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(streaming): introduce streaming AsOf JOIN executor #18242

Merged
merged 13 commits into from
Sep 13, 2024
Merged

Conversation

yuhao-su
Copy link
Contributor

@yuhao-su yuhao-su commented Aug 26, 2024

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

Introduce streaming AsOf join executor. https://duckdb.org/docs/guides/sql_features/asof_join.html

When a record comes from the left side, it will look up the first matched record on the right side.

When a record comes from the right side, we first find out which previously matched record will be affected, and then send delete/insert those records, and send insert/delete the new matched result.

We use a secondary index in JoinEntryState to do the range scan.

#17765

Checklist

  • I have written necessary rustdoc comments
  • I have added necessary unit tests and integration tests
  • I have added test labels as necessary. See details.
  • I have added fuzzing tests or opened an issue to track them. (Optional, recommended for new SQL features Sqlsmith: Sql feature generation #7934).
  • My PR contains breaking changes. (If it deprecates some features, please create a tracking issue to remove them in the future).
  • All checks passed in ./risedev check (or alias, ./risedev c)
  • My PR changes performance-critical code. (Please run macro/micro-benchmarks and show the results.)
  • My PR contains critical fixes that are necessary to be merged into the latest release. (Please check out the details)

Documentation

  • My PR needs documentation updates. (Please use the Release note section below to summarize the impact on users)

Release note

If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.

@fuyufjh fuyufjh changed the title feat(streaming): introduce streaming as of join executor feat(streaming): introduce streaming AS OF JOIN executor Aug 28, 2024
@yuhao-su yuhao-su changed the title feat(streaming): introduce streaming AS OF JOIN executor feat(streaming): introduce streaming AsOf JOIN executor Aug 28, 2024
@chenzl25 chenzl25 requested a review from xxchan September 3, 2024 06:22
Comment on lines 472 to 480
// Left deduped input pk indices. The pk of the left_table and
// left_degree_table is [left_join_key | left_deduped_input_pk_indices]
// and is expected to be the shortest key which starts with
// the join key and satisfies unique constrain.
repeated uint32 left_deduped_input_pk_indices = 9;
// Right deduped input pk indices. The pk of the right_table and
// right_degree_table is [right_join_key | right_deduped_input_pk_indices]
// and is expected to be the shortest key which starts with
// the join key and satisfies unique constrain.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

offline discussion: the pk of the state table will be join key | inequality key | stream key and inequality key will be the first field in deduped_input_pk_indice. The inequality key will be used for further optimization. But Currently executor implementation will not handle it specially.

Copy link
Contributor Author

@yuhao-su yuhao-su Sep 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will update the comment

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

inequality key will be the first field in deduped_input_pk_indice

This is not necessary.

use crate::executor::prelude::*;

/// Evict the cache every n rows.
const EVICT_EVERY_N_ROWS: u32 = 16;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let the hash join and asof join refer to the same variable, or use different names

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The should be different variables to control the different eviction frequencies. Consider they will only be used inside a file, using the same name is totally okay


impl<K: HashKey, S: StateStore, const T: AsOfJoinTypePrimitive> AsOfJoinExecutor<K, S, T> {
#[allow(clippy::too_many_arguments)]
pub fn new(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add a JoinBuilder and implement JoinBuilder::build_asof_join and build_hash_join to extract some common logic

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are a lot of subtle differences between building asof join and hash join. Extracting common logic costs more than it earns.

src/stream/src/executor/asof_join.rs Outdated Show resolved Hide resolved
src/stream/src/executor/asof_join.rs Outdated Show resolved Hide resolved
side_r,
asof_desc,
actual_output_data_types,
// inequality_watermarks,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

create a issue for it?

} else {
// Row which violates null-safe bitmap will never be matched so we need not
// store.
// Noop here because we only support left outer AsOf join.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have we assert this somewhere?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the planner I'm working on.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we still should add one in the executor part such as in from_proto. To prevent confusion

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will use JoinType instead of AsOfJoinType in proto in planner PR, so I can do the check then

src/stream/src/executor/asof_join.rs Show resolved Hide resolved
@yuhao-su yuhao-su requested a review from st1page September 12, 2024 10:16
@yuhao-su yuhao-su added this pull request to the merge queue Sep 13, 2024
Merged via the queue into main with commit bc9c2ad Sep 13, 2024
31 of 32 checks passed
@yuhao-su yuhao-su deleted the yuhao/as-of-join branch September 13, 2024 05:18
@fuyufjh fuyufjh mentioned this pull request Sep 23, 2024
4 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants