diff --git a/conmon-rs/server/src/container_io.rs b/conmon-rs/server/src/container_io.rs index ff8f5c260f..fdcc53af7a 100644 --- a/conmon-rs/server/src/container_io.rs +++ b/conmon-rs/server/src/container_io.rs @@ -82,7 +82,7 @@ pub enum ContainerIOType { } /// A message to be sent through the ContainerIO. -#[derive(Clone, Debug)] +#[derive(Clone, Debug, PartialEq, Eq)] pub enum Message { Data(Vec), Done, diff --git a/conmon-rs/server/src/streams.rs b/conmon-rs/server/src/streams.rs index 5ffcd74f41..ffb845ae8a 100644 --- a/conmon-rs/server/src/streams.rs +++ b/conmon-rs/server/src/streams.rs @@ -6,17 +6,16 @@ use crate::{ container_log::SharedContainerLog, }; use anyhow::Result; -use getset::{Getters, MutGetters}; +use getset::Getters; use std::os::unix::io::AsRawFd; use tokio::{ process::{ChildStderr, ChildStdin, ChildStdout}, - sync::mpsc, + sync::mpsc::{self, UnboundedReceiver, UnboundedSender}, task, }; use tracing::{debug, debug_span, error, Instrument}; -#[derive(Debug, Getters, MutGetters)] -#[getset(get)] +#[derive(Debug, Getters)] pub struct Streams { #[getset(get = "pub")] logger: SharedContainerLog, @@ -24,17 +23,15 @@ pub struct Streams { #[getset(get = "pub")] attach: SharedContainerAttach, - #[getset(get = "pub")] - pub message_rx_stdout: mpsc::UnboundedReceiver, + pub message_rx_stdout: UnboundedReceiver, #[getset(get = "pub")] - message_tx_stdout: mpsc::UnboundedSender, + message_tx_stdout: UnboundedSender, - #[getset(get = "pub")] - pub message_rx_stderr: mpsc::UnboundedReceiver, + pub message_rx_stderr: UnboundedReceiver, #[getset(get = "pub")] - message_tx_stderr: mpsc::UnboundedSender, + message_tx_stderr: UnboundedSender, } impl Streams { @@ -110,3 +107,64 @@ impl Streams { } } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::{attach::SharedContainerAttach, container_log::ContainerLog}; + use anyhow::{bail, Context}; + use std::{process::Stdio, str::from_utf8}; + use tokio::process::Command; + + fn msg_string(message: Message) -> Result { + match message { + Message::Data(v) => Ok(from_utf8(&v)?.into()), + _ => bail!("no data in message"), + } + } + + #[tokio::test(flavor = "multi_thread")] + async fn new_success() -> Result<()> { + let logger = ContainerLog::new(); + let attach = SharedContainerAttach::default(); + + let mut sut = Streams::new(logger, attach)?; + + let expected = "hello world"; + let mut child = Command::new("echo") + .arg("-n") + .arg(expected) + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn()?; + + sut.handle_stdio_receive(child.stdin.take(), child.stdout.take(), child.stderr.take()); + + let msg = sut + .message_rx_stdout + .recv() + .await + .context("no message on stdout")?; + + assert_eq!(msg_string(msg)?, expected); + + let msg = sut + .message_rx_stdout + .recv() + .await + .context("no message on stdout")?; + assert_eq!(msg, Message::Done); + assert!(sut.message_rx_stdout.try_recv().is_err()); + + let msg = sut + .message_rx_stderr + .recv() + .await + .context("no message on stderr")?; + assert_eq!(msg, Message::Done); + assert!(sut.message_rx_stderr.try_recv().is_err()); + + Ok(()) + } +}