Skip to content

Commit

Permalink
Move published_signed_transaction to tributary/mod.rs to reduce the s…
Browse files Browse the repository at this point in the history
…ize of main.rs
  • Loading branch information
kayabaNerve committed Dec 10, 2023
1 parent d4d977f commit cf4b1ae
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 68 deletions.
16 changes: 0 additions & 16 deletions coordinator/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ create_db!(
HandledMessageDb: (network: NetworkId) -> u64,
ActiveTributaryDb: () -> Vec<u8>,
RetiredTributaryDb: (set: ValidatorSet) -> (),
SignedTransactionDb: (order: &[u8], nonce: u32) -> Vec<u8>,
FirstPreprocessDb: (
network: NetworkId,
id_type: RecognizedIdType,
Expand Down Expand Up @@ -80,21 +79,6 @@ impl ActiveTributaryDb {
}
}

impl SignedTransactionDb {
pub fn take_signed_transaction(
txn: &mut impl DbTxn,
order: &[u8],
nonce: u32,
) -> Option<Transaction> {
let res = SignedTransactionDb::get(txn, order, nonce)
.map(|bytes| Transaction::read(&mut bytes.as_slice()).unwrap());
if res.is_some() {
Self::del(txn, order, nonce);
}
res
}
}

impl FirstPreprocessDb {
pub fn save_first_preprocess(
txn: &mut impl DbTxn,
Expand Down
50 changes: 3 additions & 47 deletions coordinator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,7 @@ use tokio::{
time::sleep,
};

use ::tributary::{
ProvidedError, TransactionKind, TransactionError, TransactionTrait, Block, Tributary,
};
use ::tributary::{ProvidedError, TransactionKind, TransactionTrait, Block, Tributary};

mod tributary;
use crate::tributary::{
Expand Down Expand Up @@ -128,48 +126,6 @@ async fn add_tributary<D: Db, Pro: Processors, P: P2p>(
.unwrap();
}

async fn publish_signed_transaction<D: Db, P: P2p>(
txn: &mut D::Transaction<'_>,
tributary: &Tributary<D, Transaction, P>,
tx: Transaction,
) {
log::debug!("publishing transaction {}", hex::encode(tx.hash()));

let (order, signer) = if let TransactionKind::Signed(order, signed) = tx.kind() {
let signer = signed.signer;

// Safe as we should deterministically create transactions, meaning if this is already on-disk,
// it's what we're saving now
SignedTransactionDb::set(txn, &order, signed.nonce, &tx.serialize());

(order, signer)
} else {
panic!("non-signed transaction passed to publish_signed_transaction");
};

// If we're trying to publish 5, when the last transaction published was 3, this will delay
// publication until the point in time we publish 4
while let Some(tx) = SignedTransactionDb::take_signed_transaction(
txn,
&order,
tributary
.next_nonce(&signer, &order)
.await
.expect("we don't have a nonce, meaning we aren't a participant on this tributary"),
) {
// We need to return a proper error here to enable that, due to a race condition around
// multiple publications
match tributary.add_transaction(tx.clone()).await {
Ok(_) => {}
// Some asynchonicity if InvalidNonce, assumed safe to deterministic nonces
Err(TransactionError::InvalidNonce) => {
log::warn!("publishing TX {tx:?} returned InvalidNonce. was it already added?")
}
Err(e) => panic!("created an invalid transaction: {e:?}"),
}
}
}

// TODO: Find a better pattern for this
static HANDOVER_VERIFY_QUEUE_LOCK: OnceLock<Mutex<()>> = OnceLock::new();

Expand Down Expand Up @@ -726,7 +682,7 @@ async fn handle_processor_message<D: Db, P: P2p>(
}
TransactionKind::Signed(_, _) => {
tx.sign(&mut OsRng, genesis, key);
publish_signed_transaction(&mut txn, tributary, tx).await;
tributary::publish_signed_transaction(&mut txn, tributary, tx).await;
}
}
}
Expand Down Expand Up @@ -1141,7 +1097,7 @@ pub async fn run<D: Db, Pro: Processors, P: P2p>(
// TODO: Should this not take a txn accordingly? It's best practice to take a txn, yet
// taking a txn fails to declare its achieved independence
let mut txn = raw_db.txn();
publish_signed_transaction(&mut txn, tributary, tx).await;
tributary::publish_signed_transaction(&mut txn, tributary, tx).await;
txn.commit();
break;
}
Expand Down
25 changes: 22 additions & 3 deletions coordinator/src/tributary/db.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
use std::collections::HashMap;

use scale::Encode;

use frost::Participant;

use serai_client::validator_sets::primitives::KeyPair;

use processor_messages::coordinator::SubstrateSignableId;

use scale::Encode;

pub use serai_db::*;

use crate::tributary::Label;
use tributary::ReadWrite;

use crate::tributary::{Label, Transaction};

#[derive(Clone, Copy, PartialEq, Eq, Debug, Encode)]
pub enum Topic {
Expand Down Expand Up @@ -55,6 +57,8 @@ create_db!(
AttemptDb: (genesis: [u8; 32], topic: &Topic) -> u32,
DataReceived: (genesis: [u8; 32], data_spec: &DataSpecification) -> u16,
DataDb: (genesis: [u8; 32], data_spec: &DataSpecification, signer_bytes: &[u8; 32]) -> Vec<u8>,

SignedTransactionDb: (order: &[u8], nonce: u32) -> Vec<u8>,
}
);

Expand Down Expand Up @@ -87,3 +91,18 @@ impl AttemptDb {
attempt
}
}

impl SignedTransactionDb {
pub fn take_signed_transaction(
txn: &mut impl DbTxn,
order: &[u8],
nonce: u32,
) -> Option<Transaction> {
let res = SignedTransactionDb::get(txn, order, nonce)
.map(|bytes| Transaction::read(&mut bytes.as_slice()).unwrap());
if res.is_some() {
Self::del(txn, order, nonce);
}
res
}
}
46 changes: 44 additions & 2 deletions coordinator/src/tributary/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ use processor_messages::coordinator::SubstrateSignableId;
use serai_client::{primitives::PublicKey, validator_sets::primitives::ValidatorSet};

use tributary::{
ReadWrite,
TRANSACTION_SIZE_LIMIT, ReadWrite,
transaction::{Signed, TransactionError, TransactionKind, Transaction as TransactionTrait},
TRANSACTION_SIZE_LIMIT,
Tributary,
};

mod db;
Expand Down Expand Up @@ -812,3 +812,45 @@ impl Transaction {
}
}
}

pub async fn publish_signed_transaction<D: Db, P: crate::P2p>(
txn: &mut D::Transaction<'_>,
tributary: &Tributary<D, Transaction, P>,
tx: Transaction,
) {
log::debug!("publishing transaction {}", hex::encode(tx.hash()));

let (order, signer) = if let TransactionKind::Signed(order, signed) = tx.kind() {
let signer = signed.signer;

// Safe as we should deterministically create transactions, meaning if this is already on-disk,
// it's what we're saving now
SignedTransactionDb::set(txn, &order, signed.nonce, &tx.serialize());

(order, signer)
} else {
panic!("non-signed transaction passed to publish_signed_transaction");
};

// If we're trying to publish 5, when the last transaction published was 3, this will delay
// publication until the point in time we publish 4
while let Some(tx) = SignedTransactionDb::take_signed_transaction(
txn,
&order,
tributary
.next_nonce(&signer, &order)
.await
.expect("we don't have a nonce, meaning we aren't a participant on this tributary"),
) {
// We need to return a proper error here to enable that, due to a race condition around
// multiple publications
match tributary.add_transaction(tx.clone()).await {
Ok(_) => {}
// Some asynchonicity if InvalidNonce, assumed safe to deterministic nonces
Err(TransactionError::InvalidNonce) => {
log::warn!("publishing TX {tx:?} returned InvalidNonce. was it already added?")
}
Err(e) => panic!("created an invalid transaction: {e:?}"),
}
}
}

0 comments on commit cf4b1ae

Please sign in to comment.