Skip to content

Commit

Permalink
Merge branch 'main' into xxh/commit_checkpoint_interval_sink_decouple
Browse files Browse the repository at this point in the history
  • Loading branch information
xxhZs authored Sep 9, 2024
2 parents 7f0d2f7 + 9a03718 commit 21cc365
Show file tree
Hide file tree
Showing 41 changed files with 995 additions and 623 deletions.
1 change: 1 addition & 0 deletions .typos.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,5 @@ extend-exclude = [
# We don't want to fix "fals" here, but may want in other places.
# Ideally, we should just ignore that line: https://github.com/crate-ci/typos/issues/316
"src/common/src/cast/mod.rs",
"src/tests/simulation/tests/integration_tests/scale/shared_source.rs",
]
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ RisingWave is a Postgres-compatible SQL database engineered to provide the <i><b

RisingWave can <b>ingest</b> millions of events per second, continuously <b>join and analyze</b> live data streams with historical tables, <b>serve</b> ad-hoc queries in real-time, and <b>deliver</b> fresh, consistent results wherever needed.

![RisingWave](./docs/dev/src/images/architecture_20240814.png)
![RisingWave](./docs/dev/src/images/architecture_20240908.png)

## Try it out in 60 seconds

Expand Down
1 change: 1 addition & 0 deletions ci/scripts/e2e-iceberg-sink-v2-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ poetry run python main.py -t ./test_case/partition_upsert.toml
poetry run python main.py -t ./test_case/range_partition_append_only.toml
poetry run python main.py -t ./test_case/range_partition_upsert.toml
poetry run python main.py -t ./test_case/append_only_with_checkpoint_interval.toml
poetry run python main.py -t ./test_case/iceberg_select_empty_table.toml


echo "--- Kill cluster"
Expand Down
2 changes: 1 addition & 1 deletion ci/scripts/e2e-source-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ echo "> inserted new rows into postgres"

# start cluster w/o clean-data
unset RISINGWAVE_CI
export RUST_LOG="risingwave_stream=debug,risingwave_batch=info,risingwave_storage=info" \
export RUST_LOG="risingwave_stream=debug,risingwave_batch=info,risingwave_storage=info"

risedev dev ci-1cn-1fe-with-recovery
echo "> wait for cluster recovery finish"
Expand Down
Binary file added docs/dev/src/images/architecture_20240908.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
3 changes: 3 additions & 0 deletions e2e_test/commands/risectl
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#!/usr/bin/env bash

RUST_LOG="error" .risingwave/bin/risingwave/risectl "$@"
60 changes: 60 additions & 0 deletions e2e_test/iceberg/test_case/iceberg_select_empty_table.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
statement ok
set sink_decouple = false;

statement ok
set streaming_parallelism=4;

statement ok
CREATE TABLE s1 (i1 int, i2 varchar, i3 varchar);

statement ok
CREATE MATERIALIZED VIEW mv1 AS SELECT * FROM s1;

statement ok
CREATE SINK sink1 AS select * from mv1 WITH (
connector = 'iceberg',
type = 'append-only',
force_append_only = 'true',
database.name = 'demo_db',
table.name = 't1',
catalog.name = 'demo',
catalog.type = 'storage',
warehouse.path = 's3a://icebergdata/demo',
s3.endpoint = 'http://127.0.0.1:9301',
s3.region = 'us-east-1',
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin',
commit_checkpoint_interval = 1,
create_table_if_not_exists = 'true'
);

statement ok
CREATE SOURCE iceberg_t1_source
WITH (
connector = 'iceberg',
s3.endpoint = 'http://127.0.0.1:9301',
s3.region = 'us-east-1',
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin',
catalog.type = 'storage',
warehouse.path = 's3a://icebergdata/demo',
database.name = 'demo_db',
table.name = 't1',
);

statement ok
flush;

query I
select count(*) from iceberg_t1_source;
----
0

statement ok
DROP SINK sink1;

statement ok
DROP SOURCE iceberg_t1_source;

statement ok
DROP TABLE s1 cascade;
11 changes: 11 additions & 0 deletions e2e_test/iceberg/test_case/iceberg_select_empty_table.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
init_sqls = [
'CREATE SCHEMA IF NOT EXISTS demo_db',
'DROP TABLE IF EXISTS demo_db.t1',
]

slt = 'test_case/iceberg_select_empty_table.slt'

drop_sqls = [
'DROP TABLE IF EXISTS demo_db.t1',
'DROP SCHEMA IF EXISTS demo_db',
]
81 changes: 81 additions & 0 deletions e2e_test/source_inline/kafka/shared_source.slt
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,87 @@ internal_table.mjs --name s0 --type source
3,"{""split_info"": {""partition"": 3, ""start_offset"": 11, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}"


# # Note: the parallelism depends on the risedev profile.
# # So scale tests below are commented out.

# query ???
# select name, flags, parallelism from rw_fragments JOIN rw_relations ON rw_fragments.table_id = rw_relations.id order by name;
# ----
# mv_1 {MVIEW,SOURCE_SCAN} 5
# mv_2 {MVIEW,SOURCE_SCAN} 5
# s0 {SOURCE} 5


# system ok
# risectl meta source-split-info --ignore-id
# ----
# Table
# Fragment (Source)
# Actor (1 splits): [0]
# Actor (1 splits): [2]
# Actor (1 splits): [3]
# Actor (1 splits): [1]
# Actor (0 splits): []
# Table
# Fragment (SourceScan)
# Actor (1 splits): [0] <- Upstream Actor #1055: [0]
# Actor (1 splits): [2] <- Upstream Actor #1056: [2]
# Actor (1 splits): [3] <- Upstream Actor #1057: [3]
# Actor (1 splits): [1] <- Upstream Actor #1058: [1]
# Actor (0 splits): [] <- Upstream Actor #1059: []
# Table
# Fragment (SourceScan)
# Actor (1 splits): [0] <- Upstream Actor #1055: [0]
# Actor (1 splits): [2] <- Upstream Actor #1056: [2]
# Actor (1 splits): [3] <- Upstream Actor #1057: [3]
# Actor (1 splits): [1] <- Upstream Actor #1058: [1]
# Actor (0 splits): [] <- Upstream Actor #1059: []


# # scale down
# statement ok
# ALTER MATERIALIZED VIEW mv_1 SET PARALLELISM TO 2;

# # should have no effect, because of NoShuffle
# # TODO: support ALTER SOURCE SET PARALLELISM, then we can
# query ???
# select name, flags, parallelism from rw_fragments JOIN rw_relations ON rw_fragments.table_id = rw_relations.id order by name;
# ----
# mv_1 {MVIEW,SOURCE_SCAN} 5
# mv_2 {MVIEW,SOURCE_SCAN} 5
# s0 {SOURCE} 5

# system ok
# risectl meta source-split-info --ignore-id
# ----
# Table
# Fragment (Source)
# Actor (1 splits): [0]
# Actor (1 splits): [2]
# Actor (1 splits): [3]
# Actor (1 splits): [1]
# Actor (0 splits): []
# Table
# Fragment (SourceScan)
# Actor (1 splits): [0] <- Upstream Actor #1055: [0]
# Actor (1 splits): [2] <- Upstream Actor #1056: [2]
# Actor (1 splits): [3] <- Upstream Actor #1057: [3]
# Actor (1 splits): [1] <- Upstream Actor #1058: [1]
# Actor (0 splits): [] <- Upstream Actor #1059: []
# Table
# Fragment (SourceScan)
# Actor (1 splits): [0] <- Upstream Actor #1055: [0]
# Actor (1 splits): [2] <- Upstream Actor #1056: [2]
# Actor (1 splits): [3] <- Upstream Actor #1057: [3]
# Actor (1 splits): [1] <- Upstream Actor #1058: [1]
# Actor (0 splits): [] <- Upstream Actor #1059: []


# # Manual test: change the parallelism of the compute node, kill and restart, and check
# # risedev ctl meta source-split-info --ignore-id
# # risedev psql -c "select name, flags, parallelism from rw_fragments JOIN rw_relations ON rw_fragments.table_id = rw_relations.id order by name;"


statement ok
drop source s0 cascade;

Expand Down
11 changes: 0 additions & 11 deletions proto/stream_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,6 @@ message BuildActorInfo {
map<uint32, SubscriptionIds> related_subscriptions = 2;
}

message DropActorsRequest {
string request_id = 1;
repeated uint32 actor_ids = 2;
}

message DropActorsResponse {
string request_id = 1;
common.Status status = 2;
}

message InjectBarrierRequest {
string request_id = 1;
stream_plan.Barrier barrier = 2;
Expand Down Expand Up @@ -109,7 +99,6 @@ message StreamingControlStreamResponse {
}

service StreamService {
rpc DropActors(DropActorsRequest) returns (DropActorsResponse);
rpc WaitEpochCommit(WaitEpochCommitRequest) returns (WaitEpochCommitResponse);
rpc StreamingControlStream(stream StreamingControlStreamRequest) returns (stream StreamingControlStreamResponse);
}
Expand Down
14 changes: 0 additions & 14 deletions src/compute/src/rpc/service/stream_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,20 +40,6 @@ impl StreamService for StreamServiceImpl {
type StreamingControlStreamStream =
impl Stream<Item = std::result::Result<StreamingControlStreamResponse, tonic::Status>>;

#[cfg_attr(coverage, coverage(off))]
async fn drop_actors(
&self,
request: Request<DropActorsRequest>,
) -> std::result::Result<Response<DropActorsResponse>, Status> {
let req = request.into_inner();
let actors = req.actor_ids;
self.mgr.drop_actors(actors).await?;
Ok(Response::new(DropActorsResponse {
request_id: req.request_id,
status: None,
}))
}

#[cfg_attr(coverage, coverage(off))]
async fn wait_epoch_commit(
&self,
Expand Down
19 changes: 15 additions & 4 deletions src/connector/src/source/iceberg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,17 @@ impl IcebergSplitEnumerator {
bail!("Batch parallelism is 0. Cannot split the iceberg files.");
}
let table = self.config.load_table_v2().await?;
let current_snapshot = table.metadata().current_snapshot();
if current_snapshot.is_none() {
// If there is no snapshot, we will return a mock `IcebergSplit` with empty files.
return Ok(vec![IcebergSplit {
split_id: 0,
snapshot_id: 0, // unused
table_meta: TableMetadataJsonStr::serialize(table.metadata()),
files: vec![],
}]);
}

let snapshot_id = match time_traval_info {
Some(IcebergTimeTravelInfo::Version(version)) => {
let Some(snapshot) = table.metadata().snapshot_by_id(version) else {
Expand All @@ -232,10 +243,10 @@ impl IcebergSplitEnumerator {
}
}
}
None => match table.metadata().current_snapshot() {
Some(snapshot) => snapshot.snapshot_id(),
None => bail!("Cannot find the current snapshot id in the iceberg table."),
},
None => {
assert!(current_snapshot.is_some());
current_snapshot.unwrap().snapshot_id()
}
};
let mut files = vec![];

Expand Down
Loading

0 comments on commit 21cc365

Please sign in to comment.