Skip to content

Commit

Permalink
apply review
Browse files Browse the repository at this point in the history
this commit only purpose is to make review easy
will be squashed before merging
  • Loading branch information
lorbax committed Jul 22, 2024
1 parent bbfdc23 commit ac836f4
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 58 deletions.
2 changes: 1 addition & 1 deletion roles/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@
{
"type": "match_message_type",
"value": "0x1b",
"condition": "WaitUntil"
"condition": {"WaitUntil": 60}
}
],
"actiondoc": "Sends NewExtendedMiningJob and SetNewPrevHash and waits until a SubmitsShareExtended is submitted"
Expand Down
125 changes: 73 additions & 52 deletions utils/message-generator/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use binary_sv2::Serialize;
use codec_sv2::{StandardEitherFrame as EitherFrame, Sv2Frame};
use roles_logic_sv2::parsers::{self, AnyMessage};
use std::{collections::HashMap, convert::TryInto, sync::Arc};
use tokio::{pin, time::sleep};

use tracing::{debug, error, info};

Expand Down Expand Up @@ -200,67 +201,87 @@ impl Executor {
);

match result {
ActionResult::MatchMessageType(message_type, condition) => {
match condition {
None => {
let message = match recv.recv().await {
Ok(message) => message,
Err(_) => {
success = false;
error!("Connection closed before receiving the message");
break;
}
};

let message: Sv2Frame<AnyMessage<'static>, _> =
message.try_into().unwrap();
debug!("RECV {:#?}", message);
let header = message.get_header().unwrap();

if header.msg_type() != *message_type {
error!(
"WRONG MESSAGE TYPE expected: {} received: {}",
message_type,
header.msg_type()
);
ActionResult::MatchMessageType(message_type, condition) => match condition {
None => {
let message = match recv.recv().await {
Ok(message) => message,
Err(_) => {
success = false;
error!("Connection closed before receiving the message");
break;
} else {
info!("MATCHED MESSAGE TYPE {}", message_type);
}
};

let message: Sv2Frame<AnyMessage<'static>, _> =
message.try_into().unwrap();
debug!("RECV {:#?}", message);
let header = message.get_header().unwrap();

if header.msg_type() != *message_type {
error!(
"WRONG MESSAGE TYPE expected: {} received: {}",
message_type,
header.msg_type()
);
success = false;
break;
} else {
info!("MATCHED MESSAGE TYPE {}", message_type);
}
Some(condition_inner) => {
match condition_inner {
Condition::WaitUntil => loop {
let message = match recv.recv().await {
Ok(message) => message,
Err(_) => {
success = false;
error!("Connection closed before receiving the message");
}
Some(condition_inner) => {
match condition_inner {
Condition::WaitUntil(secs) => {
let duration = std::time::Duration::from_secs(*secs as u64);
let timer = sleep(duration);
pin!(timer);
let async_block = async {
loop {
let message = recv.recv().await;
let message = match message {
Ok(message) => message,
Err(_) => {
success = false;
error!("Connection closed before receiving the message");
break;
}
};
let message: Sv2Frame<AnyMessage<'static>, _> =
message.try_into().unwrap();
debug!("RECV {:#?}", message);
let header = match message.get_header() {
Some(header_) => header_,
None => {
error!("Failed to get message header");
success = false;
break;
}
};
if header.msg_type() != *message_type {
info!(
"RECEIVED {}, WAITING MESSAGE TYPE {}",
header.msg_type(),
message_type
);
continue;
} else {
break;
}
};

let message: Sv2Frame<AnyMessage<'static>, _> =
message.try_into().unwrap();
debug!("RECV {:#?}", message);
let header = message.get_header().unwrap();
if header.msg_type() != *message_type {
info!(
"RECEIVED {}, WAITING MESSAGE TYPE {}",
header.msg_type(),
message_type
);
continue;
} else {
info!("MATCHED WAITED MESSAGE TYPE {}", message_type);
}
};
tokio::select! {
_ = &mut timer => {
error!("Timer has elapsed before wanted message has arrived");
break;
},
_ = async_block => {
info!("MATCHED WAITED MESSAGE TYPE {}", message_type);
}
},
};
}
}
}
};
}
}
},
ActionResult::MatchMessageField((
subprotocol,
message_type,
Expand Down
5 changes: 3 additions & 2 deletions utils/message-generator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,15 +180,16 @@ pub struct SaveField {
keyword: String,
}

// enum as a placeholder for new potential conditions to be implemented in the future
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
pub enum Condition {
WaitUntil,
WaitUntil(u32), // timeout in seconds
}

impl Display for Condition {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
Condition::WaitUntil => write!(f, "WaitUntil"),
Condition::WaitUntil(secs) => write!(f, "WaitUntil for {} seconds", secs),
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions utils/message-generator/src/parser/actions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,10 @@ impl Sv2ActionParser {
serde_json::from_value::<Condition>(condition_inner.clone())
.unwrap();
match condition_ {
Condition::WaitUntil => {
Condition::WaitUntil(secs) => {
action_results.push(ActionResult::MatchMessageType(
message_type,
Some(Condition::WaitUntil),
Some(Condition::WaitUntil(secs)),
))
}
}
Expand Down

0 comments on commit ac836f4

Please sign in to comment.