Skip to content

Commit

Permalink
fix compact segment
Browse files Browse the repository at this point in the history
  • Loading branch information
SkyFan2002 committed Jul 22, 2024
1 parent 7dfea7c commit 6836ca9
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 29 deletions.
13 changes: 13 additions & 0 deletions src/query/storages/fuse/src/operations/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ use log::info;
use log::warn;
use opendal::Operator;

use super::decorate_snapshot;
use crate::io::MetaWriter;
use crate::io::SegmentsIO;
use crate::io::TableMetaLocationGenerator;
Expand Down Expand Up @@ -312,6 +313,10 @@ impl FuseTable {

// Status
ctx.set_status_info("mutation: begin try to commit");
let base_snapshot_timestamp = ctx
.txn_mgr()
.lock()
.get_base_snapshot_timestamp(self.get_id(), base_snapshot.timestamp);

loop {
let mut snapshot_tobe_committed = TableSnapshot::from_previous(
Expand All @@ -334,6 +339,14 @@ impl FuseTable {
snapshot_tobe_committed.segments = segments_tobe_committed;
snapshot_tobe_committed.summary = statistics_tobe_committed;

decorate_snapshot(
&mut snapshot_tobe_committed,
base_snapshot_timestamp,
ctx.txn_mgr(),
Some(base_snapshot.clone()),
self.get_id(),
)?;

match Self::commit_to_meta_server(
ctx.as_ref(),
latest_table_info,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pub use conflict_resolve_context::ConflictResolveContext;
pub use conflict_resolve_context::SnapshotChanges;
pub use conflict_resolve_context::SnapshotMerged;
pub use mutation_generator::MutationGenerator;
pub use snapshot_generator::decorate_snapshot;
pub use snapshot_generator::SnapshotGenerator;
pub use truncate_generator::TruncateGenerator;
pub use truncate_generator::TruncateMode;
Original file line number Diff line number Diff line change
Expand Up @@ -53,35 +53,13 @@ pub trait SnapshotGenerator {
) -> Result<TableSnapshot> {
let mut snapshot =
self.do_generate_new_snapshot(schema, cluster_key_meta, &previous, prev_table_seq)?;

// when base_snapshot_timestamp.is_none(), it means no base snapshot or base snapshot has no timestamp,
// both of them are allowed to be committed here.
if base_snapshot_timestamp
.as_ref()
// safe to unwrap, least_base_snapshot_timestamp of newly generated snapshot must be some
.is_some_and(|base| base < snapshot.least_base_snapshot_timestamp.as_ref().unwrap())
{
return Err(ErrorCode::TransactionTimeout(format!(
"The timestamp of the base snapshot is: {:?}, the timestamp of the new snapshot is: {:?}",
base_snapshot_timestamp.unwrap(),
snapshot.timestamp,
)));
}

let has_pending_transactional_mutations = {
let guard = txn_mgr.lock();
// NOTE:
// When generating a new snapshot for a mutation of table for the first time,
// there is no buffered table ID inside txn_mgr for this table.
guard.is_active() && guard.get_table_from_buffer_by_id(table_id).is_some()
};

if has_pending_transactional_mutations {
// Adjust the `prev_snapshot_id` of the newly created snapshot to match the
// `prev_snapshot_id` of the table when it first appeared in the transaction.
let previous_of_previous = previous.as_ref().and_then(|prev| prev.prev_snapshot_id);
snapshot.prev_snapshot_id = previous_of_previous;
}
decorate_snapshot(
&mut snapshot,
base_snapshot_timestamp,
txn_mgr,
previous,
table_id,
)?;
Ok(snapshot)
}

Expand All @@ -93,3 +71,41 @@ pub trait SnapshotGenerator {
prev_table_seq: Option<u64>,
) -> Result<TableSnapshot>;
}

pub fn decorate_snapshot(
snapshot: &mut TableSnapshot,
base_snapshot_timestamp: Option<DateTime<Utc>>,
txn_mgr: TxnManagerRef,
previous: Option<Arc<TableSnapshot>>,
table_id: u64,
) -> Result<()> {
// when base_snapshot_timestamp.is_none(), it means no base snapshot or base snapshot has no timestamp,
// both of them are allowed to be committed here.
if base_snapshot_timestamp
.as_ref()
// safe to unwrap, least_base_snapshot_timestamp of newly generated snapshot must be some
.is_some_and(|base| base < snapshot.least_base_snapshot_timestamp.as_ref().unwrap())
{
return Err(ErrorCode::TransactionTimeout(format!(
"The timestamp of the base snapshot is: {:?}, the timestamp of the new snapshot is: {:?}",
base_snapshot_timestamp.unwrap(),
snapshot.timestamp,
)));
}

let has_pending_transactional_mutations = {
let guard = txn_mgr.lock();
// NOTE:
// When generating a new snapshot for a mutation of table for the first time,
// there is no buffered table ID inside txn_mgr for this table.
guard.is_active() && guard.get_table_from_buffer_by_id(table_id).is_some()
};

if has_pending_transactional_mutations {
// Adjust the `prev_snapshot_id` of the newly created snapshot to match the
// `prev_snapshot_id` of the table when it first appeared in the transaction.
let previous_of_previous = previous.as_ref().and_then(|prev| prev.prev_snapshot_id);
snapshot.prev_snapshot_id = previous_of_previous;
}
Ok(())
}

0 comments on commit 6836ca9

Please sign in to comment.