Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add listen timeouts to iroha cli #5241

Merged
merged 20 commits into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/iroha_cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ serde = { workspace = true }
serde_json = { workspace = true }
erased-serde = "0.4.5"
supports-color = { workspace = true }
tokio = { workspace = true, features = ["rt"] }
futures = { workspace = true }

[build-dependencies]
vergen = { version = "8.3.1", default-features = false }
Expand Down
110 changes: 88 additions & 22 deletions crates/iroha_cli/src/main.rs
aoyako marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,17 @@ use std::{
io::{stdin, stdout},
path::PathBuf,
str::FromStr,
time::Duration,
};

use erased_serde::Serialize;
use error_stack::{fmt::ColorMode, IntoReportCompat, ResultExt};
use eyre::{eyre, Error, Result, WrapErr};
use futures::TryStreamExt;
use iroha::{client::Client, config::Config, data_model::prelude::*};
use iroha_primitives::json::Json;
use thiserror::Error;
use tokio::runtime::Runtime;

/// Re-usable clap `--metadata <PATH>` (`-m`) argument.
/// Should be combined with `#[command(flatten)]` attr.
Expand Down Expand Up @@ -100,7 +103,6 @@ enum Subcommand {
#[clap(subcommand)]
Peer(peer::Args),
/// The subcommand related to event streaming
#[clap(subcommand)]
Events(events::Args),
/// The subcommand related to Wasm
Wasm(wasm::Args),
Expand Down Expand Up @@ -305,9 +307,18 @@ mod events {

use super::*;

#[derive(clap::Args, Debug, Clone, Copy)]
pub struct Args {
/// Wait timeout
#[clap(short, long, global = true)]
timeout: Option<humantime::Duration>,
aoyako marked this conversation as resolved.
Show resolved Hide resolved
#[clap(subcommand)]
command: Command,
}

/// Get event stream from Iroha peer
#[derive(clap::Subcommand, Debug, Clone, Copy)]
pub enum Args {
enum Command {
/// Gets block pipeline events
BlockPipeline,
/// Gets transaction pipeline events
Expand All @@ -322,24 +333,53 @@ mod events {

impl RunArgs for Args {
fn run(self, context: &mut dyn RunContext) -> Result<()> {
match self {
Args::TransactionPipeline => listen(TransactionEventFilter::default(), context),
Args::BlockPipeline => listen(BlockEventFilter::default(), context),
Args::Data => listen(DataEventFilter::Any, context),
Args::ExecuteTrigger => listen(ExecuteTriggerEventFilter::new(), context),
Args::TriggerCompleted => listen(TriggerCompletedEventFilter::new(), context),
let timeout: Option<Duration> = self.timeout.map(Into::into);

match self.command {
Command::TransactionPipeline => {
listen(TransactionEventFilter::default(), context, timeout)
}
Command::BlockPipeline => listen(BlockEventFilter::default(), context, timeout),
Command::Data => listen(DataEventFilter::Any, context, timeout),
Command::ExecuteTrigger => {
listen(ExecuteTriggerEventFilter::new(), context, timeout)
}
Command::TriggerCompleted => {
listen(TriggerCompletedEventFilter::new(), context, timeout)
}
}
}
}

fn listen(filter: impl Into<EventFilterBox>, context: &mut dyn RunContext) -> Result<()> {
fn listen(
filter: impl Into<EventFilterBox>,
context: &mut dyn RunContext,
timeout: Option<Duration>,
) -> Result<()> {
let filter = filter.into();
let client = context.client_from_config();
eprintln!("Listening to events with filter: {filter:?}");
client
.listen_for_events([filter])
.wrap_err("Failed to listen for events.")?
.try_for_each(|event| context.print_data(&event?))?;

if let Some(timeout) = timeout {
eprintln!("Listening to events with filter: {filter:?} and timeout: {timeout:?}");
let rt = Runtime::new().wrap_err("Failed to create runtime.")?;
rt.block_on(async {
let mut stream = client
.listen_for_events_async([filter])
.await
.expect("Failed to listen for events.");
while let Ok(event) = tokio::time::timeout(timeout, stream.try_next()).await {
context.print_data(&event?)?;
}
eprintln!("Timeout period has expired.");
Result::<()>::Ok(())
})?;
} else {
eprintln!("Listening to events with filter: {filter:?}");
client
.listen_for_events([filter])
.wrap_err("Failed to listen for events.")?
.try_for_each(|event| context.print_data(&event?))?;
}
Ok(())
}
}
Expand All @@ -354,22 +394,47 @@ mod blocks {
pub struct Args {
/// Block height from which to start streaming blocks
height: NonZeroU64,

/// Wait timeout
#[clap(short, long)]
timeout: Option<humantime::Duration>,
}

impl RunArgs for Args {
fn run(self, context: &mut dyn RunContext) -> Result<()> {
let Args { height } = self;
listen(height, context)
let Args { height, timeout } = self;
let timeout: Option<Duration> = timeout.map(Into::into);
listen(height, context, timeout)
}
}

fn listen(height: NonZeroU64, context: &mut dyn RunContext) -> Result<()> {
fn listen(
height: NonZeroU64,
context: &mut dyn RunContext,
timeout: Option<Duration>,
) -> Result<()> {
let client = context.client_from_config();
eprintln!("Listening to blocks from height: {height}");
client
.listen_for_blocks(height)
.wrap_err("Failed to listen for blocks.")?
.try_for_each(|event| context.print_data(&event?))?;
if let Some(timeout) = timeout {
eprintln!("Listening to blocks from height: {height} and timeout: {timeout:?}");
let rt = Runtime::new().wrap_err("Failed to create runtime.")?;
rt.block_on(async {
let mut stream = client
.listen_for_blocks_async(height)
.await
.expect("Failed to listen for blocks.");
while let Ok(event) = tokio::time::timeout(timeout, stream.try_next()).await {
context.print_data(&event?)?;
}
eprintln!("Timeout period has expired.");
Result::<()>::Ok(())
})?;
} else {
eprintln!("Listening to blocks from height: {height}");
client
.listen_for_blocks(height)
.wrap_err("Failed to listen for blocks.")?
.try_for_each(|event| context.print_data(&event?))?;
}
Ok(())
}
}
Expand Down Expand Up @@ -1377,6 +1442,7 @@ mod multisig {
Ok(())
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
Loading
Loading