Skip to content

Commit

Permalink
refactor: deprecate max_committed_epoch of hummock version
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Sep 29, 2024
1 parent d492843 commit f47530f
Show file tree
Hide file tree
Showing 29 changed files with 239 additions and 229 deletions.
3 changes: 2 additions & 1 deletion proto/backup_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ message MetaSnapshotManifest {
message MetaSnapshotMetadata {
uint64 id = 1;
uint64 hummock_version_id = 2;
uint64 max_committed_epoch = 3;
reserved 3;
reserved 'max_committed_epoch';
reserved 4;
reserved 'safe_epoch';
optional uint32 format_version = 5;
Expand Down
4 changes: 2 additions & 2 deletions proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ message HummockVersion {
uint64 id = 1;
// Levels of each compaction group
map<uint64, Levels> levels = 2;
uint64 max_committed_epoch = 3;
uint64 max_committed_epoch = 3 [deprecated = true];
reserved 4;
reserved 'safe_epoch';
map<uint32, TableWatermarks> table_watermarks = 5;
Expand All @@ -191,7 +191,7 @@ message HummockVersionDelta {
uint64 prev_id = 2;
// Levels of each compaction group
map<uint64, GroupDeltas> group_deltas = 3;
uint64 max_committed_epoch = 4;
uint64 max_committed_epoch = 4 [deprecated = true];
reserved 5;
reserved 'safe_epoch';
bool trivial_move = 6;
Expand Down
2 changes: 1 addition & 1 deletion src/meta/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ workspace-hack = { path = "../workspace-hack" }
assert_matches = "1"
expect-test = "1.5"
rand = { workspace = true }
risingwave_hummock_sdk = { workspace = true }
risingwave_hummock_sdk = { workspace = true, features = ["test"] }
risingwave_test_runner = { workspace = true }

[features]
Expand Down
1 change: 0 additions & 1 deletion src/meta/model_v2/src/hummock_version_delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ impl From<Model> for PbHummockVersionDelta {
let ret = value.full_version_delta.to_protobuf();
assert_eq!(value.id, ret.id as i64);
assert_eq!(value.prev_id, ret.prev_id as i64);
assert_eq!(value.max_committed_epoch, ret.max_committed_epoch as i64);
assert_eq!(value.trivial_move, ret.trivial_move);
ret
}
Expand Down
4 changes: 4 additions & 0 deletions src/meta/src/barrier/info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ impl InflightGraphInfo {
}
}

pub fn is_empty(&self) -> bool {
self.fragment_infos.is_empty()
}

/// Update worker nodes snapshot. We need to support incremental updates for it in the future.
pub fn on_new_worker_node_map(&self, node_map: &HashMap<WorkerId, WorkerNode>) {
for (node_id, actors) in &self.actor_map {
Expand Down
14 changes: 9 additions & 5 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use prometheus::HistogramTimer;
use risingwave_common::catalog::TableId;
use risingwave_common::system_param::reader::SystemParamsRead;
use risingwave_common::system_param::PAUSE_ON_NEXT_BOOTSTRAP_KEY;
use risingwave_common::util::epoch::{Epoch, INVALID_EPOCH};
use risingwave_common::{bail, must_match};
use risingwave_hummock_sdk::change_log::build_table_change_log_delta;
use risingwave_hummock_sdk::sstable_info::SstableInfo;
Expand Down Expand Up @@ -597,7 +596,7 @@ impl GlobalBarrierManager {
let in_flight_barrier_nums = env.opts.in_flight_barrier_nums;

let initial_invalid_state = BarrierManagerState::new(
TracedEpoch::new(Epoch(INVALID_EPOCH)),
None,
InflightGraphInfo::default(),
InflightSubscriptionInfo::default(),
None,
Expand Down Expand Up @@ -949,7 +948,14 @@ impl GlobalBarrierManager {
}
}

let (prev_epoch, curr_epoch) = self.state.next_epoch_pair();
let Some((prev_epoch, curr_epoch)) = self.state.next_epoch_pair(&command) else {
// skip the command when there is nothing to do with the barrier
for mut notifier in notifiers {
notifier.notify_started();
notifier.notify_collected();
}
return Ok(());
};

// Insert newly added creating job
if let Command::CreateStreamingJob {
Expand Down Expand Up @@ -1175,7 +1181,6 @@ impl GlobalBarrierManagerContext {
change_log_delta: Default::default(),
committed_epoch: epoch,
tables_to_commit,
is_visible_table_committed_epoch: false,
};
self.hummock_manager.commit_epoch(info).await?;
Ok(())
Expand Down Expand Up @@ -1770,6 +1775,5 @@ fn collect_commit_epoch_info(
change_log_delta: table_new_change_log,
committed_epoch: epoch,
tables_to_commit,
is_visible_table_committed_epoch: true,
}
}
87 changes: 55 additions & 32 deletions src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,16 +326,33 @@ impl GlobalBarrierManager {
.context
.hummock_manager
.on_current_version(|version| {
let max_committed_epoch = version.max_committed_epoch_for_meta();
for (table_id, info) in version.state_table_info.info() {
assert_eq!(
info.committed_epoch, max_committed_epoch,
"table {} with invisible epoch is not purged",
table_id
let state_table_info = version.state_table_info.info();
let committed_epoch = state_table_info
.values()
.map(|info| info.committed_epoch)
.next();
let existing_table_ids = info.existing_table_ids();
for table_id in existing_table_ids {
assert!(
state_table_info.contains_key(&table_id),
"table id {table_id} not registered to hummock but in recovered job {:?}. hummock table info{:?}",
info.existing_table_ids().collect_vec(),
state_table_info
);
}
if let Some(committed_epoch) = committed_epoch {
for (table_id, info) in version.state_table_info.info() {
assert_eq!(
info.committed_epoch, committed_epoch,
"table {} with invisible epoch is not purged",
table_id
);
}
}
(
TracedEpoch::new(Epoch::from(max_committed_epoch)),
committed_epoch.map(|committed_epoch| {
TracedEpoch::new(Epoch::from(committed_epoch))
}),
version.id,
)
})
Expand Down Expand Up @@ -388,30 +405,36 @@ impl GlobalBarrierManager {
subscriptions_to_add: Default::default(),
});

// Use a different `curr_epoch` for each recovery attempt.
let new_epoch = prev_epoch.next();

let mut node_to_collect = control_stream_manager.inject_barrier(
None,
Some(mutation),
(&new_epoch, &prev_epoch),
&BarrierKind::Initial,
&info,
Some(&info),
Some(node_actors),
vec![],
vec![],
)?;
debug!(?node_to_collect, "inject initial barrier");
while !node_to_collect.is_empty() {
let (worker_id, result) = control_stream_manager
.next_complete_barrier_response()
.await;
let resp = result?;
assert_eq!(resp.epoch, prev_epoch.value().0);
assert!(node_to_collect.remove(&worker_id));
}
debug!("collected initial barrier");
let new_epoch = if let Some(prev_epoch) = &prev_epoch {
// Use a different `curr_epoch` for each recovery attempt.
let new_epoch = prev_epoch.next();

let mut node_to_collect = control_stream_manager.inject_barrier(
None,
Some(mutation),
(&new_epoch, prev_epoch),
&BarrierKind::Initial,
&info,
Some(&info),
Some(node_actors),
vec![],
vec![],
)?;
debug!(?node_to_collect, "inject initial barrier");
while !node_to_collect.is_empty() {
let (worker_id, result) = control_stream_manager
.next_complete_barrier_response()
.await;
let resp = result?;
assert_eq!(resp.epoch, prev_epoch.value().0);
assert!(node_to_collect.remove(&worker_id));
}
debug!("collected initial barrier");
Some(new_epoch)
} else {
assert!(info.is_empty());
None
};

(
BarrierManagerState::new(new_epoch, info, subscription_info, paused_reason),
Expand Down Expand Up @@ -446,7 +469,7 @@ impl GlobalBarrierManager {
CheckpointControl::new(self.context.clone(), create_mview_tracker).await;

tracing::info!(
epoch = self.state.in_flight_prev_epoch().value().0,
epoch = self.state.in_flight_prev_epoch().map(|epoch| epoch.value().0),
paused = ?self.state.paused_reason(),
"recovery success"
);
Expand Down
25 changes: 17 additions & 8 deletions src/meta/src/barrier/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::util::epoch::Epoch;
use risingwave_pb::meta::PausedReason;

use crate::barrier::info::{InflightGraphInfo, InflightSubscriptionInfo};
Expand All @@ -23,7 +24,7 @@ pub struct BarrierManagerState {
///
/// There's no need to persist this field. On recovery, we will restore this from the latest
/// committed snapshot in `HummockManager`.
in_flight_prev_epoch: TracedEpoch,
in_flight_prev_epoch: Option<TracedEpoch>,

/// Inflight running actors info.
pub(crate) inflight_graph_info: InflightGraphInfo,
Expand All @@ -36,7 +37,7 @@ pub struct BarrierManagerState {

impl BarrierManagerState {
pub fn new(
in_flight_prev_epoch: TracedEpoch,
in_flight_prev_epoch: Option<TracedEpoch>,
inflight_graph_info: InflightGraphInfo,
inflight_subscription_info: InflightSubscriptionInfo,
paused_reason: Option<PausedReason>,
Expand All @@ -60,16 +61,24 @@ impl BarrierManagerState {
}
}

pub fn in_flight_prev_epoch(&self) -> &TracedEpoch {
&self.in_flight_prev_epoch
pub fn in_flight_prev_epoch(&self) -> Option<&TracedEpoch> {
self.in_flight_prev_epoch.as_ref()
}

/// Returns the epoch pair for the next barrier, and updates the state.
pub fn next_epoch_pair(&mut self) -> (TracedEpoch, TracedEpoch) {
let prev_epoch = self.in_flight_prev_epoch.clone();
pub fn next_epoch_pair(&mut self, command: &Command) -> Option<(TracedEpoch, TracedEpoch)> {
if self.inflight_graph_info.is_empty()
&& !matches!(&command, Command::CreateStreamingJob { .. })
{
return None;
};
let in_flight_prev_epoch = self
.in_flight_prev_epoch
.get_or_insert_with(|| TracedEpoch::new(Epoch::now()));
let prev_epoch = in_flight_prev_epoch.clone();
let next_epoch = prev_epoch.next();
self.in_flight_prev_epoch = next_epoch.clone();
(prev_epoch, next_epoch)
*in_flight_prev_epoch = next_epoch.clone();
Some((prev_epoch, next_epoch))
}

/// Returns the inflight actor infos that have included the newly added actors in the given command. The dropped actors
Expand Down
6 changes: 2 additions & 4 deletions src/meta/src/hummock/manager/commit_epoch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ pub struct CommitEpochInfo {
pub change_log_delta: HashMap<TableId, ChangeLogDelta>,
pub committed_epoch: u64,
pub tables_to_commit: HashSet<TableId>,
pub is_visible_table_committed_epoch: bool,
}

impl HummockManager {
Expand All @@ -79,7 +78,6 @@ impl HummockManager {
change_log_delta,
committed_epoch,
tables_to_commit,
is_visible_table_committed_epoch,
} = commit_info;
let mut versioning_guard = self.versioning.write().await;
let _timer = start_measure_real_process_timer!(self, "commit_epoch");
Expand All @@ -88,11 +86,12 @@ impl HummockManager {
return Ok(());
}

assert!(!tables_to_commit.is_empty());

let versioning: &mut Versioning = &mut versioning_guard;
self.commit_epoch_sanity_check(
committed_epoch,
&tables_to_commit,
is_visible_table_committed_epoch,
&sstables,
&sst_to_context,
&versioning.current_version,
Expand Down Expand Up @@ -194,7 +193,6 @@ impl HummockManager {
let time_travel_delta = version.pre_commit_epoch(
committed_epoch,
&tables_to_commit,
is_visible_table_committed_epoch,
new_compaction_group,
commit_sstables,
&new_table_ids,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,16 @@ impl HummockManager {
self.env.notification_manager(),
&self.metrics,
);
let mut new_version_delta = version.new_delta(None);
let mut new_version_delta = version.new_delta();

let committed_epoch = new_version_delta
.latest_version()
.state_table_info
.info()
.values()
.map(|info| info.committed_epoch)
.max()
.unwrap_or(INVALID_EPOCH);

for (table_id, raw_group_id) in pairs {
let mut group_id = *raw_group_id;
Expand Down Expand Up @@ -265,7 +274,7 @@ impl HummockManager {
.insert(
TableId::new(*table_id),
PbStateTableInfoDelta {
committed_epoch: INVALID_EPOCH,
committed_epoch,
compaction_group_id: *raw_group_id,
}
)
Expand Down Expand Up @@ -293,7 +302,7 @@ impl HummockManager {
self.env.notification_manager(),
&self.metrics,
);
let mut new_version_delta = version.new_delta(None);
let mut new_version_delta = version.new_delta();
let mut modified_groups: HashMap<CompactionGroupId, /* #member table */ u64> =
HashMap::new();
// Remove member tables
Expand Down Expand Up @@ -481,7 +490,7 @@ impl HummockManager {
self.env.notification_manager(),
&self.metrics,
);
let mut new_version_delta = version.new_delta(None);
let mut new_version_delta = version.new_delta();

let new_sst_start_id = next_sstable_object_id(
&self.env,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ impl HummockManager {
self.env.notification_manager(),
&self.metrics,
);
let mut new_version_delta = version.new_delta(None);
let mut new_version_delta = version.new_delta();

let target_compaction_group_id = {
// merge right_group_id to left_group_id and remove right_group_id
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/hummock/manager/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ fn init_selectors() -> HashMap<compact_task::TaskType, Box<dyn CompactionSelecto

impl<'a> HummockVersionTransaction<'a> {
fn apply_compact_task(&mut self, compact_task: &CompactTask) {
let mut version_delta = self.new_delta(None);
let mut version_delta = self.new_delta();
let trivial_move = CompactStatus::is_trivial_move_task(compact_task);
version_delta.trivial_move = trivial_move;

Expand Down
12 changes: 0 additions & 12 deletions src/meta/src/hummock/manager/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,6 @@ impl HummockManager {
&self,
committed_epoch: HummockEpoch,
tables_to_commit: &HashSet<TableId>,
is_visible_table_committed_epoch: bool,
sstables: &[LocalSstableInfo],
sst_to_context: &HashMap<HummockSstableObjectId, HummockContextId>,
current_version: &HummockVersion,
Expand All @@ -215,17 +214,6 @@ impl HummockManager {
}
}

if is_visible_table_committed_epoch
&& committed_epoch <= current_version.max_committed_epoch_for_meta()
{
return Err(anyhow::anyhow!(
"Epoch {} <= max_committed_epoch {}",
committed_epoch,
current_version.max_committed_epoch_for_meta()
)
.into());
}

// sanity check on monotonically increasing table committed epoch
for table_id in tables_to_commit {
if let Some(info) = current_version.state_table_info.info().get(table_id) {
Expand Down
Loading

0 comments on commit f47530f

Please sign in to comment.