Skip to content

Commit

Permalink
Merge branch 'main' into rpc-middleware
Browse files Browse the repository at this point in the history
  • Loading branch information
dinhani-cw authored Jun 10, 2024
2 parents b12351c + c15a782 commit 8f347ac
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 24 deletions.
29 changes: 18 additions & 11 deletions src/eth/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ pub struct Consensus {
direct_peers: Vec<String>,
voted_for: Mutex<Option<PeerAddress>>, //essential to ensure that a server only votes once per term
current_term: AtomicU64,
last_arrived_block_number: AtomicU64, //TODO use a true index for both executions and blocks, currently we use something like Bully algorithm so block number is fine
last_arrived_block_number: AtomicU64,
role: RwLock<Role>,
heartbeat_timeout: Duration,
my_address: PeerAddress,
Expand All @@ -174,7 +174,7 @@ impl Consensus {
let (sender, receiver) = mpsc::channel::<Block>(32);
let receiver = Arc::new(Mutex::new(receiver));
let (broadcast_sender, _) = broadcast::channel(32);
let last_arrived_block_number = AtomicU64::new(storage.read_mined_block_number().await.unwrap_or(BlockNumber::from(0)).into());
let last_arrived_block_number = AtomicU64::new(std::u64::MAX); //we use the max value to ensure that only after receiving the first appendEntry we can start the consensus
let peers = Arc::new(RwLock::new(HashMap::new()));
let my_address = Self::discover_my_address(jsonrpc_address.port(), grpc_address.port());

Expand Down Expand Up @@ -221,6 +221,7 @@ impl Consensus {
/// to avoid starting an election too soon (due to the leader not being discovered yet)
fn initialize_heartbeat_timer(consensus: Arc<Consensus>) {
named_spawn("consensus::heartbeat_timer", async move {
Self::discover_peers(Arc::clone(&consensus)).await;
if consensus.peers.read().await.is_empty() {
tracing::info!("no peers, starting hearbeat timer immediately");
Self::start_election(Arc::clone(&consensus)).await;
Expand Down Expand Up @@ -323,6 +324,8 @@ impl Consensus {
async fn become_leader(&self) {
*self.role.write().await = Role::Leader;

self.last_arrived_block_number.store(std::u64::MAX, Ordering::SeqCst); //as leader, we don't have a last block number

//TODO XXX // Initialize leader-specific tasks such as sending appendEntries
//TODO XXX self.send_append_entries().await;
}
Expand Down Expand Up @@ -375,8 +378,8 @@ impl Consensus {
if consensus.is_leader().await {
tracing::info!(number = data.header.number.as_u64(), "received block to send to followers");

if let Err(e) = consensus.broadcast_sender.send(data) {
tracing::warn!("failed to broadcast block: {:?}", e);
if consensus.broadcast_sender.send(data).is_err() {
tracing::error!("failed to broadcast block");
}
}
}
Expand Down Expand Up @@ -447,7 +450,17 @@ impl Consensus {

//TODO for now the block number is the index, but it should be a separate index wiht the execution AND the block
pub async fn should_serve(&self) -> bool {
if self.is_leader().await {
return true;
}

let last_arrived_block_number = self.last_arrived_block_number.load(Ordering::SeqCst);

if last_arrived_block_number == std::u64::MAX {
tracing::warn!("no appendEntry has been received yet");
return false;
}

let storage_block_number: u64 = self.storage.read_mined_block_number().await.unwrap_or(BlockNumber::from(0)).into();

tracing::info!(
Expand All @@ -456,10 +469,6 @@ impl Consensus {
storage_block_number
);

if self.peers.read().await.len() == 0 {
return self.is_leader().await;
}

(last_arrived_block_number - 2) <= storage_block_number
}

Expand Down Expand Up @@ -524,7 +533,6 @@ impl Consensus {
GlobalState::shutdown_from("consensus", "failed to discover peers from Kubernetes");
}

// Optionally, sleep for a bit before retrying
sleep(Duration::from_millis(100)).await;
}
}
Expand Down Expand Up @@ -811,7 +819,7 @@ impl AppendEntryService for AppendEntryServiceImpl {
);
return Ok(Response::new(RequestVoteResponse {
term: current_term,
vote_granted: false, //XXX check how we are dealing with vote_granted false
vote_granted: false,
}));
}

Expand All @@ -822,7 +830,6 @@ impl AppendEntryService for AppendEntryServiceImpl {
}

let mut voted_for = consensus.voted_for.lock().await;
//XXX for some reason candidate_id is going wrong
let candidate_address = PeerAddress::from_string(request.candidate_id.clone()).unwrap(); //XXX FIXME replace with rpc error
if voted_for.is_none() {
*voted_for = Some(candidate_address.clone());
Expand Down
22 changes: 9 additions & 13 deletions src/eth/relayer/external.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,10 @@ impl ExternalRelayer {
let block: Block = row.payload.try_into()?;
let block_number = block.header.number;

tracing::debug!(?block_number, "relaying block");
tracing::info!(?block_number, "relaying block");

// fill span
let span = Span::current();
span.rec_str("block_number", &block_number);
Span::with(|s| s.rec_str("block_number", &block_number));

// TODO: Replace failed transactions with transactions that will for sure fail in substrate (need access to primary keys)
let dag = TransactionDag::new(block.transactions);
Expand Down Expand Up @@ -138,8 +137,7 @@ impl ExternalRelayer {
tracing::info!(?tx_hash, "comparing receipts");

// fill span
let span = Span::current();
span.rec_str("hash", &tx_hash);
Span::with(|s| s.rec_str("hash", &tx_hash));

let start = Instant::now();
let mut substrate_receipt = substrate_pending_transaction;
Expand Down Expand Up @@ -229,23 +227,22 @@ impl ExternalRelayer {
#[tracing::instrument(name = "external_relayer::relay_and_check_mempool", skip_all, fields(hash))]
pub async fn relay_and_check_mempool(&self, tx_mined: TransactionMined) -> (PendingTransaction, ExternalReceipt) {
let tx_hash = tx_mined.input.hash;
tracing::debug!(?tx_hash, "relaying transaction");
tracing::info!(?tx_hash, "relaying transaction");

// fill span
let span = Span::current();
span.rec_str("hash", &tx_hash);
Span::with(|s| s.rec_str("hash", &tx_hash));

let ethers_tx = Transaction::from(tx_mined.input.clone());
let tx = loop {
match self.substrate_chain.send_raw_transaction(tx_hash, ethers_tx.rlp()).await {
Ok(tx) => break tx,
Err(err) => {
tracing::debug!(
tracing::info!(
?tx_hash,
"substrate_chain.send_raw_transaction returned an error, checking if transaction was sent anyway"
);
if self.substrate_chain.fetch_transaction(tx_hash).await.unwrap_or(None).is_some() {
tracing::debug!(?tx_hash, "transaction found on substrate");
tracing::info!(?tx_hash, "transaction found on substrate");
return (PendingTransaction::new(tx_hash, &self.substrate_chain), ExternalReceipt(tx_mined.into()));
}
tracing::warn!(?tx_hash, ?err, "failed to send raw transaction, retrying...");
Expand Down Expand Up @@ -320,11 +317,10 @@ impl ExternalRelayerClient {
#[tracing::instrument(name = "external_relayer_client::send_to_relayer", skip_all, fields(block_number))]
pub async fn send_to_relayer(&self, block: Block) -> anyhow::Result<()> {
let block_number = block.header.number;
tracing::debug!(?block_number, "sending block to relayer");
tracing::info!(?block_number, "sending block to relayer");

// fill span
let span = Span::current();
span.rec_str("block_number", &block_number);
Span::with(|s| s.rec_str("block_number", &block_number));

sqlx::query!(
"INSERT INTO relayer_blocks (number, payload) VALUES ($1, $2)",
Expand Down

0 comments on commit 8f347ac

Please sign in to comment.