Skip to content

Commit

Permalink
Abstract Future generics using new trait definitions in coordinator
Browse files Browse the repository at this point in the history
  • Loading branch information
kayabaNerve committed Dec 10, 2023
1 parent ef26b66 commit d4d977f
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 66 deletions.
14 changes: 7 additions & 7 deletions coordinator/src/tests/tributary/dkg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use ciphersuite::{group::GroupEncoding, Ciphersuite, Ristretto};
use frost::Participant;

use sp_runtime::traits::Verify;
use serai_client::validator_sets::primitives::KeyPair;
use serai_client::validator_sets::primitives::{ValidatorSet, KeyPair};

use tokio::time::sleep;

Expand Down Expand Up @@ -97,7 +97,7 @@ async fn dkg_test() {
tributary: &Tributary<MemDb, Transaction, LocalP2p>,
) -> MemProcessors {
let processors = MemProcessors::new();
handle_new_blocks::<_, _, _, _, _, _, _, _, LocalP2p>(
handle_new_blocks::<_, _, _, _, _, LocalP2p>(
db,
key,
&|_, _, _, _| async {
Expand Down Expand Up @@ -128,7 +128,7 @@ async fn dkg_test() {
sleep(Duration::from_secs(Tributary::<MemDb, Transaction, LocalP2p>::block_time().into())).await;

// Verify the scanner emits a KeyGen::Commitments message
handle_new_blocks::<_, _, _, _, _, _, _, _, LocalP2p>(
handle_new_blocks::<_, _, _, _, _, LocalP2p>(
&mut dbs[0],
&keys[0],
&|_, _, _, _| async {
Expand Down Expand Up @@ -214,7 +214,7 @@ async fn dkg_test() {
}

// With just 4 sets of shares, nothing should happen yet
handle_new_blocks::<_, _, _, _, _, _, _, _, LocalP2p>(
handle_new_blocks::<_, _, _, _, _, LocalP2p>(
&mut dbs[0],
&keys[0],
&|_, _, _, _| async {
Expand Down Expand Up @@ -268,7 +268,7 @@ async fn dkg_test() {

// Any scanner which has handled the prior blocks should only emit the new event
for (i, key) in keys.iter().enumerate() {
handle_new_blocks::<_, _, _, _, _, _, _, _, LocalP2p>(
handle_new_blocks::<_, _, _, _, _, LocalP2p>(
&mut dbs[i],
key,
&|_, _, _, _| async { panic!("provided TX caused recognized_id to be called after shares") },
Expand Down Expand Up @@ -339,14 +339,14 @@ async fn dkg_test() {
}

// The scanner should successfully try to publish a transaction with a validly signed signature
handle_new_blocks::<_, _, _, _, _, _, _, _, LocalP2p>(
handle_new_blocks::<_, _, _, _, _, LocalP2p>(
&mut dbs[0],
&keys[0],
&|_, _, _, _| async {
panic!("provided TX caused recognized_id to be called after DKG confirmation")
},
&processors,
&|set, tx_type, tx| {
&|set: ValidatorSet, tx_type, tx: serai_client::Transaction| {
assert_eq!(tx_type, PstTxType::SetKeys);

let spec = spec.clone();
Expand Down
50 changes: 24 additions & 26 deletions coordinator/src/tributary/handle.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use core::{ops::Deref, future::Future};
use core::ops::Deref;
use std::collections::HashMap;

use rand_core::OsRng;
Expand All @@ -10,9 +10,7 @@ use frost::dkg::Participant;

use scale::{Encode, Decode};
use serai_client::{
Public, SeraiAddress, Signature,
validator_sets::primitives::{ValidatorSet, KeyPair},
SeraiValidatorSets,
Public, SeraiAddress, Signature, validator_sets::primitives::KeyPair, SeraiValidatorSets,
};

use tributary::{Signed, TransactionKind, TransactionTrait};
Expand All @@ -31,7 +29,7 @@ use crate::{
SignData, Transaction, TributarySpec, SeraiBlockNumber, Topic, Label, DataSpecification,
DataSet, Accumulation,
signing_protocol::{DkgConfirmer, DkgRemoval},
scanner::{RecognizedIdType, RIDTrait, PstTxType, TributaryBlockHandler},
scanner::{RecognizedIdType, RIDTrait, PstTxType, PSTTrait, PTTTrait, TributaryBlockHandler},
FatallySlashed, DkgShare, DkgCompleted, PlanIds, ConfirmationNonces, RemovalNonces, DkgKeyPair,
AttemptDb, DataReceived, DataDb,
},
Expand Down Expand Up @@ -99,17 +97,8 @@ fn unflatten(spec: &TributarySpec, data: &mut HashMap<Participant, Vec<u8>>) {
}
}

impl<
T: DbTxn,
Pro: Processors,
FPst: Future<Output = ()>,
PST: Fn(ValidatorSet, PstTxType, serai_client::Transaction) -> FPst,
FPtt: Future<Output = ()>,
PTT: Fn(Transaction) -> FPtt,
FRid: Future<Output = ()>,
RID: RIDTrait<FRid>,
P: P2p,
> TributaryBlockHandler<'_, T, Pro, FPst, PST, FPtt, PTT, FRid, RID, P>
impl<T: DbTxn, Pro: Processors, PST: PSTTrait, PTT: PTTTrait, RID: RIDTrait, P: P2p>
TributaryBlockHandler<'_, T, Pro, PST, PTT, RID, P>
{
fn accumulate(
&mut self,
Expand Down Expand Up @@ -483,12 +472,14 @@ impl<

DkgCompleted::set(self.txn, genesis, &());

(self.publish_serai_tx)(
self.spec.set(),
PstTxType::SetKeys,
SeraiValidatorSets::set_keys(self.spec.set().network, key_pair, Signature(sig)),
)
.await;
self
.publish_serai_tx
.publish_serai_tx(
self.spec.set(),
PstTxType::SetKeys,
SeraiValidatorSets::set_keys(self.spec.set().network, key_pair, Signature(sig)),
)
.await;
}
Accumulation::Ready(DataSet::NotParticipating) => {
panic!("wasn't a participant in DKG confirmination shares")
Expand Down Expand Up @@ -537,7 +528,7 @@ impl<
signed: Transaction::empty_signed(),
});
tx.sign(&mut OsRng, genesis, self.our_key);
(self.publish_tributary_tx)(tx).await;
self.publish_tributary_tx.publish_tributary_tx(tx).await;
}
Label::Share => {
let preprocesses =
Expand Down Expand Up @@ -569,7 +560,9 @@ impl<
signers,
Signature(signature),
);
(self.publish_serai_tx)(self.spec.set(), PstTxType::RemoveParticipant(data.plan), tx)
self
.publish_serai_tx
.publish_serai_tx(self.spec.set(), PstTxType::RemoveParticipant(data.plan), tx)
.await;
}
}
Expand Down Expand Up @@ -602,7 +595,9 @@ impl<
genesis,
Topic::SubstrateSign(SubstrateSignableId::Batch(batch)),
);
(self.recognized_id)(self.spec.set(), genesis, RecognizedIdType::Batch, batch.to_vec())
self
.recognized_id
.recognized_id(self.spec.set(), genesis, RecognizedIdType::Batch, batch.to_vec())
.await;
}

Expand All @@ -614,7 +609,10 @@ impl<

for id in plan_ids.into_iter() {
AttemptDb::recognize_topic(self.txn, genesis, Topic::Sign(id));
(self.recognized_id)(self.spec.set(), genesis, RecognizedIdType::Plan, id.to_vec()).await;
self
.recognized_id
.recognized_id(self.spec.set(), genesis, RecognizedIdType::Plan, id.to_vec())
.await;
}
}

Expand Down
105 changes: 72 additions & 33 deletions coordinator/src/tributary/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,28 +39,82 @@ pub enum RecognizedIdType {
Plan,
}

pub(crate) trait RIDTrait<FRid>:
Fn(ValidatorSet, [u8; 32], RecognizedIdType, Vec<u8>) -> FRid
#[async_trait::async_trait]
pub trait RIDTrait {
async fn recognized_id(
&self,
set: ValidatorSet,
genesis: [u8; 32],
kind: RecognizedIdType,
id: Vec<u8>,
);
}
#[async_trait::async_trait]
impl<
FRid: Send + Future<Output = ()>,
F: Sync + Fn(ValidatorSet, [u8; 32], RecognizedIdType, Vec<u8>) -> FRid,
> RIDTrait for F
{
async fn recognized_id(
&self,
set: ValidatorSet,
genesis: [u8; 32],
kind: RecognizedIdType,
id: Vec<u8>,
) {
(self)(set, genesis, kind, id).await
}
}
impl<FRid, F: Fn(ValidatorSet, [u8; 32], RecognizedIdType, Vec<u8>) -> FRid> RIDTrait<FRid> for F {}

#[derive(Clone, Copy, PartialEq, Eq, Debug)]
pub enum PstTxType {
SetKeys,
RemoveParticipant([u8; 32]),
}

#[async_trait::async_trait]
pub trait PSTTrait {
async fn publish_serai_tx(
&self,
set: ValidatorSet,
kind: PstTxType,
tx: serai_client::Transaction,
);
}
#[async_trait::async_trait]
impl<
FPst: Send + Future<Output = ()>,
F: Sync + Fn(ValidatorSet, PstTxType, serai_client::Transaction) -> FPst,
> PSTTrait for F
{
async fn publish_serai_tx(
&self,
set: ValidatorSet,
kind: PstTxType,
tx: serai_client::Transaction,
) {
(self)(set, kind, tx).await
}
}

#[async_trait::async_trait]
pub trait PTTTrait {
async fn publish_tributary_tx(&self, tx: Transaction);
}
#[async_trait::async_trait]
impl<FPtt: Send + Future<Output = ()>, F: Sync + Fn(Transaction) -> FPtt> PTTTrait for F {
async fn publish_tributary_tx(&self, tx: Transaction) {
(self)(tx).await
}
}

pub struct TributaryBlockHandler<
'a,
T: DbTxn,
Pro: Processors,
FPst: Future<Output = ()>,
PST: Fn(ValidatorSet, PstTxType, serai_client::Transaction) -> FPst,
FPtt: Future<Output = ()>,
PTT: Fn(Transaction) -> FPtt,
FRid: Future<Output = ()>,
RID: RIDTrait<FRid>,
PST: PSTTrait,
PTT: PTTTrait,
RID: RIDTrait,
P: P2p,
> {
pub txn: &'a mut T,
Expand All @@ -71,21 +125,11 @@ pub struct TributaryBlockHandler<
pub publish_tributary_tx: &'a PTT,
pub spec: &'a TributarySpec,
block: Block<Transaction>,
_frid: PhantomData<FRid>,
_p2p: PhantomData<P>,
}

impl<
T: DbTxn,
Pro: Processors,
FPst: Future<Output = ()>,
PST: Fn(ValidatorSet, PstTxType, serai_client::Transaction) -> FPst,
FPtt: Future<Output = ()>,
PTT: Fn(Transaction) -> FPtt,
FRid: Future<Output = ()>,
RID: RIDTrait<FRid>,
P: P2p,
> TributaryBlockHandler<'_, T, Pro, FPst, PST, FPtt, PTT, FRid, RID, P>
impl<T: DbTxn, Pro: Processors, PST: PSTTrait, PTT: PTTTrait, RID: RIDTrait, P: P2p>
TributaryBlockHandler<'_, T, Pro, PST, PTT, RID, P>
{
pub async fn fatal_slash(&mut self, slashing: [u8; 32], reason: &str) {
let genesis = self.spec.genesis();
Expand Down Expand Up @@ -116,7 +160,7 @@ impl<
signed: Transaction::empty_signed(),
});
tx.sign(&mut OsRng, genesis, self.our_key);
(self.publish_tributary_tx)(tx).await;
self.publish_tributary_tx.publish_tributary_tx(tx).await;
}
}

Expand Down Expand Up @@ -187,12 +231,9 @@ impl<
pub(crate) async fn handle_new_blocks<
D: Db,
Pro: Processors,
FPst: Future<Output = ()>,
PST: Fn(ValidatorSet, PstTxType, serai_client::Transaction) -> FPst,
FPtt: Future<Output = ()>,
PTT: Fn(Transaction) -> FPtt,
FRid: Future<Output = ()>,
RID: RIDTrait<FRid>,
PST: PSTTrait,
PTT: PTTTrait,
RID: RIDTrait,
P: P2p,
>(
db: &mut D,
Expand Down Expand Up @@ -232,7 +273,6 @@ pub(crate) async fn handle_new_blocks<
publish_serai_tx,
publish_tributary_tx,
block,
_frid: PhantomData::<_>,
_p2p: PhantomData::<P>,
})
.handle::<D>()
Expand All @@ -247,8 +287,7 @@ pub(crate) async fn scan_tributaries_task<
D: Db,
Pro: Processors,
P: P2p,
FRid: Send + Future<Output = ()>,
RID: 'static + Send + Sync + Clone + RIDTrait<FRid>,
RID: 'static + Send + Sync + Clone + RIDTrait,
>(
raw_db: D,
key: Zeroizing<<Ristretto as Ciphersuite>::F>,
Expand Down Expand Up @@ -283,7 +322,7 @@ pub(crate) async fn scan_tributaries_task<
// the next block occurs
let next_block_notification = tributary.next_block_notification().await;

handle_new_blocks::<_, _, _, _, _, _, _, _, P>(
handle_new_blocks::<_, _, _, _, _, P>(
&mut tributary_db,
&key,
&recognized_id,
Expand Down Expand Up @@ -357,7 +396,7 @@ pub(crate) async fn scan_tributaries_task<
}
}
},
&|tx| {
&|tx: Transaction| {
let tributary = tributary.clone();
async move {
match tributary.add_transaction(tx.clone()).await {
Expand Down

0 comments on commit d4d977f

Please sign in to comment.