Skip to content

Commit

Permalink
establish dht connection in ampc
Browse files Browse the repository at this point in the history
  • Loading branch information
mikkeldenker committed Mar 27, 2024
1 parent 255c003 commit 37eee39
Show file tree
Hide file tree
Showing 7 changed files with 271 additions and 34 deletions.
21 changes: 21 additions & 0 deletions crates/core/src/ampc/dht/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,19 @@ use super::{
store::{Key, Table, Value},
};

#[derive(serde::Serialize, serde::Deserialize)]
struct Node {
api: api::RemoteClient,
}

impl Clone for Node {
fn clone(&self) -> Self {
Self {
api: api::RemoteClient::new(self.api.addr()),
}
}
}

impl Node {
fn new(addr: SocketAddr) -> Self {
let api = api::RemoteClient::new(addr);
Expand All @@ -50,6 +59,7 @@ impl Node {
}
}

#[derive(Clone, serde::Serialize, serde::Deserialize)]
struct Shard {
nodes: Vec<Node>,
}
Expand All @@ -76,6 +86,7 @@ impl Shard {
}
}

#[derive(Clone, serde::Serialize, serde::Deserialize)]
pub struct Client {
ids: Vec<ShardId>,
shards: BTreeMap<ShardId, Shard>,
Expand Down Expand Up @@ -175,4 +186,14 @@ impl Client {

Ok(tables)
}

pub async fn clone_table(&self, from: Table, to: Table) -> Result<()> {
for shard in self.shards.values() {
for node in &shard.nodes {
node.api.clone_table(from.clone(), to.clone()).await?;
}
}

Ok(())
}
}
10 changes: 7 additions & 3 deletions crates/core/src/ampc/dht/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub mod log_store;
mod network;
pub mod store;

use network::api::{AllTables, CreateTable, DropTable, Get, Set};
use network::api::{AllTables, CloneTable, CreateTable, DropTable, Get, Set};

use std::fmt::Debug;
use std::io::Cursor;
Expand All @@ -45,6 +45,7 @@ pub use network::api::RemoteClient as ApiClient;
pub use network::raft::RemoteClient as RaftClient;

pub use client::Client;
pub use store::Table;

pub type NodeId = u64;

Expand Down Expand Up @@ -88,12 +89,15 @@ macro_rules! raft_sonic_request_response {
};
}

raft_sonic_request_response!(Server, [Get, Set, CreateTable, DropTable, AllTables]);
raft_sonic_request_response!(
Server,
[Get, Set, CreateTable, DropTable, AllTables, CloneTable]
);

#[cfg(test)]
mod tests {
use std::{collections::BTreeMap, net::SocketAddr, sync::Arc};
use tests::{network::api::RemoteClient, store::Table};
use tests::network::api::RemoteClient;
use tokio::sync::Mutex;
use tracing_test::traced_test;

Expand Down
122 changes: 109 additions & 13 deletions crates/core/src/ampc/dht/network/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ pub struct CreateTable {
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct AllTables;

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct CloneTable {
pub from: Table,
pub to: Table,
}

impl sonic::service::Message<Server> for Set {
type Response = Result<(), RaftError<NodeId, ClientWriteError<NodeId, BasicNode>>>;

Expand Down Expand Up @@ -121,19 +127,36 @@ impl sonic::service::Message<Server> for AllTables {
}
}

impl sonic::service::Message<Server> for CloneTable {
type Response = Result<(), RaftError<NodeId, ClientWriteError<NodeId, BasicNode>>>;

async fn handle(self, server: &Server) -> Self::Response {
match server.raft.client_write(self.into()).await {
Ok(_) => Ok(()),
Err(e) => Err(e),
}
}
}

#[derive(serde::Serialize, serde::Deserialize)]
pub struct RemoteClient {
self_remote: sonic::replication::RemoteClient<Server>,
likely_leader: RwLock<sonic::replication::RemoteClient<Server>>,
#[serde(skip)]
likely_leader: RwLock<Option<sonic::replication::RemoteClient<Server>>>,
}

impl RemoteClient {
pub fn new(addr: SocketAddr) -> Self {
Self {
self_remote: sonic::replication::RemoteClient::new(addr),
likely_leader: RwLock::new(sonic::replication::RemoteClient::new(addr)),
likely_leader: RwLock::new(None),
}
}

pub fn addr(&self) -> SocketAddr {
self.self_remote.addr()
}

fn retry_strat() -> impl Iterator<Item = std::time::Duration> {
RandomBackoff::new(
std::time::Duration::from_millis(200),
Expand All @@ -147,6 +170,8 @@ impl RemoteClient {
.likely_leader
.read()
.await
.as_ref()
.unwrap_or(&self.self_remote)
.send_with_timeout(
&Set {
table: table.clone(),
Expand All @@ -169,12 +194,12 @@ impl RemoteClient {
}) => match leader_node {
Some(leader_node) => {
let mut likely_leader = self.likely_leader.write().await;
*likely_leader = sonic::replication::RemoteClient::new(
*likely_leader = Some(sonic::replication::RemoteClient::new(
leader_node
.addr
.parse()
.expect("node addr should always be valid addr"),
);
));
}
None => {
tokio::time::sleep(backoff).await;
Expand Down Expand Up @@ -248,6 +273,8 @@ impl RemoteClient {
.likely_leader
.read()
.await
.as_ref()
.unwrap_or(&self.self_remote)
.send_with_timeout(
&DropTable {
table: table.clone(),
Expand All @@ -266,12 +293,12 @@ impl RemoteClient {
}) => match leader_node {
Some(leader_node) => {
let mut likely_leader = self.likely_leader.write().await;
*likely_leader = sonic::replication::RemoteClient::new(
*likely_leader = Some(sonic::replication::RemoteClient::new(
leader_node
.addr
.parse()
.expect("node addr should always be valid addr"),
);
));
}
None => {
tokio::time::sleep(backoff).await;
Expand Down Expand Up @@ -301,7 +328,7 @@ impl RemoteClient {
}
}

Err(anyhow!("failed to set key"))
Err(anyhow!("failed to drop table"))
}

pub async fn create_table(&self, table: Table) -> Result<()> {
Expand All @@ -310,6 +337,8 @@ impl RemoteClient {
.likely_leader
.read()
.await
.as_ref()
.unwrap_or(&self.self_remote)
.send_with_timeout(
&CreateTable {
table: table.clone(),
Expand All @@ -328,12 +357,12 @@ impl RemoteClient {
}) => match leader_node {
Some(leader_node) => {
let mut likely_leader = self.likely_leader.write().await;
*likely_leader = sonic::replication::RemoteClient::new(
*likely_leader = Some(sonic::replication::RemoteClient::new(
leader_node
.addr
.parse()
.expect("node addr should always be valid addr"),
);
));
}
None => {
tokio::time::sleep(backoff).await;
Expand Down Expand Up @@ -363,7 +392,7 @@ impl RemoteClient {
}
}

Err(anyhow!("failed to set key"))
Err(anyhow!("failed to create table"))
}

pub async fn all_tables(&self) -> Result<Vec<Table>> {
Expand All @@ -372,6 +401,8 @@ impl RemoteClient {
.likely_leader
.read()
.await
.as_ref()
.unwrap_or(&self.self_remote)
.send_with_timeout(&AllTables, Duration::from_secs(5))
.await;

Expand All @@ -385,12 +416,12 @@ impl RemoteClient {
}) => match leader_node {
Some(leader_node) => {
let mut likely_leader = self.likely_leader.write().await;
*likely_leader = sonic::replication::RemoteClient::new(
*likely_leader = Some(sonic::replication::RemoteClient::new(
leader_node
.addr
.parse()
.expect("node addr should always be valid addr"),
);
));
}
None => {
tokio::time::sleep(backoff).await;
Expand Down Expand Up @@ -420,6 +451,71 @@ impl RemoteClient {
}
}

Err(anyhow!("failed to set key"))
Err(anyhow!("failed to get tables"))
}

pub async fn clone_table(&self, from: Table, to: Table) -> Result<()> {
for backoff in Self::retry_strat() {
let res = self
.likely_leader
.read()
.await
.as_ref()
.unwrap_or(&self.self_remote)
.send_with_timeout(
&CloneTable {
from: from.clone(),
to: to.clone(),
},
Duration::from_secs(5),
)
.await;

match res {
Ok(res) => match res {
Ok(res) => return Ok(res),
Err(RaftError::APIError(e)) => match e {
ClientWriteError::ForwardToLeader(ForwardToLeader {
leader_id: _,
leader_node,
}) => match leader_node {
Some(leader_node) => {
let mut likely_leader = self.likely_leader.write().await;
*likely_leader = Some(sonic::replication::RemoteClient::new(
leader_node
.addr
.parse()
.expect("node addr should always be valid addr"),
));
}
None => {
tokio::time::sleep(backoff).await;
}
},
ClientWriteError::ChangeMembershipError(_) => {
unreachable!(".clone_table() should not change membership")
}
},
Err(RaftError::Fatal(e)) => return Err(e.into()),
},
Err(e) => match e {
sonic::Error::IO(_)
| sonic::Error::Serialization(_)
| sonic::Error::ConnectionTimeout
| sonic::Error::RequestTimeout
| sonic::Error::PoolCreation => {
tokio::time::sleep(backoff).await;
}
sonic::Error::BadRequest
| sonic::Error::BodyTooLarge {
body_size: _,
max_size: _,
}
| sonic::Error::Application(_) => return Err(e.into()),
},
}
}

Err(anyhow!("failed to clone table"))
}
}
3 changes: 2 additions & 1 deletion crates/core/src/ampc/dht/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
pub mod api;
pub mod raft;

use api::{AllTables, CreateTable, DropTable, Get, Set};
use api::{AllTables, CloneTable, CreateTable, DropTable, Get, Set};
use std::{collections::BTreeMap, net::SocketAddr, sync::Arc};

use openraft::{BasicNode, Raft, RaftNetworkFactory};
Expand Down Expand Up @@ -72,6 +72,7 @@ sonic_service!(
DropTable,
CreateTable,
AllTables,
CloneTable,
]
);

Expand Down
14 changes: 9 additions & 5 deletions crates/core/src/ampc/dht/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use super::{Request, Response};
pub struct Table(String);

impl Table {
pub fn as_std(&self) -> &str {
pub fn as_str(&self) -> &str {
&self.0
}
}
Expand Down Expand Up @@ -125,9 +125,9 @@ impl Db {
self.data.entry(table).or_default().insert(key, value);
}

pub fn new_table_from(&mut self, table: &Table, new_table: Table) {
let data = self.data.get(table).cloned().unwrap_or_default();
self.data.insert(new_table, data);
pub fn clone_table(&mut self, from: &Table, to: Table) {
let data = self.data.get(from).cloned().unwrap_or_default();
self.data.insert(to, data);
}

pub fn new_table(&mut self, table: Table) {
Expand Down Expand Up @@ -266,12 +266,16 @@ impl RaftStateMachine<TypeConfig> for Arc<StateMachineStore> {
res.push(Response::CreateTable(Ok(())))
}
Request::DropTable(api::DropTable { table }) => {
sm.db.data.remove(table);
sm.db.drop_table(table);
res.push(Response::DropTable(Ok(())))
}
Request::AllTables(api::AllTables) => {
res.push(Response::AllTables(Ok(sm.db.tables())))
}
Request::CloneTable(api::CloneTable { from, to }) => {
sm.db.clone_table(from, to.clone());
res.push(Response::CloneTable(Ok(())))
}
},
EntryPayload::Membership(ref mem) => {
sm.last_membership = StoredMembership::new(Some(entry.log_id), mem.clone());
Expand Down
Loading

0 comments on commit 37eee39

Please sign in to comment.