Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement basic equivocations detection loop #2367

Merged
merged 6 commits into from
Aug 23, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion primitives/header-chain/src/justification/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,11 @@ impl<H: HeaderT> GrandpaJustification<H> {
}
}

impl<H: HeaderT> crate::FinalityProof<H::Number> for GrandpaJustification<H> {
impl<H: HeaderT> crate::FinalityProof<H::Hash, H::Number> for GrandpaJustification<H> {
fn target_header_hash(&self) -> H::Hash {
self.commit.target_hash
}

fn target_header_number(&self) -> H::Number {
self.commit.target_number
}
Expand Down
7 changes: 5 additions & 2 deletions primitives/header-chain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,10 @@ pub struct InitializationData<H: HeaderT> {
}

/// Abstract finality proof that is justifying block finality.
pub trait FinalityProof<Number>: Clone + Send + Sync + Debug {
pub trait FinalityProof<Hash, Number>: Clone + Send + Sync + Debug {
/// Return hash of header that this proof is generated for.
fn target_header_hash(&self) -> Hash;

/// Return number of header that this proof is generated for.
fn target_header_number(&self) -> Number;
}
Expand Down Expand Up @@ -227,7 +230,7 @@ impl<Header: HeaderT> TryFrom<StoredHeaderGrandpaInfo<Header>> for HeaderGrandpa
/// Helper trait for finding equivocations in finality proofs.
pub trait FindEquivocations<FinalityProof, FinalityVerificationContext, EquivocationProof> {
/// The type returned when encountering an error while looking for equivocations.
type Error;
type Error: Debug;

/// Find equivocations.
fn find_equivocations(
Expand Down
5 changes: 5 additions & 0 deletions relays/equivocation/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,12 @@ license = "GPL-3.0-or-later WITH Classpath-exception-2.0"
description = "Equivocation detector"

[dependencies]
async-std = "1.6.5"
async-trait = "0.1"
bp-header-chain = { path = "../../primitives/header-chain" }
finality-relay = { path = "../finality" }
frame-support = { git = "https://github.com/paritytech/substrate", branch = "master" }
futures = "0.3.28"
log = "0.4.20"
num-traits = "0.2"
relay-utils = { path = "../utils" }
327 changes: 327 additions & 0 deletions relays/equivocation/src/equivocation_loop.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,327 @@
// Copyright 2019-2023 Parity Technologies (UK) Ltd.
// This file is part of Parity Bridges Common.

// Parity Bridges Common is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Parity Bridges Common is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.

use crate::{
reporter::EquivocationsReporter, EquivocationDetectionPipeline, HeaderFinalityInfo,
SourceClient, TargetClient,
};

use bp_header_chain::{FinalityProof, FindEquivocations};
use finality_relay::{FinalityProofsBuf, FinalityProofsStream};
use futures::{select, FutureExt};
use num_traits::Saturating;
use relay_utils::{
relay_loop::{reconnect_failed_client, RECONNECT_DELAY},
FailedClient, MaybeConnectionError,
};
use std::{future::Future, time::Duration};

struct EquivocationReportingContext<P: EquivocationDetectionPipeline> {
acatangiu marked this conversation as resolved.
Show resolved Hide resolved
synced_header_hash: P::Hash,
synced_verification_context: P::FinalityVerificationContext,
}

impl<P: EquivocationDetectionPipeline> EquivocationReportingContext<P> {
async fn try_read_from_target<TC: TargetClient<P>>(
target_client: &TC,
at: P::TargetNumber,
) -> Result<Option<Self>, TC::Error> {
let maybe_best_synced_header_hash = target_client.best_synced_header_hash(at).await?;
Ok(match maybe_best_synced_header_hash {
Some(best_synced_header_hash) => Some(EquivocationReportingContext {
synced_header_hash: best_synced_header_hash,
synced_verification_context: target_client
.finality_verification_context(at)
.await?,
}),
None => None,
})
}

fn update(&mut self, info: HeaderFinalityInfo<P>) {
if let Some(new_verification_context) = info.new_verification_context {
self.synced_header_hash = info.finality_proof.target_header_hash();
self.synced_verification_context = new_verification_context;
}
}
}

/// Finality synchronization loop state.
struct EquivocationDetectionLoop<
P: EquivocationDetectionPipeline,
SC: SourceClient<P>,
TC: TargetClient<P>,
> {
source_client: SC,
target_client: TC,

from_block_num: P::TargetNumber,
until_block_num: P::TargetNumber,

reporter: EquivocationsReporter<P, SC>,

finality_proofs_stream: FinalityProofsStream<P, SC>,
finality_proofs_buf: FinalityProofsBuf<P>,
}

impl<P: EquivocationDetectionPipeline, SC: SourceClient<P>, TC: TargetClient<P>>
EquivocationDetectionLoop<P, SC, TC>
{
async fn reconnect_source_client(&mut self, e: SC::Error) -> bool {
if e.is_connection_error() {
reconnect_failed_client(
FailedClient::Source,
RECONNECT_DELAY,
&mut self.source_client,
&mut self.target_client,
)
.await;

return true
}

false
}

async fn reconnect_target_client(&mut self, e: TC::Error) -> bool {
if e.is_connection_error() {
reconnect_failed_client(
FailedClient::Target,
RECONNECT_DELAY,
&mut self.source_client,
&mut self.target_client,
)
.await;

return true
}

false
}

async fn update_until_block_num(&mut self) {
self.until_block_num = match self.target_client.best_finalized_header_number().await {
Ok(hdr_num) => hdr_num,
Err(e) => {
log::error!(
target: "bridge",
"Could not read best finalized header number from {}: {e:?}",
P::TARGET_NAME,
);

// Reconnect target client and move on
self.reconnect_target_client(e).await;
return
},
};
}

async fn build_context(
&mut self,
block_num: P::TargetNumber,
) -> Option<EquivocationReportingContext<P>> {
match EquivocationReportingContext::try_read_from_target(
&self.target_client,
block_num.saturating_sub(1.into()),
)
.await
{
Ok(Some(context)) => Some(context),
Ok(None) => None,
Err(e) => {
log::error!(
target: "bridge",
"Could not read {} `EquivocationReportingContext` from {} at block {block_num}: {e:?}",
P::SOURCE_NAME,
P::TARGET_NAME,
);

// Reconnect target client if needed and move on.
self.reconnect_target_client(e).await;
None
},
}
}

async fn synced_source_headers_at_target(
&mut self,
at: P::TargetNumber,
) -> Vec<HeaderFinalityInfo<P>> {
match self.target_client.synced_headers_finality_info(at).await {
Ok(synced_headers) => synced_headers,
Err(e) => {
log::error!(
target: "bridge",
"Could not get {} headers synced to {} at block {at:?}",
P::SOURCE_NAME,
P::TARGET_NAME
);

// Reconnect in case of a connection error.
self.reconnect_target_client(e).await;
// And move on to the next block.
vec![]
},
}
}

async fn report_equivocation(&mut self, at: P::Hash, equivocation: P::EquivocationProof) {
match self.reporter.submit_report(&self.source_client, at, equivocation.clone()).await {
Ok(_) => {},
Err(e) => {
log::error!(
target: "bridge",
"Could not submit equivocation report to {} for {equivocation:?}: {e:?}",
P::SOURCE_NAME,
);

// Reconnect source client and move on
self.reconnect_source_client(e).await;
},
}
}

async fn check_block(
&mut self,
block_num: P::TargetNumber,
context: &mut EquivocationReportingContext<P>,
) {
let synced_headers = self.synced_source_headers_at_target(block_num).await;

for synced_header in synced_headers {
self.finality_proofs_buf.fill(&mut self.finality_proofs_stream);

let equivocations = match P::EquivocationsFinder::find_equivocations(
&context.synced_verification_context,
&synced_header.finality_proof,
self.finality_proofs_buf.buf().as_slice(),
) {
Ok(equivocations) => equivocations,
Err(e) => {
log::error!(
target: "bridge",
"Could not search for equivocations in the finality proof \
for source header {:?} synced at target block {block_num:?}: {e:?}",
synced_header.finality_proof.target_header_hash()
);
continue
},
};
for equivocation in equivocations {
self.report_equivocation(context.synced_header_hash, equivocation).await;
}

self.finality_proofs_buf
.prune(synced_header.finality_proof.target_header_number(), None);
context.update(synced_header);
}
}

async fn run(&mut self, tick: Duration, exit_signal: impl Future<Output = ()>) {
let exit_signal = exit_signal.fuse();
futures::pin_mut!(exit_signal);

loop {
// Make sure that we are connected to the source finality proofs stream.
match self.finality_proofs_stream.ensure_stream(&self.source_client).await {
Ok(_) => {},
Err(e) => {
log::error!(
target: "bridge",
"Could not connect to the {} `FinalityProofsStream`: {e:?}",
P::SOURCE_NAME,
);

// Reconnect to the source client if needed
match self.reconnect_source_client(e).await {
true => {
// Connection error. Move on.
continue
},
false => {
// Irrecoverable error. End the loop.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know - you'll be polishing the error handling in future PRs, just wanted to leave a comment here, because it maybe be confusing a bit (and I already see that the non-connection error is called the "irrecoverable" here). So in relays we have two kind of errors. Connections errors are errors that are likely network errors or jsonrpsee internal errors. If you see a connection error, you need to reconnect the client - no other solution is possible. Other errors (non-connection errors) are assumed to be recoverable - they are caused e.g. by us submitting the obsolete transaction or e.g. server rejecting our RPC because it is overloaded.

The relay_utils::relay_loop function is supposed to handle the connection errors. So when you're starting loop using relay_utils::relay_loop, your "run_loop" function is expected to return an error when connection error is met. Then the RelayLoop itself reconnects to the failed client and restarts run_loop. Other (non-connection) errors are supposed to be handled by the run_loop function itself (e.g. by retrying with some exponential backoff or simply sleep for some time). Right now error handling here is implemented in an opposite way - you yield to RelayLoop on non-connection errors and do reconnect yourself.

This isn't the big issue - let's see the future error-handling PR, just wanted to share that ^^^. Maybe it'll allow you to remove some code here in favor of already existing code

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanations. Personally I thought that any error other than connection errors are irrecoverable. For example:

  • we can't generate the key ownership prof, because we're too many blocks ahead.
  • let's say that the chains were updated and the relayer has old data structures, so it can't encode/decode them
    etc

I think we should take this into consideration, because otherwise we risk getting stuck in a retrial loop. But it's true that other errors could be recoverable. So I don't know what would be best here. But seems like error handling will be a complex problem for a future PR.

But for the moment changing the strategy to consider every error recoverable and just sleep a bit and skip the item that lead to the error when possible.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we risk getting stuck in a retrial loop

That's true. Normally (in other relay loops) we'll eventually break of this loop when we'll read updated state and realized that something has changed. But until then we'll keep retrying. Which is fine (imo), unless we spam RPC server with failing RPC requests without some backoff.

In your examples:

  • we can't generate the key ownership prof, because we're too many blocks ahead: then the loop should just log an error it and go further (imo);
  • let's say that the chains were updated and the relayer has old data structures, so it can't encode/decode them etc: that's definitely a maintenance flaw - the best we could do here is to have a backoff mechanism. Like your current loop implementation would just exit the process if you encounter any non-connection error. Is it good? E.g. if we can't generate ownership proof, then we could just keep working with next justifications.

But as I said - in the end it is up to you, how to handle issues :)

return
},
}
},
}
// Check the status of the pending equivocation reports
self.reporter.process_pending_reports().await;

// Check the next block
self.update_until_block_num().await;
if self.from_block_num <= self.until_block_num {
let mut context = match self.build_context(self.from_block_num).await {
Some(context) => context,
None => return,
};

self.check_block(self.from_block_num, &mut context).await;

self.from_block_num = self.from_block_num.saturating_add(1.into());
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be a loop while self.from_block_num <= self.until_block_num {} and do full catch-up checks in a single tick.

Doesn't look like that much work to have latency concerns..

Also I would actually suggest to skip the checking full (sub)chain and just check latest (source highest/best finalized as seen by target) - only if that has different hash than block on source at same height, then you know it forked somewhere since last tick and you can iterate to find the fork. WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

while self.from_block_num <= self.until_block_num {}

👍 Also @serban300 please note that from_block_num is initialized to zero, so it'll start from genesis - I think it should be initialized to best target block at the beginning of the loop?

Also I would actually suggest to skip the checking full (sub)chain and just check latest (source highest/best finalized as seen by target) - only if that has different hash than block on source at same height, then you know it forked somewhere since last tick and you can iterate to find the fork. WDYT?

I think an attacker(s) may be able to submit { submit_finality_proof(forked_header#100), submit_message(malicious_message), submit_finality_proof(good_header#101) } in a single block. So if you'll just look at the latest, you may skip this forked_header submission. So imo we should look at all headers here

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think an attacker(s) may be able to submit { submit_finality_proof(forked_header#100), submit_message(malicious_message), submit_finality_proof(good_header#101) } in a single block. So if you'll just look at the latest, you may skip this forked_header submission. So imo we should look at all headers here

but how would good_header#101 have the correct hash if it's been built on top of bad_header#100? Once relayer introduces any forked header, all subsequent child chain will be a fork, no?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't check the connection between headers anywhere. So it could be built on top of other "normal" header. E.g. in following schema:

99 ---> 100 ---> 101
     \--> 100' 

the relayer1 may track the bad fork and submit the 100' first and relayer2 may then submit the 101

Copy link
Collaborator

@acatangiu acatangiu Aug 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't check the connection between headers anywhere.

oh, ok, I thought (without checking the code) we did. (I now realize we don't check the chain continuity because we don't want to import every header and we're happy with every other Nth header as long as it has a GRANDPA justification).

valid scenario then, let's keep the equivocation check for every submitted header 👍

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be a loop while self.from_block_num <= self.until_block_num {} and do full catch-up checks in a single tick.

Sounds good. Done.

Also @serban300 please note that from_block_num is initialized to zero, so it'll start from genesis - I think it should be initialized to best target block at the beginning of the loop?

Yes, you're right. That was the intention, but somehow I forgot to implement it.

Also I would actually suggest to skip the checking full (sub)chain and just check latest (source highest/best finalized as seen by target) - only if that has different hash than block on source at same height, then you know it forked somewhere since last tick and you can iterate to find the fork. WDYT?

I think an attacker(s) may be able to submit { submit_finality_proof(forked_header#100), submit_message(malicious_message), submit_finality_proof(good_header#101) } in a single block. So if you'll just look at the latest, you may skip this forked_header submission. So imo we should look at all headers here

That's true. Also I think it's good to check for equivocations even if synced hash == source hash. There might be cases where we could still find equivocations. And it shouldn't add a big overhead.


select! {
_ = async_std::task::sleep(tick).fuse() => {},
_ = exit_signal => return,
}
}
}

pub async fn spawn(
svyatonik marked this conversation as resolved.
Show resolved Hide resolved
source_client: SC,
target_client: TC,
tick: Duration,
exit_signal: impl Future<Output = ()>,
) -> Result<(), FailedClient> {
let mut equivocation_detection_loop = Self {
source_client,
target_client,
from_block_num: 0.into(),
until_block_num: 0.into(),
reporter: EquivocationsReporter::<P, SC>::new(),
finality_proofs_stream: FinalityProofsStream::new(),
finality_proofs_buf: FinalityProofsBuf::new(vec![]),
};

equivocation_detection_loop.run(tick, exit_signal).await;
Ok(())
}
}

/// TODO: remove `#[allow(dead_code)]`
#[allow(dead_code)]
pub async fn run<P: EquivocationDetectionPipeline>(
source_client: impl SourceClient<P>,
target_client: impl TargetClient<P>,
tick: Duration,
exit_signal: impl Future<Output = ()> + 'static + Send,
) -> Result<(), relay_utils::Error> {
let exit_signal = exit_signal.shared();
relay_utils::relay_loop(source_client, target_client)
.run(
format!("{}_to_{}_EquivocationDetection", P::SOURCE_NAME, P::TARGET_NAME),
move |source_client, target_client, _metrics| {
EquivocationDetectionLoop::spawn(
source_client,
target_client,
tick,
exit_signal.clone(),
)
},
)
.await
}
Loading