Skip to content

Commit

Permalink
async 1st pass, Send is not happy
Browse files Browse the repository at this point in the history
  • Loading branch information
fl0rek committed Sep 19, 2023
2 parents 3eab85f + 80e424f commit e38704a
Show file tree
Hide file tree
Showing 10 changed files with 497 additions and 303 deletions.
1 change: 0 additions & 1 deletion node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ tracing = "0.1.37"

[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
tokio = { version = "1.32.0", features = ["rt-multi-thread"] }
dashmap = "5.5.3"

[target.'cfg(target_arch = "wasm32")'.dependencies]
getrandom = { version = "0.2.10", features = ["js"] }
Expand Down
13 changes: 7 additions & 6 deletions node/src/exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,12 @@ use tracing::instrument;
mod client;
mod server;
mod utils;
pub use utils::ExtendedHeaderExt;

use crate::exchange::client::ExchangeClientHandler;
use crate::exchange::server::{ExchangeServerHandler, RequestResponseResponder};
use crate::exchange::server::ExchangeServerHandler;
use crate::p2p::P2pError;
use crate::peer_tracker::PeerTracker;
use crate::store::Store;
use crate::store::WrappedStore;
use crate::utils::{stream_protocol_id, OneshotResultSender};

/// Max request size in bytes
Expand All @@ -50,7 +49,7 @@ pub(crate) struct ExchangeBehaviour {
pub(crate) struct ExchangeConfig<'a> {
pub network_id: &'a str,
pub peer_tracker: Arc<PeerTracker>,
pub header_store: Arc<Store>,
pub header_store: WrappedStore,
}

#[derive(Debug, thiserror::Error)]
Expand Down Expand Up @@ -144,9 +143,9 @@ impl ExchangeBehaviour {
},
peer,
} => {
let responder = RequestResponseResponder::new(&mut self.req_resp, channel);
//let responder = RequestResponseResponder::new(&mut self.req_resp, channel);
self.server_handler
.on_request_received(peer, request_id, request, responder);
.on_request_received(peer, request_id, request, channel);
}

// Response to inbound request was sent
Expand Down Expand Up @@ -225,6 +224,8 @@ impl NetworkBehaviour for ExchangeBehaviour {
}
}

while let Poll::Ready((channel, response)) = self.server_handler.poll(cx) {}

Poll::Pending
}
}
Expand Down
192 changes: 106 additions & 86 deletions node/src/exchange/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,17 +286,14 @@ where
#[cfg(test)]
mod tests {
use super::*;
use crate::exchange::utils::ToHeaderResponse;
use crate::exchange::utils::ExtendedHeaderExt;
use crate::store::tests::gen_extended_header;
use celestia_proto::p2p::pb::header_request::Data;
use celestia_proto::p2p::pb::StatusCode;
use celestia_types::consts::HASH_SIZE;
use celestia_types::{DataAvailabilityHeader, Hash, ValidatorSet};
use celestia_types::Hash;
use std::collections::VecDeque;
use std::sync::atomic::{AtomicU64, Ordering};
use tendermint::block::header::Version;
use tendermint::block::{Commit, Header, Id};
use tendermint::hash::AppHash;
use tendermint::Time;

#[tokio::test]
async fn request_height() {
Expand All @@ -308,8 +305,8 @@ mod tests {

handler.on_send_request(&mut mock_req, HeaderRequest::with_origin(5, 1), tx);

let expected = gen_header_response(5);
let expected_header = expected.to_extended_header().unwrap();
let expected_header = gen_extended_header(5);
let expected = expected_header.to_header_response();

mock_req.send_n_responses(&mut handler, 1, vec![expected]);

Expand All @@ -325,8 +322,8 @@ mod tests {
let mut handler = ExchangeClientHandler::<MockReq>::new(peer_tracker);

let (tx, rx) = oneshot::channel();
let expected = gen_header_response(5);
let expected_header = expected.to_extended_header().unwrap();
let expected_header = gen_extended_header(5);
let expected = expected_header.to_header_response();

handler.on_send_request(
&mut mock_req,
Expand All @@ -351,14 +348,14 @@ mod tests {

handler.on_send_request(&mut mock_req, HeaderRequest::with_origin(5, 3), tx);

let expected = vec![
gen_header_response(5),
gen_header_response(6),
gen_header_response(7),
let expected_headers = vec![
gen_extended_header(5),
gen_extended_header(6),
gen_extended_header(7),
];
let expected_headers = expected
let expected = expected_headers
.iter()
.map(|header| header.to_extended_header().unwrap())
.map(|header| header.to_header_response())
.collect::<Vec<_>>();

mock_req.send_n_responses(&mut handler, 1, expected);
Expand All @@ -378,14 +375,14 @@ mod tests {

handler.on_send_request(&mut mock_req, HeaderRequest::with_origin(5, 3), tx);

let expected = vec![
gen_header_response(7),
gen_header_response(5),
gen_header_response(6),
let mut expected_headers = vec![
gen_extended_header(7),
gen_extended_header(5),
gen_extended_header(6),
];
let mut expected_headers = expected
let expected = expected_headers
.iter()
.map(|header| header.to_extended_header().unwrap())
.map(|header| header.to_header_response())
.collect::<Vec<_>>();
expected_headers.sort_by_key(|header| header.height());

Expand Down Expand Up @@ -428,7 +425,11 @@ mod tests {
let (tx, rx) = oneshot::channel();

handler.on_send_request(&mut mock_req, HeaderRequest::with_origin(5, 1), tx);
mock_req.send_n_responses(&mut handler, 1, vec![gen_header_response(4)]);
mock_req.send_n_responses(
&mut handler,
1,
vec![gen_extended_header(4).to_header_response()],
);

assert!(matches!(
rx.await,
Expand All @@ -450,9 +451,9 @@ mod tests {
&mut handler,
1,
vec![
gen_header_response(5),
gen_header_response(7),
gen_header_response(7),
gen_extended_header(5).to_header_response(),
gen_extended_header(7).to_header_response(),
gen_extended_header(7).to_header_response(),
],
);

Expand All @@ -476,7 +477,11 @@ mod tests {
tx,
);

mock_req.send_n_responses(&mut handler, 1, vec![gen_header_response(5)]);
mock_req.send_n_responses(
&mut handler,
1,
vec![gen_extended_header(5).to_header_response()],
);

assert!(matches!(
rx.await,
Expand Down Expand Up @@ -562,7 +567,11 @@ mod tests {
let (tx, rx) = oneshot::channel();

handler.on_send_request(&mut mock_req, HeaderRequest::with_origin(5, 2), tx);
mock_req.send_n_responses(&mut handler, 1, vec![gen_header_response(5)]);
mock_req.send_n_responses(
&mut handler,
1,
vec![gen_extended_header(5).to_header_response()],
);

assert!(matches!(
rx.await,
Expand Down Expand Up @@ -657,16 +666,36 @@ mod tests {

handler.on_send_request(&mut mock_req, HeaderRequest::with_origin(0, 1), tx);

let expected = gen_header_response(5);
let expected_header = expected.to_extended_header().unwrap();
let expected_header = gen_extended_header(5);
let expected = expected_header.to_header_response();

mock_req.send_n_responses(&mut handler, 1, vec![gen_header_response(3)]);
mock_req.send_n_responses(&mut handler, 2, vec![gen_header_response(4)]);
mock_req.send_n_responses(
&mut handler,
1,
vec![gen_extended_header(3).to_header_response()],
);
mock_req.send_n_responses(
&mut handler,
2,
vec![gen_extended_header(4).to_header_response()],
);
// this header also has height = 5 but has different hash
mock_req.send_n_responses(&mut handler, 1, vec![gen_header_response(5)]);
mock_req.send_n_responses(
&mut handler,
1,
vec![gen_extended_header(5).to_header_response()],
);
mock_req.send_n_responses(&mut handler, 2, vec![expected]);
mock_req.send_n_responses(&mut handler, 1, vec![gen_header_response(6)]);
mock_req.send_n_responses(&mut handler, 1, vec![gen_header_response(7)]);
mock_req.send_n_responses(
&mut handler,
1,
vec![gen_extended_header(6).to_header_response()],
);
mock_req.send_n_responses(
&mut handler,
1,
vec![gen_extended_header(7).to_header_response()],
);
mock_req.send_n_failures(&mut handler, 1, OutboundFailure::Timeout);
mock_req.send_n_failures(&mut handler, 1, OutboundFailure::ConnectionClosed);

Expand All @@ -686,15 +715,31 @@ mod tests {

handler.on_send_request(&mut mock_req, HeaderRequest::with_origin(0, 1), tx);

let expected = gen_header_response(5);
let expected_header = expected.to_extended_header().unwrap();
let expected_header = gen_extended_header(5);
let expected = expected_header.to_header_response();

// all headers have height = 5 but different hash
mock_req.send_n_responses(&mut handler, 1, vec![gen_header_response(5)]);
mock_req.send_n_responses(&mut handler, 2, vec![gen_header_response(5)]);
mock_req.send_n_responses(&mut handler, 1, vec![gen_header_response(5)]);
mock_req.send_n_responses(
&mut handler,
1,
vec![gen_extended_header(5).to_header_response()],
);
mock_req.send_n_responses(
&mut handler,
2,
vec![gen_extended_header(5).to_header_response()],
);
mock_req.send_n_responses(
&mut handler,
1,
vec![gen_extended_header(5).to_header_response()],
);
mock_req.send_n_responses(&mut handler, 4, vec![expected]);
mock_req.send_n_responses(&mut handler, 2, vec![gen_header_response(5)]);
mock_req.send_n_responses(
&mut handler,
2,
vec![gen_extended_header(5).to_header_response()],
);

let result = rx.await.unwrap().unwrap();
assert_eq!(result.len(), 1);
Expand All @@ -712,13 +757,17 @@ mod tests {

handler.on_send_request(&mut mock_req, HeaderRequest::with_origin(0, 1), tx);

let expected = gen_header_response(10);
let expected_header = expected.to_extended_header().unwrap();
let expected_header = gen_extended_header(10);
let expected = expected_header.to_header_response();

mock_req.send_n_responses(&mut handler, 1, vec![expected]);

for height in 1..10 {
mock_req.send_n_responses(&mut handler, 1, vec![gen_header_response(height)]);
mock_req.send_n_responses(
&mut handler,
1,
vec![gen_extended_header(height).to_header_response()],
);
}

let result = rx.await.unwrap().unwrap();
Expand All @@ -736,15 +785,22 @@ mod tests {

handler.on_send_request(&mut mock_req, HeaderRequest::with_origin(0, 1), tx);

let expected = gen_header_response(5);
let expected_header = expected.to_extended_header().unwrap();
let expected_header = gen_extended_header(5);
let expected = expected_header.to_header_response();

mock_req.send_n_responses(&mut handler, 1, vec![gen_header_response(5)]);
mock_req.send_n_responses(
&mut handler,
1,
vec![gen_extended_header(5).to_header_response()],
);
mock_req.send_n_responses(&mut handler, 2, vec![expected]);
mock_req.send_n_responses(
&mut handler,
7,
vec![gen_header_response(4), gen_header_response(5)],
vec![
gen_extended_header(4).to_header_response(),
gen_extended_header(5).to_header_response(),
],
);

let result = rx.await.unwrap().unwrap();
Expand Down Expand Up @@ -781,8 +837,8 @@ mod tests {

handler.on_send_request(&mut mock_req, HeaderRequest::with_origin(0, 1), tx);

let expected = gen_header_response(10);
let expected_header = expected.to_extended_header().unwrap();
let expected_header = gen_extended_header(10);
let expected = expected_header.to_header_response();

mock_req.send_n_responses(&mut handler, 1, vec![expected]);

Expand Down Expand Up @@ -870,42 +926,6 @@ mod tests {
}
}

fn gen_header_response(height: u64) -> HeaderResponse {
ExtendedHeader {
header: Header {
version: Version { block: 11, app: 1 },
chain_id: "private".to_string().try_into().unwrap(),
height: height.try_into().unwrap(),
time: Time::now(),
last_block_id: None,
last_commit_hash: Hash::default(),
data_hash: Hash::default(),
validators_hash: Hash::default(),
next_validators_hash: Hash::default(),
consensus_hash: Hash::default(),
app_hash: AppHash::default(),
last_results_hash: Hash::default(),
evidence_hash: Hash::default(),
proposer_address: tendermint::account::Id::new([0; 20]),
},
commit: Commit {
height: height.try_into().unwrap(),
block_id: Id {
hash: Hash::Sha256(rand::random()),
..Default::default()
},
..Default::default()
},
validator_set: ValidatorSet::new(Vec::new(), None),
dah: DataAvailabilityHeader {
row_roots: Vec::new(),
column_roots: Vec::new(),
hash: [0; 32],
},
}
.to_header_response()
}

fn gen_n_peers(n: usize) -> Vec<PeerId> {
(0..n).map(|_| PeerId::random()).collect()
}
Expand Down
Loading

0 comments on commit e38704a

Please sign in to comment.