Skip to content

Commit

Permalink
Implement ExchangeServerHandler
Browse files Browse the repository at this point in the history
  • Loading branch information
fl0rek committed Sep 21, 2023
1 parent 9f1adfd commit 600c52d
Show file tree
Hide file tree
Showing 4 changed files with 372 additions and 21 deletions.
1 change: 1 addition & 0 deletions node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ license = "Apache-2.0"
[dependencies]
celestia-proto = { workspace = true }
celestia-types = { workspace = true }
tendermint = { workspace = true }
tendermint-proto = { workspace = true }

async-trait = "0.1.73"
Expand Down
15 changes: 12 additions & 3 deletions node/src/exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::io;
use std::sync::Arc;
use std::task::{Context, Poll};

use crate::exchange::request_response::ResponseChannel;
use async_trait::async_trait;
use celestia_proto::p2p::pb::{HeaderRequest, HeaderResponse};
use celestia_types::ExtendedHeader;
Expand Down Expand Up @@ -36,17 +37,19 @@ const RESPONSE_SIZE_MAXIMUM: usize = 10 * 1024 * 1024;
/// Maximum length of the protobuf length delimiter in bytes
const PROTOBUF_MAX_LENGTH_DELIMITER_LEN: usize = 10;

type RequestType = HeaderRequest;
type ResponseType = Vec<HeaderResponse>;
type ReqRespBehaviour = request_response::Behaviour<HeaderCodec>;
type ReqRespEvent = request_response::Event<HeaderRequest, Vec<HeaderResponse>>;
type ReqRespMessage = request_response::Message<HeaderRequest, Vec<HeaderResponse>>;
type ReqRespEvent = request_response::Event<RequestType, ResponseType>;
type ReqRespMessage = request_response::Message<RequestType, ResponseType>;

pub(crate) struct ExchangeBehaviour<S>
where
S: Store + 'static,
{
req_resp: ReqRespBehaviour,
client_handler: ExchangeClientHandler,
server_handler: ExchangeServerHandler<S>,
server_handler: ExchangeServerHandler<S, ResponseChannel<ResponseType>>,
}

pub(crate) struct ExchangeConfig<'a, S> {
Expand Down Expand Up @@ -232,6 +235,12 @@ where
}
}

while let Poll::Ready((channel, response)) = self.server_handler.poll(cx) {
// response was prepared specifically for the request, we can drop it
// in case of error we'll get Event::InboundFailure
self.req_resp.send_response(channel, response).ok();
}

Poll::Pending
}
}
Expand Down
Loading

0 comments on commit 600c52d

Please sign in to comment.