Skip to content

Commit

Permalink
Implement ExchangeServerHandler (#72)
Browse files Browse the repository at this point in the history
* Implement ExchangeServerHandler

* less polling

* Fitter, happier, more productive associated types

* nicer looking code

* improvements

* unneeded loop

* fix moved test utils

* futures->channels

* fix wasm32, cleanup

* poll_fn everywhere

* Apply suggestions from Yiannis code review

Co-authored-by: Yiannis Marangos <[email protected]>

* Yiannis PR review

* check early

* code improvements

* fix and cleanup unittests

* missed change

* allocate exactly what we need

---------

Co-authored-by: Yiannis Marangos <[email protected]>
  • Loading branch information
fl0rek and oblique authored Oct 4, 2023
1 parent a7a37ad commit 521833f
Show file tree
Hide file tree
Showing 5 changed files with 404 additions and 30 deletions.
23 changes: 16 additions & 7 deletions node/src/exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,11 @@ const RESPONSE_SIZE_MAXIMUM: usize = 10 * 1024 * 1024;
/// Maximum length of the protobuf length delimiter in bytes
const PROTOBUF_MAX_LENGTH_DELIMITER_LEN: usize = 10;

type RequestType = HeaderRequest;
type ResponseType = Vec<HeaderResponse>;
type ReqRespBehaviour = request_response::Behaviour<HeaderCodec>;
type ReqRespEvent = request_response::Event<HeaderRequest, Vec<HeaderResponse>>;
type ReqRespMessage = request_response::Message<HeaderRequest, Vec<HeaderResponse>>;
type ReqRespEvent = request_response::Event<RequestType, ResponseType>;
type ReqRespMessage = request_response::Message<RequestType, ResponseType>;

pub(crate) struct ExchangeBehaviour<S>
where
Expand Down Expand Up @@ -251,13 +253,20 @@ where
cx: &mut Context<'_>,
params: &mut impl PollParameters,
) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
while let Poll::Ready(ev) = self.req_resp.poll(cx, params) {
if let Some(ev) = self.on_to_swarm(ev) {
return Poll::Ready(ev);
loop {
if let Poll::Ready(ev) = self.req_resp.poll(cx, params) {
if let Some(ev) = self.on_to_swarm(ev) {
return Poll::Ready(ev);
}

continue;
}
if self.server_handler.poll(cx, &mut self.req_resp).is_ready() {
continue;
}
}

Poll::Pending
return Poll::Pending;
}
}
}

Expand Down
10 changes: 5 additions & 5 deletions node/src/exchange/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::utils::{OneshotResultSender, OneshotResultSenderExt};

pub(super) struct ExchangeClientHandler<S = ReqRespBehaviour>
where
S: Sender,
S: RequestSender,
{
reqs: HashMap<S::RequestId, State>,
peer_tracker: Arc<PeerTracker>,
Expand All @@ -33,13 +33,13 @@ struct State {
respond_to: OneshotResultSender<Vec<ExtendedHeader>, P2pError>,
}

pub(super) trait Sender {
pub(super) trait RequestSender {
type RequestId: Hash + Eq + Debug;

fn send_request(&mut self, peer: &PeerId, request: HeaderRequest) -> Self::RequestId;
}

impl Sender for ReqRespBehaviour {
impl RequestSender for ReqRespBehaviour {
type RequestId = RequestId;

fn send_request(&mut self, peer: &PeerId, request: HeaderRequest) -> RequestId {
Expand All @@ -49,7 +49,7 @@ impl Sender for ReqRespBehaviour {

impl<S> ExchangeClientHandler<S>
where
S: Sender,
S: RequestSender,
{
pub(super) fn new(peer_tracker: Arc<PeerTracker>) -> Self {
ExchangeClientHandler {
Expand Down Expand Up @@ -1004,7 +1004,7 @@ mod tests {
peer: PeerId,
}

impl Sender for MockReq {
impl RequestSender for MockReq {
type RequestId = MockReqId;

fn send_request(&mut self, peer: &PeerId, _request: HeaderRequest) -> Self::RequestId {
Expand Down
Loading

0 comments on commit 521833f

Please sign in to comment.