Skip to content

Commit

Permalink
address comment
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Sep 23, 2024
1 parent 6e0e2b7 commit 4e897e9
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 28 deletions.
21 changes: 7 additions & 14 deletions src/frontend/src/scheduler/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ use risingwave_pb::common::{batch_query_epoch, BatchQueryEpoch};
use risingwave_pb::hummock::{HummockVersionDeltas, StateTableInfoDelta};
use tokio::sync::watch;

use crate::expr::InlineNowProcTime;
use crate::meta_client::FrontendMetaClient;
use crate::scheduler::SchedulerError;

Expand Down Expand Up @@ -76,17 +75,6 @@ impl QuerySnapshot {
})
}

pub fn inline_now_proc_time(&self) -> Result<InlineNowProcTime, SchedulerError> {
let epoch = match &self.snapshot {
ReadSnapshot::FrontendPinned { snapshot, .. } => {
snapshot.batch_query_epoch(&self.scan_tables)?
}
ReadSnapshot::Other(epoch) => *epoch,
ReadSnapshot::ReadUncommitted => Epoch::now(),
};
Ok(InlineNowProcTime::new(epoch))
}

/// Returns true if this snapshot is a barrier read.
pub fn support_barrier_read(&self) -> bool {
matches!(&self.snapshot, ReadSnapshot::ReadUncommitted)
Expand Down Expand Up @@ -132,7 +120,13 @@ impl PinnedSnapshot {
}
})
})?
.unwrap_or_else(Epoch::now);
.unwrap_or_else(|| {
self.value
.state_table_info
.max_table_committed_epoch()
.map(Epoch)
.unwrap_or_else(Epoch::now)
});
Ok(epoch)
}

Expand All @@ -145,7 +139,6 @@ impl PinnedSnapshot {
fn invalid_snapshot() -> FrontendHummockVersion {
FrontendHummockVersion {
id: INVALID_VERSION_ID,
max_committed_epoch: 0,
state_table_info: HummockVersionStateTableInfo::from_protobuf(&HashMap::new()),
table_change_log: Default::default(),
}
Expand Down
6 changes: 2 additions & 4 deletions src/frontend/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -847,10 +847,8 @@ impl SessionImpl {
.acquire()
.version()
.state_table_info
.info()
.values()
.map(|info| Epoch(info.committed_epoch))
.max()
.max_table_committed_epoch()
.map(Epoch)
.unwrap_or_else(Epoch::now),
)
}
Expand Down
12 changes: 3 additions & 9 deletions src/storage/hummock_sdk/src/frontend_version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::collections::{HashMap, HashSet};

use risingwave_common::catalog::TableId;
use risingwave_common::util::epoch::INVALID_EPOCH;
use risingwave_pb::hummock::hummock_version_delta::PbChangeLogDelta;
use risingwave_pb::hummock::{
PbEpochNewChangeLog, PbHummockVersion, PbHummockVersionDelta, PbTableChangeLog,
Expand All @@ -28,7 +29,6 @@ use crate::{HummockVersionId, INVALID_VERSION_ID};
#[derive(Clone, Debug)]
pub struct FrontendHummockVersion {
pub id: HummockVersionId,
pub max_committed_epoch: u64,
pub state_table_info: HummockVersionStateTableInfo,
pub table_change_log: HashMap<TableId, TableChangeLogCommon<()>>,
}
Expand All @@ -37,7 +37,6 @@ impl FrontendHummockVersion {
pub fn from_version(version: &HummockVersion) -> Self {
Self {
id: version.id,
max_committed_epoch: version.max_committed_epoch,
state_table_info: version.state_table_info.clone(),
table_change_log: version
.table_change_log
Expand Down Expand Up @@ -66,7 +65,7 @@ impl FrontendHummockVersion {
PbHummockVersion {
id: self.id.0,
levels: Default::default(),
max_committed_epoch: self.max_committed_epoch,
max_committed_epoch: INVALID_EPOCH,
table_watermarks: Default::default(),
table_change_logs: self
.table_change_log
Expand Down Expand Up @@ -95,7 +94,6 @@ impl FrontendHummockVersion {
pub fn from_protobuf(value: PbHummockVersion) -> Self {
Self {
id: HummockVersionId(value.id),
max_committed_epoch: value.max_committed_epoch,
state_table_info: HummockVersionStateTableInfo::from_protobuf(&value.state_table_info),
table_change_log: value
.table_change_logs
Expand Down Expand Up @@ -125,7 +123,6 @@ impl FrontendHummockVersion {
assert_eq!(self.id, delta.prev_id);
}
self.id = delta.id;
self.max_committed_epoch = delta.max_committed_epoch;
let (changed_table_info, _) = self
.state_table_info
.apply_delta(&delta.state_table_info_delta, &delta.removed_table_id);
Expand All @@ -142,7 +139,6 @@ impl FrontendHummockVersion {
pub struct FrontendHummockVersionDelta {
pub prev_id: HummockVersionId,
pub id: HummockVersionId,
pub max_committed_epoch: u64,
pub removed_table_id: HashSet<TableId>,
pub state_table_info_delta: HashMap<TableId, StateTableInfoDelta>,
pub change_log_delta: HashMap<TableId, ChangeLogDeltaCommon<()>>,
Expand All @@ -153,7 +149,6 @@ impl FrontendHummockVersionDelta {
Self {
prev_id: delta.prev_id,
id: delta.id,
max_committed_epoch: delta.max_committed_epoch,
removed_table_id: delta.removed_table_ids.clone(),
state_table_info_delta: delta.state_table_info_delta.clone(),
change_log_delta: delta
Expand Down Expand Up @@ -183,7 +178,7 @@ impl FrontendHummockVersionDelta {
id: self.id.to_u64(),
prev_id: self.prev_id.to_u64(),
group_deltas: Default::default(),
max_committed_epoch: self.max_committed_epoch,
max_committed_epoch: INVALID_EPOCH,
trivial_move: false,
new_table_watermarks: Default::default(),
removed_table_ids: self
Expand Down Expand Up @@ -220,7 +215,6 @@ impl FrontendHummockVersionDelta {
Self {
prev_id: HummockVersionId::new(delta.prev_id),
id: HummockVersionId::new(delta.id),
max_committed_epoch: delta.max_committed_epoch,
removed_table_id: delta
.removed_table_ids
.iter()
Expand Down
11 changes: 10 additions & 1 deletion src/storage/hummock_sdk/src/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ use crate::compaction_group::StaticCompactionGroupId;
use crate::level::LevelsCommon;
use crate::sstable_info::SstableInfo;
use crate::table_watermark::TableWatermarks;
use crate::{CompactionGroupId, HummockSstableObjectId, HummockVersionId, FIRST_VERSION_ID};
use crate::{
CompactionGroupId, HummockEpoch, HummockSstableObjectId, HummockVersionId, FIRST_VERSION_ID,
};

#[derive(Debug, Clone, PartialEq)]
pub struct HummockVersionStateTableInfo {
Expand Down Expand Up @@ -204,6 +206,13 @@ impl HummockVersionStateTableInfo {
pub fn compaction_group_member_tables(&self) -> &HashMap<CompactionGroupId, BTreeSet<TableId>> {
&self.compaction_group_member_tables
}

pub fn max_table_committed_epoch(&self) -> Option<HummockEpoch> {
self.state_table_info
.values()
.map(|info| info.committed_epoch)
.max()
}
}

#[derive(Debug, Clone, PartialEq)]
Expand Down

0 comments on commit 4e897e9

Please sign in to comment.