You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I am trying to implement a basic server push scenario where the server makes multiple push requests. The client is first expected to process pushed requests and only then await for the main response. What I noticed is that when I simulate a delay on server side between sending push requests and the main response, push_promise() seems to be stuck on client side after receiving all of the promises. It looks like receiving the main response does not wake up the waiter.
However, when I remove the delay or make the client firstly await for the main response, and only then call push_promise(), everything works fine.
Firstly, my question is - Is server push in h2 expected to be used this way? I was not able to find any documentation about this or at least a basic usage example. If yes, is this a bug or I am misusing the library somewhere?
Code example:
Client:
#[tokio::main]
pub async fn main() -> Result<(), Box<dyn Error>> {
// Establish TCP connection to the server.
let tcp = TcpStream::connect("127.0.0.1:5928").await?;
let (h2, connection) = client::handshake(tcp).await?;
tokio::spawn(async move {
connection.await.unwrap();
});
let mut h2 = h2.ready().await?;
// Prepare the HTTP request to send to the server.
let request = Request::builder()
.method(Method::GET)
.header("test_key", "test_value")
.uri("https://www.example.com/")
.body(())
.expect("Request error");
let (mut response, _) = h2.send_request(request.clone(), true).expect("Send request error");
// Awaiting for response consumes the object, so we need to get the stream of promises first.
let mut pushes = response.push_promises();
while let Some(push) = pushes.push_promise().await {
println!("Push {:?}", push);
let push_promise = push.unwrap();
let (request, pushed_response_future) = push_promise.into_parts();
println!("Pushed request: {:?}", request);
let (head_pushed, ref mut body_pushed) = pushed_response_future.await.unwrap().into_parts();
println!("Pushed data_head: {:?}", head_pushed);
while let Some(chunk) = body_pushed.data().await {
println!("Pushed data {:?}", chunk);
}
}
// We never get here, above while loop is stuck after the last promise was processed.
println!("Finished receiving pushed promises!");
let (head, mut body) = response.await?.into_parts();
println!("Received response: {:?}", head);
while let Some(chunk) = body.data().await {
println!("Main response data {:?}", chunk);
}
Ok(())
}
Server:
#[tokio::main]
pub async fn main() -> Result<(), Box<dyn Error>> {
let mut listener = TcpListener::bind("127.0.0.1:5928").await?;
println!("Listening on {:?}", listener.local_addr());
loop {
if let Ok((socket, _peer_addr)) = listener.accept().await {
socket.set_nodelay(true)?; // Disable Nagle's algorithm
tokio::spawn(async move {
if let Err(e) = handle(socket).await {
println!(" -> err={:?}", e);
}
});
}
}
}
async fn handle(socket: TcpStream) -> Result<(), Box<dyn Error>> {
let mut connection = server::handshake(socket).await?;
println!("H2 connection bound");
while let Some(result) = connection.accept().await {
tokio::spawn(async move {
let (request, mut respond) = result.unwrap();
println!("GOT request: {:?}", request);
for i in 0..50 {
send_push(i, &mut respond);
}
// Simulate a delay.
tokio::time::sleep(Duration::from_secs(5)).await;
println!("Sending response");
let response = Response::builder().status(StatusCode::OK).body(()).unwrap();
let mut send = respond.send_response(response, false).unwrap();
send.send_data("hello world".into(), true).unwrap();
println!("Sending response complete");
});
}
println!("~~~~~~~~~~~~~~~~~~~~~~~~~~~ H2 connection CLOSE !!!!!! ~~~~~~~~~~~");
Ok(())
}
fn send_push<B>(id: usize, send_response: &mut SendResponse<B>)
where
B: bytes::Buf + From<Bytes>
{
let uri = format!("http://www.example.com/{:?}", id);
let pushed_req = Request::builder()
.uri(uri)
.body(())
.unwrap();
let pushed_rsp = http::Response::builder().status(200).body(()).unwrap();
let mut send_pushed = send_response
.push_request(pushed_req)
.unwrap()
.send_response(pushed_rsp, false)
.unwrap();
send_pushed.send_data(Bytes::from("a").into(), false).unwrap();
send_pushed.send_data(Bytes::from("b").into(), false).unwrap();
send_pushed.send_data(Bytes::from("c").into(), true).unwrap();
}
The text was updated successfully, but these errors were encountered:
@seanmonstar Is this really a problem with control flow? I am sending just few bytes of data.
I managed to solve the problem by adding stream.notify_push() to Recv::recv_headers, just below stream.notify_recv(). It looks like the wake up for the poll_push_promise is missing from some paths.
Though I am not sure if this is supposed to be the wanted behavior, or the caller should explicitly wait for the main response before iterating through push promises.
I am trying to implement a basic server push scenario where the server makes multiple push requests. The client is first expected to process pushed requests and only then await for the main response. What I noticed is that when I simulate a delay on server side between sending push requests and the main response,
push_promise()
seems to be stuck on client side after receiving all of the promises. It looks like receiving the main response does not wake up the waiter.However, when I remove the delay or make the client firstly await for the main response, and only then call
push_promise()
, everything works fine.Firstly, my question is - Is server push in h2 expected to be used this way? I was not able to find any documentation about this or at least a basic usage example. If yes, is this a bug or I am misusing the library somewhere?
Code example:
Client:
Server:
The text was updated successfully, but these errors were encountered: