From 5863480c25d35d4eacbf43ad7098414b346b1587 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=82=8E=E6=B3=BC?= Date: Fri, 6 Oct 2023 19:41:11 +0800 Subject: [PATCH] refactor: it has to get a consistent view of the state machine when exporting meta-service data (#13085) In this commit, wrap sled trees for `log` and `raft-state` with an async lock. Before exporting, it has to acquire these locks to get a consistent view of the state machine. --- src/meta/service/src/store/store.rs | 22 +++++++ src/meta/service/src/store/store_inner.rs | 61 ++++++++----------- .../tests/it/meta_node/meta_node_lifecycle.rs | 6 +- src/meta/service/tests/it/store.rs | 6 +- 4 files changed, 53 insertions(+), 42 deletions(-) diff --git a/src/meta/service/src/store/store.rs b/src/meta/service/src/store/store.rs index 9124715da481..728d8c8bca18 100644 --- a/src/meta/service/src/store/store.rs +++ b/src/meta/service/src/store/store.rs @@ -97,6 +97,8 @@ impl RaftLogReader for RaftStore { match self .log + .read() + .await .range_values(range) .map_to_sto_err(ErrorSubject::Logs, ErrorVerb::Read) { @@ -132,6 +134,8 @@ impl RaftStorage for RaftStore { async fn save_committed(&mut self, committed: Option) -> Result<(), StorageError> { self.raft_state + .write() + .await .save_committed(committed) .await .map_to_sto_err(ErrorSubject::Store, ErrorVerb::Write) @@ -139,6 +143,8 @@ impl RaftStorage for RaftStore { async fn read_committed(&mut self) -> Result, StorageError> { self.raft_state + .read() + .await .read_committed() .map_to_sto_err(ErrorSubject::Store, ErrorVerb::Read) } @@ -146,6 +152,8 @@ impl RaftStorage for RaftStore { async fn get_log_state(&mut self) -> Result, StorageError> { let last_purged_log_id = match self .log + .read() + .await .get_last_purged() .map_to_sto_err(ErrorSubject::Logs, ErrorVerb::Read) { @@ -158,6 +166,8 @@ impl RaftStorage for RaftStore { let last = match self .log + .read() + .await .logs() .last() .map_to_sto_err(ErrorSubject::Logs, ErrorVerb::Read) @@ -191,6 +201,8 @@ impl RaftStorage for RaftStore { match self .raft_state + .write() + .await .save_vote(hs) .await .map_to_sto_err(ErrorSubject::Vote, ErrorVerb::Write) @@ -211,6 +223,8 @@ impl RaftStorage for RaftStore { match self .log + .write() + .await .range_remove(log_id.index..) .await .map_to_sto_err(ErrorSubject::Log(log_id), ErrorVerb::Delete) @@ -229,6 +243,8 @@ impl RaftStorage for RaftStore { if let Err(err) = self .log + .write() + .await .set_last_purged(log_id) .await .map_to_sto_err(ErrorSubject::Logs, ErrorVerb::Write) @@ -238,6 +254,8 @@ impl RaftStorage for RaftStore { }; if let Err(err) = self .log + .write() + .await .range_remove(..=log_id.index) .await .map_to_sto_err(ErrorSubject::Log(log_id), ErrorVerb::Delete) @@ -265,6 +283,8 @@ impl RaftStorage for RaftStore { match self .log + .write() + .await .append(entries) .await .map_to_sto_err(ErrorSubject::Logs, ErrorVerb::Write) @@ -396,6 +416,8 @@ impl RaftStorage for RaftStore { async fn read_vote(&mut self) -> Result, StorageError> { match self .raft_state + .read() + .await .read_vote() .map_to_sto_err(ErrorSubject::Vote, ErrorVerb::Read) { diff --git a/src/meta/service/src/store/store_inner.rs b/src/meta/service/src/store/store_inner.rs index 2c6f810d0523..619eb73dae28 100644 --- a/src/meta/service/src/store/store_inner.rs +++ b/src/meta/service/src/store/store_inner.rs @@ -41,7 +41,6 @@ use common_meta_raft_store::state_machine::StoredSnapshot; use common_meta_sled_store::get_sled_db; use common_meta_sled_store::openraft::ErrorSubject; use common_meta_sled_store::openraft::ErrorVerb; -use common_meta_sled_store::openraft::RaftLogId; use common_meta_sled_store::SledTree; use common_meta_stoerr::MetaStorageError; use common_meta_types::Endpoint; @@ -90,11 +89,12 @@ pub struct StoreInner { /// Raft state includes: /// id: NodeId, - /// current_term, - /// voted_for - pub raft_state: RaftState, + /// vote, // the last `Vote` + /// committed, // last `LogId` that is known committed + pub raft_state: RwLock, - pub log: RaftLog, + /// A series of raft logs. + pub log: RwLock, /// The Raft state machine. pub state_machine: Arc>, @@ -173,8 +173,8 @@ impl StoreInner { config: config.clone(), is_opened: is_open, db, - raft_state, - log, + raft_state: RwLock::new(raft_state), + log: RwLock::new(log), state_machine: sm, current_snapshot: RwLock::new(stored_snapshot), }) @@ -380,7 +380,6 @@ impl StoreInner { AnyError::new(&e).add_context(|| "replacing state-machine with snapshot"), ) })?; - // TODO(1): read_state_machine_id() and write_state_machine_id() is no longer used. // TODO(xp): use checksum to check consistency? @@ -390,10 +389,17 @@ impl StoreInner { /// Export data that can be used to restore a meta-service node. #[minitrace::trace] pub async fn export(&self) -> Result, io::Error> { - // NOTE: + // Lock all components so that we have a consistent view. + // // Hold the snapshot lock to prevent snapshot from being replaced until exporting finished. // Holding this lock prevent logs from being purged. + // + // Although vote and log must be consistent, + // it is OK to export RaftState and logs without transaction protection, + // if it guarantees no logs have a greater `vote` than `RaftState.HardState`. let current_snapshot = self.current_snapshot.read().await; + let raft_state = self.raft_state.read().await; + let log = self.log.read().await; let mut res = vec![]; @@ -413,21 +419,17 @@ impl StoreInner { } } - // Export RaftState - // - // Although vote and log must be consistent, - // it is OK to export RaftState and logs without transaction protection, - // if it guarantees no logs have a greater `vote` than `RaftState.HardState`. - - let exported_vote = { - let tree_name = &self.raft_state.inner.name; + // Export raft state + { + let tree_name = &raft_state.inner.name; - let ks = self.raft_state.inner.key_space::(); + let ks = raft_state.inner.key_space::(); let id = ks .get(&RaftStateKey::Id) .map_err(|e| io::Error::new(ErrorKind::InvalidData, e))? .map(NodeId::from); + if let Some(id) = id { let ent_id = RaftStoreEntry::RaftStateKV { key: RaftStateKey::Id, @@ -444,6 +446,7 @@ impl StoreInner { .get(&RaftStateKey::HardState) .map_err(|e| io::Error::new(ErrorKind::InvalidData, e))? .map(Vote::from); + if let Some(vote) = vote { let ent_vote = RaftStoreEntry::RaftStateKV { key: RaftStateKey::HardState, @@ -470,35 +473,21 @@ impl StoreInner { .map_err(|e| io::Error::new(ErrorKind::InvalidData, e))?; res.push(s); - - // NOTE: `vote` is used in the following code. - vote.unwrap_or_default() }; // Export logs that has smaller or equal leader id as `vote` { - let tree_name = &self.log.inner.name; + let tree_name = &log.inner.name; - let log_kvs = self - .log + let log_kvs = log .inner .export() .map_err(|e| io::Error::new(ErrorKind::InvalidData, e))?; + for kv in log_kvs.iter() { let kv_entry = RaftStoreEntry::deserialize(&kv[0], &kv[1]) .map_err(|e| io::Error::new(ErrorKind::InvalidData, e))?; - if let RaftStoreEntry::Logs { ref value, .. } = kv_entry { - if value.get_log_id().leader_id > exported_vote.leader_id { - warn!( - "found log({}) with greater leader id than vote({}), skip exporting logs", - value.get_log_id(), - exported_vote - ); - break; - } - } - let tree_kv = (tree_name, kv_entry); let line = serde_json::to_string(&tree_kv) .map_err(|e| io::Error::new(ErrorKind::InvalidData, e))?; @@ -510,7 +499,7 @@ impl StoreInner { // Export snapshot of state machine { // NOTE: - // This name had been used by the sled tree based sm. + // The name in form of "state_machine/[0-9]+" had been used by the sled tree based sm. // Do not change it for keeping compatibility. let tree_name = "state_machine/0"; diff --git a/src/meta/service/tests/it/meta_node/meta_node_lifecycle.rs b/src/meta/service/tests/it/meta_node/meta_node_lifecycle.rs index fc7e3c81e76e..dac3dd60a8a2 100644 --- a/src/meta/service/tests/it/meta_node/meta_node_lifecycle.rs +++ b/src/meta/service/tests/it/meta_node/meta_node_lifecycle.rs @@ -692,7 +692,7 @@ async fn test_meta_node_restart_single_node() -> anyhow::Result<()> { .await?; log_index += 1; - want_hs = leader.sto.raft_state.read_vote()?; + want_hs = leader.sto.raft_state.read().await.read_vote()?; leader.stop().await?; } @@ -724,13 +724,13 @@ async fn test_meta_node_restart_single_node() -> anyhow::Result<()> { info!("--- check hard state"); { - let hs = leader.sto.raft_state.read_vote()?; + let hs = leader.sto.raft_state.read().await.read_vote()?; assert_eq!(want_hs, hs); } info!("--- check logs"); { - let logs = leader.sto.log.range_values(..)?; + let logs = leader.sto.log.read().await.range_values(..)?; info!("logs: {:?}", logs); assert_eq!(log_index as usize + 1, logs.len()); } diff --git a/src/meta/service/tests/it/store.rs b/src/meta/service/tests/it/store.rs index 7de1fc716edd..68b7725ab6ed 100644 --- a/src/meta/service/tests/it/store.rs +++ b/src/meta/service/tests/it/store.rs @@ -134,7 +134,7 @@ async fn test_meta_store_build_snapshot() -> anyhow::Result<()> { let (logs, want) = snapshot_logs(); - sto.log.append(logs.clone()).await?; + sto.log.write().await.append(logs.clone()).await?; sto.state_machine.write().await.apply_entries(&logs).await; let curr_snap = sto.build_snapshot().await?; @@ -181,7 +181,7 @@ async fn test_meta_store_current_snapshot() -> anyhow::Result<()> { let (logs, want) = snapshot_logs(); - sto.log.append(logs.clone()).await?; + sto.log.write().await.append(logs.clone()).await?; { let mut sm = sto.state_machine.write().await; sm.apply_entries(&logs).await; @@ -226,7 +226,7 @@ async fn test_meta_store_install_snapshot() -> anyhow::Result<()> { info!("--- feed logs and state machine"); - sto.log.append(logs.clone()).await?; + sto.log.write().await.append(logs.clone()).await?; sto.state_machine.write().await.apply_entries(&logs).await; snap = sto.build_snapshot().await?;