From cf4b1aed98eccec64fadc6f9bcfddbf43ecdf803 Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Sun, 10 Dec 2023 18:16:48 -0500 Subject: [PATCH] Move published_signed_transaction to tributary/mod.rs to reduce the size of main.rs --- coordinator/src/db.rs | 16 ---------- coordinator/src/main.rs | 50 ++------------------------------ coordinator/src/tributary/db.rs | 25 ++++++++++++++-- coordinator/src/tributary/mod.rs | 46 +++++++++++++++++++++++++++-- 4 files changed, 69 insertions(+), 68 deletions(-) diff --git a/coordinator/src/db.rs b/coordinator/src/db.rs index 9e0c5d919..810bc2751 100644 --- a/coordinator/src/db.rs +++ b/coordinator/src/db.rs @@ -21,7 +21,6 @@ create_db!( HandledMessageDb: (network: NetworkId) -> u64, ActiveTributaryDb: () -> Vec, RetiredTributaryDb: (set: ValidatorSet) -> (), - SignedTransactionDb: (order: &[u8], nonce: u32) -> Vec, FirstPreprocessDb: ( network: NetworkId, id_type: RecognizedIdType, @@ -80,21 +79,6 @@ impl ActiveTributaryDb { } } -impl SignedTransactionDb { - pub fn take_signed_transaction( - txn: &mut impl DbTxn, - order: &[u8], - nonce: u32, - ) -> Option { - let res = SignedTransactionDb::get(txn, order, nonce) - .map(|bytes| Transaction::read(&mut bytes.as_slice()).unwrap()); - if res.is_some() { - Self::del(txn, order, nonce); - } - res - } -} - impl FirstPreprocessDb { pub fn save_first_preprocess( txn: &mut impl DbTxn, diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index a587ad6ed..840af4a2f 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -31,9 +31,7 @@ use tokio::{ time::sleep, }; -use ::tributary::{ - ProvidedError, TransactionKind, TransactionError, TransactionTrait, Block, Tributary, -}; +use ::tributary::{ProvidedError, TransactionKind, TransactionTrait, Block, Tributary}; mod tributary; use crate::tributary::{ @@ -128,48 +126,6 @@ async fn add_tributary( .unwrap(); } -async fn publish_signed_transaction( - txn: &mut D::Transaction<'_>, - tributary: &Tributary, - tx: Transaction, -) { - log::debug!("publishing transaction {}", hex::encode(tx.hash())); - - let (order, signer) = if let TransactionKind::Signed(order, signed) = tx.kind() { - let signer = signed.signer; - - // Safe as we should deterministically create transactions, meaning if this is already on-disk, - // it's what we're saving now - SignedTransactionDb::set(txn, &order, signed.nonce, &tx.serialize()); - - (order, signer) - } else { - panic!("non-signed transaction passed to publish_signed_transaction"); - }; - - // If we're trying to publish 5, when the last transaction published was 3, this will delay - // publication until the point in time we publish 4 - while let Some(tx) = SignedTransactionDb::take_signed_transaction( - txn, - &order, - tributary - .next_nonce(&signer, &order) - .await - .expect("we don't have a nonce, meaning we aren't a participant on this tributary"), - ) { - // We need to return a proper error here to enable that, due to a race condition around - // multiple publications - match tributary.add_transaction(tx.clone()).await { - Ok(_) => {} - // Some asynchonicity if InvalidNonce, assumed safe to deterministic nonces - Err(TransactionError::InvalidNonce) => { - log::warn!("publishing TX {tx:?} returned InvalidNonce. was it already added?") - } - Err(e) => panic!("created an invalid transaction: {e:?}"), - } - } -} - // TODO: Find a better pattern for this static HANDOVER_VERIFY_QUEUE_LOCK: OnceLock> = OnceLock::new(); @@ -726,7 +682,7 @@ async fn handle_processor_message( } TransactionKind::Signed(_, _) => { tx.sign(&mut OsRng, genesis, key); - publish_signed_transaction(&mut txn, tributary, tx).await; + tributary::publish_signed_transaction(&mut txn, tributary, tx).await; } } } @@ -1141,7 +1097,7 @@ pub async fn run( // TODO: Should this not take a txn accordingly? It's best practice to take a txn, yet // taking a txn fails to declare its achieved independence let mut txn = raw_db.txn(); - publish_signed_transaction(&mut txn, tributary, tx).await; + tributary::publish_signed_transaction(&mut txn, tributary, tx).await; txn.commit(); break; } diff --git a/coordinator/src/tributary/db.rs b/coordinator/src/tributary/db.rs index 0ab07b3bd..562c04e54 100644 --- a/coordinator/src/tributary/db.rs +++ b/coordinator/src/tributary/db.rs @@ -1,16 +1,18 @@ use std::collections::HashMap; +use scale::Encode; + use frost::Participant; use serai_client::validator_sets::primitives::KeyPair; use processor_messages::coordinator::SubstrateSignableId; -use scale::Encode; - pub use serai_db::*; -use crate::tributary::Label; +use tributary::ReadWrite; + +use crate::tributary::{Label, Transaction}; #[derive(Clone, Copy, PartialEq, Eq, Debug, Encode)] pub enum Topic { @@ -55,6 +57,8 @@ create_db!( AttemptDb: (genesis: [u8; 32], topic: &Topic) -> u32, DataReceived: (genesis: [u8; 32], data_spec: &DataSpecification) -> u16, DataDb: (genesis: [u8; 32], data_spec: &DataSpecification, signer_bytes: &[u8; 32]) -> Vec, + + SignedTransactionDb: (order: &[u8], nonce: u32) -> Vec, } ); @@ -87,3 +91,18 @@ impl AttemptDb { attempt } } + +impl SignedTransactionDb { + pub fn take_signed_transaction( + txn: &mut impl DbTxn, + order: &[u8], + nonce: u32, + ) -> Option { + let res = SignedTransactionDb::get(txn, order, nonce) + .map(|bytes| Transaction::read(&mut bytes.as_slice()).unwrap()); + if res.is_some() { + Self::del(txn, order, nonce); + } + res + } +} diff --git a/coordinator/src/tributary/mod.rs b/coordinator/src/tributary/mod.rs index 5364f1843..8478ce989 100644 --- a/coordinator/src/tributary/mod.rs +++ b/coordinator/src/tributary/mod.rs @@ -24,9 +24,9 @@ use processor_messages::coordinator::SubstrateSignableId; use serai_client::{primitives::PublicKey, validator_sets::primitives::ValidatorSet}; use tributary::{ - ReadWrite, + TRANSACTION_SIZE_LIMIT, ReadWrite, transaction::{Signed, TransactionError, TransactionKind, Transaction as TransactionTrait}, - TRANSACTION_SIZE_LIMIT, + Tributary, }; mod db; @@ -812,3 +812,45 @@ impl Transaction { } } } + +pub async fn publish_signed_transaction( + txn: &mut D::Transaction<'_>, + tributary: &Tributary, + tx: Transaction, +) { + log::debug!("publishing transaction {}", hex::encode(tx.hash())); + + let (order, signer) = if let TransactionKind::Signed(order, signed) = tx.kind() { + let signer = signed.signer; + + // Safe as we should deterministically create transactions, meaning if this is already on-disk, + // it's what we're saving now + SignedTransactionDb::set(txn, &order, signed.nonce, &tx.serialize()); + + (order, signer) + } else { + panic!("non-signed transaction passed to publish_signed_transaction"); + }; + + // If we're trying to publish 5, when the last transaction published was 3, this will delay + // publication until the point in time we publish 4 + while let Some(tx) = SignedTransactionDb::take_signed_transaction( + txn, + &order, + tributary + .next_nonce(&signer, &order) + .await + .expect("we don't have a nonce, meaning we aren't a participant on this tributary"), + ) { + // We need to return a proper error here to enable that, due to a race condition around + // multiple publications + match tributary.add_transaction(tx.clone()).await { + Ok(_) => {} + // Some asynchonicity if InvalidNonce, assumed safe to deterministic nonces + Err(TransactionError::InvalidNonce) => { + log::warn!("publishing TX {tx:?} returned InvalidNonce. was it already added?") + } + Err(e) => panic!("created an invalid transaction: {e:?}"), + } + } +}