Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add specific network/coin/balance types #619

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

45 changes: 21 additions & 24 deletions coordinator/src/cosign_evaluator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ use tokio::{
use borsh::BorshSerialize;
use sp_application_crypto::RuntimePublic;
use serai_client::{
primitives::{NETWORKS, NetworkId, Signature},
validator_sets::primitives::{Session, ValidatorSet},
SeraiError, TemporalSerai, Serai,
primitives::{ExternalNetworkId, Signature, EXTERNAL_NETWORKS},
validator_sets::primitives::{ExternalValidatorSet, Session},
Serai, SeraiError, TemporalSerai,
};

use serai_db::{Get, DbTxn, Db, create_db};
Expand All @@ -28,17 +28,17 @@ use crate::{

create_db! {
CosignDb {
ReceivedCosign: (set: ValidatorSet, block: [u8; 32]) -> CosignedBlock,
LatestCosign: (network: NetworkId) -> CosignedBlock,
DistinctChain: (set: ValidatorSet) -> (),
ReceivedCosign: (set: ExternalValidatorSet, block: [u8; 32]) -> CosignedBlock,
LatestCosign: (network: ExternalNetworkId) -> CosignedBlock,
DistinctChain: (set: ExternalValidatorSet) -> (),
}
}

pub struct CosignEvaluator<D: Db> {
db: Mutex<D>,
serai: Arc<Serai>,
stakes: RwLock<Option<HashMap<NetworkId, u64>>>,
latest_cosigns: RwLock<HashMap<NetworkId, CosignedBlock>>,
stakes: RwLock<Option<HashMap<ExternalNetworkId, u64>>>,
latest_cosigns: RwLock<HashMap<ExternalNetworkId, CosignedBlock>>,
}

impl<D: Db> CosignEvaluator<D> {
Expand Down Expand Up @@ -79,15 +79,15 @@ impl<D: Db> CosignEvaluator<D> {
let serai = self.serai.as_of_latest_finalized_block().await?;

let mut stakes = HashMap::new();
for network in NETWORKS {
for network in EXTERNAL_NETWORKS {
// Use if this network has published a Batch for a short-circuit of if they've ever set a key
let set_key = serai.in_instructions().last_batch_for_network(network).await?.is_some();
if set_key {
stakes.insert(
network,
serai
.validator_sets()
.total_allocated_stake(network)
.total_allocated_stake(network.into())
.await?
.expect("network which published a batch didn't have a stake set")
.0,
Expand Down Expand Up @@ -126,23 +126,23 @@ impl<D: Db> CosignEvaluator<D> {

async fn set_with_keys_fn(
serai: &TemporalSerai<'_>,
network: NetworkId,
) -> Result<Option<ValidatorSet>, SeraiError> {
let Some(latest_session) = serai.validator_sets().session(network).await? else {
network: ExternalNetworkId,
) -> Result<Option<ExternalValidatorSet>, SeraiError> {
let Some(latest_session) = serai.validator_sets().session(network.into()).await? else {
log::warn!("received cosign from {:?}, which doesn't yet have a session", network);
return Ok(None);
};
let prior_session = Session(latest_session.0.saturating_sub(1));
Ok(Some(
if serai
.validator_sets()
.keys(ValidatorSet { network, session: prior_session })
.keys(ExternalValidatorSet { network, session: prior_session })
.await?
.is_some()
{
ValidatorSet { network, session: prior_session }
ExternalValidatorSet { network, session: prior_session }
} else {
ValidatorSet { network, session: latest_session }
ExternalValidatorSet { network, session: latest_session }
},
))
}
Expand Down Expand Up @@ -204,16 +204,12 @@ impl<D: Db> CosignEvaluator<D> {

let mut total_stake = 0;
let mut total_on_distinct_chain = 0;
for network in NETWORKS {
if network == NetworkId::Serai {
continue;
}

for network in EXTERNAL_NETWORKS {
// Get the current set for this network
let set_with_keys = {
let mut res;
while {
res = set_with_keys_fn(&serai, cosign.network).await;
res = set_with_keys_fn(&serai, network).await;
res.is_err()
} {
log::error!(
Expand All @@ -231,7 +227,8 @@ impl<D: Db> CosignEvaluator<D> {
let stake = {
let mut res;
while {
res = serai.validator_sets().total_allocated_stake(set_with_keys.network).await;
res =
serai.validator_sets().total_allocated_stake(set_with_keys.network.into()).await;
res.is_err()
} {
log::error!(
Expand Down Expand Up @@ -271,7 +268,7 @@ impl<D: Db> CosignEvaluator<D> {
#[allow(clippy::new_ret_no_self)]
pub fn new<P: P2p>(db: D, p2p: P, serai: Arc<Serai>) -> mpsc::UnboundedSender<CosignedBlock> {
let mut latest_cosigns = HashMap::new();
for network in NETWORKS {
for network in EXTERNAL_NETWORKS {
if let Some(cosign) = LatestCosign::get(&db, network) {
latest_cosigns.insert(network, cosign);
}
Expand Down
34 changes: 17 additions & 17 deletions coordinator/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ use blake2::{
use scale::Encode;
use borsh::{BorshSerialize, BorshDeserialize};
use serai_client::{
primitives::NetworkId,
validator_sets::primitives::{Session, ValidatorSet},
in_instructions::primitives::{Batch, SignedBatch},
primitives::ExternalNetworkId,
validator_sets::primitives::{ExternalValidatorSet, Session},
};

pub use serai_db::*;
Expand All @@ -18,21 +18,21 @@ use crate::tributary::{TributarySpec, Transaction, scanner::RecognizedIdType};

create_db!(
MainDb {
HandledMessageDb: (network: NetworkId) -> u64,
HandledMessageDb: (network: ExternalNetworkId) -> u64,
ActiveTributaryDb: () -> Vec<u8>,
RetiredTributaryDb: (set: ValidatorSet) -> (),
RetiredTributaryDb: (set: ExternalValidatorSet) -> (),
FirstPreprocessDb: (
network: NetworkId,
network: ExternalNetworkId,
id_type: RecognizedIdType,
id: &[u8]
) -> Vec<Vec<u8>>,
LastReceivedBatchDb: (network: NetworkId) -> u32,
ExpectedBatchDb: (network: NetworkId, id: u32) -> [u8; 32],
BatchDb: (network: NetworkId, id: u32) -> SignedBatch,
LastVerifiedBatchDb: (network: NetworkId) -> u32,
HandoverBatchDb: (set: ValidatorSet) -> u32,
LookupHandoverBatchDb: (network: NetworkId, batch: u32) -> Session,
QueuedBatchesDb: (set: ValidatorSet) -> Vec<u8>
LastReceivedBatchDb: (network: ExternalNetworkId) -> u32,
ExpectedBatchDb: (network: ExternalNetworkId, id: u32) -> [u8; 32],
BatchDb: (network: ExternalNetworkId, id: u32) -> SignedBatch,
LastVerifiedBatchDb: (network: ExternalNetworkId) -> u32,
HandoverBatchDb: (set: ExternalValidatorSet) -> u32,
LookupHandoverBatchDb: (network: ExternalNetworkId, batch: u32) -> Session,
QueuedBatchesDb: (set: ExternalValidatorSet) -> Vec<u8>
}
);

Expand Down Expand Up @@ -61,7 +61,7 @@ impl ActiveTributaryDb {
ActiveTributaryDb::set(txn, &existing_bytes);
}

pub fn retire_tributary(txn: &mut impl DbTxn, set: ValidatorSet) {
pub fn retire_tributary(txn: &mut impl DbTxn, set: ExternalValidatorSet) {
let mut active = Self::active_tributaries(txn).1;
for i in 0 .. active.len() {
if active[i].set() == set {
Expand All @@ -82,7 +82,7 @@ impl ActiveTributaryDb {
impl FirstPreprocessDb {
pub fn save_first_preprocess(
txn: &mut impl DbTxn,
network: NetworkId,
network: ExternalNetworkId,
id_type: RecognizedIdType,
id: &[u8],
preprocess: &Vec<Vec<u8>>,
Expand All @@ -108,19 +108,19 @@ impl ExpectedBatchDb {
}

impl HandoverBatchDb {
pub fn set_handover_batch(txn: &mut impl DbTxn, set: ValidatorSet, batch: u32) {
pub fn set_handover_batch(txn: &mut impl DbTxn, set: ExternalValidatorSet, batch: u32) {
Self::set(txn, set, &batch);
LookupHandoverBatchDb::set(txn, set.network, batch, &set.session);
}
}
impl QueuedBatchesDb {
pub fn queue(txn: &mut impl DbTxn, set: ValidatorSet, batch: &Transaction) {
pub fn queue(txn: &mut impl DbTxn, set: ExternalValidatorSet, batch: &Transaction) {
let mut batches = Self::get(txn, set).unwrap_or_default();
batch.write(&mut batches).unwrap();
Self::set(txn, set, &batches);
}

pub fn take(txn: &mut impl DbTxn, set: ValidatorSet) -> Vec<Transaction> {
pub fn take(txn: &mut impl DbTxn, set: ExternalValidatorSet) -> Vec<Transaction> {
let batches_vec = Self::get(txn, set).unwrap_or_default();
txn.del(Self::key(set));

Expand Down
31 changes: 15 additions & 16 deletions coordinator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ use serai_db::{DbTxn, Db};
use scale::Encode;
use borsh::BorshSerialize;
use serai_client::{
primitives::NetworkId,
validator_sets::primitives::{Session, ValidatorSet, KeyPair},
primitives::ExternalNetworkId,
validator_sets::primitives::{ExternalValidatorSet, KeyPair, Session},
Public, Serai, SeraiInInstructions,
};

Expand Down Expand Up @@ -79,7 +79,7 @@ pub struct ActiveTributary<D: Db, P: P2p> {
#[derive(Clone)]
pub enum TributaryEvent<D: Db, P: P2p> {
NewTributary(ActiveTributary<D, P>),
TributaryRetired(ValidatorSet),
TributaryRetired(ExternalValidatorSet),
}

// Creates a new tributary and sends it to all listeners.
Expand Down Expand Up @@ -145,7 +145,7 @@ async fn handle_processor_message<D: Db, P: P2p>(
p2p: &P,
cosign_channel: &mpsc::UnboundedSender<CosignedBlock>,
tributaries: &HashMap<Session, ActiveTributary<D, P>>,
network: NetworkId,
network: ExternalNetworkId,
msg: &processors::Message,
) -> bool {
#[allow(clippy::nonminimal_bool)]
Expand Down Expand Up @@ -193,7 +193,8 @@ async fn handle_processor_message<D: Db, P: P2p>(
.iter()
.map(|plan| plan.session)
.filter(|session| {
RetiredTributaryDb::get(&txn, ValidatorSet { network, session: *session }).is_none()
RetiredTributaryDb::get(&txn, ExternalValidatorSet { network, session: *session })
.is_none()
})
.collect::<HashSet<_>>();

Expand Down Expand Up @@ -265,7 +266,7 @@ async fn handle_processor_message<D: Db, P: P2p>(
}
// This causes an action on Substrate yet not on any Tributary
coordinator::ProcessorMessage::SignedSlashReport { session, signature } => {
let set = ValidatorSet { network, session: *session };
let set = ExternalValidatorSet { network, session: *session };
let signature: &[u8] = signature.as_ref();
let signature = serai_client::Signature(signature.try_into().unwrap());

Expand Down Expand Up @@ -393,7 +394,7 @@ async fn handle_processor_message<D: Db, P: P2p>(
if let Some(relevant_tributary_value) = relevant_tributary {
if RetiredTributaryDb::get(
&txn,
ValidatorSet { network: msg.network, session: relevant_tributary_value },
ExternalValidatorSet { network: msg.network, session: relevant_tributary_value },
)
.is_some()
{
Expand Down Expand Up @@ -782,7 +783,7 @@ async fn handle_processor_messages<D: Db, Pro: Processors, P: P2p>(
processors: Pro,
p2p: P,
cosign_channel: mpsc::UnboundedSender<CosignedBlock>,
network: NetworkId,
network: ExternalNetworkId,
mut tributary_event: mpsc::UnboundedReceiver<TributaryEvent<D, P>>,
) {
let mut tributaries = HashMap::new();
Expand Down Expand Up @@ -831,7 +832,7 @@ async fn handle_processor_messages<D: Db, Pro: Processors, P: P2p>(
#[allow(clippy::too_many_arguments)]
async fn handle_cosigns_and_batch_publication<D: Db, P: P2p>(
mut db: D,
network: NetworkId,
network: ExternalNetworkId,
mut tributary_event: mpsc::UnboundedReceiver<TributaryEvent<D, P>>,
) {
let mut tributaries = HashMap::new();
Expand Down Expand Up @@ -905,7 +906,7 @@ async fn handle_cosigns_and_batch_publication<D: Db, P: P2p>(
for batch in start_id ..= last_id {
let is_pre_handover = LookupHandoverBatchDb::get(&txn, network, batch + 1);
if let Some(session) = is_pre_handover {
let set = ValidatorSet { network, session };
let set = ExternalValidatorSet { network, session };
let mut queued = QueuedBatchesDb::take(&mut txn, set);
// is_handover_batch is only set for handover `Batch`s we're participating in, making
// this safe
Expand All @@ -923,7 +924,8 @@ async fn handle_cosigns_and_batch_publication<D: Db, P: P2p>(

let is_handover = LookupHandoverBatchDb::get(&txn, network, batch);
if let Some(session) = is_handover {
for queued in QueuedBatchesDb::take(&mut txn, ValidatorSet { network, session }) {
for queued in QueuedBatchesDb::take(&mut txn, ExternalValidatorSet { network, session })
{
to_publish.push((session, queued));
}
}
Expand Down Expand Up @@ -970,10 +972,7 @@ pub async fn handle_processors<D: Db, Pro: Processors, P: P2p>(
mut tributary_event: broadcast::Receiver<TributaryEvent<D, P>>,
) {
let mut channels = HashMap::new();
for network in serai_client::primitives::NETWORKS {
if network == NetworkId::Serai {
continue;
}
for network in serai_client::primitives::EXTERNAL_NETWORKS {
let (processor_send, processor_recv) = mpsc::unbounded_channel();
tokio::spawn(handle_processor_messages(
db.clone(),
Expand Down Expand Up @@ -1195,7 +1194,7 @@ pub async fn run<D: Db, Pro: Processors, P: P2p>(
}
});

move |set: ValidatorSet, genesis, id_type, id: Vec<u8>| {
move |set: ExternalValidatorSet, genesis, id_type, id: Vec<u8>| {
log::debug!("recognized ID {:?} {}", id_type, hex::encode(&id));
let mut raw_db = raw_db.clone();
let key = key.clone();
Expand Down
Loading
Loading