Skip to content

Commit

Permalink
feat(batch): support batch read iceberg source (#15214)
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 authored Feb 27, 2024
1 parent 2157ed6 commit 0b9d5af
Show file tree
Hide file tree
Showing 7 changed files with 195 additions and 77 deletions.
63 changes: 42 additions & 21 deletions src/batch/src/executor/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use risingwave_common::array::{DataChunk, Op, StreamChunk};
use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema, TableId};
use risingwave_common::types::DataType;
use risingwave_connector::parser::SpecificParserConfig;
use risingwave_connector::source::iceberg::{IcebergProperties, IcebergSplit};
use risingwave_connector::source::monitor::SourceMetrics;
use risingwave_connector::source::reader::reader::SourceReader;
use risingwave_connector::source::{
Expand All @@ -30,7 +31,9 @@ use risingwave_pb::batch_plan::plan_node::NodeBody;

use super::Executor;
use crate::error::{BatchError, Result};
use crate::executor::{BoxedExecutor, BoxedExecutorBuilder, ExecutorBuilder};
use crate::executor::{
BoxedExecutor, BoxedExecutorBuilder, ExecutorBuilder, FileSelector, IcebergScanExecutor,
};
use crate::task::BatchTaskContext;

pub struct SourceExecutor {
Expand Down Expand Up @@ -75,16 +78,6 @@ impl BoxedExecutorBuilder for SourceExecutor {
.map(|c| SourceColumnDesc::from(&ColumnDesc::from(c.column_desc.as_ref().unwrap())))
.collect();

let source_reader = SourceReader {
config,
columns,
parser_config,
connector_message_buffer_size: source
.context()
.get_config()
.developer
.connector_message_buffer_size,
};
let source_ctrl_opts = SourceCtrlOpts {
chunk_size: source.context().get_config().developer.chunk_size,
rate_limit: None,
Expand All @@ -110,16 +103,44 @@ impl BoxedExecutorBuilder for SourceExecutor {
.collect();
let schema = Schema::new(fields);

Ok(Box::new(SourceExecutor {
source: source_reader,
column_ids,
metrics: source.context().source_metrics(),
source_id: TableId::new(source_node.source_id),
split,
schema,
identity: source.plan_node().get_identity().clone(),
source_ctrl_opts,
}))
if let ConnectorProperties::Iceberg(iceberg_properties) = config {
let iceberg_properties: IcebergProperties = *iceberg_properties;
if let SplitImpl::Iceberg(split) = split {
let split: IcebergSplit = split;
Ok(Box::new(IcebergScanExecutor::new(
iceberg_properties.to_iceberg_config(),
Some(split.snapshot_id),
FileSelector::FileList(split.files),
source.context.get_config().developer.chunk_size,
schema,
source.plan_node().get_identity().clone(),
)))
} else {
unreachable!()
}
} else {
let source_reader = SourceReader {
config,
columns,
parser_config,
connector_message_buffer_size: source
.context()
.get_config()
.developer
.connector_message_buffer_size,
};

Ok(Box::new(SourceExecutor {
source: source_reader,
column_ids,
metrics: source.context().source_metrics(),
source_id: TableId::new(source_node.source_id),
split,
schema,
identity: source.plan_node().get_identity().clone(),
source_ctrl_opts,
}))
}
}
}

Expand Down
6 changes: 6 additions & 0 deletions src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -974,6 +974,8 @@ pub enum EncodingProperties {
Json(JsonProperties),
Bytes(BytesProperties),
Native,
/// Encoding can't be specified because the source will determines it. Now only used in Iceberg.
None,
#[default]
Unspecified,
}
Expand All @@ -987,6 +989,8 @@ pub enum ProtocolProperties {
Plain,
Upsert,
Native,
/// Protocol can't be specified because the source will determines it. Now only used in Iceberg.
None,
#[default]
Unspecified,
}
Expand All @@ -1004,6 +1008,7 @@ impl SpecificParserConfig {
// in the future
let protocol_config = match format {
SourceFormat::Native => ProtocolProperties::Native,
SourceFormat::None => ProtocolProperties::None,
SourceFormat::Debezium => ProtocolProperties::Debezium,
SourceFormat::DebeziumMongo => ProtocolProperties::DebeziumMongo,
SourceFormat::Maxwell => ProtocolProperties::Maxwell,
Expand Down Expand Up @@ -1114,6 +1119,7 @@ impl SpecificParserConfig {
EncodingProperties::Bytes(BytesProperties { column_name: None })
}
(SourceFormat::Native, SourceEncode::Native) => EncodingProperties::Native,
(SourceFormat::None, SourceEncode::None) => EncodingProperties::None,
(format, encode) => {
bail!("Unsupported format {:?} encode {:?}", format, encode);
}
Expand Down
3 changes: 3 additions & 0 deletions src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ pub enum SourceFormat {
#[default]
Invalid,
Native,
None,
Debezium,
DebeziumMongo,
Maxwell,
Expand All @@ -274,6 +275,7 @@ pub enum SourceEncode {
#[default]
Invalid,
Native,
None,
Avro,
Csv,
Protobuf,
Expand Down Expand Up @@ -334,6 +336,7 @@ pub fn extract_source_struct(info: &PbStreamSourceInfo) -> Result<SourceStruct>
(PbFormatType::Native, PbEncodeType::Native) => {
(SourceFormat::Native, SourceEncode::Native)
}
(PbFormatType::None, PbEncodeType::None) => (SourceFormat::None, SourceEncode::None),
(PbFormatType::Debezium, PbEncodeType::Avro) => {
(SourceFormat::Debezium, SourceEncode::Avro)
}
Expand Down
91 changes: 83 additions & 8 deletions src/connector/src/source/iceberg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,17 @@

use std::collections::HashMap;

use anyhow::anyhow;
use async_trait::async_trait;
use icelake::types::DataContentType;
use itertools::Itertools;
use risingwave_common::bail;
use risingwave_common::types::JsonbVal;
use serde::{Deserialize, Serialize};

use crate::error::ConnectorResult;
use crate::parser::ParserConfig;
use crate::sink::iceberg::IcebergConfig;
use crate::source::{
BoxChunkSourceStream, Column, SourceContextRef, SourceEnumeratorContextRef, SourceProperties,
SplitEnumerator, SplitId, SplitMetaData, SplitReader, UnknownFields,
Expand Down Expand Up @@ -50,6 +55,22 @@ pub struct IcebergProperties {
pub unknown_fields: HashMap<String, String>,
}

impl IcebergProperties {
pub fn to_iceberg_config(&self) -> IcebergConfig {
IcebergConfig {
database_name: Some(self.database_name.clone()),
table_name: self.table_name.clone(),
catalog_type: Some(self.catalog_type.clone()),
path: self.warehouse_path.clone(),
endpoint: Some(self.endpoint.clone()),
access_key: self.s3_access.clone(),
secret_key: self.s3_secret.clone(),
region: Some(self.region_name.clone()),
..Default::default()
}
}
}

impl SourceProperties for IcebergProperties {
type Split = IcebergSplit;
type SplitEnumerator = IcebergSplitEnumerator;
Expand All @@ -65,19 +86,23 @@ impl UnknownFields for IcebergProperties {
}

#[derive(Debug, Clone, Eq, PartialEq, Hash, Serialize, Deserialize)]
pub struct IcebergSplit {}
pub struct IcebergSplit {
pub split_id: i64,
pub snapshot_id: i64,
pub files: Vec<String>,
}

impl SplitMetaData for IcebergSplit {
fn id(&self) -> SplitId {
unimplemented!()
self.split_id.to_string().into()
}

fn restore_from_json(_value: JsonbVal) -> ConnectorResult<Self> {
unimplemented!()
fn restore_from_json(value: JsonbVal) -> ConnectorResult<Self> {
serde_json::from_value(value.take()).map_err(|e| anyhow!(e).into())
}

fn encode_to_json(&self) -> JsonbVal {
unimplemented!()
serde_json::to_value(self.clone()).unwrap().into()
}

fn update_with_offset(&mut self, _start_offset: String) -> ConnectorResult<()> {
Expand All @@ -86,25 +111,75 @@ impl SplitMetaData for IcebergSplit {
}

#[derive(Debug, Clone)]
pub struct IcebergSplitEnumerator {}
pub struct IcebergSplitEnumerator {
config: IcebergConfig,
}

#[async_trait]
impl SplitEnumerator for IcebergSplitEnumerator {
type Properties = IcebergProperties;
type Split = IcebergSplit;

async fn new(
_properties: Self::Properties,
properties: Self::Properties,
_context: SourceEnumeratorContextRef,
) -> ConnectorResult<Self> {
Ok(Self {})
let iceberg_config = properties.to_iceberg_config();
Ok(Self {
config: iceberg_config,
})
}

async fn list_splits(&mut self) -> ConnectorResult<Vec<Self::Split>> {
// Iceberg source does not support streaming queries
Ok(vec![])
}
}

impl IcebergSplitEnumerator {
pub async fn list_splits_batch(
&self,
batch_parallelism: usize,
) -> ConnectorResult<Vec<IcebergSplit>> {
if batch_parallelism == 0 {
bail!("Batch parallelism is 0. Cannot split the iceberg files.");
}
let table = self.config.load_table().await?;
let snapshot_id = table.current_table_metadata().current_snapshot_id.unwrap();
let mut files = vec![];
for file in table.current_data_files().await? {
if file.content != DataContentType::Data {
bail!("Reading iceberg table with delete files is unsupported. Please try to compact the table first.");
}
files.push(file.file_path);
}
let split_num = batch_parallelism;
// evenly split the files into splits based on the parallelism.
let split_size = files.len() / split_num;
let remaining = files.len() % split_num;
let mut splits = vec![];
for i in 0..split_num {
let start = i * split_size;
let end = (i + 1) * split_size;
let split = IcebergSplit {
split_id: i as i64,
snapshot_id,
files: files[start..end].to_vec(),
};
splits.push(split);
}
for i in 0..remaining {
splits[i]
.files
.push(files[split_num * split_size + i].clone());
}
Ok(splits
.into_iter()
.filter(|split| !split.files.is_empty())
.collect_vec())
}
}

#[derive(Debug)]
pub struct IcebergFileReader {}

Expand Down
3 changes: 2 additions & 1 deletion src/frontend/planner_test/tests/testdata/output/explain.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@
},
"parent_edges": {
"0": []
}
},
"batch_parallelism": 0
}
- sql: |
create table t1(v1 int);
Expand Down
13 changes: 1 addition & 12 deletions src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ use risingwave_connector::parser::{
use risingwave_connector::schema::schema_registry::{
name_strategy_from_str, SchemaRegistryAuth, SCHEMA_REGISTRY_PASSWORD, SCHEMA_REGISTRY_USERNAME,
};
use risingwave_connector::sink::iceberg::IcebergConfig;
use risingwave_connector::source::cdc::external::CdcTableType;
use risingwave_connector::source::cdc::{
CDC_SHARING_MODE_KEY, CDC_SNAPSHOT_BACKFILL, CDC_SNAPSHOT_MODE_KEY, CDC_TRANSACTIONAL_KEY,
Expand Down Expand Up @@ -1154,17 +1153,7 @@ pub async fn check_iceberg_source(
)));
};

let iceberg_config = IcebergConfig {
database_name: Some(properties.database_name),
table_name: properties.table_name,
catalog_type: Some(properties.catalog_type),
path: properties.warehouse_path,
endpoint: Some(properties.endpoint),
access_key: properties.s3_access,
secret_key: properties.s3_secret,
region: Some(properties.region_name),
..Default::default()
};
let iceberg_config = properties.to_iceberg_config();

let schema = Schema {
fields: columns
Expand Down
Loading

0 comments on commit 0b9d5af

Please sign in to comment.