Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Custom transactions from JDC removed from mempool during update #868

Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 57 additions & 2 deletions roles/jd-server/src/lib/job_declarator/message_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,17 @@ use roles_logic_sv2::{
ProvideMissingTransactions, ProvideMissingTransactionsSuccess, SubmitSolutionJd,
},
parsers::JobDeclaration,
utils::Mutex,
};
use std::{convert::TryInto, io::Cursor};
use std::{convert::TryInto, io::Cursor, sync::Arc};
use stratum_common::bitcoin::{Transaction, Txid};
pub type SendTo = SendTo_<JobDeclaration<'static>, ()>;
use crate::mempool::JDsMempool;

use super::{signed_token, TransactionState};
use roles_logic_sv2::{errors::Error, parsers::PoolMessages as AllMessages};
use stratum_common::bitcoin::consensus::Decodable;
use tracing::info;
use tracing::{debug, info};

use super::JobDeclaratorDownstream;

Expand All @@ -38,6 +41,53 @@ impl JobDeclaratorDownstream {
}
}

pub fn clear_declared_mining_job(
NonsoAmadi10 marked this conversation as resolved.
Show resolved Hide resolved
mining_job: DeclareMiningJob,
mempool: Arc<Mutex<JDsMempool>>,
) -> Result<(), Error> {
// If there is an old declared mining job, remove its transactions from the mempool
// Retrieve necessary data from the old job
NonsoAmadi10 marked this conversation as resolved.
Show resolved Hide resolved
let transactions_to_remove = mining_job.tx_short_hash_list.inner_as_ref();
if transactions_to_remove.is_empty() {
info!("No transactions to remove from mempool");
return Ok(());
}

let nonce = mining_job.tx_short_hash_nonce;

for short_id in transactions_to_remove {
let result = mempool.safe_lock(|mempool_| -> Result<(), Error> {
// Try to manage this unwrap, we use .ok_or() method to return the proper error
let short_ids_map = mempool_
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to call mempool.safe_lock in each iteration? couldnt we retrieve short_ids_map before the for and just use it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was a feedback from lorban I implemented and modified. If you have a better approach I can compare that

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jbesraa is right
the better approach is to get short_ids_map once for all outside the loop, namely

    let short_ids_map = mempool.safe_lock(|a| a.to_short_ids(nonce).unwrap()).unwrap();

right before the

for short_id in transactions_to_remove {
...
}

for cycle. If you do not succeed in a reasonable time ask help to me or @jbesraa

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That means I would still need to handle the double unwraps well? @lorbax

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, but I suggest first to write a the code as easy as possible, thest it, then add error handling and then test it deeply. You should check that the code you introduced does what it is supposed to do. In particular, the old job must be removed from jds mempool and while updating the jds mempool the fat transactions are not discarded.
You can do that using testnet4, or a custom signet

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm worried that to_short_ids is very expensive not sure if is really better to lock the mutex for all that time. Btw we should have a benchmark for cases where there is contention for the lock.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should look into the new received DeclareMiningJob and remove only the txs that are in the old one but not in the new one. Also better would be to have a rc for each txs and decrement it everytime that the tx is in the old one and not in the new one (this cause we have multiple downstreams). And when we reach 0 we remove the tx

Copy link
Collaborator

@Fi3 Fi3 Jul 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

somthing like that: pub mempool: HashMap<Txid, Option<(Transaction,u64)>>

Copy link
Collaborator

@plebhash plebhash Aug 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should look into the new received DeclareMiningJob and remove only the txs that are in the old one but not in the new one. Also better would be to have a rc for each txs and decrement it everytime that the tx is in the old one and not in the new one (this cause we have multiple downstreams). And when we reach 0 we remove the tx

I implemented this suggestion here plebhash@d5ec026

Copy link
Contributor

@jbesraa jbesraa Aug 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

somthing like that: pub mempool: HashMap<Txid, Option<(Transaction,u64)>>

Could we take advantage of the token or other property in JobDeclaratorDownstream in order to do that?

.to_short_ids(nonce)
.ok_or(Error::JDSMissingTransactions)?;
let transaction_with_hash = short_ids_map
.get(short_id);

match transaction_with_hash {
Some(transaction_with_hash) => {
let txid = transaction_with_hash.id;
match mempool_.mempool.remove(&txid) {
Some(transaction) => {
debug!("Fat transaction {:?} in job with request id {:?} removed from mempool", transaction, mining_job.request_id);
NonsoAmadi10 marked this conversation as resolved.
Show resolved Hide resolved
info!("Fat transaction {:?} in job with request id {:?} removed from mempool", txid, mining_job.request_id);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you remove this info log ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay, I will

},
None => info!("Thin transaction {:?} in job with request id {:?} removed from mempool", txid, mining_job.request_id),
}
},
None => debug!("Transaction with short id {:?} not found in mempool while clearing old jobs", short_id),
}
Ok(()) // Explicitly return Ok(()) inside the closure for proper flow control
});

// Propagate any error from the closure
if let Err(err) = result {
return Err(Error::PoisonLock(err.to_string()));
NonsoAmadi10 marked this conversation as resolved.
Show resolved Hide resolved
}
}
Ok(())
}

impl ParseClientJobDeclarationMessages for JobDeclaratorDownstream {
fn handle_allocate_mining_job_token(
&mut self,
Expand All @@ -61,6 +111,11 @@ impl ParseClientJobDeclarationMessages for JobDeclaratorDownstream {
}

fn handle_declare_mining_job(&mut self, message: DeclareMiningJob) -> Result<SendTo, Error> {
// Clone the old declared mining job to retain its data
if let Some(old_declare_mining_job_) = self.declared_mining_job.0.clone() {
clear_declared_mining_job(old_declare_mining_job_, self.mempool.clone())?;
}

// the transactions that are present in the mempool are stored here, that is sent to the
// mempool which use the rpc client to retrieve the whole data for each transaction.
// The unknown transactions is a vector that contains the transactions that are not in the
Expand Down
39 changes: 20 additions & 19 deletions roles/jd-server/src/lib/mempool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use hashbrown::HashMap;
use roles_logic_sv2::utils::Mutex;
use rpc_sv2::{mini_rpc_client, mini_rpc_client::RpcError};
use std::{convert::TryInto, str::FromStr, sync::Arc};
use stratum_common::{bitcoin, bitcoin::hash_types::Txid};
use stratum_common::bitcoin::{self, hash_types::Txid};
NonsoAmadi10 marked this conversation as resolved.
Show resolved Hide resolved

#[derive(Clone, Debug)]
pub struct TransactionWithHash {
Expand Down Expand Up @@ -95,31 +95,32 @@ impl JDsMempool {
}

pub async fn update_mempool(self_: Arc<Mutex<Self>>) -> Result<(), JdsMempoolError> {
NonsoAmadi10 marked this conversation as resolved.
Show resolved Hide resolved
let mut mempool_ordered: HashMap<Txid, Option<Transaction>> = HashMap::new();

let client = self_
.safe_lock(|x| x.get_client())?
.ok_or(JdsMempoolError::NoClient)?;

let mempool: Vec<String> = client.get_raw_mempool().await?;
for id in &mempool {
let node_mempool: Vec<String> = client.get_raw_mempool().await?;
let mut node_mempool_deserialized: Vec<Txid> = vec![];
for id in &node_mempool {
let key_id = Txid::from_str(id)
.map_err(|err| JdsMempoolError::Rpc(RpcError::Deserialization(err.to_string())))?;

let tx = self_.safe_lock(|x| match x.mempool.get(&key_id) {
Some(entry) => entry.clone(),
None => None,
})?;

mempool_ordered.insert(key_id, tx);
node_mempool_deserialized.push(key_id);
}

NonsoAmadi10 marked this conversation as resolved.
Show resolved Hide resolved
if mempool_ordered.is_empty() {
Err(JdsMempoolError::EmptyMempool)
} else {
let _ = self_.safe_lock(|x| x.mempool = mempool_ordered);
Ok(())
}
self_.safe_lock(|x| {
let jds_mempool = &mut x.mempool;
// the fat transactions in the jds-mempool are those declared by some downstream and we
// don't want to remove them, but we can get rid of the others
jds_mempool.retain(|_, val| val.is_some());
// here we add all the new transactions
for id in &node_mempool_deserialized {
jds_mempool.entry(*id).or_insert(None);
}
if jds_mempool.is_empty() {
Err(JdsMempoolError::EmptyMempool)
} else {
Ok(())
}
})?
}

pub async fn on_submit(self_: Arc<Mutex<Self>>) -> Result<(), JdsMempoolError> {
Expand Down
Loading