Skip to content

Commit

Permalink
multi-party comm channel
Browse files Browse the repository at this point in the history
  • Loading branch information
robinhundt committed May 15, 2024
1 parent 64a3fad commit 8ab1c3c
Show file tree
Hide file tree
Showing 18 changed files with 688 additions and 119 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion crates/seec-channel/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ async-stream = "0.3.5"
bytes = "1.6.0"
futures = "0.3.30"
pin-project = "1.1.5"
serde = { version = "1.0.197" }
serde = { version = "1.0.197" , features = ["derive"]}
erased-serde = "0.4.4"
thiserror = "1.0.58"
tokio = { version = "1.36.0", features = ["macros", "net"] }
Expand All @@ -36,6 +36,7 @@ criterion = { version = "0.5.1", features = ["async_tokio"] }
serde = { version = "1.0.197", features = ["derive"] }
serde_json = "1.0.114"
tokio = { version = "1.36.0", features = ["rt-multi-thread", "time"] }
tracing-subscriber = { version = "0.3.18", features = ["env-filter"]}


[[bench]]
Expand Down
110 changes: 64 additions & 46 deletions crates/seec-channel/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
//! Channel abstraction for communication
use crate::util::{Counter, TrackingReader, TrackingWriter};
use async_trait::async_trait;

use remoc::rch::{base, mpsc};
use remoc::{codec, ConnectError, RemoteSend};
use serde::{Deserialize, Serialize};

use tokio::io;
use tokio::io::{AsyncRead, AsyncWrite};
use tracing::debug;

pub use seec_channel_macros::sub_channels_for;

pub mod in_memory;
pub mod multi;
pub mod tcp;
pub mod tls;
pub mod util;
Expand All @@ -27,29 +31,35 @@ pub type Channel<T> = (Sender<T>, Receiver<T>);
pub struct SyncMsg;

#[async_trait]
pub trait SenderT<T, E> {
async fn send(&mut self, item: T) -> Result<(), E>;
pub trait SenderT<T> {
type Error;
async fn send(&mut self, item: T) -> Result<(), Self::Error>;
}

#[async_trait]
pub trait ReceiverT<T, E> {
async fn recv(&mut self) -> Result<Option<T>, E>;
pub trait ReceiverT<T> {
type Error;
async fn recv(&mut self) -> Result<Option<T>, Self::Error>;
}

#[derive(thiserror::Error, Debug)]
pub enum CommunicationError {
#[error("Error sending initial value")]
BaseSend(base::SendErrorKind),
BaseSend(#[source] base::SendError<()>),
#[error("Error receiving value on base channel")]
BaseRecv(#[from] base::RecvError),
#[error("Error sending value on mpsc channel")]
Send(mpsc::SendError<()>),
Send(#[source] mpsc::SendError<()>),
#[error("Error receiving value on mpsc channel")]
Recv(#[from] mpsc::RecvError),
#[error("Error in Multi-Sender/Receiver")]
Multi(#[from] multi::Error),
#[error("Unexpected termination. Remote is closed.")]
RemoteClosed,
#[error("Received out of order message")]
UnexpectedMessage,
#[error("Unabel to establish multi-sub-channel with party {0}")]
MultiSubChannel(u32, #[source] Box<CommunicationError>),
}

pub fn channel<T: RemoteSend, const BUFFER: usize>(
Expand All @@ -65,62 +75,65 @@ pub fn channel<T: RemoteSend, const BUFFER: usize>(
}

#[tracing::instrument(skip_all)]
pub async fn sub_channel<Msg, SubMsg, SendErr, RecvErr>(
sender: &mut impl SenderT<Msg, SendErr>,
receiver: &mut impl ReceiverT<Msg, RecvErr>,
pub async fn sub_channel<S, R, Msg, SubMsg>(
sender: &mut S,
receiver: &mut R,
local_buffer: usize,
) -> Result<(Sender<SubMsg>, Receiver<SubMsg>), CommunicationError>
where
Receiver<SubMsg>: Into<Msg>,
Msg: Into<Option<Receiver<SubMsg>>> + RemoteSend,
S: SenderT<Msg>,
R: ReceiverT<Msg>,
Sender<SubMsg>: Into<Msg>,
Msg: Into<Option<Sender<SubMsg>>> + RemoteSend,
SubMsg: RemoteSend,
CommunicationError: From<SendErr> + From<RecvErr>,
CommunicationError: From<S::Error> + From<R::Error>,
{
tracing::debug!("Establishing new sub_channel");
let (sub_sender, remote_sub_receiver) = channel(local_buffer);
sender.send(remote_sub_receiver.into()).await?;
tracing::debug!("Sent remote_sub_receiver");
debug!("Establishing new sub_channel");
let (remote_sub_sender, sub_receiver) = channel(local_buffer);
sender.send(remote_sub_sender.into()).await?;
debug!("Sent remote_sub_receiver");
let msg = receiver
.recv()
.await?
.ok_or(CommunicationError::RemoteClosed)?;
let sub_receiver = msg.into().ok_or(CommunicationError::UnexpectedMessage)?;
tracing::debug!("Received sub_receiver");
let sub_sender = msg.into().ok_or(CommunicationError::UnexpectedMessage)?;
debug!("Received sub_receiver");
Ok((sub_sender, sub_receiver))
}

#[tracing::instrument(skip_all)]
pub async fn sub_channel_with<Msg, SubMsg, SendErr, RecvErr>(
sender: &mut impl SenderT<Msg, SendErr>,
receiver: &mut impl ReceiverT<Msg, RecvErr>,
pub async fn sub_channel_with<S, R, Msg, SubMsg>(
sender: &mut S,
receiver: &mut R,
local_buffer: usize,
wrap_fn: impl FnOnce(Receiver<SubMsg>) -> Msg,
extract_fn: impl FnOnce(Msg) -> Option<Receiver<SubMsg>>,
wrap_fn: impl FnOnce(Sender<SubMsg>) -> Msg,
extract_fn: impl FnOnce(Msg) -> Option<Sender<SubMsg>>,
) -> Result<(Sender<SubMsg>, Receiver<SubMsg>), CommunicationError>
where
S: SenderT<Msg>,
R: ReceiverT<Msg>,
Msg: RemoteSend,
SubMsg: RemoteSend,
CommunicationError: From<SendErr> + From<RecvErr>,
CommunicationError: From<S::Error> + From<R::Error>,
{
tracing::debug!("Establishing new sub_channel");
let (sub_sender, remote_sub_receiver) = channel(local_buffer);
sender.send(wrap_fn(remote_sub_receiver)).await?;
tracing::debug!("Sent remote_sub_receiver");
debug!("Establishing new sub_channel");
let (remote_sub_sender, sub_receiver) = channel(local_buffer);
sender.send(wrap_fn(remote_sub_sender)).await?;
debug!("Sent remote_sub_receiver");
let msg = receiver
.recv()
.await?
.ok_or(CommunicationError::RemoteClosed)?;
let sub_receiver = extract_fn(msg).ok_or(CommunicationError::UnexpectedMessage)?;
tracing::debug!("Received sub_receiver");
let sub_sender = extract_fn(msg).ok_or(CommunicationError::UnexpectedMessage)?;
debug!("Received sub_receiver");
Ok((sub_sender, sub_receiver))
}

pub async fn sync<SendErr, RecvErr>(
sender: &mut impl SenderT<SyncMsg, SendErr>,
receiver: &mut impl ReceiverT<SyncMsg, RecvErr>,
) -> Result<(), CommunicationError>
pub async fn sync<S, R>(sender: &mut S, receiver: &mut R) -> Result<(), CommunicationError>
where
CommunicationError: From<SendErr> + From<RecvErr>,
S: SenderT<SyncMsg>,
R: ReceiverT<SyncMsg>,
CommunicationError: From<S::Error> + From<R::Error>,
{
sender.send(SyncMsg).await?;
// ignore receiving a None
Expand All @@ -132,54 +145,56 @@ where
}

#[async_trait]
impl<T, Codec> SenderT<T, base::SendError<T>> for base::Sender<T, Codec>
impl<T, Codec> SenderT<T> for base::Sender<T, Codec>
where
T: RemoteSend,
Codec: codec::Codec,
{
async fn send(&mut self, item: T) -> Result<(), base::SendError<T>> {
type Error = base::SendError<T>;
async fn send(&mut self, item: T) -> Result<(), Self::Error> {
base::Sender::send(self, item).await
}
}

#[async_trait]
impl<T, Codec> ReceiverT<T, base::RecvError> for base::Receiver<T, Codec>
impl<T, Codec> ReceiverT<T> for base::Receiver<T, Codec>
where
T: RemoteSend,
Codec: codec::Codec,
{
async fn recv(&mut self) -> Result<Option<T>, base::RecvError> {
type Error = base::RecvError;
async fn recv(&mut self) -> Result<Option<T>, Self::Error> {
base::Receiver::recv(self).await
}
}

#[async_trait]
impl<T, Codec, const BUFFER: usize> SenderT<T, mpsc::SendError<T>>
for mpsc::Sender<T, Codec, BUFFER>
impl<T, Codec, const BUFFER: usize> SenderT<T> for mpsc::Sender<T, Codec, BUFFER>
where
T: RemoteSend,
Codec: codec::Codec,
{
async fn send(&mut self, item: T) -> Result<(), mpsc::SendError<T>> {
type Error = mpsc::SendError<T>;
async fn send(&mut self, item: T) -> Result<(), Self::Error> {
mpsc::Sender::send(self, item).await
}
}

#[async_trait]
impl<T, Codec, const BUFFER: usize> ReceiverT<T, mpsc::RecvError>
for mpsc::Receiver<T, Codec, BUFFER>
impl<T, Codec, const BUFFER: usize> ReceiverT<T> for mpsc::Receiver<T, Codec, BUFFER>
where
T: RemoteSend,
Codec: codec::Codec,
{
async fn recv(&mut self) -> Result<Option<T>, mpsc::RecvError> {
type Error = mpsc::RecvError;
async fn recv(&mut self) -> Result<Option<T>, Self::Error> {
mpsc::Receiver::recv(self).await
}
}

impl<T> From<base::SendError<T>> for CommunicationError {
fn from(err: base::SendError<T>) -> Self {
CommunicationError::BaseSend(err.kind)
CommunicationError::BaseSend(err.without_item())
}
}

Expand Down Expand Up @@ -216,7 +231,10 @@ where
8096,
)
.await?;

tokio::spawn(conn);

debug!("Established remoc connection");

Ok((tx, bytes_written, rx, bytes_read))
}
Loading

0 comments on commit 8ab1c3c

Please sign in to comment.