Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(batch): support time travel for iceberg source #15866

Merged
merged 4 commits into from
Mar 26, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion integration_tests/iceberg-source/python/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,9 @@ def check_risingwave_iceberg_source(docker):
config = read_config(f"{docker.case_dir()}/config.ini")

sqls = [
"select count(*) from iceberg_source"
"select count(*) from iceberg_source",
"select count(*) from iceberg_source for system_time as of '2100-01-01 00:00:00+00:00'",
"select count(*) from iceberg_source for system_time as of 4102444800"
]

rw_config = config['risingwave']
Expand Down
44 changes: 43 additions & 1 deletion src/connector/src/source/iceberg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,16 +157,58 @@ impl SplitEnumerator for IcebergSplitEnumerator {
}
}

pub enum IcebergTimeTravelInfo {
Version(i64),
TimeStampMs(i64),
chenzl25 marked this conversation as resolved.
Show resolved Hide resolved
}

impl IcebergSplitEnumerator {
pub async fn list_splits_batch(
&self,
time_traval_info: Option<IcebergTimeTravelInfo>,
batch_parallelism: usize,
) -> ConnectorResult<Vec<IcebergSplit>> {
if batch_parallelism == 0 {
bail!("Batch parallelism is 0. Cannot split the iceberg files.");
}
let table = self.config.load_table().await?;
let snapshot_id = table.current_table_metadata().current_snapshot_id.unwrap();
let snapshot_id = match time_traval_info {
Some(IcebergTimeTravelInfo::Version(version)) => {
let Some(snapshot) = table.current_table_metadata().snapshot(version) else {
bail!("Cannot find the snapshot id in the iceberg table.");
};
snapshot.snapshot_id
}
Some(IcebergTimeTravelInfo::TimeStampMs(timestamp)) => {
match &table.current_table_metadata().snapshots {
Some(snapshots) => {
let snapshot = snapshots
.iter()
.filter(|snapshot| snapshot.timestamp_ms <= timestamp)
.max_by_key(|snapshot| snapshot.timestamp_ms);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note down an idea during offline discussion with @chenzl25:

We might offer an option to read the latest compacted (i.e. doesn't contain delete files) snapshot. This is because it seems difficult for users to figure out the correct timestamp to read.

match snapshot {
Some(snapshot) => snapshot.snapshot_id,
None => {
// convert unix time to human readable time
let time = chrono::NaiveDateTime::from_timestamp_millis(timestamp);
if time.is_some() {
bail!("Cannot find a snapshot older than {}", time.unwrap());
} else {
bail!("Cannot find a snapshot");
}
}
}
}
None => {
bail!("Cannot find the snapshots in the iceberg table.");
}
}
}
None => match table.current_table_metadata().current_snapshot_id {
Some(snapshot_id) => snapshot_id,
None => bail!("Cannot find the current snapshot id in the iceberg table."),
},
};
let mut files = vec![];
for file in table.current_data_files().await? {
if file.content != DataContentType::Data {
Expand Down
1 change: 1 addition & 0 deletions src/frontend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ serde = { version = "1", features = ["derive"] }
serde_json = "1"
sha2 = "0.10.7"
smallvec = { version = "1.13.1", features = ["serde"] }
speedate = "0.13.0"
tempfile = "3"
thiserror = "1"
thiserror-ext = { workspace = true }
Expand Down
10 changes: 9 additions & 1 deletion src/frontend/src/optimizer/plan_node/batch_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::rc::Rc;
use pretty_xmlish::{Pretty, XmlNode};
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::SourceNode;
use risingwave_sqlparser::ast::AsOf;

use super::batch::prelude::*;
use super::utils::{childless_record, column_names_pretty, Distill};
Expand Down Expand Up @@ -59,6 +60,10 @@ impl BatchSource {
self.core.kafka_timestamp_range_value()
}

pub fn as_of(&self) -> Option<AsOf> {
self.core.as_of.clone()
}

pub fn clone_with_dist(&self) -> Self {
let base = self
.base
Expand All @@ -75,11 +80,14 @@ impl_plan_tree_node_for_leaf! { BatchSource }
impl Distill for BatchSource {
fn distill<'a>(&self) -> XmlNode<'a> {
let src = Pretty::from(self.source_catalog().unwrap().name.clone());
let fields = vec![
let mut fields = vec![
("source", src),
("columns", column_names_pretty(self.schema())),
("filter", Pretty::debug(&self.kafka_timestamp_range_value())),
];
if let Some(as_of) = &self.core.as_of {
fields.push(("as_of", Pretty::debug(as_of)));
}
childless_record("BatchSource", fields)
}
}
Expand Down
11 changes: 11 additions & 0 deletions src/frontend/src/optimizer/plan_node/generic/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,17 @@ impl Source {
.is_some_and(|catalog| catalog.with_properties.is_new_fs_connector())
}

pub fn is_iceberg_connector(&self) -> bool {
self.catalog
.as_ref()
.is_some_and(|catalog| catalog.with_properties.is_iceberg_connector())
}

/// Currently, only iceberg source supports time travel.
pub fn support_time_travel(&self) -> bool {
self.is_iceberg_connector()
}

/// The columns in stream/batch source node indicate the actual columns it will produce,
/// instead of the columns defined in source catalog. The difference is generated columns.
pub fn exclude_generated_columns(mut self) -> (Self, Option<usize>) {
Expand Down
13 changes: 11 additions & 2 deletions src/frontend/src/optimizer/plan_node/logical_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::rc::Rc;

use fixedbitset::FixedBitSet;
use pretty_xmlish::{Pretty, XmlNode};
use risingwave_common::bail;
use risingwave_common::catalog::{
ColumnCatalog, ColumnDesc, Field, Schema, KAFKA_TIMESTAMP_COLUMN_NAME,
};
Expand Down Expand Up @@ -85,6 +86,10 @@ impl LogicalSource {
as_of,
};

if core.as_of.is_some() && !core.support_time_travel() {
bail!("Time travel is not supported for the source")
}

let base = PlanBase::new_logical_with_core(&core);

let output_exprs = Self::derive_output_exprs_from_generated_columns(&core.column_catalog)?;
Expand Down Expand Up @@ -262,11 +267,15 @@ impl Distill for LogicalSource {
let fields = if let Some(catalog) = self.source_catalog() {
let src = Pretty::from(catalog.name.clone());
let time = Pretty::debug(&self.core.kafka_timestamp_range);
vec![
let mut fields = vec![
("source", src),
("columns", column_names_pretty(self.schema())),
("time_range", time),
]
];
if let Some(as_of) = &self.core.as_of {
fields.push(("as_of", Pretty::debug(as_of)));
}
fields
} else {
vec![]
};
Expand Down
10 changes: 5 additions & 5 deletions src/frontend/src/planner/relation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,14 +104,14 @@ impl Planner {
} else {
let as_of = source.as_of.clone();
match as_of {
None => {}
None
| Some(AsOf::VersionNum(_))
| Some(AsOf::TimestampString(_))
| Some(AsOf::TimestampNum(_)) => {}
Some(AsOf::ProcessTime) => {
bail_not_implemented!("As Of ProcessTime() is not supported yet.")
}
Some(AsOf::TimestampString(_)) | Some(AsOf::TimestampNum(_)) => {
bail_not_implemented!("As Of Timestamp is not supported yet.")
}
Some(AsOf::VersionNum(_)) | Some(AsOf::VersionString(_)) => {
Some(AsOf::VersionString(_)) => {
bail_not_implemented!("As Of Version is not supported yet.")
}
}
Expand Down
29 changes: 26 additions & 3 deletions src/frontend/src/scheduler/plan_fragmenter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,15 @@ use enum_as_inner::EnumAsInner;
use futures::TryStreamExt;
use itertools::Itertools;
use pgwire::pg_server::SessionId;
use risingwave_common::bail;
use risingwave_common::buffer::{Bitmap, BitmapBuilder};
use risingwave_common::catalog::TableDesc;
use risingwave_common::hash::table_distribution::TableDistribution;
use risingwave_common::hash::{ParallelUnitId, ParallelUnitMapping, VirtualNode};
use risingwave_common::util::scan_range::ScanRange;
use risingwave_connector::source::filesystem::opendal_source::opendal_enumerator::OpendalEnumerator;
use risingwave_connector::source::filesystem::opendal_source::{OpendalGcs, OpendalS3};
use risingwave_connector::source::iceberg::IcebergSplitEnumerator;
use risingwave_connector::source::iceberg::{IcebergSplitEnumerator, IcebergTimeTravelInfo};
use risingwave_connector::source::kafka::KafkaSplitEnumerator;
use risingwave_connector::source::reader::reader::build_opendal_fs_list_for_batch;
use risingwave_connector::source::{
Expand All @@ -41,6 +42,7 @@ use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::{ExchangeInfo, ScanRange as ScanRangeProto};
use risingwave_pb::common::Buffer;
use risingwave_pb::plan_common::Field as FieldPb;
use risingwave_sqlparser::ast::AsOf;
use serde::ser::SerializeStruct;
use serde::Serialize;
use uuid::Uuid;
Expand All @@ -50,7 +52,7 @@ use crate::catalog::catalog_service::CatalogReader;
use crate::catalog::TableId;
use crate::error::RwError;
use crate::optimizer::plan_node::generic::{GenericPlanRef, PhysicalPlanRef};
use crate::optimizer::plan_node::{PlanNodeId, PlanNodeType};
use crate::optimizer::plan_node::{BatchSource, PlanNodeId, PlanNodeType};
use crate::optimizer::property::Distribution;
use crate::optimizer::PlanRef;
use crate::scheduler::worker_node_manager::WorkerNodeSelector;
Expand Down Expand Up @@ -266,6 +268,7 @@ impl Query {
pub struct SourceFetchInfo {
pub connector: ConnectorProperties,
pub timebound: (Option<i64>, Option<i64>),
pub as_of: Option<AsOf>,
}

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -328,8 +331,25 @@ impl SourceScanInfo {
IcebergSplitEnumerator::new(*prop, SourceEnumeratorContext::default().into())
.await?;

let time_travel_info = match fetch_info.as_of {
Some(AsOf::VersionNum(v)) => Some(IcebergTimeTravelInfo::Version(v)),
Some(AsOf::TimestampNum(ts)) => {
Some(IcebergTimeTravelInfo::TimeStampMs(ts * 1000))
}
Some(AsOf::VersionString(_)) => {
bail!("Unsupported version string in iceberg time travel")
}
Some(AsOf::TimestampString(ts)) => Some(
speedate::DateTime::parse_str_rfc3339(&ts)
.map(|t| IcebergTimeTravelInfo::TimeStampMs(t.timestamp_tz() * 1000))
.map_err(|_e| anyhow!("fail to parse timestamp"))?,
),
Some(AsOf::ProcessTime) => unreachable!(),
None => None,
};

let split_info = iceberg_enumerator
.list_splits_batch(batch_parallelism)
.list_splits_batch(time_travel_info, batch_parallelism)
.await?
.into_iter()
.map(SplitImpl::Iceberg)
Expand Down Expand Up @@ -979,16 +999,19 @@ impl BatchPlanFragmenter {
}

if let Some(source_node) = node.as_batch_source() {
let source_node: &BatchSource = source_node;
let source_catalog = source_node.source_catalog();
if let Some(source_catalog) = source_catalog {
let property = ConnectorProperties::extract(
source_catalog.with_properties.clone().into_iter().collect(),
false,
)?;
let timestamp_bound = source_node.kafka_timestamp_range_value();
let as_of = source_node.as_of();
return Ok(Some(SourceScanInfo::new(SourceFetchInfo {
connector: property,
timebound: timestamp_bound,
as_of,
})));
}
}
Expand Down
Loading