Skip to content

Commit

Permalink
change key/value in dht to be arbitrary bytes
Browse files Browse the repository at this point in the history
  • Loading branch information
mikkeldenker committed Mar 16, 2024
1 parent 5c9a681 commit 0abc924
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 36 deletions.
59 changes: 32 additions & 27 deletions crates/core/src/mapreduce/dht/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,20 +160,22 @@ mod tests {
let c2 = RemoteClient::new(addr2);
let c3 = RemoteClient::new(addr3);

c1.set("hello".to_string(), "world".to_string()).await?;
c1.set("hello".as_bytes().to_vec(), "world".as_bytes().to_vec())
.await?;

let res = c1.get("hello".to_string()).await?;
assert_eq!(res, Some("world".to_string()));
let res = c1.get("hello".as_bytes().to_vec()).await?;
assert_eq!(res, Some("world".as_bytes().to_vec()));

let res = c2.get("hello".to_string()).await?;
assert_eq!(res, Some("world".to_string()));
let res = c2.get("hello".as_bytes().to_vec()).await?;
assert_eq!(res, Some("world".as_bytes().to_vec()));

c2.set("hello".to_string(), "world2".to_string()).await?;
let res = c3.get("hello".to_string()).await?;
assert_eq!(res, Some("world2".to_string()));
c2.set("hello".as_bytes().to_vec(), "world2".as_bytes().to_vec())
.await?;
let res = c3.get("hello".as_bytes().to_vec()).await?;
assert_eq!(res, Some("world2".as_bytes().to_vec()));

let res = c1.get("hello".to_string()).await?;
assert_eq!(res, Some("world2".to_string()));
let res = c1.get("hello".as_bytes().to_vec()).await?;
assert_eq!(res, Some("world2".as_bytes().to_vec()));

Ok(())
}
Expand Down Expand Up @@ -206,11 +208,12 @@ mod tests {
let c1 = RemoteClient::new(addr1);
let c2 = RemoteClient::new(addr2);

c1.set("hello".to_string(), "world".to_string()).await?;
c1.set("hello".as_bytes().to_vec(), "world".as_bytes().to_vec())
.await?;

let res = c2.get("hello".to_string()).await?;
let res = c2.get("hello".as_bytes().to_vec()).await?;

assert_eq!(res, Some("world".to_string()));
assert_eq!(res, Some("world".as_bytes().to_vec()));

let members: BTreeMap<u64, _> = vec![(1, addr1), (2, addr2), (3, addr3)]
.into_iter()
Expand All @@ -222,8 +225,8 @@ mod tests {
rc1.join(3, addr3, members.clone()).await?;

let c3 = RemoteClient::new(addr3);
let res = c3.get("hello".to_string()).await?;
assert_eq!(res, Some("world".to_string()));
let res = c3.get("hello".as_bytes().to_vec()).await?;
assert_eq!(res, Some("world".as_bytes().to_vec()));

Ok(())
}
Expand Down Expand Up @@ -261,13 +264,14 @@ mod tests {

let rc1 = network::raft::RemoteClient::new(1, BasicNode::new(addr1));

c1.set("hello".to_string(), "world".to_string()).await?;
c1.set("hello".as_bytes().to_vec(), "world".as_bytes().to_vec())
.await?;

let res = c1.get("hello".to_string()).await?;
assert_eq!(res, Some("world".to_string()));
let res = c1.get("hello".as_bytes().to_vec()).await?;
assert_eq!(res, Some("world".as_bytes().to_vec()));

let res = c2.get("hello".to_string()).await?;
assert_eq!(res, Some("world".to_string()));
let res = c2.get("hello".as_bytes().to_vec()).await?;
assert_eq!(res, Some("world".as_bytes().to_vec()));

// crash node 2
handles[1].abort();
Expand All @@ -284,16 +288,17 @@ mod tests {

let c2 = RemoteClient::new(addr2);

let res = c2.get("hello".to_string()).await?;
assert_eq!(res, Some("world".to_string()));
let res = c2.get("hello".as_bytes().to_vec()).await?;
assert_eq!(res, Some("world".as_bytes().to_vec()));

// crash node 2 again
handles[1].abort();
drop(raft2);

c3.set("hello".to_string(), "world2".to_string()).await?;
let res = c1.get("hello".to_string()).await?;
assert_eq!(res, Some("world2".to_string()));
c3.set("hello".as_bytes().to_vec(), "world2".as_bytes().to_vec())
.await?;
let res = c1.get("hello".as_bytes().to_vec()).await?;
assert_eq!(res, Some("world2".as_bytes().to_vec()));

let (raft2, server2, addr2) = server(2).await?;
handles[1] = tokio::spawn(async move {
Expand All @@ -306,8 +311,8 @@ mod tests {

let c2 = RemoteClient::new(addr2);

let res = c2.get("hello".to_string()).await?;
assert_eq!(res, Some("world2".to_string()));
let res = c2.get("hello".as_bytes().to_vec()).await?;
assert_eq!(res, Some("world2".as_bytes().to_vec()));

Ok(())
}
Expand Down
12 changes: 6 additions & 6 deletions crates/core/src/mapreduce/dht/network/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@ use super::Server;

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct Set {
pub key: String,
pub value: String,
pub key: Vec<u8>,
pub value: Vec<u8>,
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct Get {
pub key: String,
pub key: Vec<u8>,
}

impl sonic::service::Message<Server> for Set {
Expand All @@ -53,7 +53,7 @@ impl sonic::service::Message<Server> for Set {
}

impl sonic::service::Message<Server> for Get {
type Response = Option<String>;
type Response = Option<Vec<u8>>;

async fn handle(self, server: &Server) -> Self::Response {
server
Expand Down Expand Up @@ -87,7 +87,7 @@ impl RemoteClient {
)
}

pub async fn set(&self, key: String, value: String) -> Result<()> {
pub async fn set(&self, key: Vec<u8>, value: Vec<u8>) -> Result<()> {
for backoff in Self::retry_strat() {
let res = self
.likely_leader
Expand Down Expand Up @@ -152,7 +152,7 @@ impl RemoteClient {
Err(anyhow!("failed to set key"))
}

pub async fn get(&self, key: String) -> Result<Option<String>> {
pub async fn get(&self, key: Vec<u8>) -> Result<Option<Vec<u8>>> {
for backoff in Self::retry_strat() {
match self
.self_remote
Expand Down
6 changes: 3 additions & 3 deletions crates/core/src/mapreduce/dht/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ pub struct StateMachineData {
pub last_membership: StoredMembership<NodeId, BasicNode>,

/// Application data.
pub data: BTreeMap<String, String>,
pub data: BTreeMap<Vec<u8>, Vec<u8>>,
}

#[derive(Debug, Default)]
Expand Down Expand Up @@ -199,8 +199,8 @@ impl RaftStateMachine<TypeConfig> for Arc<StateMachineStore> {

// Update the state machine.
{
let data: BTreeMap<String, String> =
bincode::deserialize(&new_snapshot.data).map_err(|e| {
let data: BTreeMap<Vec<u8>, Vec<u8>> = bincode::deserialize(&new_snapshot.data)
.map_err(|e| {
StorageIOError::read_snapshot(Some(new_snapshot.meta.signature()), &e)
})?;

Expand Down

0 comments on commit 0abc924

Please sign in to comment.