Skip to content

Commit

Permalink
refactor: it has to get a consistent view of the state machine when e…
Browse files Browse the repository at this point in the history
…xporting meta-service data (#13085)

In this commit, wrap sled trees for `log` and `raft-state` with an async
lock. Before exporting, it has to acquire these locks to get a
consistent view of the state machine.
  • Loading branch information
drmingdrmer authored Oct 6, 2023
1 parent e934513 commit 5863480
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 42 deletions.
22 changes: 22 additions & 0 deletions src/meta/service/src/store/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ impl RaftLogReader<TypeConfig> for RaftStore {

match self
.log
.read()
.await
.range_values(range)
.map_to_sto_err(ErrorSubject::Logs, ErrorVerb::Read)
{
Expand Down Expand Up @@ -132,20 +134,26 @@ impl RaftStorage<TypeConfig> for RaftStore {

async fn save_committed(&mut self, committed: Option<LogId>) -> Result<(), StorageError> {
self.raft_state
.write()
.await
.save_committed(committed)
.await
.map_to_sto_err(ErrorSubject::Store, ErrorVerb::Write)
}

async fn read_committed(&mut self) -> Result<Option<LogId>, StorageError> {
self.raft_state
.read()
.await
.read_committed()
.map_to_sto_err(ErrorSubject::Store, ErrorVerb::Read)
}

async fn get_log_state(&mut self) -> Result<LogState<TypeConfig>, StorageError> {
let last_purged_log_id = match self
.log
.read()
.await
.get_last_purged()
.map_to_sto_err(ErrorSubject::Logs, ErrorVerb::Read)
{
Expand All @@ -158,6 +166,8 @@ impl RaftStorage<TypeConfig> for RaftStore {

let last = match self
.log
.read()
.await
.logs()
.last()
.map_to_sto_err(ErrorSubject::Logs, ErrorVerb::Read)
Expand Down Expand Up @@ -191,6 +201,8 @@ impl RaftStorage<TypeConfig> for RaftStore {

match self
.raft_state
.write()
.await
.save_vote(hs)
.await
.map_to_sto_err(ErrorSubject::Vote, ErrorVerb::Write)
Expand All @@ -211,6 +223,8 @@ impl RaftStorage<TypeConfig> for RaftStore {

match self
.log
.write()
.await
.range_remove(log_id.index..)
.await
.map_to_sto_err(ErrorSubject::Log(log_id), ErrorVerb::Delete)
Expand All @@ -229,6 +243,8 @@ impl RaftStorage<TypeConfig> for RaftStore {

if let Err(err) = self
.log
.write()
.await
.set_last_purged(log_id)
.await
.map_to_sto_err(ErrorSubject::Logs, ErrorVerb::Write)
Expand All @@ -238,6 +254,8 @@ impl RaftStorage<TypeConfig> for RaftStore {
};
if let Err(err) = self
.log
.write()
.await
.range_remove(..=log_id.index)
.await
.map_to_sto_err(ErrorSubject::Log(log_id), ErrorVerb::Delete)
Expand Down Expand Up @@ -265,6 +283,8 @@ impl RaftStorage<TypeConfig> for RaftStore {

match self
.log
.write()
.await
.append(entries)
.await
.map_to_sto_err(ErrorSubject::Logs, ErrorVerb::Write)
Expand Down Expand Up @@ -396,6 +416,8 @@ impl RaftStorage<TypeConfig> for RaftStore {
async fn read_vote(&mut self) -> Result<Option<Vote>, StorageError> {
match self
.raft_state
.read()
.await
.read_vote()
.map_to_sto_err(ErrorSubject::Vote, ErrorVerb::Read)
{
Expand Down
61 changes: 25 additions & 36 deletions src/meta/service/src/store/store_inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ use common_meta_raft_store::state_machine::StoredSnapshot;
use common_meta_sled_store::get_sled_db;
use common_meta_sled_store::openraft::ErrorSubject;
use common_meta_sled_store::openraft::ErrorVerb;
use common_meta_sled_store::openraft::RaftLogId;
use common_meta_sled_store::SledTree;
use common_meta_stoerr::MetaStorageError;
use common_meta_types::Endpoint;
Expand Down Expand Up @@ -90,11 +89,12 @@ pub struct StoreInner {

/// Raft state includes:
/// id: NodeId,
/// current_term,
/// voted_for
pub raft_state: RaftState,
/// vote, // the last `Vote`
/// committed, // last `LogId` that is known committed
pub raft_state: RwLock<RaftState>,

pub log: RaftLog,
/// A series of raft logs.
pub log: RwLock<RaftLog>,

/// The Raft state machine.
pub state_machine: Arc<RwLock<SMV002>>,
Expand Down Expand Up @@ -173,8 +173,8 @@ impl StoreInner {
config: config.clone(),
is_opened: is_open,
db,
raft_state,
log,
raft_state: RwLock::new(raft_state),
log: RwLock::new(log),
state_machine: sm,
current_snapshot: RwLock::new(stored_snapshot),
})
Expand Down Expand Up @@ -380,7 +380,6 @@ impl StoreInner {
AnyError::new(&e).add_context(|| "replacing state-machine with snapshot"),
)
})?;

// TODO(1): read_state_machine_id() and write_state_machine_id() is no longer used.
// TODO(xp): use checksum to check consistency?

Expand All @@ -390,10 +389,17 @@ impl StoreInner {
/// Export data that can be used to restore a meta-service node.
#[minitrace::trace]
pub async fn export(&self) -> Result<Vec<String>, io::Error> {
// NOTE:
// Lock all components so that we have a consistent view.
//
// Hold the snapshot lock to prevent snapshot from being replaced until exporting finished.
// Holding this lock prevent logs from being purged.
//
// Although vote and log must be consistent,
// it is OK to export RaftState and logs without transaction protection,
// if it guarantees no logs have a greater `vote` than `RaftState.HardState`.
let current_snapshot = self.current_snapshot.read().await;
let raft_state = self.raft_state.read().await;
let log = self.log.read().await;

let mut res = vec![];

Expand All @@ -413,21 +419,17 @@ impl StoreInner {
}
}

// Export RaftState
//
// Although vote and log must be consistent,
// it is OK to export RaftState and logs without transaction protection,
// if it guarantees no logs have a greater `vote` than `RaftState.HardState`.

let exported_vote = {
let tree_name = &self.raft_state.inner.name;
// Export raft state
{
let tree_name = &raft_state.inner.name;

let ks = self.raft_state.inner.key_space::<RaftStateKV>();
let ks = raft_state.inner.key_space::<RaftStateKV>();

let id = ks
.get(&RaftStateKey::Id)
.map_err(|e| io::Error::new(ErrorKind::InvalidData, e))?
.map(NodeId::from);

if let Some(id) = id {
let ent_id = RaftStoreEntry::RaftStateKV {
key: RaftStateKey::Id,
Expand All @@ -444,6 +446,7 @@ impl StoreInner {
.get(&RaftStateKey::HardState)
.map_err(|e| io::Error::new(ErrorKind::InvalidData, e))?
.map(Vote::from);

if let Some(vote) = vote {
let ent_vote = RaftStoreEntry::RaftStateKV {
key: RaftStateKey::HardState,
Expand All @@ -470,35 +473,21 @@ impl StoreInner {
.map_err(|e| io::Error::new(ErrorKind::InvalidData, e))?;

res.push(s);

// NOTE: `vote` is used in the following code.
vote.unwrap_or_default()
};

// Export logs that has smaller or equal leader id as `vote`
{
let tree_name = &self.log.inner.name;
let tree_name = &log.inner.name;

let log_kvs = self
.log
let log_kvs = log
.inner
.export()
.map_err(|e| io::Error::new(ErrorKind::InvalidData, e))?;

for kv in log_kvs.iter() {
let kv_entry = RaftStoreEntry::deserialize(&kv[0], &kv[1])
.map_err(|e| io::Error::new(ErrorKind::InvalidData, e))?;

if let RaftStoreEntry::Logs { ref value, .. } = kv_entry {
if value.get_log_id().leader_id > exported_vote.leader_id {
warn!(
"found log({}) with greater leader id than vote({}), skip exporting logs",
value.get_log_id(),
exported_vote
);
break;
}
}

let tree_kv = (tree_name, kv_entry);
let line = serde_json::to_string(&tree_kv)
.map_err(|e| io::Error::new(ErrorKind::InvalidData, e))?;
Expand All @@ -510,7 +499,7 @@ impl StoreInner {
// Export snapshot of state machine
{
// NOTE:
// This name had been used by the sled tree based sm.
// The name in form of "state_machine/[0-9]+" had been used by the sled tree based sm.
// Do not change it for keeping compatibility.
let tree_name = "state_machine/0";

Expand Down
6 changes: 3 additions & 3 deletions src/meta/service/tests/it/meta_node/meta_node_lifecycle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -692,7 +692,7 @@ async fn test_meta_node_restart_single_node() -> anyhow::Result<()> {
.await?;
log_index += 1;

want_hs = leader.sto.raft_state.read_vote()?;
want_hs = leader.sto.raft_state.read().await.read_vote()?;

leader.stop().await?;
}
Expand Down Expand Up @@ -724,13 +724,13 @@ async fn test_meta_node_restart_single_node() -> anyhow::Result<()> {

info!("--- check hard state");
{
let hs = leader.sto.raft_state.read_vote()?;
let hs = leader.sto.raft_state.read().await.read_vote()?;
assert_eq!(want_hs, hs);
}

info!("--- check logs");
{
let logs = leader.sto.log.range_values(..)?;
let logs = leader.sto.log.read().await.range_values(..)?;
info!("logs: {:?}", logs);
assert_eq!(log_index as usize + 1, logs.len());
}
Expand Down
6 changes: 3 additions & 3 deletions src/meta/service/tests/it/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ async fn test_meta_store_build_snapshot() -> anyhow::Result<()> {

let (logs, want) = snapshot_logs();

sto.log.append(logs.clone()).await?;
sto.log.write().await.append(logs.clone()).await?;
sto.state_machine.write().await.apply_entries(&logs).await;

let curr_snap = sto.build_snapshot().await?;
Expand Down Expand Up @@ -181,7 +181,7 @@ async fn test_meta_store_current_snapshot() -> anyhow::Result<()> {

let (logs, want) = snapshot_logs();

sto.log.append(logs.clone()).await?;
sto.log.write().await.append(logs.clone()).await?;
{
let mut sm = sto.state_machine.write().await;
sm.apply_entries(&logs).await;
Expand Down Expand Up @@ -226,7 +226,7 @@ async fn test_meta_store_install_snapshot() -> anyhow::Result<()> {

info!("--- feed logs and state machine");

sto.log.append(logs.clone()).await?;
sto.log.write().await.append(logs.clone()).await?;
sto.state_machine.write().await.apply_entries(&logs).await;

snap = sto.build_snapshot().await?;
Expand Down

1 comment on commit 5863480

@vercel
Copy link

@vercel vercel bot commented on 5863480 Oct 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.