Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into embassy-sync-update
Browse files Browse the repository at this point in the history
  • Loading branch information
jamesmunns committed Jul 29, 2024
2 parents 8376c19 + 7a7f312 commit b712668
Show file tree
Hide file tree
Showing 11 changed files with 517 additions and 115 deletions.
2 changes: 1 addition & 1 deletion example/firmware/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions example/workbook-host/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions example/workbook-host/src/bin/comms-01.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,18 @@ use tokio::time::interval;
#[tokio::main]
pub async fn main() {
let client = WorkbookClient::new();

tokio::select! {
_ = client.wait_closed() => {
println!("Client is closed, exiting...");
}
_ = run(&client) => {
println!("App is done")
}
}
}

async fn run(client: &WorkbookClient) {
let mut ticker = interval(Duration::from_millis(250));

for i in 0..10 {
Expand Down
4 changes: 4 additions & 0 deletions example/workbook-host/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ impl WorkbookClient {
Self { client }
}

pub async fn wait_closed(&self) {
self.client.wait_closed().await;
}

pub async fn ping(&self, id: u32) -> Result<u32, WorkbookError<Infallible>> {
let val = self.client.send_resp::<PingEndpoint>(&id).await?;
Ok(val)
Expand Down
6 changes: 3 additions & 3 deletions source/postcard-rpc-test/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion source/postcard-rpc-test/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
// I'm just here so we can write integration tests

151 changes: 148 additions & 3 deletions source/postcard-rpc-test/tests/basic.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use std::{collections::HashMap, time::Duration};

use postcard::experimental::schema::Schema;
use postcard_rpc::test_utils::local_setup;
use postcard_rpc::{
endpoint, headered::to_stdvec_keyed, topic, Dispatch, Endpoint, Key, Topic, WireHeader,
};
use postcard_rpc::test_utils::local_setup;
use serde::{Deserialize, Serialize};
use tokio::task::yield_now;
use tokio::time::timeout;

endpoint!(EndpointOne, Req1, Resp1, "endpoint/one");
Expand Down Expand Up @@ -78,7 +79,7 @@ async fn smoke_reqresp() {
});

// As the wire, get the outgoing request
let out1 = srv.from_client.recv().await.unwrap();
let out1 = srv.recv_from_client().await.unwrap();

// Does the outgoing value match what we expect?
let exp_out = to_stdvec_keyed(0, EndpointOne::REQ_KEY, &Req1 { a: 10, b: 100 }).unwrap();
Expand Down Expand Up @@ -127,7 +128,7 @@ async fn smoke_publish() {
.unwrap();

// As the wire, get the outgoing request
let out1 = srv.from_client.recv().await.unwrap();
let out1 = srv.recv_from_client().await.unwrap();

// Does the outgoing value match what we expect?
let exp_out = to_stdvec_keyed(123, TopicOne::TOPIC_KEY, &Req1 { a: 10, b: 100 }).unwrap();
Expand Down Expand Up @@ -158,3 +159,147 @@ async fn smoke_subscribe() {

assert_eq!(publ, VAL);
}

#[tokio::test]
async fn smoke_io_error() {
let (mut srv, client) = local_setup::<WireError>(8, "error");

// Do one round trip to make sure the connection works
{
let rr_rt = tokio::task::spawn({
let client = client.clone();
async move {
client
.send_resp::<EndpointOne>(&Req1 { a: 10, b: 100 })
.await
}
});

// As the wire, get the outgoing request
let out1 = srv.recv_from_client().await.unwrap();

// Does the outgoing value match what we expect?
let exp_out = to_stdvec_keyed(0, EndpointOne::REQ_KEY, &Req1 { a: 10, b: 100 }).unwrap();
let act_out = out1.to_bytes();
assert_eq!(act_out, exp_out);

// The request is still awaiting a response
assert!(!rr_rt.is_finished());

// Feed a simulated response "from the wire" back to the
// awaiting request
const RESP_001: Resp1 = Resp1 {
c: [1, 2, 3, 4, 5, 6, 7, 8],
d: -10,
};
srv.reply::<EndpointOne>(out1.header.seq_no, &RESP_001)
.await
.unwrap();

// Now wait for the request to complete
let end = rr_rt.await.unwrap().unwrap();

// We got the simulated value back
assert_eq!(end, RESP_001);
}

// Now, simulate an I/O error
srv.cause_fatal_error();

// Give the clients some time to halt
yield_now().await;

// Our server channels should now be closed - the tasks hung up
assert!(srv.from_client.recv().await.is_none());
assert!(srv.to_client.send(Vec::new()).await.is_err());

// Try again, but nothing should work because all the worker tasks just halted
{
let rr_rt = tokio::task::spawn({
let client = client.clone();
async move {
client
.send_resp::<EndpointOne>(&Req1 { a: 10, b: 100 })
.await
}
});

// As the wire, get the outgoing request - didn't happen
assert!(srv.recv_from_client().await.is_err());

// Now wait for the request to complete - it failed
rr_rt.await.unwrap().unwrap_err();
}
}

#[tokio::test]
async fn smoke_closed() {
let (mut srv, client) = local_setup::<WireError>(8, "error");

// Do one round trip to make sure the connection works
{
let rr_rt = tokio::task::spawn({
let client = client.clone();
async move {
client
.send_resp::<EndpointOne>(&Req1 { a: 10, b: 100 })
.await
}
});

// As the wire, get the outgoing request
let out1 = srv.recv_from_client().await.unwrap();

// Does the outgoing value match what we expect?
let exp_out = to_stdvec_keyed(0, EndpointOne::REQ_KEY, &Req1 { a: 10, b: 100 }).unwrap();
let act_out = out1.to_bytes();
assert_eq!(act_out, exp_out);

// The request is still awaiting a response
assert!(!rr_rt.is_finished());

// Feed a simulated response "from the wire" back to the
// awaiting request
const RESP_001: Resp1 = Resp1 {
c: [1, 2, 3, 4, 5, 6, 7, 8],
d: -10,
};
srv.reply::<EndpointOne>(out1.header.seq_no, &RESP_001)
.await
.unwrap();

// Now wait for the request to complete
let end = rr_rt.await.unwrap().unwrap();

// We got the simulated value back
assert_eq!(end, RESP_001);
}

// Now, use the *client* to close the connection
client.close();

// Give the clients some time to halt
yield_now().await;

// Our server channels should now be closed - the tasks hung up
assert!(srv.from_client.recv().await.is_none());
assert!(srv.to_client.send(Vec::new()).await.is_err());

// Try again, but nothing should work because all the worker tasks just halted
{
let rr_rt = tokio::task::spawn({
let client = client.clone();
async move {
client
.send_resp::<EndpointOne>(&Req1 { a: 10, b: 100 })
.await
}
});

// As the wire, get the outgoing request - didn't happen
assert!(srv.recv_from_client().await.is_err());

// Now wait for the request to complete - it failed
rr_rt.await.unwrap().unwrap_err();
}
}
4 changes: 2 additions & 2 deletions source/postcard-rpc/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "postcard-rpc"
version = "0.5.1"
version = "0.5.3"
authors = ["James Munns <[email protected]>"]
edition = "2021"
repository = "https://github.com/jamesmunns/postcard-rpc"
Expand Down Expand Up @@ -43,7 +43,7 @@ version = "5.4.4"
optional = true

[dependencies.maitake-sync]
version = "0.1.0"
version = "0.1.2"
optional = true

[dependencies.tokio]
Expand Down
Loading

0 comments on commit b712668

Please sign in to comment.