Skip to content

Commit

Permalink
add Response::Empty to raft in order to avoid having to send back hac…
Browse files Browse the repository at this point in the history
…ky Response::Set(Ok(())) for internal raft entries
  • Loading branch information
mikkeldenker committed Mar 16, 2024
1 parent 94af569 commit 5c9a681
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 19 deletions.
1 change: 1 addition & 0 deletions crates/core/src/mapreduce/dht/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ macro_rules! raft_sonic_request_response {
$(
$req(<$req as crate::distributed::sonic::service::Message<$service>>::Response),
)*
Empty,
}

$(
Expand Down
31 changes: 12 additions & 19 deletions crates/core/src/mapreduce/dht/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,26 +49,18 @@ pub struct StoredSnapshot {
pub data: Vec<u8>,
}

/// Data contained in the Raft state machine. Note that we are using `serde` to serialize the
/// `data`, which has a implementation to be serialized. Note that for this test we set both the key
/// and value as String, but you could set any type of value that has the serialization impl.
#[derive(Serialize, Deserialize, Debug, Default, Clone)]
pub struct StateMachineData {
pub last_applied_log: Option<LogId<NodeId>>,

pub last_membership: StoredMembership<NodeId, BasicNode>,

/// Application data.
pub data: BTreeMap<String, String>,
}

/// Defines a state machine for the Raft cluster. This state machine represents a copy of the
/// data for this node. Additionally, it is responsible for storing the last snapshot of the data.
#[derive(Debug, Default)]
pub struct StateMachineStore {
/// The Raft state machine.
pub state_machine: RwLock<StateMachineData>,

snapshot_idx: Arc<Mutex<u64>>,

/// The last received snapshot.
Expand All @@ -85,7 +77,7 @@ impl RaftSnapshotBuilder<TypeConfig> for Arc<StateMachineStore> {
{
// Serialize the data of the state machine.
let state_machine = self.state_machine.read().await;
data = serde_json::to_vec(&*state_machine)
data = bincode::serialize(&*state_machine)
.map_err(|e| StorageIOError::read_state_machine(&e))?;

last_applied_log = state_machine.last_applied_log;
Expand Down Expand Up @@ -146,25 +138,23 @@ impl RaftStateMachine<TypeConfig> for Arc<StateMachineStore> {
where
I: IntoIterator<Item = Entry<TypeConfig>> + Send,
{
let mut res = Vec::new(); //No `with_capacity`; do not know `len` of iterator

let mut res = Vec::new();
let mut sm = self.state_machine.write().await;

for entry in entries {
tracing::debug!(%entry.log_id, "replicate to sm");

if let Some(ref last) = sm.last_applied_log {
if last >= &entry.log_id {
res.push(Response::Set(Ok(())));

res.push(Response::Empty);
continue;
}
}

sm.last_applied_log = Some(entry.log_id);

match entry.payload {
EntryPayload::Blank => res.push(Response::Set(Ok(()))),
EntryPayload::Blank => res.push(Response::Empty),
EntryPayload::Normal(ref req) => match req {
Request::Set(api::Set { key, value }) => {
sm.data.insert(key.clone(), value.clone());
Expand All @@ -177,7 +167,7 @@ impl RaftStateMachine<TypeConfig> for Arc<StateMachineStore> {
},
EntryPayload::Membership(ref mem) => {
sm.last_membership = StoredMembership::new(Some(entry.log_id), mem.clone());
res.push(Response::Set(Ok(())))
res.push(Response::Empty)
}
};
}
Expand Down Expand Up @@ -209,12 +199,15 @@ impl RaftStateMachine<TypeConfig> for Arc<StateMachineStore> {

// Update the state machine.
{
let updated_state_machine: StateMachineData =
serde_json::from_slice(&new_snapshot.data).map_err(|e| {
let data: BTreeMap<String, String> =
bincode::deserialize(&new_snapshot.data).map_err(|e| {
StorageIOError::read_snapshot(Some(new_snapshot.meta.signature()), &e)
})?;

let mut state_machine = self.state_machine.write().await;
*state_machine = updated_state_machine;
state_machine.data = data;
state_machine.last_applied_log = meta.last_log_id;
state_machine.last_membership = meta.last_membership.clone();
}

// Update current snapshot.
Expand All @@ -240,7 +233,7 @@ impl RaftStateMachine<TypeConfig> for Arc<StateMachineStore> {
}

async fn get_snapshot_builder(&mut self) -> Self::SnapshotBuilder {
self.clone()
Arc::clone(self)
}
}

Expand Down

0 comments on commit 5c9a681

Please sign in to comment.