Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support reading partitioned Delta table. #14084

Merged
merged 21 commits into from
Dec 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

82 changes: 82 additions & 0 deletions src/query/expression/src/expression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,88 @@ impl<Index: ColumnIndex> Expr<Index> {
}
}

pub fn fill_const_column(&self, consts: &HashMap<Index, Scalar>) -> Expr<Index> {
sundy-li marked this conversation as resolved.
Show resolved Hide resolved
match self {
Expr::Constant {
span,
scalar,
data_type,
} => Expr::Constant {
span: *span,
scalar: scalar.clone(),
data_type: data_type.clone(),
},
Expr::ColumnRef {
span,
id,
data_type,
display_name,
} => {
if let Some(v) = consts.get(id) {
Expr::Constant {
span: *span,
scalar: v.clone(),
data_type: data_type.clone(),
}
} else {
Expr::ColumnRef {
span: *span,
id: id.clone(),
data_type: data_type.clone(),
display_name: display_name.clone(),
}
}
}
Expr::Cast {
span,
is_try,
expr,
dest_type,
} => Expr::Cast {
span: *span,
is_try: *is_try,
expr: Box::new(expr.fill_const_column(consts)),
dest_type: dest_type.clone(),
},
Expr::FunctionCall {
span,
id,
function,
generics,
args,
return_type,
} => Expr::FunctionCall {
span: *span,
id: id.clone(),
function: function.clone(),
generics: generics.clone(),
args: args
.iter()
.map(|expr| expr.fill_const_column(consts))
.collect(),
return_type: return_type.clone(),
},
Expr::LambdaFunctionCall {
span,
name,
args,
lambda_expr,
lambda_display,
return_type,
} => Expr::LambdaFunctionCall {
span: *span,
name: name.clone(),
args: args
.iter()
.map(|expr| expr.fill_const_column(consts))
.collect(),
lambda_expr: lambda_expr.clone(),
lambda_display: lambda_display.clone(),
return_type: return_type.clone(),
},
}
}

pub fn as_remote_expr(&self) -> RemoteExpr<Index> {
match self {
Expr::Constant {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,7 @@ impl CreateTableInterpreter {
storage_params: self.plan.storage_params.clone(),
part_prefix: self.plan.part_prefix.clone(),
options: self.plan.options.clone(),
engine_options: self.plan.engine_options.clone(),
default_cluster_key: None,
field_comments,
drop_on: None,
Expand Down
4 changes: 4 additions & 0 deletions src/query/service/src/test_kits/fixture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,7 @@ impl TestFixture {
table: self.default_table_name(),
schema: TestFixture::default_table_schema(),
engine: Engine::Fuse,
engine_options: Default::default(),
storage_params: None,
read_only_attach: false,
part_prefix: "".to_string(),
Expand All @@ -331,6 +332,7 @@ impl TestFixture {
table: self.default_table_name(),
schema: TestFixture::default_table_schema(),
engine: Engine::Fuse,
engine_options: Default::default(),
storage_params: None,
read_only_attach: false,
part_prefix: "".to_string(),
Expand Down Expand Up @@ -366,6 +368,7 @@ impl TestFixture {
table: self.default_table_name(),
schema: TestFixture::variant_table_schema(),
engine: Engine::Fuse,
engine_options: Default::default(),
storage_params: None,
read_only_attach: false,
part_prefix: "".to_string(),
Expand Down Expand Up @@ -410,6 +413,7 @@ impl TestFixture {
table: self.default_table_name(),
schema: TestFixture::computed_table_schema(),
engine: Engine::Fuse,
engine_options: Default::default(),
storage_params: None,
read_only_attach: false,
part_prefix: "".to_string(),
Expand Down
3 changes: 2 additions & 1 deletion src/query/service/tests/it/parquet_rs/prune_pages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,13 @@ async fn test(scenario: Scenario, predicate: &str, expected_selection: RowSelect
ParquetReadOptions::default()
.with_prune_row_groups(false)
.with_prune_pages(true),
vec![],
)
.unwrap();

let row_groups = (0..parquet_meta.num_row_groups()).collect::<Vec<_>>();
let selection = pruner
.prune_pages(parquet_meta, &row_groups)
.prune_pages(parquet_meta, &row_groups, None)
.unwrap()
.unwrap();

Expand Down
3 changes: 2 additions & 1 deletion src/query/service/tests/it/parquet_rs/prune_row_groups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,11 @@ async fn test_impl(scenario: Scenario, predicate: &str, expected_rgs: Vec<usize>
ParquetReadOptions::default()
.with_prune_row_groups(prune)
.with_prune_pages(false),
vec![],
)
.unwrap();

let (rgs, _) = pruner.prune_row_groups(&parquet_meta, None).unwrap();
let (rgs, _) = pruner.prune_row_groups(&parquet_meta, None, None).unwrap();

assert_eq!(
expected_rgs, rgs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ fn create_table_plan(fixture: &TestFixture, format: &str) -> CreateTablePlan {
TableField::new("c", TableDataType::Number(NumberDataType::Int32)),
]),
engine: Engine::Fuse,
engine_options: Default::default(),
storage_params: None,
read_only_attach: false,
part_prefix: "".to_string(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ async fn test_fuse_alter_table_cluster_key() -> databend_common_exception::Resul
table: fixture.default_table_name(),
schema: TestFixture::default_table_schema(),
engine: Engine::Fuse,
engine_options: Default::default(),
storage_params: None,
read_only_attach: false,
part_prefix: "".to_string(),
Expand Down
1 change: 1 addition & 0 deletions src/query/service/tests/it/storages/fuse/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ async fn test_block_pruner() -> Result<()> {
table: test_tbl_name.to_string(),
schema: test_schema.clone(),
engine: Engine::Fuse,
engine_options: Default::default(),
storage_params: None,
read_only_attach: false,
part_prefix: "".to_string(),
Expand Down
7 changes: 6 additions & 1 deletion src/query/sql/src/planner/binder/ddl/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ use databend_common_storages_view::view_table::QUERY;
use databend_common_storages_view::view_table::VIEW_ENGINE;
use databend_storages_common_table_meta::table::is_reserved_opt_key;
use databend_storages_common_table_meta::table::OPT_KEY_DATABASE_ID;
use databend_storages_common_table_meta::table::OPT_KEY_ENGINE_META;
use databend_storages_common_table_meta::table::OPT_KEY_STORAGE_FORMAT;
use databend_storages_common_table_meta::table::OPT_KEY_STORAGE_PREFIX;
use databend_storages_common_table_meta::table::OPT_KEY_TABLE_ATTACHED_DATA_URI;
Expand Down Expand Up @@ -412,6 +413,7 @@ impl Binder {
// Take FUSE engine AS default engine
let engine = engine.unwrap_or(Engine::Fuse);
let mut options: BTreeMap<String, String> = BTreeMap::new();
let mut engine_options: BTreeMap<String, String> = BTreeMap::new();
for table_option in table_options.iter() {
self.insert_table_option_with_validation(
&mut options,
Expand Down Expand Up @@ -519,11 +521,12 @@ impl Binder {
let sp =
get_storage_params_from_options(self.ctx.as_ref(), &options).await?;
let table = DeltaTable::load(&sp).await?;
let table_schema = DeltaTable::get_schema(&table).await?;
let (table_schema, meta) = DeltaTable::get_meta(&table).await?;
// the first version of current iceberg table do not need to persist the storage_params,
// since we get it from table options location and connection when load table each time.
// we do this in case we change this idea.
storage_params = Some(sp);
engine_options.insert(OPT_KEY_ENGINE_META.to_lowercase().to_string(), meta);
(Arc::new(table_schema), vec![])
}
_ => Err(ErrorCode::BadArguments(
Expand Down Expand Up @@ -620,6 +623,7 @@ impl Binder {
table,
schema: schema.clone(),
engine,
engine_options,
storage_params,
read_only_attach: false,
part_prefix,
Expand Down Expand Up @@ -699,6 +703,7 @@ impl Binder {
table,
schema: Arc::new(TableSchema::default()),
engine: Engine::Fuse,
engine_options: BTreeMap::new(),
storage_params: Some(sp),
read_only_attach: stmt.read_only,
part_prefix,
Expand Down
1 change: 1 addition & 0 deletions src/query/sql/src/planner/plans/ddl/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ pub struct CreateTablePlan {

pub schema: TableSchemaRef,
pub engine: Engine,
pub engine_options: TableOptions,
pub storage_params: Option<StorageParams>,
pub read_only_attach: bool,
pub part_prefix: String,
Expand Down
18 changes: 18 additions & 0 deletions src/query/storages/common/index/src/range_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;

use databend_common_exception::Result;
use databend_common_expression::is_internal_column;
use databend_common_expression::is_stream_column;
Expand Down Expand Up @@ -135,6 +137,22 @@ impl RangeIndex {
..
}))
}

#[minitrace::trace]
pub fn apply_with_partition_columns(
&self,
stats: &StatisticsOfColumns,
partition_columns: &HashMap<String, Scalar>,
) -> Result<bool> {
let expr = self.expr.fill_const_column(partition_columns);
RangeIndex {
expr,
func_ctx: self.func_ctx.clone(),
schema: self.schema.clone(),
default_stats: self.default_stats.clone(),
}
.apply(stats, |_| false)
}
}

pub fn statistics_to_domain(mut stats: Vec<&ColumnStatistics>, data_type: &DataType) -> Domain {
Expand Down
28 changes: 28 additions & 0 deletions src/query/storages/common/pruner/src/range_pruner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use databend_common_exception::Result;
use databend_common_expression::ColumnId;
use databend_common_expression::Expr;
use databend_common_expression::FunctionContext;
use databend_common_expression::Scalar;
use databend_common_expression::TableSchemaRef;
use databend_storages_common_index::RangeIndex;
use databend_storages_common_table_meta::meta::ColumnMeta;
Expand All @@ -32,6 +33,14 @@ pub trait RangePruner {
input: &StatisticsOfColumns,
metas: Option<&HashMap<ColumnId, ColumnMeta>>,
) -> bool;

fn should_keep_with_partition_columns(
&self,
_stats: &StatisticsOfColumns,
_partition_columns: Option<&HashMap<String, Scalar>>,
) -> bool {
true
}
}

struct KeepTrue;
Expand Down Expand Up @@ -79,6 +88,25 @@ impl RangePruner for RangeIndex {
}
}
}
fn should_keep_with_partition_columns(
&self,
stats: &StatisticsOfColumns,
partition_columns: Option<&HashMap<String, Scalar>>,
) -> bool {
match partition_columns {
None => self.should_keep(stats, None),
Some(partition_columns) => {
match self.apply_with_partition_columns(stats, partition_columns) {
Ok(r) => r,
Err(e) => {
// swallow exceptions intentionally, corrupted index should not prevent execution
warn!("failed to range filter, returning true. {}", e);
true
}
}
}
}
}
}

pub struct RangePrunerCreator;
Expand Down
5 changes: 5 additions & 0 deletions src/query/storages/common/table_meta/src/table/table_keys.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,12 @@ pub const OPT_KEY_TABLE_ATTACHED_DATA_URI: &str = "table_data_uri";
// Read only attached table options.
pub const OPT_KEY_TABLE_ATTACHED_READ_ONLY: &str = "read_only_attached";

// the following are used in for delta and iceberg engine
pub const OPT_KEY_LOCATION: &str = "location";
pub const OPT_KEY_CONNECTION_NAME: &str = "connection_name";
// TableMeta need to contain all info needed to create a Table, store them under this internal key as a JSON.
// e.g. the partition columns of a Delta table
pub const OPT_KEY_ENGINE_META: &str = "engine_meta";

/// Legacy table snapshot location key
///
Expand All @@ -57,6 +61,7 @@ pub static INTERNAL_TABLE_OPTION_KEYS: LazyLock<HashSet<&'static str>> = LazyLoc
let mut r = HashSet::new();
r.insert(OPT_KEY_LEGACY_SNAPSHOT_LOC);
r.insert(OPT_KEY_DATABASE_ID);
r.insert(OPT_KEY_ENGINE_META);
r
});

Expand Down
7 changes: 6 additions & 1 deletion src/query/storages/delta/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ publish = false
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
databend-common-arrow = { path = "../../../common/arrow" }
databend-common-base = { path = "../../../common/base" }
databend-common-catalog = { path = "../../catalog" }
databend-common-exception = { path = "../../../common/exception" }
Expand All @@ -18,6 +17,7 @@ databend-common-meta-app = { path = "../../../meta/app" }
databend-common-pipeline-core = { path = "../../pipeline/core" }
databend-common-storage = { path = "../../../common/storage" }
databend-common-storages-parquet = { path = "../parquet" }
databend-storages-common-table-meta = { path = "../common/table_meta" }

arrow-schema = { workspace = true }
async-backtrace = { workspace = true }
Expand All @@ -29,11 +29,16 @@ match-template = "0.0.1"
minitrace = { workspace = true }
object_store = "0.7"
opendal = { workspace = true }
ordered-float = { workspace = true }
parquet = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
tokio = { workspace = true }
typetag = "0.2"
url = "2.4.1"

[dev-dependencies]
maplit = "1.0.2"

[package.metadata.cargo-machete]
ignored = ["match-template"]
Loading
Loading