From 555010fc0907ad80687ab68f4b4270aead8ed481 Mon Sep 17 00:00:00 2001 From: Mikkel Denker Date: Fri, 15 Mar 2024 14:19:33 +0100 Subject: [PATCH] join existing raft cluster --- .../core/src/distributed/sonic/replication.rs | 11 +- crates/core/src/mapreduce/dht/mod.rs | 4 +- crates/core/src/mapreduce/dht/network/mod.rs | 19 +- crates/core/src/mapreduce/dht/network/raft.rs | 168 +++++++++++++++++- 4 files changed, 189 insertions(+), 13 deletions(-) diff --git a/crates/core/src/distributed/sonic/replication.rs b/crates/core/src/distributed/sonic/replication.rs index b5134c63..680b1ebf 100644 --- a/crates/core/src/distributed/sonic/replication.rs +++ b/crates/core/src/distributed/sonic/replication.rs @@ -21,12 +21,21 @@ use super::Result; use crate::distributed::{retry_strategy::ExponentialBackoff, sonic}; use std::{net::SocketAddr, time::Duration}; -#[derive(Debug, Clone)] +#[derive(Debug)] pub struct RemoteClient { addr: SocketAddr, _phantom: std::marker::PhantomData, } +impl Clone for RemoteClient +where + S: sonic::service::Service, +{ + fn clone(&self) -> Self { + Self::create(self.addr) + } +} + impl RemoteClient where S: sonic::service::Service, diff --git a/crates/core/src/mapreduce/dht/mod.rs b/crates/core/src/mapreduce/dht/mod.rs index ae0ce896..36b9c62c 100644 --- a/crates/core/src/mapreduce/dht/mod.rs +++ b/crates/core/src/mapreduce/dht/mod.rs @@ -179,7 +179,6 @@ mod tests { #[tokio::test] #[traced_test] - #[ignore = "[WIP] need to figure out how to add a new node to the cluster"] async fn test_member_join() -> anyhow::Result<()> { let (raft1, server1, addr1) = server(1).await?; let (raft2, server2, addr2) = server(2).await?; @@ -220,7 +219,8 @@ mod tests { .collect(); raft3.initialize(members.clone()).await?; - tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; + let rc1 = network::raft::RemoteClient::new(1, BasicNode::new(addr1)); + rc1.join(3, addr3, members.clone()).await?; let c3 = RemoteClient::new(addr3); let res = c3.get("hello".to_string()).await?; diff --git a/crates/core/src/mapreduce/dht/network/mod.rs b/crates/core/src/mapreduce/dht/network/mod.rs index b5ba6622..b6efc5a1 100644 --- a/crates/core/src/mapreduce/dht/network/mod.rs +++ b/crates/core/src/mapreduce/dht/network/mod.rs @@ -15,10 +15,10 @@ // along with this program. If not, see pub mod api; -mod raft; +pub mod raft; use api::{Get, Set}; -use std::sync::Arc; +use std::{collections::BTreeMap, net::SocketAddr, sync::Arc}; use openraft::{BasicNode, Raft, RaftNetworkFactory}; @@ -48,14 +48,27 @@ pub type InstallSnapshotResponse = openraft::raft::InstallSnapshotResponse; pub type VoteResponse = openraft::raft::VoteResponse; +#[derive(serde::Serialize, serde::Deserialize, Debug, Clone)] +pub struct AddLearnerRequest { + pub id: NodeId, + pub addr: SocketAddr, +} + +#[derive(serde::Serialize, serde::Deserialize, Debug, Clone)] +pub struct AddNodesRequest { + members: BTreeMap, +} + sonic_service!( Server, [ AppendEntriesRequest, InstallSnapshotRequest, VoteRequest, + AddLearnerRequest, + AddNodesRequest, Get, - Set + Set, ] ); diff --git a/crates/core/src/mapreduce/dht/network/raft.rs b/crates/core/src/mapreduce/dht/network/raft.rs index 8ad662ff..1ff5c15d 100644 --- a/crates/core/src/mapreduce/dht/network/raft.rs +++ b/crates/core/src/mapreduce/dht/network/raft.rs @@ -14,22 +14,27 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see -use std::net::SocketAddr; +use std::{collections::BTreeMap, net::SocketAddr, time::Duration}; use openraft::{ - error::{InstallSnapshotError, RaftError}, + error::{ClientWriteError, ForwardToLeader, InstallSnapshotError, RaftError}, network::RPCOption, - BasicNode, RaftNetwork, + BasicNode, ChangeMembers, RaftNetwork, }; +use tokio::sync::RwLock; use crate::{ - distributed::sonic::{self, service::ResilientConnection}, + distributed::{ + retry_strategy::ExponentialBackoff, + sonic::{self, service::ResilientConnection}, + }, mapreduce::dht::{NodeId, TypeConfig}, + Result, }; use super::{ - AppendEntriesRequest, AppendEntriesResponse, InstallSnapshotRequest, InstallSnapshotResponse, - Server, VoteRequest, VoteResponse, + AddLearnerRequest, AddNodesRequest, AppendEntriesRequest, AppendEntriesResponse, + InstallSnapshotRequest, InstallSnapshotResponse, Server, VoteRequest, VoteResponse, }; impl sonic::service::Message for AppendEntriesRequest { @@ -59,6 +64,32 @@ impl sonic::service::Message for VoteRequest { } } +impl sonic::service::Message for AddLearnerRequest { + type Response = Result<(), RaftError>>; + + async fn handle(self, server: &Server) -> Self::Response { + tracing::debug!("received add learner request: {:?}", self); + let node = BasicNode::new(self.addr); + server.raft.add_learner(self.id, node, false).await?; + + Ok(()) + } +} + +impl sonic::service::Message for AddNodesRequest { + type Response = Result<(), RaftError>>; + + async fn handle(self, server: &Server) -> Self::Response { + tracing::debug!("received add nodes request: {:?}", self); + server + .raft + .change_membership(ChangeMembers::AddNodes(self.members), true) + .await?; + + Ok(()) + } +} + type RPCError = openraft::error::RPCError>; @@ -66,17 +97,20 @@ pub struct RemoteClient { target: NodeId, node: BasicNode, inner: sonic::replication::RemoteClient, + likely_leader: RwLock>, } impl RemoteClient { pub fn new(target: NodeId, node: BasicNode) -> Self { let addr: SocketAddr = node.addr.parse().expect("addr is not a valid address"); let inner = sonic::replication::RemoteClient::new(addr); + let likely_leader = RwLock::new(inner.clone()); Self { target, node, inner, + likely_leader, } } async fn raft_conn( @@ -89,7 +123,7 @@ impl RemoteClient { } async fn send_raft_rpc( - &mut self, + &self, rpc: R, option: RPCOption, ) -> Result> @@ -109,6 +143,126 @@ impl RemoteClient { } }) } + + async fn add_learner(&self, id: NodeId, addr: SocketAddr) -> Result<()> { + let rpc = AddLearnerRequest { id, addr }; + let retry = ExponentialBackoff::from_millis(500) + .with_limit(Duration::from_secs(60)) + .take(5); + + for backoff in retry { + let res = self.likely_leader.read().await.send(&rpc).await; + + match res { + Ok(res) => match res { + Ok(_) => return Ok(()), + Err(RaftError::APIError(e)) => match e { + ClientWriteError::ForwardToLeader(ForwardToLeader { + leader_id: _, + leader_node, + }) => match leader_node { + Some(leader_node) => { + let mut likely_leader = self.likely_leader.write().await; + *likely_leader = sonic::replication::RemoteClient::new( + leader_node + .addr + .parse() + .expect("node addr should always be valid addr"), + ); + } + None => tokio::time::sleep(backoff).await, + }, + ClientWriteError::ChangeMembershipError(_) => { + tokio::time::sleep(backoff).await + } + }, + Err(RaftError::Fatal(e)) => return Err(e.into()), + }, + Err(e) => match e { + sonic::Error::IO(_) + | sonic::Error::Serialization(_) + | sonic::Error::ConnectionTimeout + | sonic::Error::RequestTimeout + | sonic::Error::PoolCreation => { + tokio::time::sleep(backoff).await; + } + sonic::Error::BadRequest + | sonic::Error::BodyTooLarge { + body_size: _, + max_size: _, + } + | sonic::Error::Application(_) => return Err(e.into()), + }, + } + } + + Err(anyhow::anyhow!("failed to add learner")) + } + + async fn add_nodes(&self, members: BTreeMap) -> Result<()> { + let rpc = AddNodesRequest { members }; + let retry = ExponentialBackoff::from_millis(500).with_limit(Duration::from_secs(10)); + + for backoff in retry { + let res = self.likely_leader.read().await.send(&rpc).await; + + match res { + Ok(res) => match res { + Ok(_) => return Ok(()), + Err(RaftError::APIError(e)) => match e { + ClientWriteError::ForwardToLeader(ForwardToLeader { + leader_id: _, + leader_node, + }) => match leader_node { + Some(leader_node) => { + let mut likely_leader = self.likely_leader.write().await; + *likely_leader = sonic::replication::RemoteClient::new( + leader_node + .addr + .parse() + .expect("node addr should always be valid addr"), + ); + } + None => tokio::time::sleep(backoff).await, + }, + ClientWriteError::ChangeMembershipError(_) => { + tokio::time::sleep(backoff).await + } + }, + Err(RaftError::Fatal(e)) => return Err(e.into()), + }, + Err(e) => match e { + sonic::Error::IO(_) + | sonic::Error::Serialization(_) + | sonic::Error::ConnectionTimeout + | sonic::Error::RequestTimeout + | sonic::Error::PoolCreation => { + tokio::time::sleep(backoff).await; + } + sonic::Error::BadRequest + | sonic::Error::BodyTooLarge { + body_size: _, + max_size: _, + } + | sonic::Error::Application(_) => return Err(e.into()), + }, + } + } + + unreachable!("should continue to retry"); + } + + pub async fn join( + &self, + id: NodeId, + addr: SocketAddr, + new_all_nodes: BTreeMap, + ) -> Result<()> { + self.add_learner(id, addr).await?; + self.add_nodes(new_all_nodes).await?; + + Ok(()) + } } impl RaftNetwork for RemoteClient {