Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(subscription): support blocking cursor #18675

Merged
merged 11 commits into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 87 additions & 41 deletions e2e_test/subscription/main.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import subprocess
import psycopg2
import threading
import time


Expand Down Expand Up @@ -54,9 +55,9 @@ def test_cursor_snapshot():
)

execute_insert("declare cur subscription cursor for sub full",conn)
row = execute_query("fetch next from cur",conn)
row = execute_query("fetch next from cur with (timeout = '2s')",conn)
xxhZs marked this conversation as resolved.
Show resolved Hide resolved
check_rows_data([1,2],row[0],"Insert")
row = execute_query("fetch next from cur",conn)
row = execute_query("fetch next from cur with (timeout = '2s')",conn)
xxhZs marked this conversation as resolved.
Show resolved Hide resolved
assert row == []
execute_insert("close cur",conn)
drop_table_subscription()
Expand All @@ -73,19 +74,19 @@ def test_cursor_snapshot_log_store():
)

execute_insert("declare cur subscription cursor for sub full",conn)
row = execute_query("fetch next from cur",conn)
row = execute_query("fetch next from cur with (timeout = '2s')",conn)
check_rows_data([1,2],row[0],"Insert")
row = execute_query("fetch next from cur",conn)
row = execute_query("fetch next from cur with (timeout = '2s')",conn)
assert row == []
execute_insert("insert into t1 values(4,4)",conn)
execute_insert("flush",conn)
execute_insert("insert into t1 values(5,5)",conn)
execute_insert("flush",conn)
row = execute_query("fetch next from cur",conn)
row = execute_query("fetch next from cur with (timeout = '2s')",conn)
check_rows_data([4,4],row[0],"Insert")
row = execute_query("fetch next from cur",conn)
row = execute_query("fetch next from cur with (timeout = '2s')",conn)
check_rows_data([5,5],row[0],"Insert")
row = execute_query("fetch next from cur",conn)
row = execute_query("fetch next from cur with (timeout = '2s')",conn)
assert row == []
execute_insert("close cur",conn)
drop_table_subscription()
Expand All @@ -107,13 +108,13 @@ def test_cursor_since_begin():
execute_insert("declare cur subscription cursor for sub since begin()",conn)
execute_insert("insert into t1 values(6,6)",conn)
execute_insert("flush",conn)
row = execute_query("fetch next from cur",conn)
row = execute_query("fetch next from cur with (timeout = '2s')",conn)
check_rows_data([4,4],row[0],"Insert")
row = execute_query("fetch next from cur",conn)
row = execute_query("fetch next from cur with (timeout = '2s')",conn)
check_rows_data([5,5],row[0],"Insert")
row = execute_query("fetch next from cur",conn)
row = execute_query("fetch next from cur with (timeout = '2s')",conn)
check_rows_data([6,6],row[0],"Insert")
row = execute_query("fetch next from cur",conn)
row = execute_query("fetch next from cur with (timeout = '2s')",conn)
assert row == []
execute_insert("close cur",conn)
drop_table_subscription()
Expand All @@ -136,9 +137,9 @@ def test_cursor_since_now():
time.sleep(2)
execute_insert("insert into t1 values(6,6)",conn)
execute_insert("flush",conn)
row = execute_query("fetch next from cur",conn)
row = execute_query("fetch next from cur with (timeout = '2s')",conn)
check_rows_data([6,6],row[0],"Insert")
row = execute_query("fetch next from cur",conn)
row = execute_query("fetch next from cur with (timeout = '2s')",conn)
assert row == []
execute_insert("close cur",conn)
drop_table_subscription()
Expand All @@ -161,9 +162,9 @@ def test_cursor_without_since():
time.sleep(2)
execute_insert("insert into t1 values(6,6)",conn)
execute_insert("flush",conn)
row = execute_query("fetch next from cur",conn)
row = execute_query("fetch next from cur with (timeout = '2s')",conn)
check_rows_data([6,6],row[0],"Insert")
row = execute_query("fetch next from cur",conn)
row = execute_query("fetch next from cur with (timeout = '2s')",conn)
assert row == []
execute_insert("close cur",conn)
drop_table_subscription()
Expand All @@ -185,34 +186,34 @@ def test_cursor_since_rw_timestamp():
execute_insert("declare cur subscription cursor for sub since begin()",conn)
execute_insert("insert into t1 values(6,6)",conn)
execute_insert("flush",conn)
row = execute_query("fetch next from cur",conn)
row = execute_query("fetch next from cur with (timeout = '2s')",conn)
valuelen = len(row[0])
rw_timestamp_1 = row[0][valuelen - 1]
check_rows_data([4,4],row[0],"Insert")
row = execute_query("fetch next from cur",conn)
row = execute_query("fetch next from cur with (timeout = '2s')",conn)
valuelen = len(row[0])
rw_timestamp_2 = row[0][valuelen - 1] - 1
check_rows_data([5,5],row[0],"Insert")
row = execute_query("fetch next from cur",conn)
row = execute_query("fetch next from cur with (timeout = '2s')",conn)
valuelen = len(row[0])
rw_timestamp_3 = row[0][valuelen - 1] + 1
check_rows_data([6,6],row[0],"Insert")
row = execute_query("fetch next from cur",conn)
row = execute_query("fetch next from cur with (timeout = '2s')",conn)
assert row == []
execute_insert("close cur",conn)

execute_insert(f"declare cur subscription cursor for sub since {rw_timestamp_1}",conn)
row = execute_query("fetch next from cur",conn)
row = execute_query("fetch next from cur with (timeout = '2s')",conn)
check_rows_data([4,4],row[0],"Insert")
execute_insert("close cur",conn)

execute_insert(f"declare cur subscription cursor for sub since {rw_timestamp_2}",conn)
row = execute_query("fetch next from cur",conn)
row = execute_query("fetch next from cur with (timeout = '2s')",conn)
check_rows_data([5,5],row[0],"Insert")
execute_insert("close cur",conn)

execute_insert(f"declare cur subscription cursor for sub since {rw_timestamp_3}",conn)
row = execute_query("fetch next from cur",conn)
row = execute_query("fetch next from cur with (timeout = '2s')",conn)
assert row == []
execute_insert("close cur",conn)

Expand All @@ -229,29 +230,29 @@ def test_cursor_op():
)

execute_insert("declare cur subscription cursor for sub full",conn)
row = execute_query("fetch next from cur",conn)
row = execute_query("fetch next from cur with (timeout = '2s')",conn)
check_rows_data([1,2],row[0],"Insert")
row = execute_query("fetch next from cur",conn)
row = execute_query("fetch next from cur with (timeout = '2s')",conn)
assert row == []

execute_insert("insert into t1 values(4,4)",conn)
execute_insert("flush",conn)
execute_insert("update t1 set v2 = 10 where v1 = 4",conn)
execute_insert("flush",conn)
row = execute_query("fetch next from cur",conn)
row = execute_query("fetch next from cur with (timeout = '2s')",conn)
check_rows_data([4,4],row[0],"Insert")
row = execute_query("fetch next from cur",conn)
row = execute_query("fetch next from cur with (timeout = '2s')",conn)
check_rows_data([4,4],row[0],"UpdateDelete")
row = execute_query("fetch next from cur",conn)
row = execute_query("fetch next from cur with (timeout = '2s')",conn)
check_rows_data([4,10],row[0],"UpdateInsert")
row = execute_query("fetch next from cur",conn)
row = execute_query("fetch next from cur with (timeout = '2s')",conn)
assert row == []

execute_insert("delete from t1 where v1 = 4",conn)
execute_insert("flush",conn)
row = execute_query("fetch next from cur",conn)
row = execute_query("fetch next from cur with (timeout = '2s')",conn)
check_rows_data([4,10],row[0],"Delete")
row = execute_query("fetch next from cur",conn)
row = execute_query("fetch next from cur with (timeout = '2s')",conn)
assert row == []

execute_insert("close cur",conn)
Expand All @@ -271,22 +272,22 @@ def test_cursor_with_table_alter():
execute_insert("alter table t1 add v3 int",conn)
execute_insert("insert into t1 values(4,4,4)",conn)
execute_insert("flush",conn)
row = execute_query("fetch next from cur",conn)
row = execute_query("fetch next from cur with (timeout = '2s')",conn)
check_rows_data([1,2],row[0],"Insert")
row = execute_query("fetch next from cur",conn)
assert(row == [])
row = execute_query("fetch next from cur",conn)
row = execute_query("fetch next from cur with (timeout = '2s')",conn)
assert row == []
row = execute_query("fetch next from cur with (timeout = '2s')",conn)
check_rows_data([4,4,4],row[0],"Insert")
execute_insert("insert into t1 values(5,5,5)",conn)
execute_insert("flush",conn)
row = execute_query("fetch next from cur",conn)
row = execute_query("fetch next from cur with (timeout = '2s')",conn)
check_rows_data([5,5,5],row[0],"Insert")
execute_insert("alter table t1 drop column v2",conn)
execute_insert("insert into t1 values(6,6)",conn)
execute_insert("flush",conn)
row = execute_query("fetch next from cur",conn)
assert(row == [])
row = execute_query("fetch next from cur",conn)
row = execute_query("fetch next from cur with (timeout = '2s')",conn)
assert row == []
row = execute_query("fetch next from cur with (timeout = '2s')",conn)
check_rows_data([6,6],row[0],"Insert")
drop_table_subscription()

Expand Down Expand Up @@ -317,15 +318,15 @@ def test_cursor_fetch_n():
execute_insert("flush",conn)
execute_insert("update t1 set v2 = 100 where v1 = 10",conn)
execute_insert("flush",conn)
row = execute_query("fetch 6 from cur",conn)
row = execute_query("fetch 6 from cur with (timeout = '5s')",conn)
assert len(row) == 6
check_rows_data([1,2],row[0],"Insert")
check_rows_data([4,4],row[1],"Insert")
check_rows_data([5,5],row[2],"Insert")
check_rows_data([6,6],row[3],"Insert")
check_rows_data([7,7],row[4],"Insert")
check_rows_data([8,8],row[5],"Insert")
row = execute_query("fetch 6 from cur",conn)
row = execute_query("fetch 6 from cur with (timeout = '5s')",conn)
assert len(row) == 4
check_rows_data([9,9],row[0],"Insert")
check_rows_data([10,10],row[1],"Insert")
Expand All @@ -348,13 +349,57 @@ def test_rebuild_table():
execute_insert("flush",conn)
execute_insert("update t2 set v2 = 100 where v1 = 1",conn)
execute_insert("flush",conn)
row = execute_query("fetch 4 from cur",conn)
row = execute_query("fetch 4 from cur with (timeout = '5s')",conn)
assert len(row) == 3
check_rows_data([1,1],row[0],"Insert")
check_rows_data([1,1],row[1],"UpdateDelete")
check_rows_data([1,100],row[2],"UpdateInsert")
drop_table_subscription()

def test_blcok_cursor():
print(f"test_blcok_cursor")
create_table_subscription()
conn = psycopg2.connect(
host="localhost",
port="4566",
user="root",
database="dev"
)

execute_insert("declare cur subscription cursor for sub2 full",conn)
execute_insert("insert into t2 values(1,1)",conn)
execute_insert("flush",conn)
execute_insert("update t2 set v2 = 100 where v1 = 1",conn)
execute_insert("flush",conn)
row = execute_query("fetch 100 from cur",conn)
assert len(row) == 3
check_rows_data([1,1],row[0],"Insert")
check_rows_data([1,1],row[1],"UpdateDelete")
check_rows_data([1,100],row[2],"UpdateInsert")

# Test block cursor fetches data successfully
thread = threading.Thread(target=insert_into_table)
thread.start()
row = execute_query("fetch 100 from cur",conn)
check_rows_data([10,10],row[0],"Insert")
thread.join()

# Test block cursor timeout
row = execute_query("fetch 100 from cur with (timeout = '5s')",conn)
assert row == []

drop_table_subscription()

def insert_into_table():
time.sleep(2)
conn = psycopg2.connect(
host="localhost",
port="4566",
user="root",
database="dev"
)
execute_insert("insert into t2 values(10,10)",conn)

if __name__ == "__main__":
test_cursor_snapshot()
test_cursor_op()
Expand All @@ -366,3 +411,4 @@ def test_rebuild_table():
test_cursor_with_table_alter()
test_cursor_fetch_n()
test_rebuild_table()
test_blcok_cursor()
11 changes: 0 additions & 11 deletions proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -837,7 +837,6 @@ service HummockManagerService {
rpc ListCompactTaskAssignment(ListCompactTaskAssignmentRequest) returns (ListCompactTaskAssignmentResponse);
rpc ListCompactTaskProgress(ListCompactTaskProgressRequest) returns (ListCompactTaskProgressResponse);
rpc CancelCompactTask(CancelCompactTaskRequest) returns (CancelCompactTaskResponse);
rpc ListChangeLogEpochs(ListChangeLogEpochsRequest) returns (ListChangeLogEpochsResponse);
rpc GetVersionByEpoch(GetVersionByEpochRequest) returns (GetVersionByEpochResponse);
rpc MergeCompactionGroup(MergeCompactionGroupRequest) returns (MergeCompactionGroupResponse);
}
Expand Down Expand Up @@ -909,13 +908,3 @@ message BranchedObject {
// Compaction group id the SST belongs to.
uint64 compaction_group_id = 3;
}

message ListChangeLogEpochsRequest {
uint32 table_id = 1;
uint64 min_epoch = 2;
uint32 max_count = 3;
}

message ListChangeLogEpochsResponse {
repeated uint64 epochs = 1;
}
20 changes: 20 additions & 0 deletions src/frontend/src/catalog/root_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,14 @@ impl Catalog {
}
}

pub fn get_all_table_ids_in_database(&self, db_id: DatabaseId) -> Vec<TableId> {
xxhZs marked this conversation as resolved.
Show resolved Hide resolved
if let Ok(database) = self.get_database_by_id(&db_id) {
database.iter_all_table_ids().collect()
} else {
vec![]
}
}

pub fn drop_database(&mut self, db_id: DatabaseId) {
let name = self.db_name_by_id.remove(&db_id).unwrap();
let database = self.database_by_name.remove(&name).unwrap();
Expand All @@ -295,6 +303,18 @@ impl Catalog {
});
}

pub fn get_all_table_ids_in_schema(
xxhZs marked this conversation as resolved.
Show resolved Hide resolved
&self,
db_id: DatabaseId,
schema_id: SchemaId,
) -> Vec<TableId> {
if let Ok(schema) = self.get_schema_by_id(&db_id, &schema_id) {
schema.iter_all_table_ids().cloned().collect()
} else {
vec![]
}
}

pub fn drop_schema(&mut self, db_id: DatabaseId, schema_id: SchemaId) {
self.get_database_mut(db_id).unwrap().drop_schema(schema_id);
}
Expand Down
4 changes: 4 additions & 0 deletions src/frontend/src/catalog/schema_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,10 @@ impl SchemaCatalog {
.map(|(_, v)| v)
}

pub fn iter_all_table_ids(&self) -> impl Iterator<Item = &TableId> {
xxhZs marked this conversation as resolved.
Show resolved Hide resolved
self.table_by_id.keys()
}
xxhZs marked this conversation as resolved.
Show resolved Hide resolved

pub fn iter_internal_table(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
self.table_by_name
.iter()
Expand Down
15 changes: 2 additions & 13 deletions src/frontend/src/catalog/subscription_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use core::str::FromStr;

use risingwave_common::catalog::{TableId, UserId, OBJECT_ID_PLACEHOLDER};
use risingwave_common::types::Interval;
use risingwave_common::util::epoch::Epoch;
use risingwave_pb::catalog::subscription::PbSubscriptionState;
use risingwave_pb::catalog::PbSubscription;
use thiserror_ext::AsReport;

use super::OwnedByUserCatalog;
use crate::error::{ErrorCode, Result};
use crate::handler::util::convert_interval_to_logstore_u64;
use crate::WithOptions;

#[derive(Clone, Debug, PartialEq, Eq, Hash)]
Expand Down Expand Up @@ -86,15 +83,7 @@ impl SubscriptionCatalog {
let retention_seconds_str = properties.get("retention").ok_or_else(|| {
ErrorCode::InternalError("Subscription retention time not set.".to_string())
})?;
let retention_seconds = (Interval::from_str(retention_seconds_str)
.map_err(|err| {
ErrorCode::InternalError(format!(
"Retention needs to be set in Interval format: {:?}",
err.to_report_string()
))
})?
.epoch_in_micros()
/ 1000000) as u64;
let retention_seconds = convert_interval_to_logstore_u64(retention_seconds_str)?;
self.retention_seconds = retention_seconds;
Ok(())
}
Expand Down
Loading
Loading