diff --git a/rpc-rs/Cargo.toml b/rpc-rs/Cargo.toml index d8d1310..8eb833e 100644 --- a/rpc-rs/Cargo.toml +++ b/rpc-rs/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "wasmbus-rpc" -version = "0.6.0" +version = "0.6.1" authors = [ "wasmcloud Team" ] license = "Apache-2.0" description = "Runtime library for actors and capability providers" diff --git a/rpc-rs/src/provider.rs b/rpc-rs/src/provider.rs index 88f20ed..f1a2f02 100644 --- a/rpc-rs/src/provider.rs +++ b/rpc-rs/src/provider.rs @@ -8,7 +8,7 @@ use crate::{ HealthCheckRequest, HealthCheckResponse, HostData, Invocation, InvocationResponse, LinkDefinition, }, - Message, MessageDispatch, RpcError, + Message, MessageDispatch, RpcClient, RpcError, }; use async_trait::async_trait; use futures::{future::JoinAll, StreamExt}; @@ -283,83 +283,90 @@ impl HostBridge { self.add_subscription(sid).await; pin_mut!(sub); let provider = provider.clone(); + while let Some(msg) = sub.next().await { - // parse and validate incoming message - let response = match crate::deserialize::(&msg.payload) { + match crate::deserialize::(&msg.payload) { Ok(inv) => match self.validate_invocation(&inv).await { Ok(()) => { - trace!( - "RPC Invocation: op:{} from:{}", - &inv.operation, - &inv.origin.public_key - ); - match provider - .dispatch( - &crate::Context { - actor: Some(inv.origin.public_key.clone()), - ..Default::default() - }, - Message { - method: &inv.operation, - arg: Cow::from(inv.msg), - }, - ) - .await - { - Ok(msg) => InvocationResponse { - invocation_id: inv.id, - error: None, - msg: msg.arg.to_vec(), - }, - Err(e) => { - error!( - "RPC Invocation failed: op:{} from:{}: {}", - &inv.operation, &inv.origin.public_key, e - ); - InvocationResponse { + let provider = provider.clone(); + let rpc_client = self.rpc_client().clone(); + tokio::task::spawn(async move { + trace!( + "RPC Invocation: op:{} from:{}", + &inv.operation, + &inv.origin.public_key + ); + let response = match provider + .dispatch( + &crate::Context { + actor: Some(inv.origin.public_key.clone()), + ..Default::default() + }, + Message { + method: &inv.operation, + arg: Cow::from(inv.msg), + }, + ) + .await + { + Ok(msg) => InvocationResponse { invocation_id: inv.id, - error: Some(e.to_string()), - msg: Vec::new(), + error: None, + msg: msg.arg.to_vec(), + }, + Err(e) => { + error!( + "RPC Invocation failed: op:{} from:{}: {}", + &inv.operation, &inv.origin.public_key, e + ); + InvocationResponse { + invocation_id: inv.id, + error: Some(e.to_string()), + msg: Vec::new(), + } } + }; + if let Some(reply_to) = msg.reply_to { + // Errors are published from inside the function, safe to ignore Result + let _ = + publish_invocation_response(&rpc_client, reply_to, response) + .await; } - } + }); } Err(s) => { error!( "Invocation validation failure: op:{} from:{} id:{} host:{}: {}", &inv.operation, &inv.origin.public_key, &inv.id, &inv.host_id, &s ); - InvocationResponse { - invocation_id: inv.id, - error: Some(s), - msg: Vec::new(), + if let Some(reply_to) = msg.reply_to { + // Errors are published from inside the function, safe to ignore Result + let _ = publish_invocation_response( + self.rpc_client(), + reply_to, + InvocationResponse { + invocation_id: inv.id, + error: Some(s), + msg: Vec::new(), + }, + ) + .await; } } }, Err(e) => { error!("Invocation deserialization failure: {}", e.to_string()); - InvocationResponse { - invocation_id: "invalid".to_string(), - error: Some(format!("Corrupt invocation: {}", e)), - msg: Vec::new(), - } - } - }; - // return response - if let Some(reply_to) = msg.reply_to { - match crate::serialize(&response) { - Ok(t) => { - if let Err(e) = self.rpc_client().publish(&reply_to, &t).await { - error!( - "failed sending rpc response to {}: {}", - &reply_to, - e.to_string() - ); - } - } - Err(e) => { - // extremely unlikely that InvocationResponse would fail to serialize - error!("failed serializing InvocationResponse: {}", e.to_string()); + if let Some(reply_to) = msg.reply_to { + publish_invocation_response( + &self.rpc_client(), + reply_to, + InvocationResponse { + invocation_id: "invalid".to_string(), + error: Some(format!("Corrupt invocation: {}", e)), + msg: Vec::new(), + }, + ) + .await? } } } @@ -625,6 +632,29 @@ impl HostBridge { } } +async fn publish_invocation_response( + rpc_client: &RpcClient, + reply_to: String, + response: InvocationResponse, +) -> Result<(), String> { + match crate::serialize(&response) { + Ok(t) => { + if let Err(e) = rpc_client.publish(&reply_to, &t).await { + error!( + "failed sending rpc response to {}: {}", + &reply_to, + e.to_string() + ); + } + } + Err(e) => { + // extremely unlikely that InvocationResponse would fail to serialize + error!("failed serializing InvocationResponse: {}", e.to_string()); + } + } + Ok(()) +} + pub struct ProviderTransport<'send> { pub bridge: &'send HostBridge, pub ld: &'send LinkDefinition,