Skip to content

Commit

Permalink
Merge pull request #64 from wasmCloud/fix/handle-rpc-in-task
Browse files Browse the repository at this point in the history
Moved RPC dispatch to tokio task
  • Loading branch information
brooksmtownsend authored Dec 23, 2021
2 parents 4d6b70b + 68de5cb commit e816815
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 62 deletions.
2 changes: 1 addition & 1 deletion rpc-rs/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "wasmbus-rpc"
version = "0.6.0"
version = "0.6.1"
authors = [ "wasmcloud Team" ]
license = "Apache-2.0"
description = "Runtime library for actors and capability providers"
Expand Down
152 changes: 91 additions & 61 deletions rpc-rs/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::{
HealthCheckRequest, HealthCheckResponse, HostData, Invocation, InvocationResponse,
LinkDefinition,
},
Message, MessageDispatch, RpcError,
Message, MessageDispatch, RpcClient, RpcError,
};
use async_trait::async_trait;
use futures::{future::JoinAll, StreamExt};
Expand Down Expand Up @@ -283,83 +283,90 @@ impl HostBridge {
self.add_subscription(sid).await;
pin_mut!(sub);
let provider = provider.clone();

while let Some(msg) = sub.next().await {
// parse and validate incoming message
let response = match crate::deserialize::<Invocation>(&msg.payload) {
match crate::deserialize::<Invocation>(&msg.payload) {
Ok(inv) => match self.validate_invocation(&inv).await {
Ok(()) => {
trace!(
"RPC Invocation: op:{} from:{}",
&inv.operation,
&inv.origin.public_key
);
match provider
.dispatch(
&crate::Context {
actor: Some(inv.origin.public_key.clone()),
..Default::default()
},
Message {
method: &inv.operation,
arg: Cow::from(inv.msg),
},
)
.await
{
Ok(msg) => InvocationResponse {
invocation_id: inv.id,
error: None,
msg: msg.arg.to_vec(),
},
Err(e) => {
error!(
"RPC Invocation failed: op:{} from:{}: {}",
&inv.operation, &inv.origin.public_key, e
);
InvocationResponse {
let provider = provider.clone();
let rpc_client = self.rpc_client().clone();
tokio::task::spawn(async move {
trace!(
"RPC Invocation: op:{} from:{}",
&inv.operation,
&inv.origin.public_key
);
let response = match provider
.dispatch(
&crate::Context {
actor: Some(inv.origin.public_key.clone()),
..Default::default()
},
Message {
method: &inv.operation,
arg: Cow::from(inv.msg),
},
)
.await
{
Ok(msg) => InvocationResponse {
invocation_id: inv.id,
error: Some(e.to_string()),
msg: Vec::new(),
error: None,
msg: msg.arg.to_vec(),
},
Err(e) => {
error!(
"RPC Invocation failed: op:{} from:{}: {}",
&inv.operation, &inv.origin.public_key, e
);
InvocationResponse {
invocation_id: inv.id,
error: Some(e.to_string()),
msg: Vec::new(),
}
}
};
if let Some(reply_to) = msg.reply_to {
// Errors are published from inside the function, safe to ignore Result
let _ =
publish_invocation_response(&rpc_client, reply_to, response)
.await;
}
}
});
}
Err(s) => {
error!(
"Invocation validation failure: op:{} from:{} id:{} host:{}: {}",
&inv.operation, &inv.origin.public_key, &inv.id, &inv.host_id, &s
);
InvocationResponse {
invocation_id: inv.id,
error: Some(s),
msg: Vec::new(),
if let Some(reply_to) = msg.reply_to {
// Errors are published from inside the function, safe to ignore Result
let _ = publish_invocation_response(
self.rpc_client(),
reply_to,
InvocationResponse {
invocation_id: inv.id,
error: Some(s),
msg: Vec::new(),
},
)
.await;
}
}
},
Err(e) => {
error!("Invocation deserialization failure: {}", e.to_string());
InvocationResponse {
invocation_id: "invalid".to_string(),
error: Some(format!("Corrupt invocation: {}", e)),
msg: Vec::new(),
}
}
};
// return response
if let Some(reply_to) = msg.reply_to {
match crate::serialize(&response) {
Ok(t) => {
if let Err(e) = self.rpc_client().publish(&reply_to, &t).await {
error!(
"failed sending rpc response to {}: {}",
&reply_to,
e.to_string()
);
}
}
Err(e) => {
// extremely unlikely that InvocationResponse would fail to serialize
error!("failed serializing InvocationResponse: {}", e.to_string());
if let Some(reply_to) = msg.reply_to {
publish_invocation_response(
&self.rpc_client(),
reply_to,
InvocationResponse {
invocation_id: "invalid".to_string(),
error: Some(format!("Corrupt invocation: {}", e)),
msg: Vec::new(),
},
)
.await?
}
}
}
Expand Down Expand Up @@ -625,6 +632,29 @@ impl HostBridge {
}
}

async fn publish_invocation_response(
rpc_client: &RpcClient,
reply_to: String,
response: InvocationResponse,
) -> Result<(), String> {
match crate::serialize(&response) {
Ok(t) => {
if let Err(e) = rpc_client.publish(&reply_to, &t).await {
error!(
"failed sending rpc response to {}: {}",
&reply_to,
e.to_string()
);
}
}
Err(e) => {
// extremely unlikely that InvocationResponse would fail to serialize
error!("failed serializing InvocationResponse: {}", e.to_string());
}
}
Ok(())
}

pub struct ProviderTransport<'send> {
pub bridge: &'send HostBridge,
pub ld: &'send LinkDefinition,
Expand Down

0 comments on commit e816815

Please sign in to comment.