From d164a00c74fdc09d8a90afeb744107f6146ad875 Mon Sep 17 00:00:00 2001 From: sundyli <543950155@qq.com> Date: Mon, 26 Feb 2024 17:39:58 -0800 Subject: [PATCH] feat(query): new implementation of analyze table (#14725) * update * update * update * update * update * update * update * update * update tests * update tests * update --- Cargo.lock | 1 + src/query/ast/src/ast/format/ast_format.rs | 6 + src/query/ast/src/ast/format/syntax/query.rs | 8 + src/query/ast/src/ast/query.rs | 10 + .../ast/src/ast/statements/merge_into.rs | 1 + src/query/ast/src/parser/query.rs | 8 +- src/query/ast/src/parser/statement.rs | 2 + src/query/ast/src/parser/token.rs | 2 + src/query/ast/tests/it/testdata/query.txt | 65 +++ src/query/ast/tests/it/testdata/statement.txt | 80 +++ src/query/catalog/src/table.rs | 36 +- .../src/aggregates/aggregator_common.rs | 7 +- .../interpreters/interpreter_table_analyze.rs | 116 ++++- src/query/service/src/test_kits/fuse.rs | 8 +- .../sql/src/planner/binder/merge_into.rs | 1 + src/query/sql/src/planner/binder/table.rs | 16 +- src/query/sql/src/planner/dataframe.rs | 4 +- .../sql/src/planner/semantic/view_rewriter.rs | 2 + .../storages/common/table_meta/Cargo.toml | 1 + .../common/table_meta/src/meta/current/mod.rs | 4 +- .../common/table_meta/src/meta/v2/mod.rs | 3 + .../src/meta/v2/table_snapshot_statistics.rs | 67 +++ .../common/table_meta/src/meta/versions.rs | 6 + .../src/readers/versioned_reader.rs | 6 +- src/query/storages/fuse/src/fuse_table.rs | 58 ++- src/query/storages/fuse/src/io/locations.rs | 9 +- src/query/storages/fuse/src/io/snapshots.rs | 2 +- .../storages/fuse/src/io/write/meta_writer.rs | 11 - .../storages/fuse/src/operations/analyze.rs | 343 ++++++++----- src/query/storages/fuse/src/operations/mod.rs | 1 + .../storages/fuse/src/operations/navigate.rs | 22 + .../fuse/src/operations/read_partitions.rs | 88 +++- .../storages/fuse/src/operations/util.rs | 72 +++ .../fuse_statistics/fuse_statistic.rs | 2 +- src/query/storages/stream/src/stream_table.rs | 87 +--- src/tests/sqlsmith/src/sql_gen/dml.rs | 2 + src/tests/sqlsmith/src/sql_gen/query.rs | 1 + .../base/09_fuse_engine/09_0020_analyze.test | 6 +- .../suites/tpcds/tpcds_join_order.test | 455 +++++++++--------- tests/sqllogictests/suites/tpch/join.test | 13 +- tests/sqllogictests/suites/tpch/queries.test | 4 +- .../12_0004_time_travel_select_at.result | 8 +- .../12_0004_time_travel_select_at.sh | 10 +- ...vel_alter_add_drop_column_select_at.result | 12 +- ..._travel_alter_add_drop_column_select_at.sh | 12 +- 45 files changed, 1156 insertions(+), 522 deletions(-) create mode 100644 src/query/storages/common/table_meta/src/meta/v2/table_snapshot_statistics.rs diff --git a/Cargo.lock b/Cargo.lock index ad795fb59cb7..e9e8c4ec270c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4559,6 +4559,7 @@ dependencies = [ "rmp-serde", "serde", "serde_json", + "simple_hll", "snap", "typetag", "zstd", diff --git a/src/query/ast/src/ast/format/ast_format.rs b/src/query/ast/src/ast/format/ast_format.rs index 87cf36dfe327..94b49eb41f4f 100644 --- a/src/query/ast/src/ast/format/ast_format.rs +++ b/src/query/ast/src/ast/format/ast_format.rs @@ -2925,6 +2925,7 @@ impl<'ast> Visitor<'ast> for AstFormatVisitor { table, alias, travel_point, + since_point, pivot, unpivot, } => { @@ -2955,6 +2956,11 @@ impl<'ast> Visitor<'ast> for AstFormatVisitor { self.visit_time_travel_point(travel_point); children.push(self.children.pop().unwrap()); } + + if let Some(travel_point) = since_point { + self.visit_time_travel_point(travel_point); + children.push(self.children.pop().unwrap()); + } let format_ctx = if let Some(alias) = alias { AstFormatContext::with_children_alias( name, diff --git a/src/query/ast/src/ast/format/syntax/query.rs b/src/query/ast/src/ast/format/syntax/query.rs index d5c85e560890..9be982e60aad 100644 --- a/src/query/ast/src/ast/format/syntax/query.rs +++ b/src/query/ast/src/ast/format/syntax/query.rs @@ -320,6 +320,7 @@ pub(crate) fn pretty_table(table: TableReference) -> RcDoc<'static> { table, alias, travel_point, + since_point, pivot, unpivot, } => if let Some(catalog) = catalog { @@ -350,6 +351,13 @@ pub(crate) fn pretty_table(table: TableReference) -> RcDoc<'static> { } else { RcDoc::nil() }) + .append(if let Some(TimeTravelPoint::Snapshot(sid)) = since_point { + RcDoc::text(format!(" SINCE (SNAPSHOT => {sid})")) + } else if let Some(TimeTravelPoint::Timestamp(ts)) = since_point { + RcDoc::text(format!(" SINCE (TIMESTAMP => {ts})")) + } else { + RcDoc::nil() + }) .append(if let Some(alias) = alias { RcDoc::text(format!(" AS {alias}")) } else { diff --git a/src/query/ast/src/ast/query.rs b/src/query/ast/src/ast/query.rs index a2984ca59d96..0e28c3ea9ea7 100644 --- a/src/query/ast/src/ast/query.rs +++ b/src/query/ast/src/ast/query.rs @@ -270,6 +270,7 @@ pub enum TableReference { table: Identifier, alias: Option, travel_point: Option, + since_point: Option, pivot: Option>, unpivot: Option>, }, @@ -454,6 +455,7 @@ impl Display for TableReference { table, alias, travel_point, + since_point, pivot, unpivot, } => { @@ -470,6 +472,14 @@ impl Display for TableReference { write!(f, " AT (TIMESTAMP => {ts})")?; } + if let Some(TimeTravelPoint::Snapshot(sid)) = since_point { + write!(f, " SINCE (SNAPSHOT => {sid})")?; + } + + if let Some(TimeTravelPoint::Timestamp(ts)) = since_point { + write!(f, " SINCE (TIMESTAMP => {ts})")?; + } + if let Some(alias) = alias { write!(f, " AS {alias}")?; } diff --git a/src/query/ast/src/ast/statements/merge_into.rs b/src/query/ast/src/ast/statements/merge_into.rs index af452ffb4b28..da15b2eb0ef3 100644 --- a/src/query/ast/src/ast/statements/merge_into.rs +++ b/src/query/ast/src/ast/statements/merge_into.rs @@ -258,6 +258,7 @@ impl MergeSource { table: table.clone(), alias: alias.clone(), travel_point: None, + since_point: None, pivot: None, unpivot: None, }, diff --git a/src/query/ast/src/parser/query.rs b/src/query/ast/src/parser/query.rs index c951814e6ccc..12229a9a013f 100644 --- a/src/query/ast/src/parser/query.rs +++ b/src/query/ast/src/parser/query.rs @@ -627,6 +627,7 @@ pub enum TableReferenceElement { table: Identifier, alias: Option, travel_point: Option, + since_point: Option, pivot: Option>, unpivot: Option>, }, @@ -685,15 +686,16 @@ pub fn table_reference_element(i: Input) -> IResult>> PrattParser table, alias, travel_point, + since_point, pivot, unpivot, } => TableReference::Table { @@ -813,6 +816,7 @@ impl<'a, I: Iterator>> PrattParser table, alias, travel_point, + since_point, pivot, unpivot, }, diff --git a/src/query/ast/src/parser/statement.rs b/src/query/ast/src/parser/statement.rs index 7f045a56c58f..525ade5bc9ca 100644 --- a/src/query/ast/src/parser/statement.rs +++ b/src/query/ast/src/parser/statement.rs @@ -3471,6 +3471,7 @@ pub fn table_reference_with_alias(i: Input) -> IResult { columns: vec![], }), travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -3489,6 +3490,7 @@ pub fn table_reference_only(i: Input) -> IResult { table, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, diff --git a/src/query/ast/src/parser/token.rs b/src/query/ast/src/parser/token.rs index 363da2f58ff4..23cdf98b18a5 100644 --- a/src/query/ast/src/parser/token.rs +++ b/src/query/ast/src/parser/token.rs @@ -922,6 +922,8 @@ pub enum TokenKind { SHA256_PASSWORD, #[token("SHOW", ignore(ascii_case))] SHOW, + #[token("SINCE", ignore(ascii_case))] + SINCE, #[token("SIGNED", ignore(ascii_case))] SIGNED, #[token("SINGLE", ignore(ascii_case))] diff --git a/src/query/ast/tests/it/testdata/query.txt b/src/query/ast/tests/it/testdata/query.txt index 637cd4132567..5bfae46800db 100644 --- a/src/query/ast/tests/it/testdata/query.txt +++ b/src/query/ast/tests/it/testdata/query.txt @@ -146,6 +146,7 @@ Query { }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -164,6 +165,7 @@ Query { }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -348,6 +350,7 @@ Query { }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -418,6 +421,7 @@ Query { }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -436,6 +440,7 @@ Query { }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -508,6 +513,7 @@ Query { }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -526,6 +532,7 @@ Query { }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -637,6 +644,7 @@ Query { }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -655,6 +663,7 @@ Query { }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -766,6 +775,7 @@ Query { }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -784,6 +794,7 @@ Query { }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -904,6 +915,7 @@ Query { }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -922,6 +934,7 @@ Query { }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -1012,6 +1025,7 @@ Query { }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -1030,6 +1044,7 @@ Query { }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -1119,6 +1134,7 @@ Query { }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -1137,6 +1153,7 @@ Query { }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -1157,6 +1174,7 @@ Query { }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -1265,6 +1283,7 @@ Query { }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -1337,6 +1356,7 @@ Query { }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -1482,6 +1502,7 @@ Query { }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -1554,6 +1575,7 @@ Query { }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -1691,6 +1713,7 @@ Query { }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -1763,6 +1786,7 @@ Query { }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -1908,6 +1932,7 @@ Query { }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -1980,6 +2005,7 @@ Query { }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -2061,6 +2087,7 @@ Query { }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -2218,6 +2245,7 @@ Query { }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -2236,6 +2264,7 @@ Query { }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -2254,6 +2283,7 @@ Query { }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -2406,6 +2436,7 @@ Query { }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -2461,6 +2492,7 @@ Query { }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -2535,6 +2567,7 @@ Query { }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -2749,6 +2782,7 @@ Query { }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -2906,6 +2940,7 @@ Query { }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -2935,6 +2970,7 @@ Query { }, ), travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -3111,6 +3147,7 @@ Query { }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -3129,6 +3166,7 @@ Query { }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -3363,6 +3401,7 @@ Query { }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -3409,6 +3448,7 @@ Query { }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -3481,6 +3521,7 @@ Query { }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -3527,6 +3568,7 @@ Query { }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -3606,6 +3648,7 @@ Query { }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -3652,6 +3695,7 @@ Query { }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -3700,6 +3744,7 @@ Query { }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -3779,6 +3824,7 @@ Query { }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -3825,6 +3871,7 @@ Query { }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -3873,6 +3920,7 @@ Query { }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -3945,6 +3993,7 @@ Query { }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -3998,6 +4047,7 @@ Query { }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -4044,6 +4094,7 @@ Query { }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -4125,6 +4176,7 @@ Query { }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -4171,6 +4223,7 @@ Query { }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -4219,6 +4272,7 @@ Query { }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -4291,6 +4345,7 @@ Query { }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -4344,6 +4399,7 @@ Query { }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -4390,6 +4446,7 @@ Query { }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -4759,6 +4816,7 @@ Query { }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -4889,6 +4947,7 @@ Query { }, alias: None, travel_point: None, + since_point: None, pivot: Some( Pivot { aggregate: FunctionCall { @@ -5051,6 +5110,7 @@ Query { }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: Some( Unpivot { @@ -5299,6 +5359,7 @@ Query { }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -5574,6 +5635,7 @@ Query { }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -5796,6 +5858,7 @@ Query { }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -6193,6 +6256,7 @@ Query { }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -6348,6 +6412,7 @@ Query { }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, diff --git a/src/query/ast/tests/it/testdata/statement.txt b/src/query/ast/tests/it/testdata/statement.txt index 1d7093a42f56..6d34fb648b01 100644 --- a/src/query/ast/tests/it/testdata/statement.txt +++ b/src/query/ast/tests/it/testdata/statement.txt @@ -548,6 +548,7 @@ Explain { }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -625,6 +626,7 @@ Explain { }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -793,6 +795,7 @@ CreateIndex( }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -968,6 +971,7 @@ CreateIndex( }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -1429,6 +1433,7 @@ CreateTable( }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -3790,6 +3795,7 @@ CreateTable( }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -4085,6 +4091,7 @@ Query( }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -4291,6 +4298,7 @@ Query( }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -4364,6 +4372,7 @@ Query( }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -4429,6 +4438,7 @@ Query( }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -4447,6 +4457,7 @@ Query( }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -4465,6 +4476,7 @@ Query( }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -4530,6 +4542,7 @@ Query( }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -4548,6 +4561,7 @@ Query( }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -4566,6 +4580,7 @@ Query( }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -4736,6 +4751,7 @@ Query( }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -4754,6 +4770,7 @@ Query( }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -4883,6 +4900,7 @@ Query( }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -4901,6 +4919,7 @@ Query( }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -5030,6 +5049,7 @@ Query( }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -5048,6 +5068,7 @@ Query( }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -5177,6 +5198,7 @@ Query( }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -5195,6 +5217,7 @@ Query( }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -5324,6 +5347,7 @@ Query( }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -5342,6 +5366,7 @@ Query( }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -5471,6 +5496,7 @@ Query( }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -5489,6 +5515,7 @@ Query( }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -5618,6 +5645,7 @@ Query( }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -5636,6 +5664,7 @@ Query( }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -5765,6 +5794,7 @@ Query( }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -5783,6 +5813,7 @@ Query( }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -5912,6 +5943,7 @@ Query( }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -5930,6 +5962,7 @@ Query( }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -6059,6 +6092,7 @@ Query( }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -6077,6 +6111,7 @@ Query( }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -6206,6 +6241,7 @@ Query( }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -6224,6 +6260,7 @@ Query( }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -6308,6 +6345,7 @@ Query( }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -6326,6 +6364,7 @@ Query( }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -6410,6 +6449,7 @@ Query( }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -6428,6 +6468,7 @@ Query( }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -6512,6 +6553,7 @@ Query( }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -6530,6 +6572,7 @@ Query( }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -6614,6 +6657,7 @@ Query( }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -6632,6 +6676,7 @@ Query( }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -6699,6 +6744,7 @@ Query( }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -6797,6 +6843,7 @@ Query( }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -6876,6 +6923,7 @@ Query( }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -6974,6 +7022,7 @@ Query( }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -7053,6 +7102,7 @@ Query( }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -7151,6 +7201,7 @@ Query( }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -7230,6 +7281,7 @@ Query( }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -7326,6 +7378,7 @@ Query( }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -7958,6 +8011,7 @@ Insert( }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -8551,6 +8605,7 @@ AlterTable( }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -8601,6 +8656,7 @@ AlterTable( }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -8632,6 +8688,7 @@ AlterTable( }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -8700,6 +8757,7 @@ AlterTable( }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -8747,6 +8805,7 @@ AlterTable( }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -8794,6 +8853,7 @@ AlterTable( }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -8854,6 +8914,7 @@ AlterTable( }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -8918,6 +8979,7 @@ AlterTable( }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -8964,6 +9026,7 @@ AlterTable( }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -9003,6 +9066,7 @@ AlterTable( }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -9042,6 +9106,7 @@ AlterTable( }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -9084,6 +9149,7 @@ AlterTable( }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -9125,6 +9191,7 @@ AlterTable( }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -9196,6 +9263,7 @@ AlterTable( }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -9273,6 +9341,7 @@ AlterTable( }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -9321,6 +9390,7 @@ AlterTable( }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -9369,6 +9439,7 @@ AlterTable( }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -9410,6 +9481,7 @@ AlterTable( }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -12773,6 +12845,7 @@ Update( }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -13284,6 +13357,7 @@ Query( }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -13507,6 +13581,7 @@ Query( }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -13649,6 +13724,7 @@ Query( }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -13789,6 +13865,7 @@ Query( }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -13945,6 +14022,7 @@ Query( }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -14084,6 +14162,7 @@ Query( }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, @@ -14202,6 +14281,7 @@ Query( }, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }, diff --git a/src/query/catalog/src/table.rs b/src/query/catalog/src/table.rs index 52e29d8f9364..e0070c006f5e 100644 --- a/src/query/catalog/src/table.rs +++ b/src/query/catalog/src/table.rs @@ -265,13 +265,6 @@ pub trait Table: Sync + Send { Ok(None) } - #[async_backtrace::framed] - async fn analyze(&self, ctx: Arc) -> Result<()> { - let _ = ctx; - - Ok(()) - } - async fn table_statistics( &self, ctx: Arc, @@ -292,8 +285,13 @@ pub trait Table: Sync + Send { } #[async_backtrace::framed] - async fn navigate_to(&self, instant: &NavigationPoint) -> Result> { - let _ = instant; + async fn navigate_since_to( + &self, + since_point: &Option, + to_point: &Option, + ) -> Result> { + let _ = since_point; + let _ = to_point; Err(ErrorCode::Unimplemented(format!( "Time travel operation is not supported for the table '{}', which uses the '{}' engine.", @@ -449,6 +447,26 @@ pub struct TableStatistics { pub number_of_segments: Option, } +fn merge(a: Option, b: Option) -> Option { + match (a, b) { + (Some(a), Some(b)) if a > b => Some(a - b), + _ => None, + } +} + +impl TableStatistics { + pub fn increment_since_from(&self, other: &TableStatistics) -> Self { + TableStatistics { + num_rows: merge(self.num_rows, other.num_rows), + data_size: merge(self.data_size, other.data_size), + data_size_compressed: merge(self.data_size_compressed, other.data_size_compressed), + index_size: None, + number_of_blocks: merge(self.number_of_blocks, other.number_of_blocks), + number_of_segments: None, + } + } +} + #[derive(Debug, Clone)] pub struct ColumnStatistics { pub min: Scalar, diff --git a/src/query/functions/src/aggregates/aggregator_common.rs b/src/query/functions/src/aggregates/aggregator_common.rs index d9e440b82f74..abb0a1a88666 100644 --- a/src/query/functions/src/aggregates/aggregator_common.rs +++ b/src/query/functions/src/aggregates/aggregator_common.rs @@ -23,8 +23,6 @@ use databend_common_expression::types::DataType; use databend_common_expression::Column; use databend_common_expression::ColumnBuilder; use databend_common_expression::Scalar; -use databend_common_io::prelude::borsh_deserialize_from_stream; -use databend_common_io::prelude::borsh_serialize_into_buf; use super::AggregateFunctionFactory; use super::AggregateFunctionRef; @@ -160,10 +158,11 @@ pub fn borsh_serialize_state( writer: &mut W, value: &T, ) -> Result<()> { - borsh_serialize_into_buf(writer, value) + borsh::to_writer(writer, value)?; + Ok(()) } #[inline] pub fn borsh_deserialize_state(slice: &mut &[u8]) -> Result { - borsh_deserialize_from_stream(slice) + Ok(T::deserialize(slice)?) } diff --git a/src/query/service/src/interpreters/interpreter_table_analyze.rs b/src/query/service/src/interpreters/interpreter_table_analyze.rs index a7d734868ca8..389119f2d452 100644 --- a/src/query/service/src/interpreters/interpreter_table_analyze.rs +++ b/src/query/service/src/interpreters/interpreter_table_analyze.rs @@ -16,10 +16,20 @@ use std::sync::Arc; use databend_common_catalog::table::TableExt; use databend_common_exception::Result; +use databend_common_sql::executor::PhysicalPlanBuilder; use databend_common_sql::plans::AnalyzeTablePlan; +use databend_common_sql::plans::Plan; +use databend_common_sql::Planner; +use databend_common_storages_factory::NavigationPoint; +use databend_common_storages_factory::Table; +use databend_common_storages_fuse::FuseTable; +use databend_storages_common_index::Index; +use databend_storages_common_index::RangeIndex; +use itertools::Itertools; use crate::interpreters::Interpreter; use crate::pipelines::PipelineBuildResult; +use crate::schedulers::build_query_pipeline_without_render_result_set; use crate::sessions::QueryContext; use crate::sessions::TableContext; @@ -55,7 +65,111 @@ impl Interpreter for AnalyzeTableInterpreter { // check mutability table.check_mutable()?; - table.analyze(self.ctx.clone()).await?; + // Only fuse table can apply analyze + let table = match FuseTable::try_from_table(table.as_ref()) { + Ok(t) => t, + Err(_) => return Ok(PipelineBuildResult::create()), + }; + + let r = table.read_table_snapshot().await; + let snapshot_opt = match r { + Err(e) => return Err(e), + Ok(v) => v, + }; + + if let Some(snapshot) = snapshot_opt { + // plan sql + let schema = table.schema(); + let _table_info = table.get_table_info(); + + let table_statistics = table + .read_table_snapshot_statistics(Some(&snapshot)) + .await?; + + let since_str = if let Some(table_statistics) = &table_statistics { + let is_full = table + .navigate_to(&NavigationPoint::SnapshotID( + table_statistics.snapshot_id.simple().to_string(), + )) + .await + .is_err(); + + if is_full { + "".to_string() + } else { + format!( + "SINCE (snapshot => '{}')", + table_statistics.snapshot_id.simple(), + ) + } + } else { + "".to_string() + }; + + let index_cols: Vec<(u32, String)> = schema + .fields() + .iter() + .filter(|f| RangeIndex::supported_type(&f.data_type().into())) + .map(|f| (f.column_id(), f.name.clone())) + .collect(); + + // 0.01625 --> 12 buckets --> 4K size per column + // 1.04 / math.sqrt(1<<12) --> 0.01625 + const DISTINCT_ERROR_RATE: f64 = 0.01625; + let select_expr = index_cols + .iter() + .map(|c| { + format!( + "approx_count_distinct_state({DISTINCT_ERROR_RATE})({}) as ndv_{}", + c.1, c.0 + ) + }) + .join(", "); + + let sql = format!( + "SELECT {select_expr}, {} as is_full from {}.{} AT (snapshot => '{}') {since_str} ", + since_str.is_empty(), + plan.database, + plan.table, + snapshot.snapshot_id.simple(), + ); + + log::info!("Analyze via sql {:?}", sql); + + let mut planner = Planner::new(self.ctx.clone()); + let (plan, _) = planner.plan_sql(&sql).await?; + let (select_plan, schema) = match &plan { + Plan::Query { + s_expr, + metadata, + bind_context, + .. + } => { + let mut builder1 = + PhysicalPlanBuilder::new(metadata.clone(), self.ctx.clone(), false); + ( + builder1.build(s_expr, bind_context.column_set()).await?, + bind_context.output_schema(), + ) + } + _ => unreachable!(), + }; + + let mut build_res = + build_query_pipeline_without_render_result_set(&self.ctx, &select_plan).await?; + + FuseTable::do_analyze( + self.ctx.clone(), + schema, + &self.plan.catalog, + &self.plan.database, + &self.plan.table, + snapshot.snapshot_id, + &mut build_res.main_pipeline, + )?; + return Ok(build_res); + } + return Ok(PipelineBuildResult::create()); } } diff --git a/src/query/service/src/test_kits/fuse.rs b/src/query/service/src/test_kits/fuse.rs index 573647b3151d..3a92e2e83872 100644 --- a/src/query/service/src/test_kits/fuse.rs +++ b/src/query/service/src/test_kits/fuse.rs @@ -270,8 +270,12 @@ pub async fn append_sample_data(num_blocks: usize, fixture: &TestFixture) -> Res } pub async fn analyze_table(fixture: &TestFixture) -> Result<()> { - let table = fixture.latest_default_table().await?; - table.analyze(fixture.default_ctx.clone()).await + let query = format!( + "analyze table {}.{}", + fixture.default_db_name(), + fixture.default_table_name() + ); + fixture.execute_command(&query).await } pub async fn do_deletion(ctx: Arc, plan: DeletePlan) -> Result<()> { diff --git a/src/query/sql/src/planner/binder/merge_into.rs b/src/query/sql/src/planner/binder/merge_into.rs index 152b6b2b6ffc..b6dc9fcf96b8 100644 --- a/src/query/sql/src/planner/binder/merge_into.rs +++ b/src/query/sql/src/planner/binder/merge_into.rs @@ -211,6 +211,7 @@ impl Binder { table: table_ident.clone(), alias: target_alias.clone(), travel_point: None, + since_point: None, pivot: None, unpivot: None, }; diff --git a/src/query/sql/src/planner/binder/table.rs b/src/query/sql/src/planner/binder/table.rs index bb73130bc93b..9755536c8d05 100644 --- a/src/query/sql/src/planner/binder/table.rs +++ b/src/query/sql/src/planner/binder/table.rs @@ -175,6 +175,7 @@ impl Binder { table: &Identifier, alias: &Option, travel_point: &Option, + since_point: &Option, ) -> Result<(SExpr, BindContext)> { let (catalog, database, table_name) = self.normalize_object_identifier_triple(catalog, database, table); @@ -212,6 +213,11 @@ impl Binder { None => None, }; + let since_point = match since_point { + Some(tp) => Some(self.resolve_data_travel_point(bind_context, tp).await?), + None => None, + }; + // Resolve table with catalog let table_meta = match self .resolve_data_source( @@ -220,6 +226,7 @@ impl Binder { database.as_str(), table_name.as_str(), &navigation_point, + &since_point, ) .await { @@ -916,6 +923,7 @@ impl Binder { table, alias, travel_point, + since_point, pivot: _, unpivot: _, } => { @@ -927,6 +935,7 @@ impl Binder { table, alias, travel_point, + since_point, ) .await } @@ -1435,6 +1444,7 @@ impl Binder { database_name: &str, table_name: &str, travel_point: &Option, + since_point: &Option, ) -> Result> { // Resolve table with ctx // for example: select * from t1 join (select * from t1 as t2 where a > 1 and a < 13); @@ -1445,8 +1455,10 @@ impl Binder { .get_table(catalog_name, database_name, table_name) .await?; - if let Some(tp) = travel_point { - table_meta = table_meta.navigate_to(tp).await?; + if travel_point.is_some() || since_point.is_some() { + table_meta = table_meta + .navigate_since_to(since_point, travel_point) + .await?; } Ok(table_meta) } diff --git a/src/query/sql/src/planner/dataframe.rs b/src/query/sql/src/planner/dataframe.rs index c7bdc33621ec..b8ab7726dece 100644 --- a/src/query/sql/src/planner/dataframe.rs +++ b/src/query/sql/src/planner/dataframe.rs @@ -63,6 +63,7 @@ impl Dataframe { catalog: None, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }; @@ -85,7 +86,7 @@ impl Dataframe { let database = "system"; let tenant = query_ctx.get_tenant(); let table_meta: Arc = binder - .resolve_data_source(tenant.as_str(), catalog, database, "one", &None) + .resolve_data_source(tenant.as_str(), catalog, database, "one", &None, &None) .await?; let table_index = metadata.write().add_table( @@ -477,6 +478,7 @@ impl Dataframe { catalog: None, alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }; diff --git a/src/query/sql/src/planner/semantic/view_rewriter.rs b/src/query/sql/src/planner/semantic/view_rewriter.rs index 43816aed957a..b05382a37b0b 100644 --- a/src/query/sql/src/planner/semantic/view_rewriter.rs +++ b/src/query/sql/src/planner/semantic/view_rewriter.rs @@ -32,6 +32,7 @@ impl VisitorMut for ViewRewriter { table, alias, travel_point, + since_point, pivot, unpivot, } => { @@ -49,6 +50,7 @@ impl VisitorMut for ViewRewriter { table: table.clone(), alias: alias.clone(), travel_point: travel_point.clone(), + since_point: since_point.clone(), pivot: pivot.clone(), unpivot: unpivot.clone(), } diff --git a/src/query/storages/common/table_meta/Cargo.toml b/src/query/storages/common/table_meta/Cargo.toml index b5dd5d3cc26a..bd407034c9e6 100644 --- a/src/query/storages/common/table_meta/Cargo.toml +++ b/src/query/storages/common/table_meta/Cargo.toml @@ -25,6 +25,7 @@ parquet_rs = { workspace = true } rmp-serde = "1.1.1" serde = { workspace = true } serde_json = { workspace = true } +simple_hll = { version = "0.0.1", features = ["serde_borsh"] } snap = { version = "1.1.0", optional = true } typetag = { workspace = true } zstd = "0.12.3" diff --git a/src/query/storages/common/table_meta/src/meta/current/mod.rs b/src/query/storages/common/table_meta/src/meta/current/mod.rs index f7778cb97715..7cf2cf890a14 100644 --- a/src/query/storages/common/table_meta/src/meta/current/mod.rs +++ b/src/query/storages/common/table_meta/src/meta/current/mod.rs @@ -13,18 +13,18 @@ // limitations under the License. pub use v0::ColumnMeta as SingleColumnMeta; -pub use v1::TableSnapshotStatistics; pub use v2::BlockMeta; pub use v2::ClusterStatistics; pub use v2::ColumnMeta; pub use v2::ColumnStatistics; +pub use v2::MetaHLL; pub use v2::Statistics; +pub use v2::TableSnapshotStatistics; pub use v4::CompactSegmentInfo; pub use v4::SegmentInfo; pub use v4::TableSnapshot; pub use v4::TableSnapshotLite; use super::v0; -use super::v1; use super::v2; use super::v4; diff --git a/src/query/storages/common/table_meta/src/meta/v2/mod.rs b/src/query/storages/common/table_meta/src/meta/v2/mod.rs index db7086404b17..f5cca210b5aa 100644 --- a/src/query/storages/common/table_meta/src/meta/v2/mod.rs +++ b/src/query/storages/common/table_meta/src/meta/v2/mod.rs @@ -15,6 +15,7 @@ mod segment; mod snapshot; pub mod statistics; +mod table_snapshot_statistics; pub use segment::BlockMeta; pub use segment::ColumnMeta; @@ -23,3 +24,5 @@ pub use snapshot::TableSnapshot; pub use statistics::ClusterStatistics; pub use statistics::ColumnStatistics; pub use statistics::Statistics; +pub use table_snapshot_statistics::MetaHLL; +pub use table_snapshot_statistics::TableSnapshotStatistics; diff --git a/src/query/storages/common/table_meta/src/meta/v2/table_snapshot_statistics.rs b/src/query/storages/common/table_meta/src/meta/v2/table_snapshot_statistics.rs new file mode 100644 index 000000000000..17aae1a8c608 --- /dev/null +++ b/src/query/storages/common/table_meta/src/meta/v2/table_snapshot_statistics.rs @@ -0,0 +1,67 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; + +use databend_common_expression::ColumnId; +use serde::Deserialize; +use serde::Serialize; + +use crate::meta::v1; +use crate::meta::FormatVersion; +use crate::meta::SnapshotId; +use crate::meta::Versioned; + +pub type MetaHLL = simple_hll::HyperLogLog<12>; + +#[derive(Serialize, Deserialize, Clone, Debug)] +pub struct TableSnapshotStatistics { + /// format version of snapshot + pub format_version: FormatVersion, + + /// id of snapshot + pub snapshot_id: SnapshotId, + pub hll: HashMap, +} + +impl TableSnapshotStatistics { + pub fn new(hll: HashMap, snapshot_id: SnapshotId) -> Self { + Self { + format_version: TableSnapshotStatistics::VERSION, + snapshot_id, + hll, + } + } + + pub fn format_version(&self) -> u64 { + self.format_version + } + + pub fn column_distinct_values(&self) -> HashMap { + self.hll + .iter() + .map(|hll| (*hll.0, hll.1.count() as u64)) + .collect() + } +} + +impl From for TableSnapshotStatistics { + fn from(value: v1::TableSnapshotStatistics) -> Self { + Self { + format_version: TableSnapshotStatistics::VERSION, + snapshot_id: value.snapshot_id, + hll: HashMap::new(), + } + } +} diff --git a/src/query/storages/common/table_meta/src/meta/versions.rs b/src/query/storages/common/table_meta/src/meta/versions.rs index 663fe2c66d8b..42fd7ecf4225 100644 --- a/src/query/storages/common/table_meta/src/meta/versions.rs +++ b/src/query/storages/common/table_meta/src/meta/versions.rs @@ -103,17 +103,20 @@ impl SnapshotVersion { } impl Versioned<0> for v1::TableSnapshotStatistics {} +impl Versioned<2> for v2::TableSnapshotStatistics {} impl Versioned<2> for DataBlock {} pub enum TableSnapshotStatisticsVersion { V0(PhantomData), + V2(PhantomData), } impl TableSnapshotStatisticsVersion { pub fn version(&self) -> u64 { match self { TableSnapshotStatisticsVersion::V0(a) => Self::ver(a), + TableSnapshotStatisticsVersion::V2(a) => Self::ver(a), } } @@ -176,6 +179,9 @@ mod converters { 0 => Ok(TableSnapshotStatisticsVersion::V0(testify_version::<_, 0>( PhantomData, ))), + 2 => Ok(TableSnapshotStatisticsVersion::V2(testify_version::<_, 2>( + PhantomData, + ))), _ => Err(ErrorCode::Internal(format!( "unknown table snapshot statistics version {value}, versions supported: 0" ))), diff --git a/src/query/storages/common/table_meta/src/readers/versioned_reader.rs b/src/query/storages/common/table_meta/src/readers/versioned_reader.rs index 1f5ffb150b00..383365d197de 100644 --- a/src/query/storages/common/table_meta/src/readers/versioned_reader.rs +++ b/src/query/storages/common/table_meta/src/readers/versioned_reader.rs @@ -36,7 +36,11 @@ impl VersionedReader for TableSnapshotStatisticsVersion let mut buffer: Vec = vec![]; reader.read_to_end(&mut buffer).await?; let r = match self { - TableSnapshotStatisticsVersion::V0(v) => load_json(&buffer, v).await?, + TableSnapshotStatisticsVersion::V0(v) => { + let ts = load_json(&buffer, v).await?; + TableSnapshotStatistics::from(ts) + } + TableSnapshotStatisticsVersion::V2(v) => load_json(&buffer, v).await?, }; Ok(r) } diff --git a/src/query/storages/fuse/src/fuse_table.rs b/src/query/storages/fuse/src/fuse_table.rs index d7e63288704d..239ec4d98182 100644 --- a/src/query/storages/fuse/src/fuse_table.rs +++ b/src/query/storages/fuse/src/fuse_table.rs @@ -112,6 +112,9 @@ pub struct FuseTable { pub(crate) data_metrics: Arc, table_type: FuseTableType, + + // If this is set, reading from fuse_table should only returns the increment blocks + pub(crate) since_table: Option>, } impl FuseTable { @@ -226,6 +229,7 @@ impl FuseTable { storage_format: FuseStorageFormat::from_str(storage_format.as_str())?, table_compression: table_compression.as_str().try_into()?, table_type, + since_table: None, })) } @@ -293,26 +297,21 @@ impl FuseTable { Ok(table_storage_prefix(db_id, table_id)) } - pub fn table_snapshot_statistics_format_version(&self, location: &String) -> u64 { - TableMetaLocationGenerator::snapshot_version(location) - } - #[minitrace::trace] #[async_backtrace::framed] - pub(crate) async fn read_table_snapshot_statistics( + pub async fn read_table_snapshot_statistics( &self, snapshot: Option<&Arc>, ) -> Result>> { match snapshot { Some(snapshot) => { if let Some(loc) = &snapshot.table_statistics_location { - let ver = self.table_snapshot_statistics_format_version(loc); let reader = MetaReaders::table_snapshot_statistics_reader(self.get_operator()); let load_params = LoadParams { location: loc.clone(), len_hint: None, - ver, + ver: TableMetaLocationGenerator::snapshot_statistics_version(), put_cache: true, }; @@ -719,15 +718,9 @@ impl Table for FuseTable { } } - #[minitrace::trace] - #[async_backtrace::framed] - async fn analyze(&self, ctx: Arc) -> Result<()> { - self.do_analyze(&ctx).await - } - async fn table_statistics( &self, - _ctx: Arc, + ctx: Arc, ) -> Result> { let stats = match self.table_type { FuseTableType::AttachedReadOnly => { @@ -749,14 +742,22 @@ impl Table for FuseTable { } _ => { let s = &self.table_info.meta.statistics; - TableStatistics { + let mut res = TableStatistics { num_rows: Some(s.number_of_rows), data_size: Some(s.data_bytes), data_size_compressed: Some(s.compressed_data_bytes), index_size: Some(s.index_data_bytes), number_of_blocks: s.number_of_blocks, number_of_segments: s.number_of_segments, + }; + + if let Some(since) = &self.since_table { + if let Some(since_stats) = since.table_statistics(ctx).await? { + res = res.increment_since_from(&since_stats); + } } + + res } }; Ok(Some(stats)) @@ -773,7 +774,7 @@ impl Table for FuseTable { if let Some(table_statistics) = table_statistics { FuseTableColumnStatisticsProvider::new( stats.clone(), - Some(table_statistics.column_distinct_values.clone()), + Some(table_statistics.column_distinct_values()), snapshot.summary.row_count, ) } else { @@ -791,24 +792,21 @@ impl Table for FuseTable { #[minitrace::trace] #[async_backtrace::framed] - async fn navigate_to(&self, point: &NavigationPoint) -> Result> { - let snapshot_location = if let Some(loc) = self.snapshot_loc().await? { - loc + async fn navigate_since_to( + &self, + since_point: &Option, + to_point: &Option, + ) -> Result> { + let mut to_point = if let Some(to_point) = to_point { + self.navigate_to(to_point).await?.as_ref().clone() } else { - // not an error? - return Err(ErrorCode::TableHistoricalDataNotFound( - "Empty Table has no historical data", - )); + self.clone() }; - match point { - NavigationPoint::SnapshotID(snapshot_id) => Ok(self - .navigate_to_snapshot(snapshot_location, snapshot_id.as_str()) - .await?), - NavigationPoint::TimePoint(time_point) => Ok(self - .navigate_to_time_point(snapshot_location, *time_point) - .await?), + if let Some(since_point) = since_point { + to_point.since_table = Some(self.navigate_to(since_point).await?); } + Ok(Arc::new(to_point)) } fn get_block_thresholds(&self) -> BlockThresholds { diff --git a/src/query/storages/fuse/src/io/locations.rs b/src/query/storages/fuse/src/io/locations.rs index 99f5f7910dcf..dff384c5039b 100644 --- a/src/query/storages/fuse/src/io/locations.rs +++ b/src/query/storages/fuse/src/io/locations.rs @@ -39,8 +39,8 @@ static SNAPSHOT_V2: SnapshotVersion = SnapshotVersion::V2(PhantomData); static SNAPSHOT_V3: SnapshotVersion = SnapshotVersion::V3(PhantomData); static SNAPSHOT_V4: SnapshotVersion = SnapshotVersion::V4(PhantomData); -static SNAPSHOT_STATISTICS_V0: TableSnapshotStatisticsVersion = - TableSnapshotStatisticsVersion::V0(PhantomData); +static SNAPSHOT_STATISTICS_V2: TableSnapshotStatisticsVersion = + TableSnapshotStatisticsVersion::V2(PhantomData); #[derive(Clone)] pub struct TableMetaLocationGenerator { @@ -135,8 +135,8 @@ impl TableMetaLocationGenerator { Ok(statistics_version.create(id, &self.prefix)) } - pub fn snapshot_statistics_version(_location: impl AsRef) -> u64 { - SNAPSHOT_STATISTICS_V0.version() + pub fn snapshot_statistics_version() -> u64 { + SNAPSHOT_STATISTICS_V2.version() } pub fn gen_last_snapshot_hint_location(&self) -> String { @@ -197,6 +197,7 @@ impl SnapshotLocationCreator for TableSnapshotStatisticsVersion { fn suffix(&self) -> String { match self { TableSnapshotStatisticsVersion::V0(_) => "_ts_v0.json".to_string(), + TableSnapshotStatisticsVersion::V2(_) => "_ts_v2.json".to_string(), } } } diff --git a/src/query/storages/fuse/src/io/snapshots.rs b/src/query/storages/fuse/src/io/snapshots.rs index 9c187020643a..9c62c929904a 100644 --- a/src/query/storages/fuse/src/io/snapshots.rs +++ b/src/query/storages/fuse/src/io/snapshots.rs @@ -78,7 +78,7 @@ impl SnapshotsIO { data_accessor: Operator, ) -> Result<(Arc, FormatVersion)> { let reader = MetaReaders::table_snapshot_reader(data_accessor); - let ver = TableMetaLocationGenerator::snapshot_version(snapshot_location.as_str()); + let ver: u64 = TableMetaLocationGenerator::snapshot_version(snapshot_location.as_str()); let load_params = LoadParams { location: snapshot_location, len_hint: None, diff --git a/src/query/storages/fuse/src/io/write/meta_writer.rs b/src/query/storages/fuse/src/io/write/meta_writer.rs index 99d4c3ae927b..525b4cc01a5d 100644 --- a/src/query/storages/fuse/src/io/write/meta_writer.rs +++ b/src/query/storages/fuse/src/io/write/meta_writer.rs @@ -104,7 +104,6 @@ impl Marshal for TableSnapshotStatistics { #[cfg(test)] mod tests { - use std::collections::HashMap; use databend_common_base::runtime::catch_unwind; use databend_common_expression::TableSchema; @@ -164,14 +163,4 @@ mod tests { ); snapshot.marshal().unwrap(); } - - #[test] - fn test_table_snapshot_statistics_format_version_validation() { - // since there is only one version for TableSnapshotStatistics, - // we omit the checking of invalid format versions, otherwise clippy will complain about empty_ranges - - // current version allowed - let snapshot_stats = TableSnapshotStatistics::new(HashMap::new()); - snapshot_stats.marshal().unwrap(); - } } diff --git a/src/query/storages/fuse/src/operations/analyze.rs b/src/query/storages/fuse/src/operations/analyze.rs index 71c24c7f7421..fea474fab54c 100644 --- a/src/query/storages/fuse/src/operations/analyze.rs +++ b/src/query/storages/fuse/src/operations/analyze.rs @@ -16,14 +16,28 @@ use std::collections::HashMap; use std::sync::Arc; use std::time::Instant; +use async_trait::async_trait; +use async_trait::unboxed_simple; +use databend_common_catalog::catalog::CatalogManager; use databend_common_catalog::table::Table; use databend_common_catalog::table_context::TableContext; use databend_common_exception::ErrorCode; use databend_common_exception::Result; +use databend_common_expression::DataBlock; +use databend_common_expression::DataSchema; +use databend_common_io::prelude::borsh_deserialize_from_slice; +use databend_common_pipeline_core::processors::InputPort; +use databend_common_pipeline_core::processors::ProcessorPtr; +use databend_common_pipeline_core::Pipeline; +use databend_common_pipeline_sinks::AsyncSink; +use databend_common_pipeline_sinks::AsyncSinker; +use databend_storages_common_table_meta::meta::ClusterStatistics; +use databend_storages_common_table_meta::meta::MetaHLL; use databend_storages_common_table_meta::meta::SegmentInfo; +use databend_storages_common_table_meta::meta::SnapshotId; +use databend_storages_common_table_meta::meta::StatisticsOfColumns; use databend_storages_common_table_meta::meta::TableSnapshot; use databend_storages_common_table_meta::meta::TableSnapshotStatistics; -use log::warn; use crate::io::SegmentsIO; use crate::statistics::reduce_block_statistics; @@ -31,128 +45,231 @@ use crate::statistics::reduce_cluster_statistics; use crate::FuseTable; impl FuseTable { + #[allow(clippy::too_many_arguments)] + pub fn do_analyze( + ctx: Arc, + output_schema: Arc, + catalog: &str, + database: &str, + table: &str, + snapshot_id: SnapshotId, + pipeline: &mut Pipeline, + ) -> Result<()> { + pipeline.add_sink(|input| { + SinkAnalyzeState::create( + ctx.clone(), + output_schema.clone(), + catalog, + database, + table, + snapshot_id, + input, + ) + })?; + Ok(()) + } +} + +struct SinkAnalyzeState { + ctx: Arc, + output_schema: Arc, + + catalog: String, + database: String, + table: String, + snapshot_id: SnapshotId, +} + +impl SinkAnalyzeState { + #[allow(clippy::too_many_arguments)] + pub fn create( + ctx: Arc, + output_schema: Arc, + catalog: &str, + database: &str, + table: &str, + snapshot_id: SnapshotId, + input: Arc, + ) -> Result { + let sinker = AsyncSinker::create(input, ctx.clone(), SinkAnalyzeState { + ctx, + output_schema, + catalog: catalog.to_string(), + database: database.to_string(), + table: table.to_string(), + snapshot_id, + }); + Ok(ProcessorPtr::create(sinker)) + } + + #[unboxed_simple] #[async_backtrace::framed] - pub async fn do_analyze(&self, ctx: &Arc) -> Result<()> { - // 1. Read table snapshot. - let r = self.read_table_snapshot().await; - let snapshot_opt = match r { - Err(e) if e.code() == ErrorCode::STORAGE_NOT_FOUND => { - warn!( - "concurrent statistic: snapshot {:?} already collected. table: {}, ident {}", - self.snapshot_loc().await?, - self.table_info.desc, - self.table_info.ident, - ); - return Ok(()); + pub async fn merge_analyze_states(&mut self, data_block: DataBlock) -> Result { + if data_block.num_rows() == 0 { + return Ok(false); + } + + // always use the latest table + let tenant = self.ctx.get_tenant(); + let catalog = CatalogManager::instance() + .get_catalog(&tenant, &self.catalog) + .await?; + let table = catalog + .get_table(tenant.as_str(), &self.database, &self.table) + .await?; + + let table = FuseTable::try_from_table(table.as_ref())?; + let snapshot = table.read_table_snapshot().await?; + if snapshot.is_none() { + return Ok(true); + } + let table_statistics = table + .read_table_snapshot_statistics(snapshot.as_ref()) + .await?; + + let is_full = data_block.columns()[self.output_schema.num_fields() - 1] + .value + .index(0) + .unwrap(); + + let is_full = is_full.as_boolean().unwrap(); + + let mut ndv_states = table_statistics.map(|s| s.hll.clone()).unwrap_or_default(); + + let index_num = self.output_schema.num_fields() - 1; + + for (f, col) in self + .output_schema + .fields() + .iter() + .take(index_num) + .zip(data_block.columns()) + { + let name = f.name(); + let index: u32 = name.strip_prefix("ndv_").unwrap().parse().unwrap(); + + let col = col.value.index(0).unwrap(); + let col = col.as_binary().unwrap(); + let hll: MetaHLL = borsh_deserialize_from_slice(col)?; + + if !is_full { + ndv_states + .entry(index) + .and_modify(|c| c.merge(&hll)) + .or_insert(hll); + } else { + ndv_states.insert(index, hll); } - Err(e) => return Err(e), - Ok(v) => v, - }; - - let default_cluster_key_id = self.cluster_key_id(); - - if let Some(snapshot) = snapshot_opt { - // 2. Iterator segments and blocks to estimate statistics. - let mut sum_map = HashMap::new(); - let mut row_count_sum = 0; - let mut block_count_sum: u64 = 0; - let mut read_segment_count = 0; - let mut col_stats = HashMap::new(); - let mut cluster_stats = None; - - let start = Instant::now(); - let segments_io = SegmentsIO::create(ctx.clone(), self.operator.clone(), self.schema()); - let chunk_size = ctx.get_settings().get_max_threads()? as usize * 4; - let number_segments = snapshot.segments.len(); - for chunk in snapshot.segments.chunks(chunk_size) { - let mut stats_of_columns = Vec::new(); - let mut blocks_cluster_stats = Vec::new(); - if !col_stats.is_empty() { - stats_of_columns.push(col_stats.clone()); - blocks_cluster_stats.push(cluster_stats.clone()); - } + } - let segments = segments_io - .read_segments::(chunk, true) - .await?; - for segment in segments { - let segment = segment?; - stats_of_columns.push(segment.summary.col_stats.clone()); - blocks_cluster_stats.push(segment.summary.cluster_stats.clone()); - segment.blocks.iter().for_each(|block| { - let block = block.as_ref(); - let row_count = block.row_count; - if row_count != 0 { - block_count_sum += 1; - row_count_sum += row_count; - for (i, col_stat) in block.col_stats.iter() { - let density = col_stat - .distinct_of_values - .map_or(0.0, |ndv| ndv as f64 / row_count as f64); - - match sum_map.get_mut(i) { - Some(sum) => { - *sum += density; - } - None => { - let _ = sum_map.insert(*i, density); - } - } - } - } - }); - } + let snapshot = snapshot.unwrap(); + // 3. Generate new table statistics + let table_statistics = TableSnapshotStatistics::new(ndv_states, self.snapshot_id); + let table_statistics_location = table + .meta_location_generator + .snapshot_statistics_location_from_uuid( + &table_statistics.snapshot_id, + table_statistics.format_version(), + )?; + + let (col_stats, cluster_stats) = + regenerate_statistics(table, snapshot.as_ref(), &self.ctx).await?; + // 4. Save table statistics + let mut new_snapshot = TableSnapshot::from_previous(&snapshot); + new_snapshot.summary.col_stats = col_stats; + new_snapshot.summary.cluster_stats = cluster_stats; + new_snapshot.table_statistics_location = Some(table_statistics_location); + + FuseTable::commit_to_meta_server( + self.ctx.as_ref(), + &table.table_info, + &table.meta_location_generator, + new_snapshot, + Some(table_statistics), + &None, + &table.operator, + ) + .await?; + + Ok(true) + } +} + +#[async_trait] +impl AsyncSink for SinkAnalyzeState { + const NAME: &'static str = "SinkAnalyzeState"; - // Generate new column statistics for snapshot - col_stats = reduce_block_statistics(&stats_of_columns); - cluster_stats = - reduce_cluster_statistics(&blocks_cluster_stats, default_cluster_key_id); - - // Status. - { - read_segment_count += chunk.len(); - let status = format!( - "analyze: read segment files:{}/{}, cost:{} sec", - read_segment_count, - number_segments, - start.elapsed().as_secs() - ); - ctx.set_status_info(&status); + #[unboxed_simple] + #[async_backtrace::framed] + async fn consume(&mut self, data_block: DataBlock) -> Result { + let mismatch_code = ErrorCode::TableVersionMismatched("").code(); + + loop { + if let Err(e) = self.merge_analyze_states(data_block.clone()).await { + if e.code() == mismatch_code { + log::warn!("Retry after got TableVersionMismatched"); + continue; + } else { + return Err(e); } } - let mut ndv_map = HashMap::new(); - for (i, sum) in sum_map.iter() { - let density_avg = *sum / block_count_sum as f64; - ndv_map.insert(*i, (density_avg * row_count_sum as f64) as u64); - } + break; + } + Ok(true) + } +} - // 3. Generate new table statistics - let table_statistics = TableSnapshotStatistics::new(ndv_map); - let table_statistics_location = self - .meta_location_generator - .snapshot_statistics_location_from_uuid( - &table_statistics.snapshot_id, - table_statistics.format_version(), - )?; - - // 4. Save table statistics - let mut new_snapshot = TableSnapshot::from_previous(&snapshot); - new_snapshot.summary.col_stats = col_stats; - new_snapshot.summary.cluster_stats = cluster_stats; - new_snapshot.table_statistics_location = Some(table_statistics_location); - FuseTable::commit_to_meta_server( - ctx.as_ref(), - &self.table_info, - &self.meta_location_generator, - new_snapshot, - Some(table_statistics), - &None, - &self.operator, - ) +pub async fn regenerate_statistics( + table: &FuseTable, + snapshot: &TableSnapshot, + ctx: &Arc, +) -> Result<(StatisticsOfColumns, Option)> { + // 1. Read table snapshot. + let default_cluster_key_id = table.cluster_key_id(); + + // 2. Iterator segments and blocks to estimate statistics. + let mut read_segment_count = 0; + let mut col_stats = HashMap::new(); + let mut cluster_stats = None; + + let start = Instant::now(); + let segments_io = SegmentsIO::create(ctx.clone(), table.operator.clone(), table.schema()); + let chunk_size = ctx.get_settings().get_max_threads()? as usize * 4; + let number_segments = snapshot.segments.len(); + for chunk in snapshot.segments.chunks(chunk_size) { + let mut stats_of_columns = Vec::new(); + let mut blocks_cluster_stats = Vec::new(); + if !col_stats.is_empty() { + stats_of_columns.push(col_stats.clone()); + blocks_cluster_stats.push(cluster_stats.clone()); + } + + let segments = segments_io + .read_segments::(chunk, true) .await?; + for segment in segments { + let segment = segment?; + stats_of_columns.push(segment.summary.col_stats.clone()); + blocks_cluster_stats.push(segment.summary.cluster_stats.clone()); } - Ok(()) + // Generate new column statistics for snapshot + col_stats = reduce_block_statistics(&stats_of_columns); + cluster_stats = reduce_cluster_statistics(&blocks_cluster_stats, default_cluster_key_id); + + // Status. + { + read_segment_count += chunk.len(); + let status = format!( + "analyze: read segment files:{}/{}, cost:{} sec", + read_segment_count, + number_segments, + start.elapsed().as_secs() + ); + ctx.set_status_info(&status); + } } + + Ok((col_stats, cluster_stats)) } diff --git a/src/query/storages/fuse/src/operations/mod.rs b/src/query/storages/fuse/src/operations/mod.rs index fbbad4b46af1..f6a8feaf19c6 100644 --- a/src/query/storages/fuse/src/operations/mod.rs +++ b/src/query/storages/fuse/src/operations/mod.rs @@ -44,6 +44,7 @@ pub use read::build_row_fetcher_pipeline; pub use read::need_reserve_block_info; pub use replace_into::*; pub use util::acquire_task_permit; +pub use util::collect_incremental_blocks; pub use util::column_parquet_metas; pub use util::read_block; pub use util::set_backoff; diff --git a/src/query/storages/fuse/src/operations/navigate.rs b/src/query/storages/fuse/src/operations/navigate.rs index 6e856a68c55d..9182c76363c0 100644 --- a/src/query/storages/fuse/src/operations/navigate.rs +++ b/src/query/storages/fuse/src/operations/navigate.rs @@ -53,6 +53,28 @@ impl FuseTable { .await } + #[minitrace::trace] + #[async_backtrace::framed] + pub async fn navigate_to(&self, point: &NavigationPoint) -> Result> { + let snapshot_location = if let Some(loc) = self.snapshot_loc().await? { + loc + } else { + // not an error? + return Err(ErrorCode::TableHistoricalDataNotFound( + "Empty Table has no historical data", + )); + }; + + match point { + NavigationPoint::SnapshotID(snapshot_id) => Ok(self + .navigate_to_snapshot(snapshot_location, snapshot_id.as_str()) + .await?), + NavigationPoint::TimePoint(time_point) => Ok(self + .navigate_to_time_point(snapshot_location, *time_point) + .await?), + } + } + #[async_backtrace::framed] pub async fn navigate_to_snapshot( &self, diff --git a/src/query/storages/fuse/src/operations/read_partitions.rs b/src/query/storages/fuse/src/operations/read_partitions.rs index 85077576d040..b5fe2f6a9224 100644 --- a/src/query/storages/fuse/src/operations/read_partitions.rs +++ b/src/query/storages/fuse/src/operations/read_partitions.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::collections::HashMap; +use std::collections::HashSet; use std::sync::Arc; use std::time::Instant; @@ -36,13 +37,17 @@ use databend_storages_common_cache_manager::CachedObject; use databend_storages_common_pruner::BlockMetaIndex; use databend_storages_common_table_meta::meta::BlockMeta; use databend_storages_common_table_meta::meta::ColumnStatistics; +use databend_storages_common_table_meta::meta::Location; use log::debug; use log::info; use opendal::Operator; use sha2::Digest; use sha2::Sha256; +use super::collect_incremental_blocks; use crate::fuse_part::FusePartInfo; +use crate::io::SegmentsIO; +use crate::pruning::create_segment_location_vector; use crate::pruning::FusePruner; use crate::pruning::SegmentLocation; use crate::FuseLazyPartInfo; @@ -76,6 +81,16 @@ impl FuseTable { nodes_num = cluster.nodes.len(); } + if let Some(c) = &self.since_table { + return self + .do_read_increment_partitions( + ctx, + push_downs.clone(), + &c.snapshot_loc().await?, + ) + .await; + } + if (!dry_run && snapshot.segments.len() > nodes_num) || is_lazy { let mut segments = Vec::with_capacity(snapshot.segments.len()); for (idx, segment_location) in snapshot.segments.iter().enumerate() { @@ -97,14 +112,8 @@ impl FuseTable { let snapshot_loc = Some(snapshot_loc); let table_schema = self.schema_with_stream(); let summary = snapshot.summary.block_count as usize; - let mut segments_location = Vec::with_capacity(snapshot.segments.len()); - for (idx, segment_location) in snapshot.segments.iter().enumerate() { - segments_location.push(SegmentLocation { - segment_idx: idx, - location: segment_location.clone(), - snapshot_loc: snapshot_loc.clone(), - }); - } + let segments_location = + create_segment_location_vector(snapshot.segments.clone(), snapshot_loc); self.prune_snapshot_blocks( ctx.clone(), @@ -120,6 +129,69 @@ impl FuseTable { } } + async fn do_read_increment_partitions( + &self, + ctx: Arc, + push_downs: Option, + base_snapshot: &Option, + ) -> Result<(PartStatistics, Partitions)> { + let fuse_segment_io = SegmentsIO::create(ctx.clone(), self.get_operator(), self.schema()); + let latest_snapshot = self.snapshot_loc().await?; + + let (increment_segments, _, add_blocks) = collect_incremental_blocks( + ctx.clone(), + fuse_segment_io, + self.get_operator(), + &latest_snapshot, + base_snapshot, + ) + .await?; + + let table_schema = self.schema_with_stream(); + let segments_location = create_segment_location_vector(increment_segments, None); + + let mut pruner = if !self.is_native() || self.cluster_key_meta.is_none() { + FusePruner::create( + &ctx, + self.operator.clone(), + table_schema.clone(), + &push_downs, + self.bloom_index_cols(), + )? + } else { + let cluster_keys = self.cluster_keys(ctx.clone()); + FusePruner::create_with_pages( + &ctx, + self.operator.clone(), + table_schema, + &push_downs, + self.cluster_key_meta.clone(), + cluster_keys, + self.bloom_index_cols(), + )? + }; + + let block_metas = pruner.read_pruning(segments_location).await?; + let pruning_stats = pruner.pruning_stats(); + let add_blocks: HashSet = add_blocks.iter().map(|b| b.location.clone()).collect(); + let block_metas = block_metas + .into_iter() + .filter(|(_, block_meta)| add_blocks.contains(&block_meta.location)) + .map(|(block_meta_index, block_meta)| (Some(block_meta_index), block_meta)) + .collect::>(); + + let (stats, parts) = self.read_partitions_with_metas( + ctx.clone(), + self.schema(), + None, + &block_metas, + block_metas.len(), + pruning_stats, + )?; + + Ok((stats, parts)) + } + #[minitrace::trace] #[async_backtrace::framed] pub async fn prune_snapshot_blocks( diff --git a/src/query/storages/fuse/src/operations/util.rs b/src/query/storages/fuse/src/operations/util.rs index 65213d0884e7..b08c45fee210 100644 --- a/src/query/storages/fuse/src/operations/util.rs +++ b/src/query/storages/fuse/src/operations/util.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::collections::HashMap; +use std::collections::HashSet; use std::sync::Arc; use std::time::Duration; @@ -23,6 +24,7 @@ use databend_common_base::base::tokio::sync::OwnedSemaphorePermit; use databend_common_base::base::tokio::sync::Semaphore; use databend_common_base::runtime::GlobalIORuntime; use databend_common_base::runtime::TrySpawn; +use databend_common_catalog::table_context::TableContext; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::ColumnId; @@ -31,10 +33,14 @@ use databend_common_expression::TableSchemaRef; use databend_storages_common_blocks::ParquetFileMeta; use databend_storages_common_table_meta::meta::BlockMeta; use databend_storages_common_table_meta::meta::ColumnMeta; +use databend_storages_common_table_meta::meta::SegmentInfo; use databend_storages_common_table_meta::meta::SingleColumnMeta; +use opendal::Operator; use crate::io::BlockReader; use crate::io::ReadSettings; +use crate::io::SegmentsIO; +use crate::io::SnapshotsIO; use crate::FuseStorageFormat; const OCC_DEFAULT_BACKOFF_INIT_DELAY_MS: Duration = Duration::from_millis(5); @@ -234,3 +240,69 @@ pub async fn read_block( .add_message_back(e.to_string()) })? } + +pub async fn collect_incremental_blocks( + ctx: Arc, + fuse_segment_io: SegmentsIO, + op: Operator, + latest: &Option, + base: &Option, +) -> Result<(Vec<(String, u64)>, Vec>, Vec>)> { + let latest_segments = if let Some(snapshot) = latest { + let (sn, _) = SnapshotsIO::read_snapshot(snapshot.to_string(), op.clone()).await?; + HashSet::from_iter(sn.segments.clone()) + } else { + HashSet::new() + }; + + let base_segments = if let Some(snapshot) = base { + let (sn, _) = SnapshotsIO::read_snapshot(snapshot.to_string(), op.clone()).await?; + HashSet::from_iter(sn.segments.clone()) + } else { + HashSet::new() + }; + + let chunk_size = ctx.get_settings().get_max_threads()? as usize * 4; + + let mut base_blocks = HashMap::new(); + let diff_in_base = base_segments + .difference(&latest_segments) + .cloned() + .collect::>(); + for chunk in diff_in_base.chunks(chunk_size) { + let segments = fuse_segment_io + .read_segments::(chunk, true) + .await?; + for segment in segments { + let segment = segment?; + segment.blocks.into_iter().for_each(|block| { + base_blocks.insert(block.location.clone(), block); + }) + } + } + + let mut add_blocks = Vec::new(); + let diff_in_latest = latest_segments + .difference(&base_segments) + .cloned() + .collect::>(); + for chunk in diff_in_latest.chunks(chunk_size) { + let segments = fuse_segment_io + .read_segments::(chunk, true) + .await?; + + for segment in segments { + let segment = segment?; + segment.blocks.into_iter().for_each(|block| { + if base_blocks.contains_key(&block.location) { + base_blocks.remove(&block.location); + } else { + add_blocks.push(block); + } + }); + } + } + + let del_blocks = base_blocks.into_values().collect::>(); + Ok((diff_in_latest, del_blocks, add_blocks)) +} diff --git a/src/query/storages/fuse/src/table_functions/fuse_statistics/fuse_statistic.rs b/src/query/storages/fuse/src/table_functions/fuse_statistics/fuse_statistic.rs index bc2d33073824..c4fc1b13228a 100644 --- a/src/query/storages/fuse/src/table_functions/fuse_statistics/fuse_statistic.rs +++ b/src/query/storages/fuse/src/table_functions/fuse_statistics/fuse_statistic.rs @@ -61,7 +61,7 @@ impl<'a> FuseStatistic<'a> { let mut col_ndvs: Vec = Vec::with_capacity(1); if let Some(table_statistics) = table_statistics { let mut ndvs: String = "".to_string(); - for (i, n) in table_statistics.column_distinct_values.iter() { + for (i, n) in table_statistics.column_distinct_values().iter() { ndvs.push_str(&format!("({},{});", *i, *n)); } col_ndvs.push(ndvs); diff --git a/src/query/storages/stream/src/stream_table.rs b/src/query/storages/stream/src/stream_table.rs index cea4e4d22c66..d0c107d48542 100644 --- a/src/query/storages/stream/src/stream_table.rs +++ b/src/query/storages/stream/src/stream_table.rs @@ -13,8 +13,6 @@ // limitations under the License. use std::any::Any; -use std::collections::HashMap; -use std::collections::HashSet; use std::sync::Arc; use std::time::Instant; @@ -51,10 +49,9 @@ use databend_common_pipeline_core::Pipeline; use databend_common_sql::binder::STREAM_COLUMN_FACTORY; use databend_common_storages_fuse::io::SegmentsIO; use databend_common_storages_fuse::io::SnapshotsIO; +use databend_common_storages_fuse::operations::collect_incremental_blocks; use databend_common_storages_fuse::pruning::FusePruner; use databend_common_storages_fuse::FuseTable; -use databend_storages_common_table_meta::meta::BlockMeta; -use databend_storages_common_table_meta::meta::SegmentInfo; use databend_storages_common_table_meta::table::ChangeType; use databend_storages_common_table_meta::table::StreamMode; use databend_storages_common_table_meta::table::OPT_KEY_DATABASE_NAME; @@ -186,73 +183,6 @@ impl StreamTable { &self.table_database } - async fn collect_incremental_blocks( - &self, - ctx: Arc, - fuse_table: &FuseTable, - ) -> Result<(Vec>, Vec>)> { - let operator = fuse_table.get_operator(); - let latest_segments = if let Some(snapshot) = fuse_table.read_table_snapshot().await? { - HashSet::from_iter(snapshot.segments.clone()) - } else { - HashSet::new() - }; - - let base_segments = if let Some(snapshot_location) = &self.snapshot_location { - let (base_snapshot, _) = - SnapshotsIO::read_snapshot(snapshot_location.clone(), operator.clone()).await?; - HashSet::from_iter(base_snapshot.segments.clone()) - } else { - HashSet::new() - }; - - let fuse_segment_io = - SegmentsIO::create(ctx.clone(), operator.clone(), fuse_table.schema()); - let chunk_size = ctx.get_settings().get_max_threads()? as usize * 4; - - let mut base_blocks = HashMap::new(); - let diff_in_base = base_segments - .difference(&latest_segments) - .cloned() - .collect::>(); - for chunk in diff_in_base.chunks(chunk_size) { - let segments = fuse_segment_io - .read_segments::(chunk, true) - .await?; - for segment in segments { - let segment = segment?; - segment.blocks.into_iter().for_each(|block| { - base_blocks.insert(block.location.clone(), block); - }) - } - } - - let mut add_blocks = Vec::new(); - let diff_in_latest = latest_segments - .difference(&base_segments) - .cloned() - .collect::>(); - for chunk in diff_in_latest.chunks(chunk_size) { - let segments = fuse_segment_io - .read_segments::(chunk, true) - .await?; - - for segment in segments { - let segment = segment?; - segment.blocks.into_iter().for_each(|block| { - if base_blocks.contains_key(&block.location) { - base_blocks.remove(&block.location); - } else { - add_blocks.push(block); - } - }); - } - } - - let del_blocks = base_blocks.into_values().collect::>(); - Ok((del_blocks, add_blocks)) - } - #[async_backtrace::framed] async fn do_read_partitions( &self, @@ -263,9 +193,18 @@ impl StreamTable { let table = self.source_table(ctx.get_default_catalog()?).await?; let fuse_table = FuseTable::try_from_table(table.as_ref())?; - let (del_blocks, add_blocks) = self - .collect_incremental_blocks(ctx.clone(), fuse_table) - .await?; + let fuse_segment_io = + SegmentsIO::create(ctx.clone(), fuse_table.get_operator(), self.schema()); + + let latest_snapshot = fuse_table.snapshot_loc().await?; + let (_, del_blocks, add_blocks) = collect_incremental_blocks( + ctx.clone(), + fuse_segment_io, + fuse_table.get_operator(), + &latest_snapshot, + &self.snapshot_location, + ) + .await?; let change_type = push_downs.as_ref().map_or(ChangeType::Append, |v| { v.change_type.clone().unwrap_or(ChangeType::Append) diff --git a/src/tests/sqlsmith/src/sql_gen/dml.rs b/src/tests/sqlsmith/src/sql_gen/dml.rs index 3f91aa02393d..e48dcc3f3122 100644 --- a/src/tests/sqlsmith/src/sql_gen/dml.rs +++ b/src/tests/sqlsmith/src/sql_gen/dml.rs @@ -296,6 +296,7 @@ impl<'a, R: Rng + 'a> SqlGenerator<'a, R> { table: Identifier::from_name(table.name.clone()), alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }; @@ -497,6 +498,7 @@ impl<'a, R: Rng + 'a> SqlGenerator<'a, R> { table: Identifier::from_name(table.name.clone()), alias: None, travel_point: None, + since_point: None, pivot: None, unpivot: None, }; diff --git a/src/tests/sqlsmith/src/sql_gen/query.rs b/src/tests/sqlsmith/src/sql_gen/query.rs index 40b7449da351..eba5ac6c4a93 100644 --- a/src/tests/sqlsmith/src/sql_gen/query.rs +++ b/src/tests/sqlsmith/src/sql_gen/query.rs @@ -475,6 +475,7 @@ impl<'a, R: Rng> SqlGenerator<'a, R> { alias: None, // TODO travel_point: None, + since_point: None, // TODO pivot: None, // TODO diff --git a/tests/sqllogictests/suites/base/09_fuse_engine/09_0020_analyze.test b/tests/sqllogictests/suites/base/09_fuse_engine/09_0020_analyze.test index fa37b44f62fc..e1fe31e49603 100644 --- a/tests/sqllogictests/suites/base/09_fuse_engine/09_0020_analyze.test +++ b/tests/sqllogictests/suites/base/09_fuse_engine/09_0020_analyze.test @@ -74,7 +74,7 @@ analyze table `t` query T select * from fuse_statistic('db_09_0020', 't') ---- -(0,6); +(0,3); statement ok optimize table t compact @@ -87,7 +87,7 @@ select segment_count,block_count from fuse_snapshot('db_09_0020', 't') limit 1 query T select * from fuse_statistic('db_09_0020', 't') ---- -(0,6); +(0,3); statement ok analyze table `t` @@ -111,7 +111,7 @@ analyze table `t` query T select * from fuse_statistic('db_09_0020', 't') ---- -(0,2); +(0,3); statement ok DROP TABLE t diff --git a/tests/sqllogictests/suites/tpcds/tpcds_join_order.test b/tests/sqllogictests/suites/tpcds/tpcds_join_order.test index 297da1a7629f..0c90af25ead9 100644 --- a/tests/sqllogictests/suites/tpcds/tpcds_join_order.test +++ b/tests/sqllogictests/suites/tpcds/tpcds_join_order.test @@ -749,13 +749,13 @@ HashJoin: LEFT OUTER └── Probe └── HashJoin: INNER ├── Build - │ └── Scan: default.tpcds.item (#4) (read rows: 180) + │ └── HashJoin: INNER + │ ├── Build + │ │ └── Scan: default.tpcds.item (#4) (read rows: 180) + │ └── Probe + │ └── Scan: default.tpcds.store_sales (#2) (read rows: 28810) └── Probe - └── HashJoin: INNER - ├── Build - │ └── Scan: default.tpcds.store_sales (#2) (read rows: 28810) - └── Probe - └── Scan: default.tpcds.date_dim (#3) (read rows: 73049) + └── Scan: default.tpcds.date_dim (#3) (read rows: 73049) # Q7 query I @@ -786,11 +786,11 @@ LIMIT 100; ---- HashJoin: INNER ├── Build -│ └── Scan: default.tpcds.promotion (#4) (read rows: 3) +│ └── Scan: default.tpcds.item (#3) (read rows: 180) └── Probe └── HashJoin: INNER ├── Build - │ └── Scan: default.tpcds.item (#3) (read rows: 180) + │ └── Scan: default.tpcds.promotion (#4) (read rows: 3) └── Probe └── HashJoin: INNER ├── Build @@ -1768,15 +1768,15 @@ HashJoin: INNER │ │ │ │ ├── Build │ │ │ │ │ └── Scan: default.tpcds.store_sales (#0) (read rows: 28810) │ │ │ │ └── Probe -│ │ │ │ └── Scan: default.tpcds.customer_address (#4) (read rows: 500) +│ │ │ │ └── Scan: default.tpcds.customer_demographics (#2) (read rows: 19208) │ │ │ └── Probe -│ │ │ └── Scan: default.tpcds.date_dim (#5) (read rows: 73049) +│ │ │ └── Scan: default.tpcds.household_demographics (#3) (read rows: 7200) │ │ └── Probe -│ │ └── Scan: default.tpcds.household_demographics (#3) (read rows: 7200) +│ │ └── Scan: default.tpcds.store (#1) (read rows: 1) │ └── Probe -│ └── Scan: default.tpcds.store (#1) (read rows: 1) +│ └── Scan: default.tpcds.customer_address (#4) (read rows: 500) └── Probe - └── Scan: default.tpcds.customer_demographics (#2) (read rows: 19208) + └── Scan: default.tpcds.date_dim (#5) (read rows: 73049) # Q14 @@ -2310,25 +2310,25 @@ LIMIT 100 ; ---- HashJoin: INNER ├── Build -│ └── Scan: default.tpcds.store (#5) (read rows: 1) +│ └── HashJoin: INNER +│ ├── Build +│ │ └── HashJoin: INNER +│ │ ├── Build +│ │ │ └── Scan: default.tpcds.store (#5) (read rows: 1) +│ │ └── Probe +│ │ └── HashJoin: INNER +│ │ ├── Build +│ │ │ └── Scan: default.tpcds.item (#2) (read rows: 180) +│ │ └── Probe +│ │ └── HashJoin: INNER +│ │ ├── Build +│ │ │ └── Scan: default.tpcds.date_dim (#0) (read rows: 73049) +│ │ └── Probe +│ │ └── Scan: default.tpcds.store_sales (#1) (read rows: 28810) +│ └── Probe +│ └── Scan: default.tpcds.customer (#3) (read rows: 1000) └── Probe - └── HashJoin: INNER - ├── Build - │ └── HashJoin: INNER - │ ├── Build - │ │ └── HashJoin: INNER - │ │ ├── Build - │ │ │ └── Scan: default.tpcds.item (#2) (read rows: 180) - │ │ └── Probe - │ │ └── HashJoin: INNER - │ │ ├── Build - │ │ │ └── Scan: default.tpcds.date_dim (#0) (read rows: 73049) - │ │ └── Probe - │ │ └── Scan: default.tpcds.store_sales (#1) (read rows: 28810) - │ └── Probe - │ └── Scan: default.tpcds.customer (#3) (read rows: 1000) - └── Probe - └── Scan: default.tpcds.customer_address (#4) (read rows: 500) + └── Scan: default.tpcds.customer_address (#4) (read rows: 500) # Q20 query I @@ -2409,11 +2409,11 @@ HashJoin: INNER ├── Build │ └── HashJoin: INNER │ ├── Build -│ │ └── Scan: default.tpcds.item (#2) (read rows: 180) +│ │ └── Scan: default.tpcds.date_dim (#3) (read rows: 73049) │ └── Probe │ └── HashJoin: INNER │ ├── Build -│ │ └── Scan: default.tpcds.date_dim (#3) (read rows: 73049) +│ │ └── Scan: default.tpcds.item (#2) (read rows: 180) │ └── Probe │ └── Scan: default.tpcds.inventory (#0) (read rows: 23490) └── Probe @@ -2830,21 +2830,21 @@ LIMIT 100; ---- HashJoin: INNER ├── Build -│ └── Scan: default.tpcds.promotion (#4) (read rows: 3) +│ └── HashJoin: INNER +│ ├── Build +│ │ └── Scan: default.tpcds.promotion (#4) (read rows: 3) +│ └── Probe +│ └── HashJoin: INNER +│ ├── Build +│ │ └── HashJoin: INNER +│ │ ├── Build +│ │ │ └── Scan: default.tpcds.date_dim (#2) (read rows: 73049) +│ │ └── Probe +│ │ └── Scan: default.tpcds.catalog_sales (#0) (read rows: 14313) +│ └── Probe +│ └── Scan: default.tpcds.customer_demographics (#1) (read rows: 19208) └── Probe - └── HashJoin: INNER - ├── Build - │ └── HashJoin: INNER - │ ├── Build - │ │ └── HashJoin: INNER - │ │ ├── Build - │ │ │ └── Scan: default.tpcds.date_dim (#2) (read rows: 73049) - │ │ └── Probe - │ │ └── Scan: default.tpcds.catalog_sales (#0) (read rows: 14313) - │ └── Probe - │ └── Scan: default.tpcds.customer_demographics (#1) (read rows: 19208) - └── Probe - └── Scan: default.tpcds.item (#3) (read rows: 180) + └── Scan: default.tpcds.item (#3) (read rows: 180) # Q27 query I @@ -3373,13 +3373,13 @@ HashJoin: INNER ├── Build │ └── HashJoin: INNER │ ├── Build -│ │ └── HashJoin: INNER -│ │ ├── Build -│ │ │ └── Scan: default.tpcds.date_dim (#2) (read rows: 73049) -│ │ └── Probe -│ │ └── Scan: default.tpcds.catalog_sales (#0) (read rows: 14313) +│ │ └── Scan: default.tpcds.item (#1) (read rows: 0) │ └── Probe -│ └── Scan: default.tpcds.item (#1) (read rows: 0) +│ └── HashJoin: INNER +│ ├── Build +│ │ └── Scan: default.tpcds.date_dim (#2) (read rows: 73049) +│ └── Probe +│ └── Scan: default.tpcds.catalog_sales (#0) (read rows: 14313) └── Probe └── HashJoin: INNER ├── Build @@ -3723,11 +3723,11 @@ UnionAll │ ├── Left │ │ └── HashJoin: INNER │ │ ├── Build -│ │ │ └── Scan: default.tpcds.store (#3) (read rows: 1) +│ │ │ └── Scan: default.tpcds.item (#2) (read rows: 180) │ │ └── Probe │ │ └── HashJoin: INNER │ │ ├── Build -│ │ │ └── Scan: default.tpcds.item (#2) (read rows: 180) +│ │ │ └── Scan: default.tpcds.store (#3) (read rows: 1) │ │ └── Probe │ │ └── HashJoin: INNER │ │ ├── Build @@ -3737,11 +3737,11 @@ UnionAll │ └── Right │ └── HashJoin: INNER │ ├── Build -│ │ └── Scan: default.tpcds.store (#7) (read rows: 1) +│ │ └── Scan: default.tpcds.item (#6) (read rows: 180) │ └── Probe │ └── HashJoin: INNER │ ├── Build -│ │ └── Scan: default.tpcds.item (#6) (read rows: 180) +│ │ └── Scan: default.tpcds.store (#7) (read rows: 1) │ └── Probe │ └── HashJoin: INNER │ ├── Build @@ -3751,11 +3751,11 @@ UnionAll └── Right └── HashJoin: INNER ├── Build - │ └── Scan: default.tpcds.store (#11) (read rows: 1) + │ └── Scan: default.tpcds.item (#10) (read rows: 180) └── Probe └── HashJoin: INNER ├── Build - │ └── Scan: default.tpcds.item (#10) (read rows: 180) + │ └── Scan: default.tpcds.store (#11) (read rows: 1) └── Probe └── HashJoin: INNER ├── Build @@ -3793,11 +3793,11 @@ HashJoin: INNER ├── Build │ └── HashJoin: INNER │ ├── Build -│ │ └── Scan: default.tpcds.item (#0) (read rows: 180) +│ │ └── Scan: default.tpcds.date_dim (#2) (read rows: 73049) │ └── Probe │ └── HashJoin: INNER │ ├── Build -│ │ └── Scan: default.tpcds.date_dim (#2) (read rows: 73049) +│ │ └── Scan: default.tpcds.item (#0) (read rows: 180) │ └── Probe │ └── Scan: default.tpcds.inventory (#1) (read rows: 23490) └── Probe @@ -3935,11 +3935,11 @@ HashJoin: INNER ├── Build │ └── HashJoin: INNER │ ├── Build -│ │ └── Scan: default.tpcds.warehouse (#6) (read rows: 1) +│ │ └── Scan: default.tpcds.item (#5) (read rows: 180) │ └── Probe │ └── HashJoin: INNER │ ├── Build -│ │ └── Scan: default.tpcds.item (#5) (read rows: 180) +│ │ └── Scan: default.tpcds.warehouse (#6) (read rows: 1) │ └── Probe │ └── HashJoin: INNER │ ├── Build @@ -3949,11 +3949,11 @@ HashJoin: INNER └── Probe └── HashJoin: INNER ├── Build - │ └── Scan: default.tpcds.warehouse (#2) (read rows: 1) + │ └── Scan: default.tpcds.item (#1) (read rows: 180) └── Probe └── HashJoin: INNER ├── Build - │ └── Scan: default.tpcds.item (#1) (read rows: 180) + │ └── Scan: default.tpcds.warehouse (#2) (read rows: 1) └── Probe └── HashJoin: INNER ├── Build @@ -4318,11 +4318,11 @@ HashJoin: RIGHT MARK │ │ │ └── Probe │ │ │ └── Scan: default.tpcds.web_sales (#0) (read rows: 7212) │ │ └── Probe - │ │ └── Scan: default.tpcds.customer (#1) (read rows: 1000) + │ │ └── Scan: default.tpcds.item (#4) (read rows: 180) │ └── Probe - │ └── Scan: default.tpcds.customer_address (#2) (read rows: 500) + │ └── Scan: default.tpcds.customer (#1) (read rows: 1000) └── Probe - └── Scan: default.tpcds.item (#4) (read rows: 180) + └── Scan: default.tpcds.customer_address (#2) (read rows: 500) # Q46 query I @@ -4576,11 +4576,11 @@ HashJoin: INNER │ │ │ └── Probe │ │ │ └── Scan: default.tpcds.customer_address (#3) (read rows: 500) │ │ └── Probe -│ │ └── Scan: default.tpcds.date_dim (#4) (read rows: 73049) +│ │ └── Scan: default.tpcds.store (#1) (read rows: 1) │ └── Probe │ └── Scan: default.tpcds.customer_demographics (#2) (read rows: 19208) └── Probe - └── Scan: default.tpcds.store (#1) (read rows: 1) + └── Scan: default.tpcds.date_dim (#4) (read rows: 73049) # Q49 query I @@ -5460,13 +5460,13 @@ HashJoin: INNER │ ├── Build │ │ └── HashJoin: INNER │ │ ├── Build -│ │ │ └── HashJoin: INNER -│ │ │ ├── Build -│ │ │ │ └── Scan: default.tpcds.item (#6) (read rows: 180) -│ │ │ └── Probe -│ │ │ └── Scan: default.tpcds.catalog_sales (#5) (read rows: 14313) +│ │ │ └── Scan: default.tpcds.item (#6) (read rows: 180) │ │ └── Probe -│ │ └── Scan: default.tpcds.date_dim (#7) (read rows: 73049) +│ │ └── HashJoin: INNER +│ │ ├── Build +│ │ │ └── Scan: default.tpcds.catalog_sales (#5) (read rows: 14313) +│ │ └── Probe +│ │ └── Scan: default.tpcds.date_dim (#7) (read rows: 73049) │ └── Probe │ └── HashJoin: INNER │ ├── Build @@ -6116,65 +6116,65 @@ HashJoin: INNER │ │ │ │ │ │ ├── Build │ │ │ │ │ │ │ └── HashJoin: INNER │ │ │ │ │ │ │ ├── Build -│ │ │ │ │ │ │ │ └── Scan: default.tpcds.promotion (#30) (read rows: 3) +│ │ │ │ │ │ │ │ └── HashJoin: INNER +│ │ │ │ │ │ │ │ ├── Build +│ │ │ │ │ │ │ │ │ └── HashJoin: INNER +│ │ │ │ │ │ │ │ │ ├── Build +│ │ │ │ │ │ │ │ │ │ └── HashJoin: INNER +│ │ │ │ │ │ │ │ │ │ ├── Build +│ │ │ │ │ │ │ │ │ │ │ └── HashJoin: INNER +│ │ │ │ │ │ │ │ │ │ │ ├── Build +│ │ │ │ │ │ │ │ │ │ │ │ └── Scan: default.tpcds.promotion (#30) (read rows: 3) +│ │ │ │ │ │ │ │ │ │ │ └── Probe +│ │ │ │ │ │ │ │ │ │ │ └── HashJoin: INNER +│ │ │ │ │ │ │ │ │ │ │ ├── Build +│ │ │ │ │ │ │ │ │ │ │ │ └── HashJoin: INNER +│ │ │ │ │ │ │ │ │ │ │ │ ├── Build +│ │ │ │ │ │ │ │ │ │ │ │ │ └── Scan: default.tpcds.store (#26) (read rows: 1) +│ │ │ │ │ │ │ │ │ │ │ │ └── Probe +│ │ │ │ │ │ │ │ │ │ │ │ └── HashJoin: INNER +│ │ │ │ │ │ │ │ │ │ │ │ ├── Build +│ │ │ │ │ │ │ │ │ │ │ │ │ └── HashJoin: INNER +│ │ │ │ │ │ │ │ │ │ │ │ │ ├── Build +│ │ │ │ │ │ │ │ │ │ │ │ │ │ └── HashJoin: INNER +│ │ │ │ │ │ │ │ │ │ │ │ │ │ ├── Build +│ │ │ │ │ │ │ │ │ │ │ │ │ │ │ └── HashJoin: INNER +│ │ │ │ │ │ │ │ │ │ │ │ │ │ │ ├── Build +│ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ └── Scan: default.tpcds.item (#37) (read rows: 180) +│ │ │ │ │ │ │ │ │ │ │ │ │ │ │ └── Probe +│ │ │ │ │ │ │ │ │ │ │ │ │ │ │ └── HashJoin: INNER +│ │ │ │ │ │ │ │ │ │ │ │ │ │ │ ├── Build +│ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ └── Scan: default.tpcds.catalog_returns (#22) (read rows: 1358) +│ │ │ │ │ │ │ │ │ │ │ │ │ │ │ └── Probe +│ │ │ │ │ │ │ │ │ │ │ │ │ │ │ └── Scan: default.tpcds.catalog_sales (#21) (read rows: 14313) +│ │ │ │ │ │ │ │ │ │ │ │ │ │ └── Probe +│ │ │ │ │ │ │ │ │ │ │ │ │ │ └── Scan: default.tpcds.store_returns (#20) (read rows: 2810) +│ │ │ │ │ │ │ │ │ │ │ │ │ └── Probe +│ │ │ │ │ │ │ │ │ │ │ │ │ └── Scan: default.tpcds.store_sales (#19) (read rows: 28810) +│ │ │ │ │ │ │ │ │ │ │ │ └── Probe +│ │ │ │ │ │ │ │ │ │ │ │ └── Scan: default.tpcds.date_dim (#23) (read rows: 73049) +│ │ │ │ │ │ │ │ │ │ │ └── Probe +│ │ │ │ │ │ │ │ │ │ │ └── Scan: default.tpcds.customer_demographics (#28) (read rows: 19208) +│ │ │ │ │ │ │ │ │ │ └── Probe +│ │ │ │ │ │ │ │ │ │ └── Scan: default.tpcds.household_demographics (#31) (read rows: 7200) +│ │ │ │ │ │ │ │ │ └── Probe +│ │ │ │ │ │ │ │ │ └── Scan: default.tpcds.income_band (#35) (read rows: 20) +│ │ │ │ │ │ │ │ └── Probe +│ │ │ │ │ │ │ │ └── Scan: default.tpcds.customer_address (#33) (read rows: 500) │ │ │ │ │ │ │ └── Probe -│ │ │ │ │ │ │ └── HashJoin: INNER -│ │ │ │ │ │ │ ├── Build -│ │ │ │ │ │ │ │ └── HashJoin: INNER -│ │ │ │ │ │ │ │ ├── Build -│ │ │ │ │ │ │ │ │ └── HashJoin: INNER -│ │ │ │ │ │ │ │ │ ├── Build -│ │ │ │ │ │ │ │ │ │ └── HashJoin: INNER -│ │ │ │ │ │ │ │ │ │ ├── Build -│ │ │ │ │ │ │ │ │ │ │ └── HashJoin: INNER -│ │ │ │ │ │ │ │ │ │ │ ├── Build -│ │ │ │ │ │ │ │ │ │ │ │ └── HashJoin: INNER -│ │ │ │ │ │ │ │ │ │ │ │ ├── Build -│ │ │ │ │ │ │ │ │ │ │ │ │ └── Scan: default.tpcds.store (#26) (read rows: 1) -│ │ │ │ │ │ │ │ │ │ │ │ └── Probe -│ │ │ │ │ │ │ │ │ │ │ │ └── HashJoin: INNER -│ │ │ │ │ │ │ │ │ │ │ │ ├── Build -│ │ │ │ │ │ │ │ │ │ │ │ │ └── HashJoin: INNER -│ │ │ │ │ │ │ │ │ │ │ │ │ ├── Build -│ │ │ │ │ │ │ │ │ │ │ │ │ │ └── HashJoin: INNER -│ │ │ │ │ │ │ │ │ │ │ │ │ │ ├── Build -│ │ │ │ │ │ │ │ │ │ │ │ │ │ │ └── HashJoin: INNER -│ │ │ │ │ │ │ │ │ │ │ │ │ │ │ ├── Build -│ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ └── Scan: default.tpcds.item (#37) (read rows: 180) -│ │ │ │ │ │ │ │ │ │ │ │ │ │ │ └── Probe -│ │ │ │ │ │ │ │ │ │ │ │ │ │ │ └── HashJoin: INNER -│ │ │ │ │ │ │ │ │ │ │ │ │ │ │ ├── Build -│ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ └── Scan: default.tpcds.catalog_returns (#22) (read rows: 1358) -│ │ │ │ │ │ │ │ │ │ │ │ │ │ │ └── Probe -│ │ │ │ │ │ │ │ │ │ │ │ │ │ │ └── Scan: default.tpcds.catalog_sales (#21) (read rows: 14313) -│ │ │ │ │ │ │ │ │ │ │ │ │ │ └── Probe -│ │ │ │ │ │ │ │ │ │ │ │ │ │ └── Scan: default.tpcds.store_returns (#20) (read rows: 2810) -│ │ │ │ │ │ │ │ │ │ │ │ │ └── Probe -│ │ │ │ │ │ │ │ │ │ │ │ │ └── Scan: default.tpcds.store_sales (#19) (read rows: 28810) -│ │ │ │ │ │ │ │ │ │ │ │ └── Probe -│ │ │ │ │ │ │ │ │ │ │ │ └── Scan: default.tpcds.date_dim (#23) (read rows: 73049) -│ │ │ │ │ │ │ │ │ │ │ └── Probe -│ │ │ │ │ │ │ │ │ │ │ └── Scan: default.tpcds.customer (#27) (read rows: 1000) -│ │ │ │ │ │ │ │ │ │ └── Probe -│ │ │ │ │ │ │ │ │ │ └── Scan: default.tpcds.date_dim (#24) (read rows: 73049) -│ │ │ │ │ │ │ │ │ └── Probe -│ │ │ │ │ │ │ │ │ └── Scan: default.tpcds.date_dim (#25) (read rows: 73049) -│ │ │ │ │ │ │ │ └── Probe -│ │ │ │ │ │ │ │ └── Scan: default.tpcds.customer_demographics (#28) (read rows: 19208) -│ │ │ │ │ │ │ └── Probe -│ │ │ │ │ │ │ └── Scan: default.tpcds.customer_demographics (#29) (read rows: 19208) +│ │ │ │ │ │ │ └── Scan: default.tpcds.customer (#27) (read rows: 1000) │ │ │ │ │ │ └── Probe -│ │ │ │ │ │ └── Scan: default.tpcds.household_demographics (#31) (read rows: 7200) +│ │ │ │ │ │ └── Scan: default.tpcds.customer_demographics (#29) (read rows: 19208) │ │ │ │ │ └── Probe │ │ │ │ │ └── Scan: default.tpcds.household_demographics (#32) (read rows: 7200) │ │ │ │ └── Probe -│ │ │ │ └── Scan: default.tpcds.customer_address (#33) (read rows: 500) +│ │ │ │ └── Scan: default.tpcds.income_band (#36) (read rows: 20) │ │ │ └── Probe │ │ │ └── Scan: default.tpcds.customer_address (#34) (read rows: 500) │ │ └── Probe -│ │ └── Scan: default.tpcds.income_band (#35) (read rows: 20) +│ │ └── Scan: default.tpcds.date_dim (#24) (read rows: 73049) │ └── Probe -│ └── Scan: default.tpcds.income_band (#36) (read rows: 20) +│ └── Scan: default.tpcds.date_dim (#25) (read rows: 73049) └── Probe └── HashJoin: INNER ├── Build @@ -6190,65 +6190,65 @@ HashJoin: INNER │ │ │ │ │ ├── Build │ │ │ │ │ │ └── HashJoin: INNER │ │ │ │ │ │ ├── Build - │ │ │ │ │ │ │ └── Scan: default.tpcds.promotion (#11) (read rows: 3) + │ │ │ │ │ │ │ └── HashJoin: INNER + │ │ │ │ │ │ │ ├── Build + │ │ │ │ │ │ │ │ └── HashJoin: INNER + │ │ │ │ │ │ │ │ ├── Build + │ │ │ │ │ │ │ │ │ └── HashJoin: INNER + │ │ │ │ │ │ │ │ │ ├── Build + │ │ │ │ │ │ │ │ │ │ └── HashJoin: INNER + │ │ │ │ │ │ │ │ │ │ ├── Build + │ │ │ │ │ │ │ │ │ │ │ └── Scan: default.tpcds.promotion (#11) (read rows: 3) + │ │ │ │ │ │ │ │ │ │ └── Probe + │ │ │ │ │ │ │ │ │ │ └── HashJoin: INNER + │ │ │ │ │ │ │ │ │ │ ├── Build + │ │ │ │ │ │ │ │ │ │ │ └── HashJoin: INNER + │ │ │ │ │ │ │ │ │ │ │ ├── Build + │ │ │ │ │ │ │ │ │ │ │ │ └── Scan: default.tpcds.store (#7) (read rows: 1) + │ │ │ │ │ │ │ │ │ │ │ └── Probe + │ │ │ │ │ │ │ │ │ │ │ └── HashJoin: INNER + │ │ │ │ │ │ │ │ │ │ │ ├── Build + │ │ │ │ │ │ │ │ │ │ │ │ └── HashJoin: INNER + │ │ │ │ │ │ │ │ │ │ │ │ ├── Build + │ │ │ │ │ │ │ │ │ │ │ │ │ └── HashJoin: INNER + │ │ │ │ │ │ │ │ │ │ │ │ │ ├── Build + │ │ │ │ │ │ │ │ │ │ │ │ │ │ └── HashJoin: INNER + │ │ │ │ │ │ │ │ │ │ │ │ │ │ ├── Build + │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ └── Scan: default.tpcds.item (#18) (read rows: 180) + │ │ │ │ │ │ │ │ │ │ │ │ │ │ └── Probe + │ │ │ │ │ │ │ │ │ │ │ │ │ │ └── HashJoin: INNER + │ │ │ │ │ │ │ │ │ │ │ │ │ │ ├── Build + │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ └── Scan: default.tpcds.catalog_returns (#3) (read rows: 1358) + │ │ │ │ │ │ │ │ │ │ │ │ │ │ └── Probe + │ │ │ │ │ │ │ │ │ │ │ │ │ │ └── Scan: default.tpcds.catalog_sales (#2) (read rows: 14313) + │ │ │ │ │ │ │ │ │ │ │ │ │ └── Probe + │ │ │ │ │ │ │ │ │ │ │ │ │ └── Scan: default.tpcds.store_returns (#1) (read rows: 2810) + │ │ │ │ │ │ │ │ │ │ │ │ └── Probe + │ │ │ │ │ │ │ │ │ │ │ │ └── Scan: default.tpcds.store_sales (#0) (read rows: 28810) + │ │ │ │ │ │ │ │ │ │ │ └── Probe + │ │ │ │ │ │ │ │ │ │ │ └── Scan: default.tpcds.date_dim (#4) (read rows: 73049) + │ │ │ │ │ │ │ │ │ │ └── Probe + │ │ │ │ │ │ │ │ │ │ └── Scan: default.tpcds.customer_demographics (#9) (read rows: 19208) + │ │ │ │ │ │ │ │ │ └── Probe + │ │ │ │ │ │ │ │ │ └── Scan: default.tpcds.household_demographics (#12) (read rows: 7200) + │ │ │ │ │ │ │ │ └── Probe + │ │ │ │ │ │ │ │ └── Scan: default.tpcds.income_band (#16) (read rows: 20) + │ │ │ │ │ │ │ └── Probe + │ │ │ │ │ │ │ └── Scan: default.tpcds.customer_address (#14) (read rows: 500) │ │ │ │ │ │ └── Probe - │ │ │ │ │ │ └── HashJoin: INNER - │ │ │ │ │ │ ├── Build - │ │ │ │ │ │ │ └── HashJoin: INNER - │ │ │ │ │ │ │ ├── Build - │ │ │ │ │ │ │ │ └── HashJoin: INNER - │ │ │ │ │ │ │ │ ├── Build - │ │ │ │ │ │ │ │ │ └── HashJoin: INNER - │ │ │ │ │ │ │ │ │ ├── Build - │ │ │ │ │ │ │ │ │ │ └── HashJoin: INNER - │ │ │ │ │ │ │ │ │ │ ├── Build - │ │ │ │ │ │ │ │ │ │ │ └── HashJoin: INNER - │ │ │ │ │ │ │ │ │ │ │ ├── Build - │ │ │ │ │ │ │ │ │ │ │ │ └── Scan: default.tpcds.store (#7) (read rows: 1) - │ │ │ │ │ │ │ │ │ │ │ └── Probe - │ │ │ │ │ │ │ │ │ │ │ └── HashJoin: INNER - │ │ │ │ │ │ │ │ │ │ │ ├── Build - │ │ │ │ │ │ │ │ │ │ │ │ └── HashJoin: INNER - │ │ │ │ │ │ │ │ │ │ │ │ ├── Build - │ │ │ │ │ │ │ │ │ │ │ │ │ └── HashJoin: INNER - │ │ │ │ │ │ │ │ │ │ │ │ │ ├── Build - │ │ │ │ │ │ │ │ │ │ │ │ │ │ └── HashJoin: INNER - │ │ │ │ │ │ │ │ │ │ │ │ │ │ ├── Build - │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ └── Scan: default.tpcds.item (#18) (read rows: 180) - │ │ │ │ │ │ │ │ │ │ │ │ │ │ └── Probe - │ │ │ │ │ │ │ │ │ │ │ │ │ │ └── HashJoin: INNER - │ │ │ │ │ │ │ │ │ │ │ │ │ │ ├── Build - │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ └── Scan: default.tpcds.catalog_returns (#3) (read rows: 1358) - │ │ │ │ │ │ │ │ │ │ │ │ │ │ └── Probe - │ │ │ │ │ │ │ │ │ │ │ │ │ │ └── Scan: default.tpcds.catalog_sales (#2) (read rows: 14313) - │ │ │ │ │ │ │ │ │ │ │ │ │ └── Probe - │ │ │ │ │ │ │ │ │ │ │ │ │ └── Scan: default.tpcds.store_returns (#1) (read rows: 2810) - │ │ │ │ │ │ │ │ │ │ │ │ └── Probe - │ │ │ │ │ │ │ │ │ │ │ │ └── Scan: default.tpcds.store_sales (#0) (read rows: 28810) - │ │ │ │ │ │ │ │ │ │ │ └── Probe - │ │ │ │ │ │ │ │ │ │ │ └── Scan: default.tpcds.date_dim (#4) (read rows: 73049) - │ │ │ │ │ │ │ │ │ │ └── Probe - │ │ │ │ │ │ │ │ │ │ └── Scan: default.tpcds.customer (#8) (read rows: 1000) - │ │ │ │ │ │ │ │ │ └── Probe - │ │ │ │ │ │ │ │ │ └── Scan: default.tpcds.date_dim (#5) (read rows: 73049) - │ │ │ │ │ │ │ │ └── Probe - │ │ │ │ │ │ │ │ └── Scan: default.tpcds.date_dim (#6) (read rows: 73049) - │ │ │ │ │ │ │ └── Probe - │ │ │ │ │ │ │ └── Scan: default.tpcds.customer_demographics (#9) (read rows: 19208) - │ │ │ │ │ │ └── Probe - │ │ │ │ │ │ └── Scan: default.tpcds.customer_demographics (#10) (read rows: 19208) + │ │ │ │ │ │ └── Scan: default.tpcds.customer (#8) (read rows: 1000) │ │ │ │ │ └── Probe - │ │ │ │ │ └── Scan: default.tpcds.household_demographics (#12) (read rows: 7200) + │ │ │ │ │ └── Scan: default.tpcds.customer_demographics (#10) (read rows: 19208) │ │ │ │ └── Probe │ │ │ │ └── Scan: default.tpcds.household_demographics (#13) (read rows: 7200) │ │ │ └── Probe - │ │ │ └── Scan: default.tpcds.customer_address (#14) (read rows: 500) + │ │ │ └── Scan: default.tpcds.income_band (#17) (read rows: 20) │ │ └── Probe │ │ └── Scan: default.tpcds.customer_address (#15) (read rows: 500) │ └── Probe - │ └── Scan: default.tpcds.income_band (#16) (read rows: 20) + │ └── Scan: default.tpcds.date_dim (#5) (read rows: 73049) └── Probe - └── Scan: default.tpcds.income_band (#17) (read rows: 20) + └── Scan: default.tpcds.date_dim (#6) (read rows: 73049) # Q65 query I @@ -6297,21 +6297,21 @@ HashJoin: INNER │ ├── Build │ │ └── HashJoin: INNER │ │ ├── Build -│ │ │ └── Scan: default.tpcds.date_dim (#3) (read rows: 73049) +│ │ │ └── HashJoin: INNER +│ │ │ ├── Build +│ │ │ │ └── Scan: default.tpcds.date_dim (#3) (read rows: 73049) +│ │ │ └── Probe +│ │ │ └── Scan: default.tpcds.store_sales (#2) (read rows: 28810) │ │ └── Probe -│ │ └── Scan: default.tpcds.store_sales (#2) (read rows: 28810) +│ │ └── Scan: default.tpcds.store (#0) (read rows: 1) │ └── Probe -│ └── Scan: default.tpcds.store (#0) (read rows: 1) +│ └── HashJoin: INNER +│ ├── Build +│ │ └── Scan: default.tpcds.date_dim (#5) (read rows: 73049) +│ └── Probe +│ └── Scan: default.tpcds.store_sales (#4) (read rows: 28810) └── Probe - └── HashJoin: INNER - ├── Build - │ └── HashJoin: INNER - │ ├── Build - │ │ └── Scan: default.tpcds.date_dim (#5) (read rows: 73049) - │ └── Probe - │ └── Scan: default.tpcds.store_sales (#4) (read rows: 28810) - └── Probe - └── Scan: default.tpcds.item (#1) (read rows: 180) + └── Scan: default.tpcds.item (#1) (read rows: 180) # Q66 @@ -6712,7 +6712,7 @@ HashJoin: INNER # Q69 -query +query explain join SELECT cd_gender, cd_marital_status, @@ -7502,13 +7502,13 @@ UnionAll │ └── Right │ └── HashJoin: INNER │ ├── Build -│ │ └── Scan: default.tpcds.item (#4) (read rows: 180) +│ │ └── HashJoin: INNER +│ │ ├── Build +│ │ │ └── Scan: default.tpcds.item (#4) (read rows: 180) +│ │ └── Probe +│ │ └── Scan: default.tpcds.web_sales (#3) (read rows: 0) │ └── Probe -│ └── HashJoin: INNER -│ ├── Build -│ │ └── Scan: default.tpcds.web_sales (#3) (read rows: 0) -│ └── Probe -│ └── Scan: default.tpcds.date_dim (#5) (read rows: 73049) +│ └── Scan: default.tpcds.date_dim (#5) (read rows: 73049) └── Right └── HashJoin: INNER ├── Build @@ -7626,13 +7626,13 @@ UnionAll │ │ ├── Build │ │ │ └── HashJoin: INNER │ │ │ ├── Build -│ │ │ │ └── HashJoin: INNER -│ │ │ │ ├── Build -│ │ │ │ │ └── Scan: default.tpcds.date_dim (#4) (read rows: 73049) -│ │ │ │ └── Probe -│ │ │ │ └── Scan: default.tpcds.store_returns (#3) (read rows: 2810) +│ │ │ │ └── Scan: default.tpcds.store (#5) (read rows: 1) │ │ │ └── Probe -│ │ │ └── Scan: default.tpcds.store (#5) (read rows: 1) +│ │ │ └── HashJoin: INNER +│ │ │ ├── Build +│ │ │ │ └── Scan: default.tpcds.date_dim (#4) (read rows: 73049) +│ │ │ └── Probe +│ │ │ └── Scan: default.tpcds.store_returns (#3) (read rows: 2810) │ │ └── Probe │ │ └── HashJoin: INNER │ │ ├── Build @@ -7662,23 +7662,23 @@ UnionAll ├── Build │ └── HashJoin: INNER │ ├── Build - │ │ └── HashJoin: INNER - │ │ ├── Build - │ │ │ └── Scan: default.tpcds.date_dim (#14) (read rows: 73049) - │ │ └── Probe - │ │ └── Scan: default.tpcds.web_returns (#13) (read rows: 679) + │ │ └── Scan: default.tpcds.web_page (#15) (read rows: 1) │ └── Probe - │ └── Scan: default.tpcds.web_page (#15) (read rows: 1) + │ └── HashJoin: INNER + │ ├── Build + │ │ └── Scan: default.tpcds.date_dim (#14) (read rows: 73049) + │ └── Probe + │ └── Scan: default.tpcds.web_returns (#13) (read rows: 679) └── Probe └── HashJoin: INNER ├── Build - │ └── HashJoin: INNER - │ ├── Build - │ │ └── Scan: default.tpcds.date_dim (#11) (read rows: 73049) - │ └── Probe - │ └── Scan: default.tpcds.web_sales (#10) (read rows: 7212) + │ └── Scan: default.tpcds.web_page (#12) (read rows: 1) └── Probe - └── Scan: default.tpcds.web_page (#12) (read rows: 1) + └── HashJoin: INNER + ├── Build + │ └── Scan: default.tpcds.date_dim (#11) (read rows: 73049) + └── Probe + └── Scan: default.tpcds.web_sales (#10) (read rows: 7212) # Q78 query I @@ -8132,18 +8132,18 @@ HashJoin: INNER ├── Build │ └── HashJoin: INNER │ ├── Build -│ │ └── Scan: default.tpcds.item (#0) (read rows: 180) +│ │ └── Scan: default.tpcds.date_dim (#2) (read rows: 73049) │ └── Probe │ └── HashJoin: INNER │ ├── Build -│ │ └── Scan: default.tpcds.date_dim (#2) (read rows: 73049) +│ │ └── Scan: default.tpcds.item (#0) (read rows: 180) │ └── Probe │ └── Scan: default.tpcds.inventory (#1) (read rows: 23490) └── Probe └── Scan: default.tpcds.store_sales (#3) (read rows: 28810) # Q83 -query +query explain join WITH sr_items AS (SELECT i_item_id item_id, @@ -8399,15 +8399,15 @@ HashJoin: INNER │ │ │ │ │ └── Probe │ │ │ │ │ └── Scan: default.tpcds.customer_address (#5) (read rows: 500) │ │ │ │ └── Probe -│ │ │ │ └── Scan: default.tpcds.date_dim (#6) (read rows: 73049) +│ │ │ │ └── Scan: default.tpcds.web_page (#2) (read rows: 1) │ │ │ └── Probe -│ │ │ └── Scan: default.tpcds.web_page (#2) (read rows: 1) +│ │ │ └── Scan: default.tpcds.customer_demographics (#3) (read rows: 19208) │ │ └── Probe -│ │ └── Scan: default.tpcds.customer_demographics (#3) (read rows: 19208) +│ │ └── Scan: default.tpcds.customer_demographics (#4) (read rows: 19208) │ └── Probe -│ └── Scan: default.tpcds.customer_demographics (#4) (read rows: 19208) +│ └── Scan: default.tpcds.reason (#7) (read rows: 1) └── Probe - └── Scan: default.tpcds.reason (#7) (read rows: 1) + └── Scan: default.tpcds.date_dim (#6) (read rows: 73049) # Q86 query I @@ -8963,13 +8963,13 @@ HashJoin: INNER ├── Build │ └── HashJoin: INNER │ ├── Build -│ │ └── HashJoin: INNER -│ │ ├── Build -│ │ │ └── Scan: default.tpcds.date_dim (#2) (read rows: 73049) -│ │ └── Probe -│ │ └── Scan: default.tpcds.web_sales (#0) (read rows: 7212) +│ │ └── Scan: default.tpcds.item (#1) (read rows: 180) │ └── Probe -│ └── Scan: default.tpcds.item (#1) (read rows: 180) +│ └── HashJoin: INNER +│ ├── Build +│ │ └── Scan: default.tpcds.date_dim (#2) (read rows: 73049) +│ └── Probe +│ └── Scan: default.tpcds.web_sales (#0) (read rows: 7212) └── Probe └── HashJoin: INNER ├── Build @@ -9318,4 +9318,3 @@ HashJoin: INNER │ └── Scan: default.tpcds.catalog_sales (#0) (read rows: 14313) └── Probe └── Scan: default.tpcds.ship_mode (#2) (read rows: 20) - diff --git a/tests/sqllogictests/suites/tpch/join.test b/tests/sqllogictests/suites/tpch/join.test index 6beb1d25c877..a91d98d61240 100644 --- a/tests/sqllogictests/suites/tpch/join.test +++ b/tests/sqllogictests/suites/tpch/join.test @@ -331,13 +331,13 @@ select l_orderkey from (select * from lineitem order by l_orderkey limit 5000) a # LEFT OUTER / LEFT SINGEL / FULL query I -select l_orderkey, o_orderdate, o_shippriority from lineitem left join orders on l_orderkey = o_orderkey and o_orderdate < to_date('1995-03-15') order by o_orderdate limit 5; +select l_orderkey, o_orderdate, o_shippriority from lineitem left join orders on l_orderkey = o_orderkey and o_orderdate < to_date('1995-03-15') order by o_orderdate, l_orderkey limit 5; ---- -571586 1992-01-01 0 -190656 1992-01-01 0 -359170 1992-01-01 0 -414725 1992-01-01 0 -45697 1992-01-01 0 +3271 1992-01-01 0 +3271 1992-01-01 0 +3271 1992-01-01 0 +3271 1992-01-01 0 +5607 1992-01-01 0 # LEFT ANTI query I @@ -353,4 +353,3 @@ select o_custkey from orders where not exists (select * from customer where subs 1 1 4 - diff --git a/tests/sqllogictests/suites/tpch/queries.test b/tests/sqllogictests/suites/tpch/queries.test index f73b0ec40d14..9f2d13e60cc7 100644 --- a/tests/sqllogictests/suites/tpch/queries.test +++ b/tests/sqllogictests/suites/tpch/queries.test @@ -1849,13 +1849,13 @@ MaterializedCte: 0 ├── Build │ └── CTEScan │ ├── CTE index: 0, sub index: 2 - │ └── estimated rows: 81.00 + │ └── estimated rows: 80.00 └── Probe └── HashJoin: INNER ├── Build │ └── CTEScan │ ├── CTE index: 0, sub index: 1 - │ └── estimated rows: 81.00 + │ └── estimated rows: 80.00 └── Probe └── Scan: default.tpch_test.supplier (#0) (read rows: 1000) diff --git a/tests/suites/0_stateless/12_time_travel/12_0004_time_travel_select_at.result b/tests/suites/0_stateless/12_time_travel/12_0004_time_travel_select_at.result index 544653c664fa..f77dba39b1bd 100644 --- a/tests/suites/0_stateless/12_time_travel/12_0004_time_travel_select_at.result +++ b/tests/suites/0_stateless/12_time_travel/12_0004_time_travel_select_at.result @@ -3,7 +3,11 @@ latest snapshot should contain 3 rows 3 counting the data set of first insertion, which should contain 2 rows 2 -planner_v2: counting the data set of first insertion, which should contain 2 rows +counting the data set of first insertion, which should contain 2 rows 2 -planner_v2: counting the data set of first insertion by timestamp, which should contains 2 rows +counting the data since first insertion, which should contain 1 row +1 +counting the data set of first insertion by timestamp, which should contains 2 rows 2 +counting the data since of first insertion by timestamp, which should contains 1 row +1 diff --git a/tests/suites/0_stateless/12_time_travel/12_0004_time_travel_select_at.sh b/tests/suites/0_stateless/12_time_travel/12_0004_time_travel_select_at.sh index fddc069d15ce..0fc3e9940570 100755 --- a/tests/suites/0_stateless/12_time_travel/12_0004_time_travel_select_at.sh +++ b/tests/suites/0_stateless/12_time_travel/12_0004_time_travel_select_at.sh @@ -19,15 +19,21 @@ SNAPSHOT_ID=$(echo "select previous_snapshot_id from fuse_snapshot('default','t1 echo "counting the data set of first insertion, which should contain 2 rows" echo "select count(*) from t12_0004 at (snapshot => '$SNAPSHOT_ID')" | $BENDSQL_CLIENT_CONNECT -echo "planner_v2: counting the data set of first insertion, which should contain 2 rows" +echo "counting the data set of first insertion, which should contain 2 rows" echo "select count(t.c) from t12_0004 at (snapshot => '$SNAPSHOT_ID') as t" | $BENDSQL_CLIENT_CONNECT +echo "counting the data since first insertion, which should contain 1 row" +echo "select count(t.c) from t12_0004 since (snapshot => '$SNAPSHOT_ID') as t" | $BENDSQL_CLIENT_CONNECT + # Get a time point at/after the first insertion. TIMEPOINT=$(echo "select timestamp from fuse_snapshot('default', 't12_0004') where row_count=2" | $BENDSQL_CLIENT_CONNECT) -echo "planner_v2: counting the data set of first insertion by timestamp, which should contains 2 rows" +echo "counting the data set of first insertion by timestamp, which should contains 2 rows" echo "select count(t.c) from t12_0004 at (TIMESTAMP => '$TIMEPOINT'::TIMESTAMP) as t" | $BENDSQL_CLIENT_CONNECT +echo "counting the data since of first insertion by timestamp, which should contains 1 row" +echo "select count(t.c) from t12_0004 since (TIMESTAMP => '$TIMEPOINT'::TIMESTAMP) as t" | $BENDSQL_CLIENT_CONNECT + ## Drop table. echo "drop table t12_0004" | $BENDSQL_CLIENT_CONNECT diff --git a/tests/suites/0_stateless/17_altertable/17_0001_time_travel_alter_add_drop_column_select_at.result b/tests/suites/0_stateless/17_altertable/17_0001_time_travel_alter_add_drop_column_select_at.result index eb802b9f197a..43b5eb68fc7b 100644 --- a/tests/suites/0_stateless/17_altertable/17_0001_time_travel_alter_add_drop_column_select_at.result +++ b/tests/suites/0_stateless/17_altertable/17_0001_time_travel_alter_add_drop_column_select_at.result @@ -7,11 +7,11 @@ counting the data set of first insertion, which should contain 2 rows select the data set of first insertion, which should contain 2 rows 1 2 -planner_v2: counting the data set of first insertion, which should contain 2 rows +counting the data set of first insertion, which should contain 2 rows 2 -planner_v2: counting the data set of first insertion by timestamp, which should contains 2 rows +counting the data set of first insertion by timestamp, which should contains 2 rows 2 -planner_v2: select the data set of first insertion by timestamp, which should contains 2 rows +select the data set of first insertion by timestamp, which should contains 2 rows 1 2 alter table drop a column @@ -20,10 +20,10 @@ counting the data set of first insertion, which should contain 2 rows select the data set of first insertion, which should contain 2 rows 1 2 -planner_v2: counting the data set of first insertion, which should contain 2 rows +counting the data set of first insertion, which should contain 2 rows 2 -planner_v2: counting the data set of first insertion by timestamp, which should contains 2 rows +counting the data set of first insertion by timestamp, which should contains 2 rows 2 -planner_v2: select the data set of first insertion by timestamp, which should contains 2 rows +select the data set of first insertion by timestamp, which should contains 2 rows 1 2 diff --git a/tests/suites/0_stateless/17_altertable/17_0001_time_travel_alter_add_drop_column_select_at.sh b/tests/suites/0_stateless/17_altertable/17_0001_time_travel_alter_add_drop_column_select_at.sh index f628b142e160..6a9c43ee49cc 100755 --- a/tests/suites/0_stateless/17_altertable/17_0001_time_travel_alter_add_drop_column_select_at.sh +++ b/tests/suites/0_stateless/17_altertable/17_0001_time_travel_alter_add_drop_column_select_at.sh @@ -30,17 +30,17 @@ echo "select count(*) from t12_0005 at (snapshot => '$SNAPSHOT_ID')" | $BENDSQL_ echo "select the data set of first insertion, which should contain 2 rows" echo "select * from t12_0005 at (snapshot => '$SNAPSHOT_ID')" | $BENDSQL_CLIENT_CONNECT -echo "planner_v2: counting the data set of first insertion, which should contain 2 rows" +echo "counting the data set of first insertion, which should contain 2 rows" echo "select count(t.c) from t12_0005 at (snapshot => '$SNAPSHOT_ID') as t" | $BENDSQL_CLIENT_CONNECT # Get a time point at/after the first insertion. TIMEPOINT=$(echo "select timestamp from fuse_snapshot('default', 't12_0005') where row_count=2" | $BENDSQL_CLIENT_CONNECT) -echo "planner_v2: counting the data set of first insertion by timestamp, which should contains 2 rows" +echo "counting the data set of first insertion by timestamp, which should contains 2 rows" echo "select count(t.c) from t12_0005 at (TIMESTAMP => '$TIMEPOINT'::TIMESTAMP) as t" | $BENDSQL_CLIENT_CONNECT -echo "planner_v2: select the data set of first insertion by timestamp, which should contains 2 rows" +echo "select the data set of first insertion by timestamp, which should contains 2 rows" echo "select * from t12_0005 at (TIMESTAMP => '$TIMEPOINT'::TIMESTAMP) as t" | $BENDSQL_CLIENT_CONNECT # alter table drop a column @@ -57,17 +57,17 @@ echo "select count(*) from t12_0005 at (snapshot => '$SNAPSHOT_ID')" | $BENDSQL_ echo "select the data set of first insertion, which should contain 2 rows" echo "select * from t12_0005 at (snapshot => '$SNAPSHOT_ID')" | $BENDSQL_CLIENT_CONNECT -echo "planner_v2: counting the data set of first insertion, which should contain 2 rows" +echo "counting the data set of first insertion, which should contain 2 rows" echo "select count(t.c) from t12_0005 at (snapshot => '$SNAPSHOT_ID') as t" | $BENDSQL_CLIENT_CONNECT # Get a time point at/after the first insertion. TIMEPOINT=$(echo "select timestamp from fuse_snapshot('default', 't12_0005') where row_count=2 limit 1" | $BENDSQL_CLIENT_CONNECT) -echo "planner_v2: counting the data set of first insertion by timestamp, which should contains 2 rows" +echo "counting the data set of first insertion by timestamp, which should contains 2 rows" echo "select count(t.c) from t12_0005 at (TIMESTAMP => '$TIMEPOINT'::TIMESTAMP) as t" | $BENDSQL_CLIENT_CONNECT -echo "planner_v2: select the data set of first insertion by timestamp, which should contains 2 rows" +echo "select the data set of first insertion by timestamp, which should contains 2 rows" echo "select * from t12_0005 at (TIMESTAMP => '$TIMEPOINT'::TIMESTAMP) as t" | $BENDSQL_CLIENT_CONNECT ## Drop table.