From da2ad1cefac1b626c2d81ab2e222b1ba5b550702 Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Fri, 22 Mar 2024 18:04:21 +0800 Subject: [PATCH 1/4] support time travel --- Cargo.lock | 1 + src/connector/src/source/iceberg/mod.rs | 56 ++++++++++++++++++- src/frontend/Cargo.toml | 1 + .../src/optimizer/plan_node/batch_source.rs | 10 +++- .../src/optimizer/plan_node/generic/source.rs | 11 ++++ .../src/optimizer/plan_node/logical_source.rs | 15 ++++- src/frontend/src/planner/relation.rs | 7 +-- src/frontend/src/scheduler/plan_fragmenter.rs | 29 +++++++++- 8 files changed, 118 insertions(+), 12 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0c000d59950b8..d2fb1c123a3e7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9657,6 +9657,7 @@ dependencies = [ "serde_json", "sha2", "smallvec", + "speedate", "tempfile", "thiserror", "thiserror-ext", diff --git a/src/connector/src/source/iceberg/mod.rs b/src/connector/src/source/iceberg/mod.rs index 8b7f0e696e95d..2d39da56a37d9 100644 --- a/src/connector/src/source/iceberg/mod.rs +++ b/src/connector/src/source/iceberg/mod.rs @@ -157,16 +157,70 @@ impl SplitEnumerator for IcebergSplitEnumerator { } } +pub enum IcebergTimeTravelInfo { + Version(i64), + TimeStampMs(i64), +} + impl IcebergSplitEnumerator { pub async fn list_splits_batch( &self, + time_traval_info: Option, batch_parallelism: usize, ) -> ConnectorResult> { if batch_parallelism == 0 { bail!("Batch parallelism is 0. Cannot split the iceberg files."); } let table = self.config.load_table().await?; - let snapshot_id = table.current_table_metadata().current_snapshot_id.unwrap(); + let snapshot_id = match time_traval_info { + Some(IcebergTimeTravelInfo::Version(version)) => { + match &table.current_table_metadata().snapshots { + Some(snapshots) => { + let snapshot = snapshots + .iter() + .find(|snapshot| snapshot.snapshot_id == version); + match snapshot { + Some(snapshot) => snapshot.snapshot_id, + None => { + bail!("Cannot find the snapshot id in the iceberg table."); + } + } + } + None => { + bail!("Cannot find the snapshots in the iceberg table."); + } + } + } + Some(IcebergTimeTravelInfo::TimeStampMs(timestamp)) => { + match &table.current_table_metadata().snapshots { + Some(snapshots) => { + let snapshot = snapshots + .iter() + .filter(|snapshot| snapshot.timestamp_ms <= timestamp) + .max_by_key(|snapshot| snapshot.timestamp_ms); + match snapshot { + Some(snapshot) => snapshot.snapshot_id, + None => { + // convert unix time to human readable time + let time = chrono::NaiveDateTime::from_timestamp_millis(timestamp); + if time.is_some() { + bail!("Cannot find a snapshot older than {}", time.unwrap()); + } else { + bail!("Cannot find a snapshot"); + } + } + } + } + None => { + bail!("Cannot find the snapshots in the iceberg table."); + } + } + } + None => match table.current_table_metadata().current_snapshot_id { + Some(snapshot_id) => snapshot_id, + None => bail!("Cannot find the current snapshot id in the iceberg table."), + }, + }; let mut files = vec![]; for file in table.current_data_files().await? { if file.content != DataContentType::Data { diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml index 3ab30f4f397f2..387b370a6401a 100644 --- a/src/frontend/Cargo.toml +++ b/src/frontend/Cargo.toml @@ -78,6 +78,7 @@ serde = { version = "1", features = ["derive"] } serde_json = "1" sha2 = "0.10.7" smallvec = { version = "1.13.1", features = ["serde"] } +speedate = "0.13.0" tempfile = "3" thiserror = "1" thiserror-ext = { workspace = true } diff --git a/src/frontend/src/optimizer/plan_node/batch_source.rs b/src/frontend/src/optimizer/plan_node/batch_source.rs index 73547957c57a6..a0ece261dcea0 100644 --- a/src/frontend/src/optimizer/plan_node/batch_source.rs +++ b/src/frontend/src/optimizer/plan_node/batch_source.rs @@ -17,6 +17,7 @@ use std::rc::Rc; use pretty_xmlish::{Pretty, XmlNode}; use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::SourceNode; +use risingwave_sqlparser::ast::AsOf; use super::batch::prelude::*; use super::utils::{childless_record, column_names_pretty, Distill}; @@ -59,6 +60,10 @@ impl BatchSource { self.core.kafka_timestamp_range_value() } + pub fn as_of(&self) -> Option { + self.core.as_of.clone() + } + pub fn clone_with_dist(&self) -> Self { let base = self .base @@ -75,11 +80,14 @@ impl_plan_tree_node_for_leaf! { BatchSource } impl Distill for BatchSource { fn distill<'a>(&self) -> XmlNode<'a> { let src = Pretty::from(self.source_catalog().unwrap().name.clone()); - let fields = vec![ + let mut fields = vec![ ("source", src), ("columns", column_names_pretty(self.schema())), ("filter", Pretty::debug(&self.kafka_timestamp_range_value())), ]; + if let Some(as_of) = &self.core.as_of { + fields.push(("as_of", Pretty::debug(as_of))); + } childless_record("BatchSource", fields) } } diff --git a/src/frontend/src/optimizer/plan_node/generic/source.rs b/src/frontend/src/optimizer/plan_node/generic/source.rs index fa347375bf2bf..8d2a3dc8cec0f 100644 --- a/src/frontend/src/optimizer/plan_node/generic/source.rs +++ b/src/frontend/src/optimizer/plan_node/generic/source.rs @@ -107,6 +107,17 @@ impl Source { .is_some_and(|catalog| catalog.with_properties.is_new_fs_connector()) } + pub fn is_iceberg_connector(&self) -> bool { + self.catalog + .as_ref() + .is_some_and(|catalog| catalog.with_properties.is_iceberg_connector()) + } + + /// Currently, only iceberg source supports time travel. + pub fn support_time_travel(&self) -> bool { + self.is_iceberg_connector() + } + /// The columns in stream/batch source node indicate the actual columns it will produce, /// instead of the columns defined in source catalog. The difference is generated columns. pub fn exclude_generated_columns(mut self) -> (Self, Option) { diff --git a/src/frontend/src/optimizer/plan_node/logical_source.rs b/src/frontend/src/optimizer/plan_node/logical_source.rs index 54d3680778817..e61ab328ef037 100644 --- a/src/frontend/src/optimizer/plan_node/logical_source.rs +++ b/src/frontend/src/optimizer/plan_node/logical_source.rs @@ -19,6 +19,7 @@ use std::rc::Rc; use fixedbitset::FixedBitSet; use pretty_xmlish::{Pretty, XmlNode}; +use risingwave_common::bail; use risingwave_common::catalog::{ ColumnCatalog, ColumnDesc, Field, Schema, KAFKA_TIMESTAMP_COLUMN_NAME, }; @@ -85,6 +86,12 @@ impl LogicalSource { as_of, }; + if core.as_of.is_some() { + if !core.support_time_travel() { + bail!("Time travel is not supported for the source") + } + } + let base = PlanBase::new_logical_with_core(&core); let output_exprs = Self::derive_output_exprs_from_generated_columns(&core.column_catalog)?; @@ -262,11 +269,15 @@ impl Distill for LogicalSource { let fields = if let Some(catalog) = self.source_catalog() { let src = Pretty::from(catalog.name.clone()); let time = Pretty::debug(&self.core.kafka_timestamp_range); - vec![ + let mut fields = vec![ ("source", src), ("columns", column_names_pretty(self.schema())), ("time_range", time), - ] + ]; + if let Some(as_of) = &self.core.as_of { + fields.push(("as_of", Pretty::debug(as_of))); + } + fields } else { vec![] }; diff --git a/src/frontend/src/planner/relation.rs b/src/frontend/src/planner/relation.rs index 16440e97c15bb..5d18e8a9ab427 100644 --- a/src/frontend/src/planner/relation.rs +++ b/src/frontend/src/planner/relation.rs @@ -104,14 +104,11 @@ impl Planner { } else { let as_of = source.as_of.clone(); match as_of { - None => {} + None | Some(AsOf::VersionNum(_)) | Some(AsOf::TimestampString(_)) | Some(AsOf::TimestampNum(_)) => {} Some(AsOf::ProcessTime) => { bail_not_implemented!("As Of ProcessTime() is not supported yet.") } - Some(AsOf::TimestampString(_)) | Some(AsOf::TimestampNum(_)) => { - bail_not_implemented!("As Of Timestamp is not supported yet.") - } - Some(AsOf::VersionNum(_)) | Some(AsOf::VersionString(_)) => { + Some(AsOf::VersionString(_)) => { bail_not_implemented!("As Of Version is not supported yet.") } } diff --git a/src/frontend/src/scheduler/plan_fragmenter.rs b/src/frontend/src/scheduler/plan_fragmenter.rs index c68cd02c2eeeb..f49a188b91f6b 100644 --- a/src/frontend/src/scheduler/plan_fragmenter.rs +++ b/src/frontend/src/scheduler/plan_fragmenter.rs @@ -24,6 +24,7 @@ use enum_as_inner::EnumAsInner; use futures::TryStreamExt; use itertools::Itertools; use pgwire::pg_server::SessionId; +use risingwave_common::bail; use risingwave_common::buffer::{Bitmap, BitmapBuilder}; use risingwave_common::catalog::TableDesc; use risingwave_common::hash::table_distribution::TableDistribution; @@ -31,7 +32,7 @@ use risingwave_common::hash::{ParallelUnitId, ParallelUnitMapping, VirtualNode}; use risingwave_common::util::scan_range::ScanRange; use risingwave_connector::source::filesystem::opendal_source::opendal_enumerator::OpendalEnumerator; use risingwave_connector::source::filesystem::opendal_source::{OpendalGcs, OpendalS3}; -use risingwave_connector::source::iceberg::IcebergSplitEnumerator; +use risingwave_connector::source::iceberg::{IcebergSplitEnumerator, IcebergTimeTravelInfo}; use risingwave_connector::source::kafka::KafkaSplitEnumerator; use risingwave_connector::source::reader::reader::build_opendal_fs_list_for_batch; use risingwave_connector::source::{ @@ -41,6 +42,7 @@ use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::{ExchangeInfo, ScanRange as ScanRangeProto}; use risingwave_pb::common::Buffer; use risingwave_pb::plan_common::Field as FieldPb; +use risingwave_sqlparser::ast::AsOf; use serde::ser::SerializeStruct; use serde::Serialize; use uuid::Uuid; @@ -50,7 +52,7 @@ use crate::catalog::catalog_service::CatalogReader; use crate::catalog::TableId; use crate::error::RwError; use crate::optimizer::plan_node::generic::{GenericPlanRef, PhysicalPlanRef}; -use crate::optimizer::plan_node::{PlanNodeId, PlanNodeType}; +use crate::optimizer::plan_node::{BatchSource, PlanNodeId, PlanNodeType}; use crate::optimizer::property::Distribution; use crate::optimizer::PlanRef; use crate::scheduler::worker_node_manager::WorkerNodeSelector; @@ -266,6 +268,7 @@ impl Query { pub struct SourceFetchInfo { pub connector: ConnectorProperties, pub timebound: (Option, Option), + pub as_of: Option, } #[derive(Clone, Debug)] @@ -328,8 +331,25 @@ impl SourceScanInfo { IcebergSplitEnumerator::new(*prop, SourceEnumeratorContext::default().into()) .await?; + let time_travel_info = match fetch_info.as_of { + Some(AsOf::VersionNum(v)) => Some(IcebergTimeTravelInfo::Version(v)), + Some(AsOf::TimestampNum(ts)) => { + Some(IcebergTimeTravelInfo::TimeStampMs(ts * 1000)) + } + Some(AsOf::VersionString(_)) => { + bail!("Unsupported version string in iceberg time travel") + } + Some(AsOf::TimestampString(ts)) => Some( + speedate::DateTime::parse_str_rfc3339(&ts) + .map(|t| IcebergTimeTravelInfo::TimeStampMs(t.timestamp_tz() * 1000)) + .map_err(|_e| anyhow!("fail to parse timestamp"))?, + ), + Some(AsOf::ProcessTime) => unreachable!(), + None => None, + }; + let split_info = iceberg_enumerator - .list_splits_batch(batch_parallelism) + .list_splits_batch(time_travel_info, batch_parallelism) .await? .into_iter() .map(SplitImpl::Iceberg) @@ -979,6 +999,7 @@ impl BatchPlanFragmenter { } if let Some(source_node) = node.as_batch_source() { + let source_node: &BatchSource = source_node; let source_catalog = source_node.source_catalog(); if let Some(source_catalog) = source_catalog { let property = ConnectorProperties::extract( @@ -986,9 +1007,11 @@ impl BatchPlanFragmenter { false, )?; let timestamp_bound = source_node.kafka_timestamp_range_value(); + let as_of = source_node.as_of(); return Ok(Some(SourceScanInfo::new(SourceFetchInfo { connector: property, timebound: timestamp_bound, + as_of, }))); } } From 6373c201f844c2b4c624fb1c60cd3bf25b637bd7 Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Fri, 22 Mar 2024 18:39:22 +0800 Subject: [PATCH 2/4] add test --- integration_tests/iceberg-source/python/main.py | 4 +++- src/frontend/src/optimizer/plan_node/logical_source.rs | 6 ++---- src/frontend/src/planner/relation.rs | 5 ++++- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/integration_tests/iceberg-source/python/main.py b/integration_tests/iceberg-source/python/main.py index ebfd6a6c468f5..bcc44bc77e0cf 100644 --- a/integration_tests/iceberg-source/python/main.py +++ b/integration_tests/iceberg-source/python/main.py @@ -88,7 +88,9 @@ def check_risingwave_iceberg_source(docker): config = read_config(f"{docker.case_dir()}/config.ini") sqls = [ - "select count(*) from iceberg_source" + "select count(*) from iceberg_source", + "select count(*) from iceberg_source for system_time as of '2100-01-01 00:00:00+00:00'", + "select count(*) from iceberg_source for system_time as of 4102444800" ] rw_config = config['risingwave'] diff --git a/src/frontend/src/optimizer/plan_node/logical_source.rs b/src/frontend/src/optimizer/plan_node/logical_source.rs index e61ab328ef037..83fa891cb39a0 100644 --- a/src/frontend/src/optimizer/plan_node/logical_source.rs +++ b/src/frontend/src/optimizer/plan_node/logical_source.rs @@ -86,10 +86,8 @@ impl LogicalSource { as_of, }; - if core.as_of.is_some() { - if !core.support_time_travel() { - bail!("Time travel is not supported for the source") - } + if core.as_of.is_some() && !core.support_time_travel() { + bail!("Time travel is not supported for the source") } let base = PlanBase::new_logical_with_core(&core); diff --git a/src/frontend/src/planner/relation.rs b/src/frontend/src/planner/relation.rs index 5d18e8a9ab427..74060b484e9c0 100644 --- a/src/frontend/src/planner/relation.rs +++ b/src/frontend/src/planner/relation.rs @@ -104,7 +104,10 @@ impl Planner { } else { let as_of = source.as_of.clone(); match as_of { - None | Some(AsOf::VersionNum(_)) | Some(AsOf::TimestampString(_)) | Some(AsOf::TimestampNum(_)) => {} + None + | Some(AsOf::VersionNum(_)) + | Some(AsOf::TimestampString(_)) + | Some(AsOf::TimestampNum(_)) => {} Some(AsOf::ProcessTime) => { bail_not_implemented!("As Of ProcessTime() is not supported yet.") } From c40a1e6344ada128ca587c5629752292d066be63 Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Mon, 25 Mar 2024 11:13:03 +0800 Subject: [PATCH 3/4] refactor --- src/connector/src/source/iceberg/mod.rs | 20 ++++---------------- 1 file changed, 4 insertions(+), 16 deletions(-) diff --git a/src/connector/src/source/iceberg/mod.rs b/src/connector/src/source/iceberg/mod.rs index 2d39da56a37d9..4889933512b49 100644 --- a/src/connector/src/source/iceberg/mod.rs +++ b/src/connector/src/source/iceberg/mod.rs @@ -174,22 +174,10 @@ impl IcebergSplitEnumerator { let table = self.config.load_table().await?; let snapshot_id = match time_traval_info { Some(IcebergTimeTravelInfo::Version(version)) => { - match &table.current_table_metadata().snapshots { - Some(snapshots) => { - let snapshot = snapshots - .iter() - .find(|snapshot| snapshot.snapshot_id == version); - match snapshot { - Some(snapshot) => snapshot.snapshot_id, - None => { - bail!("Cannot find the snapshot id in the iceberg table."); - } - } - } - None => { - bail!("Cannot find the snapshots in the iceberg table."); - } - } + let Some(snapshot) = table.current_table_metadata().snapshot(version) else { + bail!("Cannot find the snapshot id in the iceberg table."); + }; + snapshot.snapshot_id } Some(IcebergTimeTravelInfo::TimeStampMs(timestamp)) => { match &table.current_table_metadata().snapshots { From acf187717b552a7db2d6fbbb5d3ce45614fb0a06 Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Tue, 26 Mar 2024 16:04:25 +0800 Subject: [PATCH 4/4] refactor --- src/connector/src/source/iceberg/mod.rs | 4 ++-- src/frontend/src/scheduler/plan_fragmenter.rs | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/connector/src/source/iceberg/mod.rs b/src/connector/src/source/iceberg/mod.rs index 4889933512b49..99f4259fb3607 100644 --- a/src/connector/src/source/iceberg/mod.rs +++ b/src/connector/src/source/iceberg/mod.rs @@ -159,7 +159,7 @@ impl SplitEnumerator for IcebergSplitEnumerator { pub enum IcebergTimeTravelInfo { Version(i64), - TimeStampMs(i64), + TimestampMs(i64), } impl IcebergSplitEnumerator { @@ -179,7 +179,7 @@ impl IcebergSplitEnumerator { }; snapshot.snapshot_id } - Some(IcebergTimeTravelInfo::TimeStampMs(timestamp)) => { + Some(IcebergTimeTravelInfo::TimestampMs(timestamp)) => { match &table.current_table_metadata().snapshots { Some(snapshots) => { let snapshot = snapshots diff --git a/src/frontend/src/scheduler/plan_fragmenter.rs b/src/frontend/src/scheduler/plan_fragmenter.rs index f49a188b91f6b..16ccbb8164bf4 100644 --- a/src/frontend/src/scheduler/plan_fragmenter.rs +++ b/src/frontend/src/scheduler/plan_fragmenter.rs @@ -334,14 +334,14 @@ impl SourceScanInfo { let time_travel_info = match fetch_info.as_of { Some(AsOf::VersionNum(v)) => Some(IcebergTimeTravelInfo::Version(v)), Some(AsOf::TimestampNum(ts)) => { - Some(IcebergTimeTravelInfo::TimeStampMs(ts * 1000)) + Some(IcebergTimeTravelInfo::TimestampMs(ts * 1000)) } Some(AsOf::VersionString(_)) => { bail!("Unsupported version string in iceberg time travel") } Some(AsOf::TimestampString(ts)) => Some( speedate::DateTime::parse_str_rfc3339(&ts) - .map(|t| IcebergTimeTravelInfo::TimeStampMs(t.timestamp_tz() * 1000)) + .map(|t| IcebergTimeTravelInfo::TimestampMs(t.timestamp_tz() * 1000)) .map_err(|_e| anyhow!("fail to parse timestamp"))?, ), Some(AsOf::ProcessTime) => unreachable!(),