Skip to content

Commit

Permalink
fix: compact limit get no affect rows (#13210)
Browse files Browse the repository at this point in the history
* fix: compact bug

* add test case

* Update src/query/storages/fuse/src/operations/common/processors/transform_mutation_aggregator.rs

Co-authored-by: Sky Fan <[email protected]>

* update

* fix lint

---------

Co-authored-by: Sky Fan <[email protected]>
Co-authored-by: dantengsky <[email protected]>
  • Loading branch information
3 people authored Oct 12, 2023
1 parent 1a8b79d commit 31bf85b
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use common_sql::executor::MutationKind;
use itertools::Itertools;
use log::debug;
use log::info;
use log::warn;
use opendal::Operator;
use storages_common_table_meta::meta::BlockMeta;
use storages_common_table_meta::meta::Location;
Expand Down Expand Up @@ -307,6 +308,7 @@ impl TableMutationAggregator {
let op = self.dal.clone();
let location_gen = self.location_gen.clone();

let mut all_perfect = false;
tasks.push(async move {
let (new_blocks, origin_summary) = if let Some(loc) = location {
// read the old segment
Expand Down Expand Up @@ -341,6 +343,9 @@ impl TableMutationAggregator {
} else {
// use by compact.
assert!(segment_mutation.deleted_blocks.is_empty());
// There are more than 1 blocks, means that the blocks can no longer be compacted.
// They can be marked as perfect blocks.
all_perfect = segment_mutation.replaced_blocks.len() > 1;
let new_blocks = segment_mutation
.replaced_blocks
.into_iter()
Expand All @@ -350,14 +355,24 @@ impl TableMutationAggregator {
(new_blocks, None)
};

let location = location_gen.gen_segment_info_location();
// re-calculate the segment statistics
let new_summary =
let mut new_summary =
reduce_block_metas(&new_blocks, thresholds, default_cluster_key_id);
if all_perfect {
// To fix issue #13217.
if new_summary.block_count > new_summary.perfect_block_count {
warn!(
"compact: generate new segment: {}, perfect_block_count: {}, block_count: {}",
location, new_summary.perfect_block_count, new_summary.block_count,
);
new_summary.perfect_block_count = new_summary.block_count;
}
}
// create new segment info
let new_segment = SegmentInfo::new(new_blocks, new_summary.clone());

// write the segment info.
let location = location_gen.gen_segment_info_location();
let serialized_segment = SerializedSegment {
path: location.clone(),
segment: Arc::new(new_segment),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -536,16 +536,26 @@ impl CompactTaskBuilder {
// The clustering table cannot compact different level blocks.
self.build_task(&mut tasks, &mut unchanged_blocks, block_idx, tail);
} else {
let (index, mut blocks) = if latest_flag {
unchanged_blocks
.pop()
.map_or((0, vec![]), |(k, v)| (k, vec![v]))
let mut blocks = if latest_flag {
unchanged_blocks.pop().map_or(vec![], |(_, v)| vec![v])
} else {
tasks.pop_back().unwrap_or((0, vec![]))
tasks.pop_back().map_or(vec![], |(_, v)| v)
};

blocks.extend(tail);
tasks.push_back((index, blocks));
let (total_rows, total_size) =
blocks.iter().chain(tail.iter()).fold((0, 0), |mut acc, x| {
acc.0 += x.row_count as usize;
acc.1 += x.block_size as usize;
acc
});
if self.thresholds.check_for_compact(total_rows, total_size) {
blocks.extend(tail);
self.build_task(&mut tasks, &mut unchanged_blocks, block_idx, blocks);
} else {
// blocks > 2N
self.build_task(&mut tasks, &mut unchanged_blocks, block_idx, blocks);
self.build_task(&mut tasks, &mut unchanged_blocks, block_idx + 1, tail);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,83 @@ select segment_count, block_count from fuse_snapshot('db_09_0008', 't12') limit



# For PR#13210
statement ok
create table t13(a int) row_per_block=10 block_per_segment=2;

statement ok
insert into t13 select number from numbers(7)

statement ok
insert into t13 select number from numbers(7)

statement ok
insert into t13 select number from numbers(7)

statement ok
insert into t13 select number from numbers(7)

statement ok
optimize table t13 compact limit 2

query II
select block_count, row_count from fuse_segment('db_09_0008', 't13')
----
1 14
1 7
1 7

statement ok
insert into t13 select number from numbers(7)

statement ok
optimize table t13 compact limit 2

query II
select block_count, row_count from fuse_segment('db_09_0008', 't13')
----
2 21
1 7
1 7

statement ok
optimize table t13 compact limit 2

query II
select block_count, row_count from fuse_segment('db_09_0008', 't13')
----
2 21
1 14

statement ok
insert into t13 select number from numbers(7)

statement ok
optimize table t13 compact

query I
select row_count from fuse_block('db_09_0008', 't13')
----
14
14
14

statement ok
insert into t13 select number from numbers(7)

statement ok
optimize table t13 compact

query I
select row_count from fuse_block('db_09_0008', 't13')
----
14
14
7
14



statement ok
DROP TABLE m

Expand Down Expand Up @@ -710,6 +787,9 @@ DROP TABLE t11
statement ok
DROP TABLE t12

statement ok
DROP TABLE t13

statement ok
DROP DATABASE db_09_0008

1 comment on commit 31bf85b

@vercel
Copy link

@vercel vercel bot commented on 31bf85b Oct 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.