Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(subscription): support blocking cursor #18675

Merged
merged 11 commits into from
Oct 8, 2024
Merged

Conversation

xxhZs
Copy link
Contributor

@xxhZs xxhZs commented Sep 24, 2024

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

#18208

In this pr, we support blocking subscription cursor and all cursor timeout.
the stmt of our fetch cursor becomes
fetch next/num from cursor_name with (timeout = 'xx');

about subscription cursor's timeout :

  1. When there are a lot of values in the subscription and the fetch is set to a large number of rows, the fetch statement may take longer than timeout time to execute. This time we will returns all the values currently fetched.
  2. When subscription has no new data at the moment and fetch is used, there are two scenarios
  • If you have already fetched more than one row, the fetch statement does not block, but returns all the values currently fetched.
  • If there is currently no value, we block until there is a new value, or until the blocking time exceeds timeout

about query cursor's timeout :
only support 1

When with (timeout = 'xx') is not set,
The query timeout is u64::max
The block timeout is 0

timeout is a string in interval format.

Checklist

  • I have written necessary rustdoc comments
  • I have added necessary unit tests and integration tests
  • I have added test labels as necessary. See details.
  • I have added fuzzing tests or opened an issue to track them. (Optional, recommended for new SQL features Sqlsmith: Sql feature generation #7934).
  • My PR contains breaking changes. (If it deprecates some features, please create a tracking issue to remove them in the future).
  • All checks passed in ./risedev check (or alias, ./risedev c)
  • My PR changes performance-critical code. (Please run macro/micro-benchmarks and show the results.)
  • My PR contains critical fixes that are necessary to be merged into the latest release. (Please check out the details)

Documentation

  • My PR needs documentation updates. (Please use the Release note section below to summarize the impact on users)

Release note

Support
fetch next/num from cursor_name with (timeout = 'xx');
About subscription cursor's timeout :

  1. When there are a lot of values in the subscription and the fetch is set to a large number of rows, the fetch statement may take longer than timeout time to execute. This time we will returns all the values currently fetched.
  2. When subscription has no new data at the moment and fetch is used, there are two scenarios
  • If we have already fetched more than one row, the fetch statement does not block, but returns all the values currently fetched.
  • If there is currently no value, we block until there is a new value, or until the blocking time exceeds timeout

About query cursor's timeout :
only support 1

When with (timeout = 'xx') is not set,
In the case of 1, this corresponds to timeout = u64::max
In the case of 2, this corresponds to timeout = 0

timeout is a string in interval format.

@xxhZs xxhZs added the user-facing-changes Contains changes that are visible to users label Sep 24, 2024
@xxhZs xxhZs changed the title feat(subscription): support blcok cursor feat(subscription): support block cursor Sep 24, 2024
@xxhZs xxhZs changed the title feat(subscription): support block cursor feat(subscription): support blockingcursor Sep 24, 2024
@xxhZs xxhZs changed the title feat(subscription): support blockingcursor feat(subscription): support blocking cursor Sep 24, 2024
@graphite-app graphite-app bot requested a review from a team September 24, 2024 11:16
fix notify

fmt
rename
@xxhZs xxhZs force-pushed the xxh/support-block-cursor branch from a7749eb to c413039 Compare September 25, 2024 06:13
@xxhZs xxhZs requested review from hzxa21 and wenym1 September 25, 2024 06:14
e2e_test/subscription/main.py Outdated Show resolved Hide resolved
src/frontend/src/handler/util.rs Outdated Show resolved Hide resolved
src/frontend/src/handler/fetch_cursor.rs Show resolved Hide resolved
src/frontend/src/observer/observer_manager.rs Outdated Show resolved Hide resolved
src/frontend/src/observer/observer_manager.rs Outdated Show resolved Hide resolved
src/frontend/src/catalog/root_catalog.rs Outdated Show resolved Hide resolved
src/frontend/src/session/cursor_manager.rs Outdated Show resolved Hide resolved
src/frontend/src/session/cursor_manager.rs Outdated Show resolved Hide resolved
if table_ids.is_empty() {
return;
}
for session in self.sessions_map.read().values() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently one CursorNotifier per session and within each CursorNotifier, we have one channel per table. Assume we have N sessions and M tables in total, we can end up creating N*M channels. Not to mention that we have a task in CursorNotifier to maintain the per table channel. These seem inefficient and complicated. IMO, a better way to implement the notification is:

  • Create one and only one watch channel to listen on table changed log updates.
  • When cursor needs to wait for changelog epoch changes for a table, it only needs to subscribe to this channel.

Implementation-wise, maybe we can store HummockSnapshotManager in each cursor instance and implement a wait_for_epoch similar to the method here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I think the current cursor implementation in frontend can be greatly simplified. The most complicated part in the current cursor is the state machine. The async feature of rust is a very powerful tool to generate state machine. So for a cursor, it can be implemented as a stream

#[try_stream(...)]
async fn cursor_stream(...) {
    if need_snapshot {
         for row in create_snapshot_iter(table_id) { yield row; }
    }
    let mut current_epoch = ...;
    loop {
        for row in change_log(current_epoch, table_id) { yield row; }
        current_epoch = snapshot_manager.wait_next_epoch(current_epoch, table_id);
    }
}

When a cursor is created, we can create such stream and hold it in the session. On a fetch, we just call cursor_stream.next().await, and the timeout can be applied with simply timeout(timeout_duration, cursor_stream.next()).await.

We can be aware of table drop in snapshot_manager, when we see that the table_id does not exist in the state_table_info of FrontendHummockVersion anymore, and then changes on catalog in this PR can be totally avoided.

@@ -471,16 +487,31 @@ impl SubscriptionCursor {
ans.push(row);
}
None => {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does None here always mean no data? I saw that we will return None when field mismatches here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, none means that there is no data for this call to next_row

src/storage/hummock_sdk/src/frontend_version.rs Outdated Show resolved Hide resolved
src/storage/hummock_sdk/src/frontend_version.rs Outdated Show resolved Hide resolved
src/storage/hummock_sdk/src/change_log.rs Outdated Show resolved Hide resolved
if table_ids.is_empty() {
return;
}
for session in self.sessions_map.read().values() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I think the current cursor implementation in frontend can be greatly simplified. The most complicated part in the current cursor is the state machine. The async feature of rust is a very powerful tool to generate state machine. So for a cursor, it can be implemented as a stream

#[try_stream(...)]
async fn cursor_stream(...) {
    if need_snapshot {
         for row in create_snapshot_iter(table_id) { yield row; }
    }
    let mut current_epoch = ...;
    loop {
        for row in change_log(current_epoch, table_id) { yield row; }
        current_epoch = snapshot_manager.wait_next_epoch(current_epoch, table_id);
    }
}

When a cursor is created, we can create such stream and hold it in the session. On a fetch, we just call cursor_stream.next().await, and the timeout can be applied with simply timeout(timeout_duration, cursor_stream.next()).await.

We can be aware of table drop in snapshot_manager, when we see that the table_id does not exist in the state_table_info of FrontendHummockVersion anymore, and then changes on catalog in this PR can be totally avoided.

fix comm

fmt
@xxhZs xxhZs requested a review from hzxa21 September 29, 2024 04:58
src/frontend/src/catalog/schema_catalog.rs Outdated Show resolved Hide resolved
src/frontend/src/scheduler/snapshot.rs Outdated Show resolved Hide resolved
src/frontend/src/session/cursor_manager.rs Outdated Show resolved Hide resolved
src/frontend/src/session/cursor_manager.rs Outdated Show resolved Hide resolved
e2e_test/subscription/main.py Outdated Show resolved Hide resolved
Copy link
Collaborator

@hzxa21 hzxa21 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@xxhZs xxhZs added this pull request to the merge queue Oct 8, 2024
Merged via the queue into main with commit 20699d4 Oct 8, 2024
11 of 18 checks passed
@xxhZs xxhZs deleted the xxh/support-block-cursor branch October 8, 2024 14:43
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants