Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add the groundwork for failover RPC support #110

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 14 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ and they can convert them back as well.

![withdraw](./res/withdraw.png)

### How to build
### How to build

Requires `rust` and `cargo`: [installation instructions.](https://www.rust-lang.org/en-US/install.html)

Expand Down Expand Up @@ -83,16 +83,20 @@ keystore = "/path/to/keystore"

[home]
account = "0x006e27b6a72e1f34c626762f3c4761547aff1421"
rpc_host = "http://localhost"
rpc_port = 8545
primary_rpc_host = "http://localhost"
primary_rpc_port = 8545
failover_rpc_host = "http://localhost"
failover_rpc_port = 8546
required_confirmations = 0
password = "home_password.txt"
default_gas_price = 1_000_000_000 # 1 GWEI

[foreign]
account = "0x006e27b6a72e1f34c626762f3c4761547aff1421"
rpc_host = "http://localhost"
rpc_port = 9545
primary_rpc_host = "http://localhost"
primary_rpc_port = 9545
failover_rpc_host = "http://localhost"
failover_rpc_port = 9546
required_confirmations = 0
gas_price_oracle_url = "https://gasprice.poa.network"
gas_price_speed = "instant"
Expand All @@ -110,13 +114,15 @@ withdraw_confirm = { gas = 3000000 }

#### Options

- `keystore` - path to a keystore directory with JSON keys
- `keystore` - path to a keystore directory with JSON keys

#### home/foreign options

- `home/foreign.account` - authority address on the home (**required**)
- `home/foreign.rpc_host` - RPC host (**required**)
- `home/foreign.rpc_port` - RPC port (**defaults to 8545**)
- `home/foreign.primary_rpc_host` - Primary RPC host (**required**)
- `home/foreign.primary_rpc_port` - Primary RPC port (**defaults to 8545**)
- `home/foreign.failover_rpc_host` - Failover RPC host used in the event the primary RPC host is not available. Must be specified if `failover_rpc_port` is set.
- `home/foreign.failover_rpc_port` - Failover RPC port used in the event the primary RPC host is not available. (**defaults to 8545** if `failover_rpc_host` is set.)
- `home/foreign.required_confirmations` - number of confirmation required to consider transaction final on home (default: **12**)
- `home/foreign.poll_interval` - specify how often home node should be polled for changes (in seconds, default: **1**)
- `home/foreign.request_timeout` - specify request timeout (in seconds, default: **3600**)
Expand Down
93 changes: 66 additions & 27 deletions bridge/src/app.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use std::path::{Path, PathBuf};
use tokio_core::reactor::{Handle};
use tokio_timer::{self, Timer};
use web3::Transport;
use error::{Error, ResultExt, ErrorKind};
use config::Config;
use error::{Error, ErrorKind};
use config::{Config, RpcUrl, RpcUrlKind};
use contracts::{home, foreign};
use web3::transports::http::Http;
use web3::{Transport, transports::http::Http, error::Error as Web3Error};
use std::time::Duration;

use std::sync::Arc;
Expand All @@ -27,54 +26,93 @@ pub struct App<T> where T: Transport {

pub struct Connections<T> where T: Transport {
pub home: T,
pub home_url: RpcUrlKind,
pub foreign: T,
pub foreign_url: RpcUrlKind,
}

impl Connections<Http> {
pub fn new_http(handle: &Handle, home: &str, home_concurrent_connections: usize, foreign: &str, foreign_concurrent_connections: usize) -> Result<Self, Error> {
/// Returns new home and foreign HTTP transport connections, falling back
/// to failover urls if necessary.
pub fn new_http(handle: &Handle, home_primary: &RpcUrl, home_failover: Option<&RpcUrl>,
home_concurrent_connections: usize, foreign_primary: &RpcUrl,
foreign_failover: Option<&RpcUrl>, foreign_concurrent_connections: usize)
-> Result<Self, Error> {
// Attempts to connect to either a primary or failover url, returning
// the transport and the url upon success.
fn connect(handle: &Handle, url_primary: &RpcUrl, url_failover: Option<&RpcUrl>,
concurrent_connections: usize) -> Result<(Http, RpcUrlKind), Web3Error> {
match Http::with_event_loop(&url_primary.to_string(), handle, concurrent_connections) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the most important part of my review. My reading of https://github.com/tomusdrw/rust-web3/blob/master/src/transports/http.rs#L78 suggests that this call will never fail if primary is not available as it doesn't attempt the connection, meaning bridge will always try primary, then fail, restart and try it again to no avail.

Would love to be proven wrong, though -- if I misread any part or something.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you're right which is why I wanted to make clear that this PR doesn't properly address the issue. It looks like the connection errors are passed up through the spawned future. I'll need to dig around and look into how those are currently handled (see below).

Ok(t) => Ok((t, RpcUrlKind::Primary(url_primary.clone()))),
Err(err) => match url_failover {
Some(fo) => {
Http::with_event_loop(&fo.to_string(), handle, concurrent_connections)
.map(|h| (h, RpcUrlKind::Failover(fo.clone())))
},
None => Err(err),
},
}
}

let home = Http::with_event_loop(home, handle,home_concurrent_connections)
.map_err(ErrorKind::Web3)
.map_err(Error::from)
.chain_err(||"Cannot connect to home node rpc")?;
let foreign = Http::with_event_loop(foreign, handle, foreign_concurrent_connections)
.map_err(ErrorKind::Web3)
.map_err(Error::from)
.chain_err(||"Cannot connect to foreign node rpc")?;
let (home, home_url) = connect(handle, home_primary, home_failover, home_concurrent_connections)
.map_err(|err| ErrorKind::HomeRpcConnection(err))?;
let (foreign, foreign_url) = connect(handle, foreign_primary, foreign_failover, foreign_concurrent_connections)
.map_err(|err| ErrorKind::ForeignRpcConnection(err))?;

let result = Connections {
Ok(Connections {
home,
foreign
};
Ok(result)
home_url,
foreign,
foreign_url,
})
}
}

impl<T: Transport> Connections<T> {
pub fn as_ref(&self) -> Connections<&T> {
Connections {
/// Contains references to the fields of a `Connection`.
pub struct ConnectionsRef<'u, T> where T: Transport {
pub home: T,
pub home_url: &'u RpcUrlKind,
pub foreign: T,
pub foreign_url: &'u RpcUrlKind,
}

impl<'u, T: Transport> ConnectionsRef<'u, T> {
pub fn as_ref(&'u self) -> ConnectionsRef<'u, &T> {
ConnectionsRef {
home: &self.home,
home_url: &self.home_url,
foreign: &self.foreign,
foreign_url: &self.foreign_url,
}
}
}

impl App<Http> {
pub fn new_http<P: AsRef<Path>>(config: Config, database_path: P, handle: &Handle, running: Arc<AtomicBool>) -> Result<Self, Error> {
let home_url:String = format!("{}:{}", config.home.rpc_host, config.home.rpc_port);
let foreign_url:String = format!("{}:{}", config.foreign.rpc_host, config.foreign.rpc_port);
pub fn new_http<P: AsRef<Path>>(config: Config, database_path: P, handle: &Handle,
running: Arc<AtomicBool>) -> Result<Self, Error> {
let connections = Connections::new_http(
handle,
&config.home.primary_rpc,
config.home.failover_rpc.as_ref(),
config.home.concurrent_http_requests,
&config.foreign.primary_rpc,
config.foreign.failover_rpc.as_ref(),
config.foreign.concurrent_http_requests,
)?;

let connections = Connections::new_http(handle, home_url.as_ref(), config.home.concurrent_http_requests, foreign_url.as_ref(), config.foreign.concurrent_http_requests)?;
let keystore = EthStore::open(Box::new(RootDiskDirectory::at(&config.keystore))).map_err(|e| ErrorKind::KeyStore(e))?;
let keystore = EthStore::open(Box::new(RootDiskDirectory::at(&config.keystore)))
.map_err(|e| ErrorKind::KeyStore(e))?;

let keystore = AccountProvider::new(Box::new(keystore), AccountProviderSettings {
enable_hardware_wallets: false,
hardware_wallet_classic_key: false,
unlock_keep_secret: true,
blacklisted_accounts: vec![],
});
keystore.unlock_account_permanently(config.home.account, config.home.password()?).map_err(|e| ErrorKind::AccountError(e))?;
keystore.unlock_account_permanently(config.foreign.account, config.foreign.password()?).map_err(|e| ErrorKind::AccountError(e))?;
keystore.unlock_account_permanently(config.home.account, config.home.password()?)
.map_err(|e| ErrorKind::AccountError(e))?;
keystore.unlock_account_permanently(config.foreign.account, config.foreign.password()?)
.map_err(|e| ErrorKind::AccountError(e))?;

let max_timeout = config.clone().home.request_timeout.max(config.clone().foreign.request_timeout);

Expand All @@ -96,3 +134,4 @@ impl App<Http> {
Ok(result)
}
}

20 changes: 14 additions & 6 deletions bridge/src/bridge/deploy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,13 +100,21 @@ impl<T: Transport + Clone> Future for Deploy<T> {
data: test_data.into(),
};

let main_future = api::send_transaction_with_nonce(self.app.connections.home.clone(), self.app.clone(),
self.app.config.home.clone(), main_tx, self.home_chain_id,
TransactionWithConfirmation(self.app.connections.home.clone(), self.app.config.home.poll_interval, self.app.config.home.required_confirmations));
let main_future = api::send_transaction_with_nonce(
self.app.connections.home.clone(), self.app.connections.home_url.clone(),
self.app.clone(), self.app.config.home.clone(), main_tx, self.home_chain_id,
TransactionWithConfirmation(self.app.connections.home.clone(),
self.app.config.home.poll_interval,
self.app.config.home.required_confirmations)
);

let test_future = api::send_transaction_with_nonce(self.app.connections.foreign.clone(), self.app.clone(),
self.app.config.foreign.clone(), test_tx, self.foreign_chain_id,
TransactionWithConfirmation(self.app.connections.foreign.clone(), self.app.config.foreign.poll_interval, self.app.config.foreign.required_confirmations));
let test_future = api::send_transaction_with_nonce(
self.app.connections.foreign.clone(), self.app.connections.foreign_url.clone(),
self.app.clone(), self.app.config.foreign.clone(), test_tx, self.foreign_chain_id,
TransactionWithConfirmation(self.app.connections.foreign.clone(),
self.app.config.foreign.poll_interval,
self.app.config.foreign.required_confirmations)
);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like these are formatting changes unrelated to the actual change. They obscure the substance of the change, making it harder to understand what's important and what's not.

DeployState::Deploying(main_future.join(test_future))
}
Expand Down
8 changes: 5 additions & 3 deletions bridge/src/bridge/deposit_relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ impl<T: Transport> Stream for DepositRelay<T> {
let gas = U256::from(self.app.config.txs.deposit_relay.gas);
let gas_price = U256::from(*self.foreign_gas_price.read().unwrap());
let balance_required = gas * gas_price * U256::from(item.logs.len());

if balance_required > *foreign_balance.as_ref().unwrap() {
return Err(ErrorKind::InsufficientFunds.into())
}
Expand All @@ -110,8 +110,10 @@ impl<T: Transport> Stream for DepositRelay<T> {
nonce: U256::zero(),
action: Action::Call(self.foreign_contract.clone()),
};
api::send_transaction_with_nonce(self.app.connections.foreign.clone(), self.app.clone(), self.app.config.foreign.clone(),
tx, self.foreign_chain_id, SendRawTransaction(self.app.connections.foreign.clone()))
api::send_transaction_with_nonce(self.app.connections.foreign.clone(),
self.app.connections.foreign_url.clone(), self.app.clone(),
self.app.config.foreign.clone(), tx, self.foreign_chain_id,
SendRawTransaction(self.app.connections.foreign.clone()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like these are formatting changes unrelated to the actual change. They make sense, but they would have been a lot easier to process as a part of separate "formatting" patch, both for review and later reading.

}).collect_vec();

info!("relaying {} deposits", len);
Expand Down
24 changes: 13 additions & 11 deletions bridge/src/bridge/gas_price.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ mod tests {
use super::*;
use error::{Error, ErrorKind};
use futures::{Async, future::{err, ok, FutureResult}};
use config::{Node, NodeInfo, DEFAULT_CONCURRENCY};
use config::{Node, NodeInfo, DEFAULT_CONCURRENCY, RpcUrl};
use tokio_timer::Timer;
use std::time::Duration;
use std::path::PathBuf;
Expand All @@ -168,8 +168,8 @@ mod tests {
request_timeout: Duration::from_secs(5),
poll_interval: Duration::from_secs(1),
required_confirmations: 0,
rpc_host: "https://rpc".into(),
rpc_port: 443,
primary_rpc: RpcUrl { host: "https://rpc".into(), port: 443 },
failover_rpc: None,
password: PathBuf::from("password"),
info: NodeInfo::default(),
gas_price_oracle_url: Some("https://gas.price".into()),
Expand Down Expand Up @@ -211,8 +211,10 @@ mod tests {
request_timeout: Duration::from_secs(5),
poll_interval: Duration::from_secs(1),
required_confirmations: 0,
rpc_host: "https://rpc".into(),
rpc_port: 443,
// rpc_host: "https://rpc".into(),
// rpc_port: 443,
primary_rpc: RpcUrl { host: "https://rpc".into(), port: 443 },
failover_rpc: None,
password: PathBuf::from("password"),
info: NodeInfo::default(),
gas_price_oracle_url: Some("https://gas.price".into()),
Expand Down Expand Up @@ -254,8 +256,8 @@ mod tests {
request_timeout: Duration::from_secs(5),
poll_interval: Duration::from_secs(1),
required_confirmations: 0,
rpc_host: "https://rpc".into(),
rpc_port: 443,
primary_rpc: RpcUrl { host: "https://rpc".into(), port: 443 },
failover_rpc: None,
password: PathBuf::from("password"),
info: NodeInfo::default(),
gas_price_oracle_url: Some("https://gas.price".into()),
Expand Down Expand Up @@ -296,8 +298,8 @@ mod tests {
request_timeout: Duration::from_secs(5),
poll_interval: Duration::from_secs(1),
required_confirmations: 0,
rpc_host: "https://rpc".into(),
rpc_port: 443,
primary_rpc: RpcUrl { host: "https://rpc".into(), port: 443 },
failover_rpc: None,
password: PathBuf::from("password"),
info: NodeInfo::default(),
gas_price_oracle_url: Some("https://gas.price".into()),
Expand Down Expand Up @@ -338,8 +340,8 @@ mod tests {
request_timeout: Duration::from_secs(5),
poll_interval: Duration::from_secs(1),
required_confirmations: 0,
rpc_host: "https://rpc".into(),
rpc_port: 443,
primary_rpc: RpcUrl { host: "https://rpc".into(), port: 443 },
failover_rpc: None,
password: PathBuf::from("password"),
info: NodeInfo::default(),
gas_price_oracle_url: Some("https://gas.price".into()),
Expand Down
12 changes: 9 additions & 3 deletions bridge/src/bridge/nonce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use web3::types::{U256, H256, Bytes};
use ethcore_transaction::Transaction;
use api::{self, ApiCall};
use error::{Error, ErrorKind};
use config::Node;
use config::{Node, RpcUrlKind};
use transaction::prepare_raw_transaction;
use app::App;
use std::sync::Arc;
Expand All @@ -32,6 +32,8 @@ enum NonceCheckState<T: Transport, S: TransactionSender> {
pub struct NonceCheck<T: Transport, S: TransactionSender> {
app: Arc<App<T>>,
transport: T,
/// Used for logging:
rpc_url: RpcUrlKind,
state: NonceCheckState<T, S>,
node: Node,
transaction: Transaction,
Expand All @@ -48,11 +50,14 @@ impl<T: Transport, S: TransactionSender> Debug for NonceCheck<T, S> {

}

pub fn send_transaction_with_nonce<T: Transport + Clone, S: TransactionSender>(transport: T, app: Arc<App<T>>, node: Node, transaction: Transaction, chain_id: u64, sender: S) -> NonceCheck<T, S> {
pub fn send_transaction_with_nonce<T, S>(transport: T, rpc_url: RpcUrlKind, app: Arc<App<T>>,
node: Node, transaction: Transaction, chain_id: u64, sender: S) -> NonceCheck<T, S>
where T: Transport + Clone, S: TransactionSender {
NonceCheck {
app,
state: NonceCheckState::Ready,
transport,
rpc_url,
node,
transaction,
chain_id,
Expand Down Expand Up @@ -108,7 +113,8 @@ impl<T: Transport, S: TransactionSender> Future for NonceCheck<T, S> {
NonceCheckState::Reacquire
} else if rpc_err.code == rpc::ErrorCode::ServerError(-32010) && rpc_err.message.ends_with("already imported.") {
let hash = self.transaction.hash(Some(self.chain_id));
info!("{} already imported on {}, skipping", hash, self.node.rpc_host);
// info!("{} already imported on {}, skipping", hash, self.node.rpc_host);
info!("{} already imported on {}, skipping", hash, self.rpc_url);
return Ok(Async::Ready(self.sender.ignore(hash)))
} else {
return Err(ErrorKind::Web3(web3::error::ErrorKind::Rpc(rpc_err).into()).into());
Expand Down
6 changes: 4 additions & 2 deletions bridge/src/bridge/withdraw_confirm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,10 @@ impl<T: Transport> Stream for WithdrawConfirm<T> {
nonce: U256::zero(),
action: Action::Call(contract),
};
api::send_transaction_with_nonce(self.app.connections.foreign.clone(), self.app.clone(), self.app.config.foreign.clone(),
tx, self.foreign_chain_id, SendRawTransaction(self.app.connections.foreign.clone()))
api::send_transaction_with_nonce(self.app.connections.foreign.clone(),
self.app.connections.foreign_url.clone(), self.app.clone(),
self.app.config.foreign.clone(), tx, self.foreign_chain_id,
SendRawTransaction(self.app.connections.foreign.clone()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like these are formatting changes unrelated to the actual change.

}).collect_vec();

info!("submitting {} signatures", len);
Expand Down
4 changes: 3 additions & 1 deletion bridge/src/bridge/withdraw_relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ impl<T: Transport> Stream for WithdrawRelay<T> {
let contract = self.home_contract.clone();
let home = &self.app.config.home;
let t = &self.app.connections.home;
let t_url = &self.app.connections.home_url;
let chain_id = self.home_chain_id;

loop {
Expand Down Expand Up @@ -232,7 +233,8 @@ impl<T: Transport> Stream for WithdrawRelay<T> {
nonce: U256::zero(),
action: Action::Call(contract),
};
api::send_transaction_with_nonce(t.clone(), app.clone(), home.clone(), tx, chain_id, SendRawTransaction(t.clone()))
api::send_transaction_with_nonce(t.clone(), t_url.clone(), app.clone(),
home.clone(), tx, chain_id, SendRawTransaction(t.clone()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this is a formatting change unrelated to the actual change.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I made these formatting changes to make the code more readable (and to adhere more closely to the Rust style guidelines. I'll put them all in a separate PR.

}).collect_vec();

info!("relaying {} withdraws", len);
Expand Down
Loading