From 31bf85bd10fba4d2dbf78e2013121e2a6041887d Mon Sep 17 00:00:00 2001 From: zhyass Date: Thu, 12 Oct 2023 14:46:15 +0800 Subject: [PATCH] fix: compact limit get no affect rows (#13210) * fix: compact bug * add test case * Update src/query/storages/fuse/src/operations/common/processors/transform_mutation_aggregator.rs Co-authored-by: Sky Fan <3374614481@qq.com> * update * fix lint --------- Co-authored-by: Sky Fan <3374614481@qq.com> Co-authored-by: dantengsky --- .../transform_mutation_aggregator.rs | 19 ++++- .../mutation/compact/block_compact_mutator.rs | 24 ++++-- .../09_0008_fuse_optimize_table | 80 +++++++++++++++++++ 3 files changed, 114 insertions(+), 9 deletions(-) diff --git a/src/query/storages/fuse/src/operations/common/processors/transform_mutation_aggregator.rs b/src/query/storages/fuse/src/operations/common/processors/transform_mutation_aggregator.rs index b648d643aaef..0d6ade3d2f72 100644 --- a/src/query/storages/fuse/src/operations/common/processors/transform_mutation_aggregator.rs +++ b/src/query/storages/fuse/src/operations/common/processors/transform_mutation_aggregator.rs @@ -31,6 +31,7 @@ use common_sql::executor::MutationKind; use itertools::Itertools; use log::debug; use log::info; +use log::warn; use opendal::Operator; use storages_common_table_meta::meta::BlockMeta; use storages_common_table_meta::meta::Location; @@ -307,6 +308,7 @@ impl TableMutationAggregator { let op = self.dal.clone(); let location_gen = self.location_gen.clone(); + let mut all_perfect = false; tasks.push(async move { let (new_blocks, origin_summary) = if let Some(loc) = location { // read the old segment @@ -341,6 +343,9 @@ impl TableMutationAggregator { } else { // use by compact. assert!(segment_mutation.deleted_blocks.is_empty()); + // There are more than 1 blocks, means that the blocks can no longer be compacted. + // They can be marked as perfect blocks. + all_perfect = segment_mutation.replaced_blocks.len() > 1; let new_blocks = segment_mutation .replaced_blocks .into_iter() @@ -350,14 +355,24 @@ impl TableMutationAggregator { (new_blocks, None) }; + let location = location_gen.gen_segment_info_location(); // re-calculate the segment statistics - let new_summary = + let mut new_summary = reduce_block_metas(&new_blocks, thresholds, default_cluster_key_id); + if all_perfect { + // To fix issue #13217. + if new_summary.block_count > new_summary.perfect_block_count { + warn!( + "compact: generate new segment: {}, perfect_block_count: {}, block_count: {}", + location, new_summary.perfect_block_count, new_summary.block_count, + ); + new_summary.perfect_block_count = new_summary.block_count; + } + } // create new segment info let new_segment = SegmentInfo::new(new_blocks, new_summary.clone()); // write the segment info. - let location = location_gen.gen_segment_info_location(); let serialized_segment = SerializedSegment { path: location.clone(), segment: Arc::new(new_segment), diff --git a/src/query/storages/fuse/src/operations/mutation/compact/block_compact_mutator.rs b/src/query/storages/fuse/src/operations/mutation/compact/block_compact_mutator.rs index ddccd71cf9ba..f9a197bd6a1e 100644 --- a/src/query/storages/fuse/src/operations/mutation/compact/block_compact_mutator.rs +++ b/src/query/storages/fuse/src/operations/mutation/compact/block_compact_mutator.rs @@ -536,16 +536,26 @@ impl CompactTaskBuilder { // The clustering table cannot compact different level blocks. self.build_task(&mut tasks, &mut unchanged_blocks, block_idx, tail); } else { - let (index, mut blocks) = if latest_flag { - unchanged_blocks - .pop() - .map_or((0, vec![]), |(k, v)| (k, vec![v])) + let mut blocks = if latest_flag { + unchanged_blocks.pop().map_or(vec![], |(_, v)| vec![v]) } else { - tasks.pop_back().unwrap_or((0, vec![])) + tasks.pop_back().map_or(vec![], |(_, v)| v) }; - blocks.extend(tail); - tasks.push_back((index, blocks)); + let (total_rows, total_size) = + blocks.iter().chain(tail.iter()).fold((0, 0), |mut acc, x| { + acc.0 += x.row_count as usize; + acc.1 += x.block_size as usize; + acc + }); + if self.thresholds.check_for_compact(total_rows, total_size) { + blocks.extend(tail); + self.build_task(&mut tasks, &mut unchanged_blocks, block_idx, blocks); + } else { + // blocks > 2N + self.build_task(&mut tasks, &mut unchanged_blocks, block_idx, blocks); + self.build_task(&mut tasks, &mut unchanged_blocks, block_idx + 1, tail); + } } } diff --git a/tests/sqllogictests/suites/base/09_fuse_engine/09_0008_fuse_optimize_table b/tests/sqllogictests/suites/base/09_fuse_engine/09_0008_fuse_optimize_table index c238f4d50981..5dfdea336e97 100644 --- a/tests/sqllogictests/suites/base/09_fuse_engine/09_0008_fuse_optimize_table +++ b/tests/sqllogictests/suites/base/09_fuse_engine/09_0008_fuse_optimize_table @@ -668,6 +668,83 @@ select segment_count, block_count from fuse_snapshot('db_09_0008', 't12') limit +# For PR#13210 +statement ok +create table t13(a int) row_per_block=10 block_per_segment=2; + +statement ok +insert into t13 select number from numbers(7) + +statement ok +insert into t13 select number from numbers(7) + +statement ok +insert into t13 select number from numbers(7) + +statement ok +insert into t13 select number from numbers(7) + +statement ok +optimize table t13 compact limit 2 + +query II +select block_count, row_count from fuse_segment('db_09_0008', 't13') +---- +1 14 +1 7 +1 7 + +statement ok +insert into t13 select number from numbers(7) + +statement ok +optimize table t13 compact limit 2 + +query II +select block_count, row_count from fuse_segment('db_09_0008', 't13') +---- +2 21 +1 7 +1 7 + +statement ok +optimize table t13 compact limit 2 + +query II +select block_count, row_count from fuse_segment('db_09_0008', 't13') +---- +2 21 +1 14 + +statement ok +insert into t13 select number from numbers(7) + +statement ok +optimize table t13 compact + +query I +select row_count from fuse_block('db_09_0008', 't13') +---- +14 +14 +14 + +statement ok +insert into t13 select number from numbers(7) + +statement ok +optimize table t13 compact + +query I +select row_count from fuse_block('db_09_0008', 't13') +---- +14 +14 +7 +14 + + + statement ok DROP TABLE m @@ -710,6 +787,9 @@ DROP TABLE t11 statement ok DROP TABLE t12 +statement ok +DROP TABLE t13 + statement ok DROP DATABASE db_09_0008