Skip to content
This repository has been archived by the owner on Jun 3, 2020. It is now read-only.

Commit

Permalink
Merge branch 'master' into issues/154
Browse files Browse the repository at this point in the history
  • Loading branch information
thanethomson committed Jan 24, 2019
2 parents 5dd5c63 + 418cd7e commit 04c4fca
Show file tree
Hide file tree
Showing 16 changed files with 131 additions and 82 deletions.
8 changes: 8 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
## [0.3.0] (2019-01-23)

- Add ability to terminate on SIGTERM or SIGINT ([#161])
- Remove `PoisonPillMsg` ([#162])

## [0.2.4] (2019-01-18)

- Refactor client/tests to always dial out to tendermint/gaiad ([#149], [#150])
Expand Down Expand Up @@ -34,6 +39,9 @@

- Initial "preview" release

[0.3.0]: https://github.com/tendermint/kms/pull/165
[#161]: https://github.com/tendermint/kms/pull/161
[#162]: https://github.com/tendermint/kms/pull/162
[0.2.4]: https://github.com/tendermint/kms/pull/156
[#149]: https://github.com/tendermint/kms/pull/149
[#150]: https://github.com/tendermint/kms/pull/150
Expand Down
27 changes: 22 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "tmkms"
description = "Tendermint Key Management System"
version = "0.2.4"
version = "0.3.0"
authors = ["Tony Arcieri <[email protected]>", "Ismail Khoffi <[email protected]>"]
license = "Apache-2.0"
homepage = "https://github.com/tendermint/kms/"
Expand All @@ -26,16 +26,17 @@ failure_derive = "0.1"
lazy_static = "1"
prost-amino = "0.4.0"
prost-amino-derive = "0.4.0"
rand = "0.5"
rand = "0.6"
serde = "1"
serde_derive = "1"
serde_json = "1"
sha2 = "0.8"
signal-hook = "0.1.7"
signatory = { version = "0.10", features = ["ed25519"] }
signatory-dalek = "0.10"
signatory-yubihsm = { version = "0.10", optional = true }
subtle-encoding = "0.3"
tendermint = { version = "0.1.5", path = "tendermint-rs" }
tendermint = { version = "0.2,0", path = "tendermint-rs" }

[dev-dependencies]
tempfile = "3"
Expand Down
31 changes: 23 additions & 8 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
use signatory::{ed25519, Decode, Encode};
use signatory_dalek::Ed25519Signer;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::{
panic,
path::Path,
Expand Down Expand Up @@ -38,9 +40,9 @@ pub struct Client {

impl Client {
/// Spawn a new client, returning a handle so it can be joined
pub fn spawn(config: ValidatorConfig) -> Self {
pub fn spawn(config: ValidatorConfig, should_term: Arc<AtomicBool>) -> Self {
Self {
handle: thread::spawn(move || client_loop(config)),
handle: thread::spawn(move || client_loop(config, &should_term)),
}
}

Expand All @@ -51,7 +53,7 @@ impl Client {
}

/// Main loop for all clients. Handles reconnecting in the event of an error
fn client_loop(config: ValidatorConfig) {
fn client_loop(config: ValidatorConfig, should_term: &Arc<AtomicBool>) {
let ValidatorConfig {
addr,
chain_id,
Expand All @@ -60,9 +62,15 @@ fn client_loop(config: ValidatorConfig) {
} = config;

loop {
// If we've already received a shutdown signal from outside
if should_term.load(Ordering::Relaxed) {
info!("[{}@{}] shutdown request received", chain_id, &addr);
return;
}

let session_result = match &addr {
ValidatorAddr::Tcp { host, port } => match &secret_key {
Some(path) => tcp_session(chain_id, host, *port, path),
Some(path) => tcp_session(chain_id, host, *port, path, should_term),
None => {
error!(
"config error: missing field `secret_key` for validator {}",
Expand All @@ -71,7 +79,7 @@ fn client_loop(config: ValidatorConfig) {
return;
}
},
ValidatorAddr::Unix { socket_path } => unix_session(chain_id, socket_path),
ValidatorAddr::Unix { socket_path } => unix_session(chain_id, socket_path, should_term),
};

if let Err(e) = session_result {
Expand All @@ -85,6 +93,8 @@ fn client_loop(config: ValidatorConfig) {
}
} else {
info!("[{}@{}] session closed gracefully", chain_id, &addr);
// Indicate to the outer thread it's time to terminate
should_term.swap(true, Ordering::Relaxed);

return;
}
Expand All @@ -97,6 +107,7 @@ fn tcp_session(
host: &str,
port: u16,
secret_key_path: &Path,
should_term: &Arc<AtomicBool>,
) -> Result<(), KmsError> {
let secret_key = load_secret_connection_key(secret_key_path)?;

Expand All @@ -113,13 +124,17 @@ fn tcp_session(
chain_id, host, port
);

session.request_loop()
session.request_loop(should_term)
})
.unwrap_or_else(|ref e| Err(KmsError::from_panic(e)))
}

/// Create a validator session over a Unix domain socket
fn unix_session(chain_id: chain::Id, socket_path: &Path) -> Result<(), KmsError> {
fn unix_session(
chain_id: chain::Id,
socket_path: &Path,
should_term: &Arc<AtomicBool>,
) -> Result<(), KmsError> {
panic::catch_unwind(move || {
let mut session = Session::connect_unix(chain_id, socket_path)?;

Expand All @@ -129,7 +144,7 @@ fn unix_session(chain_id: chain::Id, socket_path: &Path) -> Result<(), KmsError>
socket_path.display()
);

session.request_loop()
session.request_loop(should_term)
})
.unwrap_or_else(|ref e| Err(KmsError::from_panic(e)))
}
Expand Down
32 changes: 27 additions & 5 deletions src/commands/start.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use abscissa::{Callable, GlobalConfig};
use std::process;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::{thread, time};

use crate::{
client::Client,
Expand Down Expand Up @@ -44,11 +47,27 @@ impl Callable for StartCommand {
process::exit(1);
});

// Should we terminate yet?
let should_term = Arc::new(AtomicBool::new(false));
// Spawn the validator client threads
let validator_clients = spawn_validator_clients(&config.validator);
let validator_clients = spawn_validator_clients(&config.validator, &should_term);
let catch_signals = [signal_hook::SIGTERM, signal_hook::SIGINT];

// Wait for the validator client threads to exit
// TODO: Find something more useful for this thread to do
// Listen for the relevant signals so we can gracefully shut down
for sig in catch_signals.iter() {
signal_hook::flag::register(*sig, Arc::clone(&should_term)).unwrap_or_else(|e| {
status_err!("couldn't register signal hook: {}", e);
process::exit(1);
});
}

// Keep checking in on whether or not we need to terminate
while !should_term.load(Ordering::Relaxed) {
thread::sleep(time::Duration::from_millis(100));
}

// Wait for all of the validator client threads to exit
info!("Waiting for client threads to stop...");
for client in validator_clients {
client.join();
}
Expand All @@ -57,9 +76,12 @@ impl Callable for StartCommand {

/// Spawn validator client threads (which provide KMS service to the
/// validators they connect to)
fn spawn_validator_clients(config: &[ValidatorConfig]) -> Vec<Client> {
fn spawn_validator_clients(
config: &[ValidatorConfig],
should_term: &Arc<AtomicBool>,
) -> Vec<Client> {
config
.iter()
.map(|validator| Client::spawn(validator.clone()))
.map(|validator| Client::spawn(validator.clone(), Arc::clone(should_term)))
.collect()
}
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ extern crate serde;
extern crate serde_derive;
extern crate serde_json;
extern crate sha2;
extern crate signal_hook;
extern crate signatory;
extern crate signatory_dalek;
#[cfg(feature = "yubihsm")]
Expand Down
5 changes: 0 additions & 5 deletions src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,6 @@ pub enum Request {

// PingRequest is a PrivValidatorSocket message to keep the connection alive.
ReplyPing(PingRequest),

/// Instruct the KMS to terminate
PoisonPill(PoisonPillMsg),
}

/// Responses from the KMS
Expand Down Expand Up @@ -62,7 +59,6 @@ fn compute_prefix(name: &str) -> (Vec<u8>) {
// pre-compute registered types prefix (this is probably sth. our amino library should
// provide instead)
lazy_static! {
static ref PP_PREFIX: Vec<u8> = compute_prefix(POISON_PILL_AMINO_NAME);
static ref VOTE_PREFIX: Vec<u8> = compute_prefix(VOTE_AMINO_NAME);
static ref PROPOSAL_PREFIX: Vec<u8> = compute_prefix(PROPOSAL_AMINO_NAME);
static ref PUBKEY_PREFIX: Vec<u8> = compute_prefix(PUBKEY_AMINO_NAME);
Expand Down Expand Up @@ -93,7 +89,6 @@ impl Request {
let total_len = encoded_len_varint(len).checked_add(len as usize).unwrap();
let rem = buff.get_ref()[..total_len].to_vec();
match amino_pre {
ref pp if *pp == *PP_PREFIX => Ok(Request::PoisonPill(PoisonPillMsg {})),
ref vt if *vt == *VOTE_PREFIX => Ok(Request::SignVote(SignVoteRequest::decode(&rem)?)),
ref pr if *pr == *PROPOSAL_PREFIX => {
Ok(Request::SignProposal(SignProposalRequest::decode(&rem)?))
Expand Down
13 changes: 9 additions & 4 deletions src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ use std::{
net::TcpStream,
os::unix::net::UnixStream,
path::Path,
sync::atomic::{AtomicBool, Ordering},
sync::Arc,
};
use tendermint::{
amino_types::{PingRequest, PingResponse, PubKeyRequest, PubKeyResponse},
Expand Down Expand Up @@ -79,22 +81,25 @@ where
Connection: Read + Write + Sync + Send,
{
/// Main request loop
pub fn request_loop(&mut self) -> Result<(), KmsError> {
pub fn request_loop(&mut self, should_term: &Arc<AtomicBool>) -> Result<(), KmsError> {
debug!("starting handle request loop ... ");
while self.handle_request()? {}
while self.handle_request(should_term)? {}
Ok(())
}

/// Handle an incoming request from the validator
fn handle_request(&mut self) -> Result<bool, KmsError> {
fn handle_request(&mut self, should_term: &Arc<AtomicBool>) -> Result<bool, KmsError> {
if should_term.load(Ordering::Relaxed) {
info!("terminate signal received");
return Ok(false);
}
debug!("started handling request ... ");
let response = match Request::read(&mut self.connection)? {
Request::SignProposal(req) => self.sign(req)?,
Request::SignVote(req) => self.sign(req)?,
// non-signable requests:
Request::ReplyPing(ref req) => self.reply_ping(req),
Request::ShowPublicKey(ref req) => self.get_public_key(req)?,
Request::PoisonPill(_req) => return Ok(false),
};

let mut buf = vec![];
Expand Down
10 changes: 10 additions & 0 deletions tendermint-rs/CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,13 @@
## 0.2.0 (2019-01-23)

This release is compatible with tendermint [v0.29]

- Update to x25519-dalek v0.4.4 (#158)
- Consistent ordering of `BlockID` and `Timestamps` in vote and proposal messages (#159)
- Remove `PoisonPillMsg` previously used to shut-down the kms (#162)

[v0.29]: https://github.com/tendermint/tendermint/blob/master/CHANGELOG.md#v0290

## 0.1.5 (2019-01-18)

This release is compatible with tendermint [v0.28]
Expand Down
Loading

0 comments on commit 04c4fca

Please sign in to comment.