From ac8c77e273ec5f3729ad92b68458c8094c814f47 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Florkiewicz?= Date: Thu, 11 Apr 2024 09:30:08 +0200 Subject: [PATCH] Type safe finish --- Cargo.lock | 1 + node-wasm/Cargo.toml | 9 +- node-wasm/src/node.rs | 18 +- node-wasm/src/utils.rs | 8 +- node-wasm/src/worker.rs | 457 ++++++++++++++++++++-------------------- 5 files changed, 257 insertions(+), 236 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c64f49ec..c2a21fd4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2882,6 +2882,7 @@ dependencies = [ "anyhow", "celestia-types", "console_error_panic_hook", + "futures", "gloo-timers 0.3.0", "instant", "js-sys", diff --git a/node-wasm/Cargo.toml b/node-wasm/Cargo.toml index e8b4462d..9e5d32ee 100644 --- a/node-wasm/Cargo.toml +++ b/node-wasm/Cargo.toml @@ -29,17 +29,18 @@ lumina-node = { workspace = true } anyhow = "1.0.71" console_error_panic_hook = "0.1.7" +futures = "0.3" +gloo-timers = "0.3" +instant = "0.1" js-sys = "0.3.64" serde = { version = "1.0.164", features = ["derive"] } -serde_repr = "0.1" serde-wasm-bindgen = "0.6.0" +serde_repr = "0.1" time = { version = "0.3", features = ["wasm-bindgen"] } +tokio = { version = "*", features = ["sync"]} tracing = "0.1.37" tracing-subscriber = { version = "0.3.18", features = ["time"] } tracing-web = "0.1.2" wasm-bindgen = "0.2.88" wasm-bindgen-futures = "0.4.37" web-sys = { version = "0.3.69", features = ["BroadcastChannel", "MessageEvent", "Worker", "WorkerOptions", "WorkerType", "SharedWorker", "MessagePort", "SharedWorkerGlobalScope"]} -tokio = { version = "*", features = ["sync"]} -gloo-timers = "0.3" -instant = "0.1" diff --git a/node-wasm/src/node.rs b/node-wasm/src/node.rs index 3c2ebbad..8433b521 100644 --- a/node-wasm/src/node.rs +++ b/node-wasm/src/node.rs @@ -20,7 +20,6 @@ use crate::utils::js_value_from_display; use crate::utils::BChannel; use crate::utils::JsContext; use crate::utils::Network; -use crate::utils::NodeCommandType; use crate::worker::{MultipleHeaderQuery, NodeCommand, NodeResponse, SingleHeaderQuery}; use crate::wrapper::libp2p::NetworkInfoSnapshot; use crate::Result; @@ -172,7 +171,14 @@ impl NodeDriver { let command = RequestMultipleHeaders(MultipleHeaderQuery::GetVerified { from, amount }); let response = self.channel.send(command); - Ok(response.await.unwrap()) + let result = response + .await + .unwrap() + .iter() + .map(|h| to_value(&h).unwrap()) // XXX + .collect(); + + Ok(result) } pub async fn syncer_info(&mut self) -> Result { let response = self.channel.send(GetSyncerInfo); @@ -217,8 +223,14 @@ impl NodeDriver { end_height, }); let response = self.channel.send(command); + let result = response + .await + .unwrap() + .iter() + .map(|h| to_value(&h).unwrap()) + .collect(); - Ok(response.await.unwrap()) + Ok(result) } pub async fn get_sampling_metadata(&mut self, height: u64) -> Result { let command = GetSamplingMetadata { height }; diff --git a/node-wasm/src/utils.rs b/node-wasm/src/utils.rs index ba420df3..2008f4ed 100644 --- a/node-wasm/src/utils.rs +++ b/node-wasm/src/utils.rs @@ -14,7 +14,6 @@ use tracing_web::{performance_layer, MakeConsoleWriter}; use wasm_bindgen::prelude::*; use crate::worker::NodeCommand; -use crate::worker::NodeResponse; use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; use serde_wasm_bindgen::from_value; @@ -27,10 +26,9 @@ use web_sys::MessagePort; use web_sys::SharedWorker; use web_sys::SharedWorkerGlobalScope; -pub type CommandResponseChannel = oneshot::Sender; - +pub type CommandResponseChannel = oneshot::Sender<::Output>; #[derive(Serialize, Deserialize, Debug)] -pub struct NodeCommandResponse(T::Output) +pub struct NodeCommandResponse(pub T::Output) where T: NodeCommandType, T::Output: Debug + Serialize; @@ -54,8 +52,6 @@ impl NodeCommandSender { pub trait NodeCommandType: Debug + Into { type Output; - - //fn response(&self, output: Self::Output) -> NodeResponse; } pub struct BChannel { diff --git a/node-wasm/src/worker.rs b/node-wasm/src/worker.rs index 127ed312..3a1ae326 100644 --- a/node-wasm/src/worker.rs +++ b/node-wasm/src/worker.rs @@ -1,37 +1,26 @@ use std::fmt::Debug; +use futures::future::{BoxFuture, FutureExt, TryFutureExt}; +use instant::Instant; +use libp2p::Multiaddr; use serde::{Deserialize, Serialize}; +use serde_wasm_bindgen::{from_value, to_value}; +use tokio::sync::{mpsc, oneshot}; use tracing::{info, trace, warn}; use wasm_bindgen::prelude::*; +use wasm_bindgen_futures::spawn_local; +use web_sys::{MessageEvent, MessagePort, SharedWorker}; use celestia_types::hash::Hash; use celestia_types::ExtendedHeader; -use instant::Instant; -use js_sys::Array; -use serde_wasm_bindgen::{from_value, to_value}; -use tokio::sync::mpsc; -use wasm_bindgen_futures::spawn_local; -use web_sys::MessageEvent; -use web_sys::MessagePort; -use web_sys::SharedWorker; - use lumina_node::node::Node; -use lumina_node::store::{IndexedDbStore, Store}; +use lumina_node::peer_tracker::PeerTrackerInfo; +use lumina_node::store::{IndexedDbStore, SamplingMetadata, Store}; use lumina_node::syncer::SyncingInfo; use crate::node::WasmNodeConfig; -use crate::utils::js_value_from_display; -use crate::utils::CommandResponseChannel; -use crate::utils::NodeCommandResponse; -use crate::utils::NodeCommandType; -use crate::utils::WorkerSelf; -//use crate::worker::NodeResponse; +use crate::utils::{CommandResponseChannel, NodeCommandResponse, NodeCommandType, WorkerSelf}; use crate::wrapper::libp2p::NetworkInfoSnapshot; -use crate::Result; -use libp2p::Multiaddr; -use lumina_node::peer_tracker::PeerTrackerInfo; -use lumina_node::store::SamplingMetadata; -use tokio::sync::oneshot; #[derive(Debug, Serialize, Deserialize)] pub struct IsRunning; @@ -45,7 +34,7 @@ impl From for NodeCommand { } #[derive(Debug, Serialize, Deserialize)] -enum NodeState { +pub enum NodeState { NodeStopped, NodeStarted, AlreadyRunning(u64), @@ -156,8 +145,8 @@ impl From for NodeCommand { #[derive(Debug, Serialize, Deserialize)] pub struct SetPeerTrust { - peer_id: String, - is_trusted: bool, + pub peer_id: String, + pub is_trusted: bool, } impl NodeCommandType for SetPeerTrust { type Output = (); @@ -170,7 +159,7 @@ impl From for NodeCommand { #[derive(Debug, Serialize, Deserialize)] pub struct WaitConnected { - trusted: bool, + pub trusted: bool, } impl NodeCommandType for WaitConnected { type Output = (); @@ -206,7 +195,7 @@ impl From for NodeCommand { #[derive(Debug, Serialize, Deserialize)] pub struct RequestMultipleHeaders(pub MultipleHeaderQuery); impl NodeCommandType for RequestMultipleHeaders { - type Output = Array; + type Output = Vec; } impl From for NodeCommand { fn from(command: RequestMultipleHeaders) -> NodeCommand { @@ -228,7 +217,7 @@ impl From for NodeCommand { #[derive(Debug, Serialize, Deserialize)] pub struct GetMultipleHeaders(pub MultipleHeaderQuery); impl NodeCommandType for GetMultipleHeaders { - type Output = Array; + type Output = Vec; } impl From for NodeCommand { fn from(command: GetMultipleHeaders) -> NodeCommand { @@ -259,10 +248,7 @@ pub enum NodeCommand { GetPeerTrackerInfo(GetPeerTrackerInfo), GetNetworkInfo(GetNetworkInfo), GetConnectedPeers(GetConnectedPeers), - //GetNetworkHeadHeader, - //GetLocalHeadHeader, SetPeerTrust(SetPeerTrust), - //RequestHeadHeader(RequestHeadHeader), WaitConnected(WaitConnected), GetListeners(GetListeners), RequestHeader(RequestHeader), @@ -272,6 +258,191 @@ pub enum NodeCommand { GetSamplingMetadata(GetSamplingMetadata), } +#[derive(Serialize, Deserialize, Debug)] +pub enum SingleHeaderQuery { + Head, + ByHash(Hash), + ByHeight(u64), +} + +#[derive(Serialize, Deserialize, Debug)] +pub enum MultipleHeaderQuery { + GetVerified { + #[serde(with = "serde_wasm_bindgen::preserve")] + from: JsValue, + amount: u64, + }, + Range { + start_height: Option, + end_height: Option, + }, +} + +impl NodeCommand { + fn add_response_channel( + self, + ) -> ( + NodeCommandWithChannel, + BoxFuture<'static, Result>, // XXX + // type cleanup + ) { + match self { + NodeCommand::IsRunning(cmd) => { + let (tx, rx) = oneshot::channel(); + ( + NodeCommandWithChannel::IsRunning((cmd, tx)), + rx.map_ok(|r| NodeResponse::IsRunning(NodeCommandResponse::(r))) + .boxed(), + ) + } + NodeCommand::StartNode(cmd) => { + let (tx, rx) = oneshot::channel(); + ( + NodeCommandWithChannel::StartNode((cmd, tx)), + rx.map_ok(|r| NodeResponse::StartNode(NodeCommandResponse::(r))) + .boxed(), + ) + } + NodeCommand::GetLocalPeerId(cmd) => { + let (tx, rx) = oneshot::channel(); + ( + NodeCommandWithChannel::GetLocalPeerId((cmd, tx)), + rx.map_ok(|r| { + NodeResponse::GetLocalPeerId(NodeCommandResponse::(r)) + }) + .boxed(), + ) + } + NodeCommand::GetSyncerInfo(cmd) => { + let (tx, rx) = oneshot::channel(); + ( + NodeCommandWithChannel::GetSyncerInfo((cmd, tx)), + rx.map_ok(|r| { + NodeResponse::GetSyncerInfo(NodeCommandResponse::(r)) + }) + .boxed(), + ) + } + NodeCommand::GetPeerTrackerInfo(cmd) => { + let (tx, rx) = oneshot::channel(); + ( + NodeCommandWithChannel::GetPeerTrackerInfo((cmd, tx)), + rx.map_ok(|r| { + NodeResponse::GetPeerTrackerInfo(NodeCommandResponse::( + r, + )) + }) + .boxed(), + ) + } + NodeCommand::GetNetworkInfo(cmd) => { + let (tx, rx) = oneshot::channel(); + ( + NodeCommandWithChannel::GetNetworkInfo((cmd, tx)), + rx.map_ok(|r| { + NodeResponse::GetNetworkInfo(NodeCommandResponse::(r)) + }) + .boxed(), + ) + } + NodeCommand::GetConnectedPeers(cmd) => { + let (tx, rx) = oneshot::channel(); + ( + NodeCommandWithChannel::GetConnectedPeers((cmd, tx)), + rx.map_ok(|r| { + NodeResponse::GetConnectedPeers(NodeCommandResponse::(r)) + }) + .boxed(), + ) + } + NodeCommand::SetPeerTrust(cmd) => { + let (tx, rx) = oneshot::channel(); + ( + NodeCommandWithChannel::SetPeerTrust((cmd, tx)), + rx.map_ok(|r| { + NodeResponse::SetPeerTrust(NodeCommandResponse::(r)) + }) + .boxed(), + ) + } + NodeCommand::WaitConnected(cmd) => { + let (tx, rx) = oneshot::channel(); + ( + NodeCommandWithChannel::WaitConnected((cmd, tx)), + rx.map_ok(|r| { + NodeResponse::WaitConnected(NodeCommandResponse::(r)) + }) + .boxed(), + ) + } + NodeCommand::GetListeners(cmd) => { + let (tx, rx) = oneshot::channel(); + ( + NodeCommandWithChannel::GetListeners((cmd, tx)), + rx.map_ok(|r| { + NodeResponse::GetListeners(NodeCommandResponse::(r)) + }) + .boxed(), + ) + } + NodeCommand::RequestHeader(cmd) => { + let (tx, rx) = oneshot::channel(); + ( + NodeCommandWithChannel::RequestHeader((cmd, tx)), + rx.map_ok(|r| { + NodeResponse::RequestHeader(NodeCommandResponse::(r)) + }) + .boxed(), + ) + } + NodeCommand::RequestMultipleHeaders(cmd) => { + let (tx, rx) = oneshot::channel(); + ( + NodeCommandWithChannel::RequestMultipleHeaders((cmd, tx)), + rx.map_ok(|r| { + NodeResponse::RequestMultipleHeaders(NodeCommandResponse::< + RequestMultipleHeaders, + >(r)) + }) + .boxed(), + ) + } + NodeCommand::GetHeader(cmd) => { + let (tx, rx) = oneshot::channel(); + ( + NodeCommandWithChannel::GetHeader((cmd, tx)), + rx.map_ok(|r| NodeResponse::GetHeader(NodeCommandResponse::(r))) + .boxed(), + ) + } + NodeCommand::GetMultipleHeaders(cmd) => { + let (tx, rx) = oneshot::channel(); + ( + NodeCommandWithChannel::GetMultipleHeaders((cmd, tx)), + rx.map_ok(|r| { + NodeResponse::GetMultipleHeaders(NodeCommandResponse::( + r, + )) + }) + .boxed(), + ) + } + NodeCommand::GetSamplingMetadata(cmd) => { + let (tx, rx) = oneshot::channel(); + ( + NodeCommandWithChannel::GetSamplingMetadata((cmd, tx)), + rx.map_ok(|r| { + NodeResponse::GetSamplingMetadata( + NodeCommandResponse::(r), + ) + }) + .boxed(), + ) + } + } + } +} + #[derive(Debug)] pub enum NodeCommandWithChannel { IsRunning((IsRunning, CommandResponseChannel)), @@ -286,10 +457,7 @@ pub enum NodeCommandWithChannel { ), GetNetworkInfo((GetNetworkInfo, CommandResponseChannel)), GetConnectedPeers((GetConnectedPeers, CommandResponseChannel)), - //GetNetworkHeadHeader, - //GetLocalHeadHeader, SetPeerTrust((SetPeerTrust, CommandResponseChannel)), - //RequestHeadHeader((RequestHeadHeader, CommandResponseChannel)), WaitConnected((WaitConnected, CommandResponseChannel)), GetListeners((GetListeners, CommandResponseChannel)), RequestHeader((RequestHeader, CommandResponseChannel)), @@ -314,26 +482,6 @@ pub enum NodeCommandWithChannel { ), } -#[derive(Serialize, Deserialize, Debug)] -pub enum SingleHeaderQuery { - Head, - ByHash(Hash), - ByHeight(u64), -} - -#[derive(Serialize, Deserialize, Debug)] -pub enum MultipleHeaderQuery { - GetVerified { - #[serde(with = "serde_wasm_bindgen::preserve")] - from: JsValue, - amount: u64, - }, - Range { - start_height: Option, - end_height: Option, - }, -} - #[derive(Serialize, Deserialize, Debug)] pub enum NodeResponse { IsRunning(NodeCommandResponse), @@ -343,48 +491,16 @@ pub enum NodeResponse { GetPeerTrackerInfo(NodeCommandResponse), GetNetworkInfo(NodeCommandResponse), GetConnectedPeers(NodeCommandResponse), - //GetNetworkHeadHeader, - //GetLocalHeadHeader, SetPeerTrust(NodeCommandResponse), - //RequestHeadHeader(NodeCommandResponse), WaitConnected(NodeCommandResponse), GetListeners(NodeCommandResponse), RequestHeader(NodeCommandResponse), + RequestMultipleHeaders(NodeCommandResponse), GetHeader(NodeCommandResponse), + GetMultipleHeaders(NodeCommandResponse), GetSamplingMetadata(NodeCommandResponse), } -/* -#[derive(Serialize, Deserialize, Debug)] -pub enum NodeResponse { - Running(bool), - Started(u64), - LocalPeerId(String), - Connected(bool), - #[serde(with = "serde_wasm_bindgen::preserve")] - SyncerInfo(JsValue), - #[serde(with = "serde_wasm_bindgen::preserve")] - PeerTrackerInfo(JsValue), - NetworkInfo(NetworkInfoSnapshot), - #[serde(with = "serde_wasm_bindgen::preserve")] - ConnectedPeers(Array), - PeerTrust { - peer_id: String, - is_trusted: bool, - }, - #[serde(with = "serde_wasm_bindgen::preserve")] - Header(JsValue), - #[serde(with = "serde_wasm_bindgen::preserve")] - HeaderArray(Array), - #[serde(with = "serde_wasm_bindgen::preserve")] - VerifiedHeaders(Array), - #[serde(with = "serde_wasm_bindgen::preserve")] - SamplingMetadata(JsValue), - #[serde(with = "serde_wasm_bindgen::preserve")] - Listeners(Array), -} -*/ - struct NodeWorker { node: Node, start_timestamp: Instant, @@ -408,101 +524,6 @@ impl NodeWorker { } } - /* - fn local_peer_id(&self) -> String { - self.node.local_peer_id().to_string() - } - - fn peer_tracker_info(&self) -> Result { - Ok(to_value(&self.node.peer_tracker_info())?) - } - - async fn syncer_info(&self) -> Result { - Ok(to_value(&self.node.syncer_info().await?)?) - } - - async fn network_info(&self) -> Result { - Ok(self.node.network_info().await?.into()) - } - - async fn request_header_by_hash(&self, hash: Hash) -> Result { - Ok(to_value(&self.node.request_header_by_hash(&hash).await?)?) - } - - async fn request_header_by_height(&self, height: u64) -> Result { - Ok(to_value( - &self.node.request_header_by_height(height).await?, - )?) - } - - async fn get_header_by_hash(&self, hash: Hash) -> Result { - Ok(to_value(&self.node.get_header_by_hash(&hash).await?)?) - } - - async fn get_header_by_height(&self, height: u64) -> Result { - Ok(to_value(&self.node.get_header_by_height(height).await?)?) - } - - async fn get_headers( - &self, - start_height: Option, - end_height: Option, - ) -> Result { - let headers = match (start_height, end_height) { - (None, None) => self.node.get_headers(..).await, - (Some(start), None) => self.node.get_headers(start..).await, - (None, Some(end)) => self.node.get_headers(..=end).await, - (Some(start), Some(end)) => self.node.get_headers(start..=end).await, - }?; - - Ok(to_value(&headers)?.into()) - } - - async fn request_verified_headers(&self, from: ExtendedHeader, amount: u64) -> Result { - Ok(to_value(&self.node.request_verified_headers(&from, amount).await?)?.into()) - } - - async fn get_sampling_metadata(&self, height: u64) -> Result { - Ok(to_value(&self.node.get_sampling_metadata(height).await?)?) - } - - async fn set_peer_trust(&self, peer_id: String, is_trusted: bool) -> Result<()> { - Ok(self - .node - .set_peer_trust(peer_id.parse()?, is_trusted) - .await?) - } - - async fn connected_peers(&self) -> Result { - Ok(self - .node - .connected_peers() - .await? - .iter() - .map(js_value_from_display) - .collect()) - } - - async fn network_head_header(&self) -> Result { - Ok(to_value(&self.node.get_network_head_header())?) - } - - async fn wait_connected(&self, trusted: bool) { - if trusted { - self.node.wait_connected().await; - } else { - self.node.wait_connected_trusted().await; - } - } - - async fn local_head_header(&self) -> Result { - Ok(to_value(&self.node.get_local_head_header().await?)?) - } - - async fn request_head_header(&self) -> Result { - Ok(to_value(&self.node.request_head_header().await?)?) - } - */ async fn process_command(&mut self, command: NodeCommandWithChannel) { match command { // TODO: order @@ -538,12 +559,6 @@ impl NodeWorker { } NodeCommandWithChannel::GetConnectedPeers((_, response)) => { let connected_peers = self.node.connected_peers().await.expect("TODO"); - /* - .expect("TODO") - .iter() - .map(js_value_from_display) - .collect(); - */ response .send(connected_peers.iter().map(|id| id.to_string()).collect()) .expect("channel_dropped"); @@ -579,9 +594,9 @@ impl NodeWorker { NodeCommandWithChannel::WaitConnected((parameters, response)) => { // TODO: nonblocking on channels if parameters.trusted { - self.node.wait_connected().await; + let _ = self.node.wait_connected().await; } else { - self.node.wait_connected_trusted().await; + let _ = self.node.wait_connected_trusted().await; } response.send(()).expect("channel_dropped") } @@ -601,7 +616,6 @@ impl NodeWorker { .ok() .unwrap(), }; - //let jsvalue = to_value(&header).ok().unwrap(); response.send(header).expect("channel_dropped"); } NodeCommandWithChannel::RequestMultipleHeaders((command, response)) => { @@ -613,23 +627,10 @@ impl NodeWorker { .await .unwrap() } - MultipleHeaderQuery::Range { - start_height, - end_height, - } => todo!(), /* - * match (start_height, end_height) { - (None, None) => node.get_headers(..).await, - (Some(start), None) => node.get_headers(start..).await, - (None, Some(end)) => node.get_headers(..=end).await, - (Some(start), Some(end)) => node.get_headers(start..=end).await, - } - .ok() - .unwrap(), - */ + MultipleHeaderQuery::Range { .. } => unreachable!(), }; - let jsvalue = to_value(&headers).ok().unwrap().into(); // TODO: array fix? - response.send(jsvalue).expect("channel_dropped"); + response.send(headers).expect("channel_dropped"); } NodeCommandWithChannel::GetHeader((command, response)) => { let header = match command.0 { @@ -641,7 +642,6 @@ impl NodeWorker { self.node.get_header_by_height(height).await.ok().unwrap() } }; - //let jsvalue = to_value(&header).ok().unwrap(); response.send(header).expect("channel_dropped"); } NodeCommandWithChannel::GetMultipleHeaders((command, response)) => { @@ -666,8 +666,7 @@ impl NodeWorker { .unwrap(), }; - let jsvalue = to_value(&headers).ok().unwrap().into(); // TODO: array fix? - response.send(jsvalue).expect("channel_dropped"); + response.send(headers).expect("channel_dropped"); } NodeCommandWithChannel::GetSamplingMetadata((command, response)) => { let metadata = self @@ -683,11 +682,15 @@ impl NodeWorker { } } -//type WorkerChannel = BChannel; - enum WorkerMessage { NewConnection(MessagePort), Command(NodeCommandWithChannel), + ResponseChannel( + ( + ClientId, + BoxFuture<'static, Result>, + ), + ), } #[derive(Debug)] @@ -737,22 +740,26 @@ impl WorkerConnector { let local_tx = near_tx.clone(); spawn_local(async move { let message_data = ev.data(); - let data: NodeCommand = from_value(message_data).expect("could not from value"); - - let (tx, rx) = oneshot::channel(); - let command_with_channel = todo!(); + let Ok(node_command) = from_value::(message_data) else { + warn!("could not deserialize message from client {client_id}"); + return; + }; + let (command_with_channel, response_channel) = + node_command.add_response_channel(); local_tx .send(WorkerMessage::Command(command_with_channel)) .await .expect("send3 err"); - let response = rx.await.expect("forwardding channel error"); - let v = to_value(&response).expect("could not to_value"); - - self.ports[client_id] - .post_message(&v) - .expect("error posting"); + // TODO: something cleaner? + local_tx + .send(WorkerMessage::ResponseChannel(( + ClientId(client_id), + response_channel, + ))) + .await + .expect("send4 err"); }) }); port.set_onmessage(Some(client_message_callback.as_ref().unchecked_ref())); @@ -803,8 +810,12 @@ pub async fn run_worker(queued_connections: Vec) { }; trace!("received: {command_with_channel:?}"); - let response = worker.process_command(command_with_channel).await; - //connector.respond_to(client, response); + worker.process_command(command_with_channel).await; + } + WorkerMessage::ResponseChannel((client_id, channel)) => { + // XXX: properly + let response = channel.await.expect("forwardding channel error"); + connector.respond_to(client_id, response); } } }