From 635ec6f6f0d3de91a0d01fa313aa5a6e093c31ff Mon Sep 17 00:00:00 2001 From: Dylan Date: Wed, 4 Dec 2024 15:11:42 +0800 Subject: [PATCH] fix(iceberg): bypass iceberg partition table optimization (#19655) --- .../tests/testdata/output/sink.yaml | 8 +- src/frontend/src/handler/create_sink.rs | 138 +++++++++--------- 2 files changed, 75 insertions(+), 71 deletions(-) diff --git a/src/frontend/planner_test/tests/testdata/output/sink.yaml b/src/frontend/planner_test/tests/testdata/output/sink.yaml index 16571bb87952f..f914fb225c9fd 100644 --- a/src/frontend/planner_test/tests/testdata/output/sink.yaml +++ b/src/frontend/planner_test/tests/testdata/output/sink.yaml @@ -195,9 +195,7 @@ ); explain_output: | StreamSink { type: append-only, columns: [v1, v2, v3, v4, t1._row_id(hidden)] } - └─StreamExchange { dist: HashShard($expr1) } - └─StreamProject { exprs: [t1.v1, t1.v2, t1.v3, t1.v4, t1._row_id, Row(t1.v1, IcebergTransform('bucket[1]':Varchar, t1.v2), IcebergTransform('truncate[1]':Varchar, t1.v3), null:Int32) as $expr1] } - └─StreamTableScan { table: t1, columns: [v1, v2, v3, v4, _row_id] } + └─StreamTableScan { table: t1, columns: [v1, v2, v3, v4, _row_id] } - id: create_mock_iceberg_sink_append_only_with_range_partition sql: | create table t1 (v1 date, v2 timestamp, v3 timestamp with time zone, v4 timestamp); @@ -237,9 +235,7 @@ ); explain_output: | StreamSink { type: upsert, columns: [v1, v2, v3, v4, t1._row_id(hidden)], downstream_pk: [t1.v1] } - └─StreamExchange { dist: HashShard($expr1) } - └─StreamProject { exprs: [t1.v1, t1.v2, t1.v3, t1.v4, t1._row_id, Row(t1.v1, IcebergTransform('bucket[1]':Varchar, t1.v2), IcebergTransform('truncate[1]':Varchar, t1.v3), null:Int32) as $expr1] } - └─StreamTableScan { table: t1, columns: [v1, v2, v3, v4, _row_id] } + └─StreamTableScan { table: t1, columns: [v1, v2, v3, v4, _row_id] } - id: create_mock_iceberg_sink_upsert_with_range_partition sql: | create table t1 (v1 date, v2 timestamp, v3 timestamp with time zone, v4 timestamp); diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index cc587930fee5e..c516e47fc7ce3 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -326,77 +326,85 @@ pub async fn get_partition_compute_info( } async fn get_partition_compute_info_for_iceberg( - iceberg_config: &IcebergConfig, + _iceberg_config: &IcebergConfig, ) -> Result> { - // TODO: check table if exists - if iceberg_config.create_table_if_not_exists { - return Ok(None); - } - let table = iceberg_config.load_table().await?; - let Some(partition_spec) = table.current_table_metadata().current_partition_spec().ok() else { - return Ok(None); - }; + // TODO: enable partition compute for iceberg after fixing the issue of sink decoupling. + return Ok(None); - if partition_spec.is_unpartitioned() { - return Ok(None); - } + #[allow(unreachable_code)] + { + // TODO: check table if exists + if _iceberg_config.create_table_if_not_exists { + return Ok(None); + } + let table = _iceberg_config.load_table().await?; + let Some(partition_spec) = table.current_table_metadata().current_partition_spec().ok() + else { + return Ok(None); + }; - // Separate the partition spec into two parts: sparse partition and range partition. - // Sparse partition means that the data distribution is more sparse at a given time. - // Range partition means that the data distribution is likely same at a given time. - // Only compute the partition and shuffle by them for the sparse partition. - let has_sparse_partition = partition_spec.fields.iter().any(|f| match f.transform { - // Sparse partition - icelake::types::Transform::Identity - | icelake::types::Transform::Truncate(_) - | icelake::types::Transform::Bucket(_) => true, - // Range partition - icelake::types::Transform::Year - | icelake::types::Transform::Month - | icelake::types::Transform::Day - | icelake::types::Transform::Hour - | icelake::types::Transform::Void => false, - }); + if partition_spec.is_unpartitioned() { + return Ok(None); + } - if !has_sparse_partition { - return Ok(None); - } + // Separate the partition spec into two parts: sparse partition and range partition. + // Sparse partition means that the data distribution is more sparse at a given time. + // Range partition means that the data distribution is likely same at a given time. + // Only compute the partition and shuffle by them for the sparse partition. + let has_sparse_partition = partition_spec.fields.iter().any(|f| match f.transform { + // Sparse partition + icelake::types::Transform::Identity + | icelake::types::Transform::Truncate(_) + | icelake::types::Transform::Bucket(_) => true, + // Range partition + icelake::types::Transform::Year + | icelake::types::Transform::Month + | icelake::types::Transform::Day + | icelake::types::Transform::Hour + | icelake::types::Transform::Void => false, + }); - let arrow_type: ArrowDataType = table - .current_partition_type() - .map_err(|err| RwError::from(ErrorCode::SinkError(err.into())))? - .try_into() - .map_err(|_| { - RwError::from(ErrorCode::SinkError( - "Fail to convert iceberg partition type to arrow type".into(), - )) - })?; - let Some(schema) = table.current_table_metadata().current_schema().ok() else { - return Ok(None); - }; - let partition_fields = partition_spec - .fields - .iter() - .map(|f| { - let source_f = schema - .look_up_field_by_id(f.source_column_id) - .ok_or(RwError::from(ErrorCode::SinkError( - "Fail to look up iceberg partition field".into(), - )))?; - Ok((source_f.name.clone(), f.transform)) - }) - .collect::>>()?; - - let ArrowDataType::Struct(partition_type) = arrow_type else { - return Err(RwError::from(ErrorCode::SinkError( - "Partition type of iceberg should be a struct type".into(), - ))); - }; + if !has_sparse_partition { + return Ok(None); + } - Ok(Some(PartitionComputeInfo::Iceberg(IcebergPartitionInfo { - partition_type: IcebergArrowConvert.struct_from_fields(&partition_type)?, - partition_fields, - }))) + let arrow_type: ArrowDataType = table + .current_partition_type() + .map_err(|err| RwError::from(ErrorCode::SinkError(err.into())))? + .try_into() + .map_err(|_| { + RwError::from(ErrorCode::SinkError( + "Fail to convert iceberg partition type to arrow type".into(), + )) + })?; + let Some(schema) = table.current_table_metadata().current_schema().ok() else { + return Ok(None); + }; + let partition_fields = partition_spec + .fields + .iter() + .map(|f| { + let source_f = + schema + .look_up_field_by_id(f.source_column_id) + .ok_or(RwError::from(ErrorCode::SinkError( + "Fail to look up iceberg partition field".into(), + )))?; + Ok((source_f.name.clone(), f.transform)) + }) + .collect::>>()?; + + let ArrowDataType::Struct(partition_type) = arrow_type else { + return Err(RwError::from(ErrorCode::SinkError( + "Partition type of iceberg should be a struct type".into(), + ))); + }; + + Ok(Some(PartitionComputeInfo::Iceberg(IcebergPartitionInfo { + partition_type: IcebergArrowConvert.struct_from_fields(&partition_type)?, + partition_fields, + }))) + } } pub async fn handle_create_sink(