Skip to content

Commit

Permalink
fix(iceberg): bypass iceberg partition table optimization (#19655)
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 authored and chenzl25 committed Dec 4, 2024
1 parent dc164bd commit 635ec6f
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 71 deletions.
8 changes: 2 additions & 6 deletions src/frontend/planner_test/tests/testdata/output/sink.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -195,9 +195,7 @@
);
explain_output: |
StreamSink { type: append-only, columns: [v1, v2, v3, v4, t1._row_id(hidden)] }
└─StreamExchange { dist: HashShard($expr1) }
└─StreamProject { exprs: [t1.v1, t1.v2, t1.v3, t1.v4, t1._row_id, Row(t1.v1, IcebergTransform('bucket[1]':Varchar, t1.v2), IcebergTransform('truncate[1]':Varchar, t1.v3), null:Int32) as $expr1] }
└─StreamTableScan { table: t1, columns: [v1, v2, v3, v4, _row_id] }
└─StreamTableScan { table: t1, columns: [v1, v2, v3, v4, _row_id] }
- id: create_mock_iceberg_sink_append_only_with_range_partition
sql: |
create table t1 (v1 date, v2 timestamp, v3 timestamp with time zone, v4 timestamp);
Expand Down Expand Up @@ -237,9 +235,7 @@
);
explain_output: |
StreamSink { type: upsert, columns: [v1, v2, v3, v4, t1._row_id(hidden)], downstream_pk: [t1.v1] }
└─StreamExchange { dist: HashShard($expr1) }
└─StreamProject { exprs: [t1.v1, t1.v2, t1.v3, t1.v4, t1._row_id, Row(t1.v1, IcebergTransform('bucket[1]':Varchar, t1.v2), IcebergTransform('truncate[1]':Varchar, t1.v3), null:Int32) as $expr1] }
└─StreamTableScan { table: t1, columns: [v1, v2, v3, v4, _row_id] }
└─StreamTableScan { table: t1, columns: [v1, v2, v3, v4, _row_id] }
- id: create_mock_iceberg_sink_upsert_with_range_partition
sql: |
create table t1 (v1 date, v2 timestamp, v3 timestamp with time zone, v4 timestamp);
Expand Down
138 changes: 73 additions & 65 deletions src/frontend/src/handler/create_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,77 +326,85 @@ pub async fn get_partition_compute_info(
}

async fn get_partition_compute_info_for_iceberg(
iceberg_config: &IcebergConfig,
_iceberg_config: &IcebergConfig,
) -> Result<Option<PartitionComputeInfo>> {
// TODO: check table if exists
if iceberg_config.create_table_if_not_exists {
return Ok(None);
}
let table = iceberg_config.load_table().await?;
let Some(partition_spec) = table.current_table_metadata().current_partition_spec().ok() else {
return Ok(None);
};
// TODO: enable partition compute for iceberg after fixing the issue of sink decoupling.
return Ok(None);

if partition_spec.is_unpartitioned() {
return Ok(None);
}
#[allow(unreachable_code)]
{
// TODO: check table if exists
if _iceberg_config.create_table_if_not_exists {
return Ok(None);
}
let table = _iceberg_config.load_table().await?;
let Some(partition_spec) = table.current_table_metadata().current_partition_spec().ok()
else {
return Ok(None);
};

// Separate the partition spec into two parts: sparse partition and range partition.
// Sparse partition means that the data distribution is more sparse at a given time.
// Range partition means that the data distribution is likely same at a given time.
// Only compute the partition and shuffle by them for the sparse partition.
let has_sparse_partition = partition_spec.fields.iter().any(|f| match f.transform {
// Sparse partition
icelake::types::Transform::Identity
| icelake::types::Transform::Truncate(_)
| icelake::types::Transform::Bucket(_) => true,
// Range partition
icelake::types::Transform::Year
| icelake::types::Transform::Month
| icelake::types::Transform::Day
| icelake::types::Transform::Hour
| icelake::types::Transform::Void => false,
});
if partition_spec.is_unpartitioned() {
return Ok(None);
}

if !has_sparse_partition {
return Ok(None);
}
// Separate the partition spec into two parts: sparse partition and range partition.
// Sparse partition means that the data distribution is more sparse at a given time.
// Range partition means that the data distribution is likely same at a given time.
// Only compute the partition and shuffle by them for the sparse partition.
let has_sparse_partition = partition_spec.fields.iter().any(|f| match f.transform {
// Sparse partition
icelake::types::Transform::Identity
| icelake::types::Transform::Truncate(_)
| icelake::types::Transform::Bucket(_) => true,
// Range partition
icelake::types::Transform::Year
| icelake::types::Transform::Month
| icelake::types::Transform::Day
| icelake::types::Transform::Hour
| icelake::types::Transform::Void => false,
});

let arrow_type: ArrowDataType = table
.current_partition_type()
.map_err(|err| RwError::from(ErrorCode::SinkError(err.into())))?
.try_into()
.map_err(|_| {
RwError::from(ErrorCode::SinkError(
"Fail to convert iceberg partition type to arrow type".into(),
))
})?;
let Some(schema) = table.current_table_metadata().current_schema().ok() else {
return Ok(None);
};
let partition_fields = partition_spec
.fields
.iter()
.map(|f| {
let source_f = schema
.look_up_field_by_id(f.source_column_id)
.ok_or(RwError::from(ErrorCode::SinkError(
"Fail to look up iceberg partition field".into(),
)))?;
Ok((source_f.name.clone(), f.transform))
})
.collect::<Result<Vec<_>>>()?;

let ArrowDataType::Struct(partition_type) = arrow_type else {
return Err(RwError::from(ErrorCode::SinkError(
"Partition type of iceberg should be a struct type".into(),
)));
};
if !has_sparse_partition {
return Ok(None);
}

Ok(Some(PartitionComputeInfo::Iceberg(IcebergPartitionInfo {
partition_type: IcebergArrowConvert.struct_from_fields(&partition_type)?,
partition_fields,
})))
let arrow_type: ArrowDataType = table
.current_partition_type()
.map_err(|err| RwError::from(ErrorCode::SinkError(err.into())))?
.try_into()
.map_err(|_| {
RwError::from(ErrorCode::SinkError(
"Fail to convert iceberg partition type to arrow type".into(),
))
})?;
let Some(schema) = table.current_table_metadata().current_schema().ok() else {
return Ok(None);
};
let partition_fields = partition_spec
.fields
.iter()
.map(|f| {
let source_f =
schema
.look_up_field_by_id(f.source_column_id)
.ok_or(RwError::from(ErrorCode::SinkError(
"Fail to look up iceberg partition field".into(),
)))?;
Ok((source_f.name.clone(), f.transform))
})
.collect::<Result<Vec<_>>>()?;

let ArrowDataType::Struct(partition_type) = arrow_type else {
return Err(RwError::from(ErrorCode::SinkError(
"Partition type of iceberg should be a struct type".into(),
)));
};

Ok(Some(PartitionComputeInfo::Iceberg(IcebergPartitionInfo {
partition_type: IcebergArrowConvert.struct_from_fields(&partition_type)?,
partition_fields,
})))
}
}

pub async fn handle_create_sink(
Expand Down

0 comments on commit 635ec6f

Please sign in to comment.