Skip to content

Commit

Permalink
f
Browse files Browse the repository at this point in the history
  • Loading branch information
scx1332 committed Mar 19, 2024
1 parent 0b2aaf0 commit cd30432
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 44 deletions.
73 changes: 46 additions & 27 deletions crates/erc20_payment_lib/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ impl PaymentRuntime {
fn start_service_loop(
&self,
signer_address: Address,
netr
chain_id: i64,
notify: Arc<Notify>,
extra_testing: Option<ExtraOptionsForTesting>,
options: AdditionalOptions,
Expand Down Expand Up @@ -463,6 +463,7 @@ impl PaymentRuntime {
} else {
service_loop(
shared_state_clone,
chain_id,
signer_address,
notify,
&conn,
Expand Down Expand Up @@ -570,31 +571,45 @@ impl PaymentRuntime {
}

fn get_and_remove_tasks(&self) -> Vec<JoinHandle<()>> {
self.shared_state
.lock()
.unwrap()
.accounts
.iter_mut()
.filter_map(|a| a.jh.lock().unwrap().take())
.collect()
let mut task_handles = Vec::new();
let mut lock_shared_state = self.shared_state.lock().unwrap();

//this shouldn't end in deadlock. It just extracts all handles and removes them from the lists
for account in lock_shared_state.accounts.iter_mut() {
for jh in account.jh.lock().unwrap().iter_mut() {
if let Some(jh) = jh.take() {
task_handles.push(jh);
}
}
}

task_handles
}

pub fn is_any_task_running(&self) -> bool {
self.shared_state.lock().unwrap().accounts.iter().any(|a| {
a.jh.lock()
.unwrap()
.as_ref()
.is_some_and(|jh| !jh.is_finished())
})
let lock_shared_state = self.shared_state.lock().unwrap();

for account in lock_shared_state.accounts.iter() {
for jh in account.jh.lock().unwrap().iter().flatten() {
if !jh.is_finished() {
return true;
}
}
}
false
}

pub fn is_any_task_finished(&self) -> bool {
self.shared_state.lock().unwrap().accounts.iter().any(|a| {
a.jh.lock()
.unwrap()
.as_ref()
.is_some_and(|jh| jh.is_finished())
})
let lock_shared_state = self.shared_state.lock().unwrap();

for account in lock_shared_state.accounts.iter() {
for jh in account.jh.lock().unwrap().iter().flatten() {
if jh.is_finished() {
return true;
}
}
}
false
}

pub async fn join_tasks(&self) -> Result<(), JoinError> {
Expand Down Expand Up @@ -635,13 +650,17 @@ impl PaymentRuntime {
log::error!("Account already added: {}", payment_account);
return false;
}
let jh = self.start_service_loop(
payment_account.address,
self.wake.clone(),
extra_testing,
options,
);
*payment_account.jh.lock().unwrap() = Some(jh);
for chain_id in self.chains() {
let jh = self.start_service_loop(
payment_account.address,
chain_id,
self.wake.clone(),
extra_testing.clone(),
options.clone(),
);
payment_account.jh.lock().as_mut().unwrap().push(Some(jh));

}
sh.accounts.push(payment_account);

true
Expand Down
24 changes: 13 additions & 11 deletions crates/erc20_payment_lib/src/sender/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ pub async fn update_tx_result(

pub async fn process_transactions(
signer_account: &SignerAccount,
chain_id: i64,
event_sender: Option<tokio::sync::mpsc::Sender<DriverEvent>>,
shared_state: Arc<std::sync::Mutex<SharedState>>,
conn: &SqlitePool,
Expand All @@ -265,12 +266,11 @@ pub async fn process_transactions(

let mut current_wait_time_no_gas_token: f64 = 0.0;
loop {
let mut transactions =
get_next_transactions_to_process(conn, Some(signer_account.address), 1)
.await
.map_err(err_from!())?;
let mut trans_option = get_next_transaction(conn, chain_id, &format!("{:#x}", signer_account.address))
.await
.map_err(err_from!())?;

let Some(tx) = transactions.get_mut(0) else {
let Some(tx) = trans_option.as_mut() else {
log::debug!("No transactions to process, breaking from loop");
break;
};
Expand Down Expand Up @@ -467,6 +467,7 @@ async fn sleep_for_gather_time_or_report_alive(

pub async fn service_loop(
shared_state: Arc<std::sync::Mutex<SharedState>>,
chain_id: i64,
account: Address,
wake: Arc<tokio::sync::Notify>,
conn: &SqlitePool,
Expand All @@ -487,12 +488,12 @@ pub async fn service_loop(
let metric_label_gather_post = "erc20_payment_lib.service_loop.gather_post";
let metric_label_gather_post_error = "erc20_payment_lib.service_loop.gather_post_error";
//let metric_label_loop_duration = "erc20_payment_lib.service_loop.loop_duration";
metrics::counter!(metric_label_start, 0);
metrics::counter!(metric_label_process_allowance, 0);
metrics::counter!(metric_label_gather_pre, 0);
metrics::counter!(metric_label_gather_pre_error, 0);
metrics::counter!(metric_label_gather_post, 0);
metrics::counter!(metric_label_gather_post_error, 0);
metrics::counter!(metric_label_start, 0, "chain_id" => chain_id.to_string());
metrics::counter!(metric_label_process_allowance, 0, "chain_id" => chain_id.to_string());
metrics::counter!(metric_label_gather_pre, 0, "chain_id" => chain_id.to_string());
metrics::counter!(metric_label_gather_pre_error, 0, "chain_id" => chain_id.to_string());
metrics::counter!(metric_label_gather_post, 0, "chain_id" => chain_id.to_string());
metrics::counter!(metric_label_gather_post_error, 0, "chain_id" => chain_id.to_string());

let mut process_tx_needed;
let mut last_stats_time: Option<Instant> = None;
Expand Down Expand Up @@ -525,6 +526,7 @@ pub async fn service_loop(
log::warn!("Skipping processing transactions...");
} else if let Err(e) = process_transactions(
&signer_account,
chain_id,
event_sender.clone(),
shared_state.clone(),
conn,
Expand Down
15 changes: 9 additions & 6 deletions crates/erc20_payment_lib/src/signer/account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pub struct SignerAccount {
pub signer: Arc<Box<dyn Signer + Send + Sync>>,
pub(crate) external_gather_time: Arc<Mutex<Option<DateTime<Utc>>>>,
#[serde(skip)]
pub(crate) jh: Arc<Mutex<Option<JoinHandle<()>>>>,
pub(crate) jh: Arc<Mutex<Vec<Option<JoinHandle<()>>>>>,
}

impl Debug for SignerAccount {
Expand All @@ -38,17 +38,20 @@ impl SignerAccount {
address,
signer,
external_gather_time: Arc::new(Mutex::new(None)),
jh: Arc::new(Mutex::new(None)),
jh: Arc::new(Mutex::new(Vec::new())),
}
}

pub fn is_active(&self) -> bool {
let jh_guard = self.jh.lock().unwrap();
if let Some(jh_guard) = jh_guard.as_ref() {
!jh_guard.is_finished()
} else {
false
for jh in jh_guard.iter() {
if let Some(jh) = (*jh).as_ref() {
if !jh.is_finished() {
return true;
}
}
}
false
}

pub async fn check_if_sign_possible(&self) -> Result<(), PaymentError> {
Expand Down
20 changes: 20 additions & 0 deletions crates/erc20_payment_lib_common/src/db/ops/tx_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,26 @@ pub const TRANSACTION_ORDER_BY_ID_AND_REPLACEMENT_ID: &str = "orig_tx_id DESC,id
pub const TRANSACTION_ORDER_BY_CREATE_DATE: &str = "created_date ASC";
pub const TRANSACTION_ORDER_BY_FIRST_PROCESSED_DATE_DESC: &str = "first_processed DESC";

pub async fn get_next_transaction<'c, E>(
executor: E,
chain_id: i64,
account: &str,
) -> Result<Option<TxDbObj>, sqlx::Error>
where
E: Executor<'c, Database = Sqlite>,
{
let res = sqlx::query_as::<_, TxDbObj>(
r"SELECT * FROM tx
WHERE chain_id = $1 AND from_addr = $2 AND processing > 0 AND first_processed IS NULL
ORDER BY id ASC LIMIT 1",
)
.bind(chain_id)
.bind(account)
.fetch_optional(executor)
.await?;
Ok(res)
}

pub async fn get_transactions<'c, E>(
executor: E,
account: Option<Address>,
Expand Down

0 comments on commit cd30432

Please sign in to comment.