-
Notifications
You must be signed in to change notification settings - Fork 596
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: support precompute partition for iceberg #14710
Conversation
83392c9
to
bc926bd
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally LGTM, thanks!
@@ -177,7 +178,7 @@ impl IcebergConfig { | |||
self.catalog_type.as_deref().unwrap_or("storage") | |||
} | |||
|
|||
fn build_iceberg_configs(&self) -> Result<HashMap<String, String>> { | |||
pub fn build_iceberg_configs(&self) -> Result<HashMap<String, String>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why we need to make this pub
?
@@ -346,6 +347,55 @@ impl IcebergConfig { | |||
} | |||
} | |||
|
|||
pub async fn create_catalog(config: &IcebergConfig) -> Result<CatalogRef> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pub async fn create_catalog(config: &IcebergConfig) -> Result<CatalogRef> { | |
async fn create_catalog(config: &IcebergConfig) -> Result<CatalogRef> { |
.load_table(&table_id) | ||
.await | ||
.map_err(|err| SinkError::Iceberg(anyhow!(err)))?; | ||
async fn create_validated_table(&self) -> Result<Table> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
async fn create_validated_table(&self) -> Result<Table> { | |
async fn create_and_validate_table(&self) -> Result<Table> { |
let table = create_table(iceberg_config).await?; | ||
let partition_spec = table | ||
.current_table_metadata() | ||
.current_partition_spec() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should not throw error here, user may create a table without partition.
@@ -133,9 +236,42 @@ impl StreamSink { | |||
Ok(Self::new(input, sink)) | |||
} | |||
|
|||
fn derive_iceberg_sink_distribution( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we add some planner tests for this? I think it would better to ensure its consistency.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find it inconvenient to test it for Iceberg because I need to connect the catalog and get related info.
We test the sink using handle_explain
, which means that there isn't a place to insert the mock partition info in plan test.
- One way to test it is to mock the partition info in
blackhole
connector. But this will add the code for test in frontend, I'm not sure whether it's a good way.
pub async fn get_partition_compute_info(
with_options: &WithOptions,
) -> Result<Option<PartitionComputeInfo>> {
match connector.as_str() {
ICEBERG_SINK => {
..
}
BLACK_HOLE => { ..//generate mock info }
_ => Ok(None),
}
}
- Another way is to modify the test way for sink, maybe we can use create_sink instead of test using explain and add a param to pass the partition info so that we can mock the partition info outside.
explain::handle_explain(handler_args, *statement, options, analyze).await?;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find it inconvenient to test it for Iceberg because I need to connect the catalog and get related info.
How about creating a mocked iceberg catalog?
Is this PR specific to Iceberg? Would you please update the PR title to make it more detailed? |
Sorry. Update it. |
b46fa78
to
6d18a72
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
license-eye has totally checked 4768 files.
Valid | Invalid | Ignored | Fixed |
---|---|---|---|
2077 | 2 | 2689 | 0 |
Click to see the invalid file list
- src/common/src/array/arrow/arrow_iceberg.rs
- src/connector/src/sink/iceberg/mock_catalog.rs
@@ -172,3 +172,45 @@ | |||
StreamSink { type: upsert, columns: [a, b, t2._row_id(hidden)], pk: [t2._row_id] } | |||
└─StreamEowcSort { sort_column: t2.b } | |||
└─StreamTableScan { table: t2, columns: [a, b, _row_id] } | |||
- id: create_mock_iceberg_sink_append_only_with_partition |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test cases seem not enough, in the rfc there are 4 cases, please add test for each case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not familiar with the context, waiting for renjie's approval
/// ## Why we need `PartitionComputeInfo`? | ||
/// | ||
/// For some sink, it will write the data into different file based on the partition value. E.g. iceberg sink(<https://iceberg.apache.org/spec/#partitioning>) | ||
/// For this kind of sink, the file num can be reduced if we can shuffle the data based on the partition value. More details can be found in <https://github.com/risingwavelabs/rfcs/pull/77>. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this also apply to other lakehouse downstream?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, seems delta also has partition concept. I guess it can apply it.🤔
// Separate the partition spec into two parts: sparse partition and range partition. | ||
// Sparse partition means that the data distribution is more sparse at a given time. | ||
// Range partition means that the data distribution is likely same at a given time. | ||
// Only compute the partition and shuffle by them for the sparse partition. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so we are changing the distribution key to adjust each CN's hash range?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we can see that for sparse partition, we will add the exchange before sink.
└─StreamExchange { dist: HashShard($expr1) } |
8b263cb
to
397faa4
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks!
cc @xxchan @BugenZhao PTAL for cargo.lock |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM for Cargo.lock
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
This is the final part of #13898:
Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.