Skip to content

Commit

Permalink
Added handling of quit signals
Browse files Browse the repository at this point in the history
  • Loading branch information
rakshith-ravi committed May 5, 2020
1 parent 1900f35 commit 74cf699
Show file tree
Hide file tree
Showing 6 changed files with 198 additions and 7 deletions.
38 changes: 38 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ lazy_static = "1.4.0"
colored = "1.9.3"
cli-table = "0.3.0"
chrono = "0.4.11"
ctrlc = "3.1.4"
nix = "0.17.0"
winapi = "0.3.8"

[dependencies.async-std]
version = "1.5.0"
Expand Down
2 changes: 2 additions & 0 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,8 @@ pub async fn get_module_info(config: ConfigValue, args: &ArgMatches<'_>) {
let _module = module.as_object().unwrap();
}

pub async fn on_exit() {}

fn get_date_time(timestamp: i64) -> String {
Utc.timestamp_millis(timestamp)
.format("%a %b %e %T %Y")
Expand Down
17 changes: 16 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,19 @@ extern crate chrono;
extern crate clap;
extern crate cli_table;
extern crate colored;
extern crate ctrlc;
extern crate futures;
extern crate futures_timer;
extern crate juno;
extern crate serde;
extern crate serde_json;

#[cfg(target_family = "unix")]
extern crate nix;

#[cfg(target_family = "windows")]
extern crate winapi;

mod cli;
mod juno_module;
mod logger;
Expand All @@ -22,8 +29,9 @@ mod parser;
mod process_runner;
mod runner;

use async_std::{fs, path::Path};
use async_std::{fs, path::Path, task};
use clap::{App, Arg, SubCommand};
use futures::future;

#[async_std::main]
async fn main() {
Expand Down Expand Up @@ -66,6 +74,8 @@ async fn main() {
)
.get_matches();

ctrlc::set_handler(|| task::block_on(on_exit())).expect("Error setting the CtrlC handler");

let config_path = Path::new(args.value_of("config").unwrap_or("./config.json"));

if !config_path.exists().await {
Expand Down Expand Up @@ -95,3 +105,8 @@ async fn main() {
(cmd, _) => println!("Unknown command '{}'", cmd),
}
}

async fn on_exit() {
logger::info("Recieved exit code. Closing all modules");
future::join(runner::on_exit(), cli::on_exit()).await;
}
65 changes: 64 additions & 1 deletion src/process_runner.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use crate::logger;

use serde_derive::Deserialize;
use std::{
process::{Child, Command},
process::{Child, Command, Stdio},
time::{SystemTime, UNIX_EPOCH},
};

Expand Down Expand Up @@ -96,6 +98,8 @@ impl ProcessRunner {
Command::new(&self.config.command)
.args(self.config.args.as_ref().unwrap_or(&vec![]))
.envs(self.config.envs.as_ref().unwrap_or(&vec![]).clone())
.stdin(Stdio::null())
.stdout(Stdio::null())
.spawn()
} else {
Command::new(self.config.interpreter.as_ref().unwrap())
Expand All @@ -117,6 +121,65 @@ impl ProcessRunner {
self.last_started_at = get_current_time();
}

#[cfg(target_family = "unix")]
pub fn send_quit_signal(&mut self) {
if self.process.is_none() {
return;
}
// Send SIGINT to a process in unix
use nix::{
sys::signal::{self, Signal},
unistd::Pid,
};

// send SIGINT to the child
let result = signal::kill(
Pid::from_raw(self.process.as_ref().unwrap().id() as i32),
Signal::SIGINT,
);
if result.is_err() {
logger::error(&format!(
"Error sending SIGINT to child process '{}': {}",
self.config.name,
result.unwrap_err()
));
}
}

#[cfg(target_family = "windows")]
pub fn send_quit_signal(&mut self) {
if self.process.is_none() {
return;
}
// Send ctrl-c event to a process in windows
// Ref: https://blog.codetitans.pl/post/sending-ctrl-c-signal-to-another-application-on-windows/
use winapi::um::{
consoleapi::SetConsoleCtrlHandler,
wincon::{AttachConsole, FreeConsole, GenerateConsoleCtrlEvent},
};

let pid = self.process.as_ref().unwrap().id();
const CTRL_C_EVENT: u32 = 0;

unsafe {
FreeConsole();
if AttachConsole(pid) > 0 {
SetConsoleCtrlHandler(None, 1);
GenerateConsoleCtrlEvent(CTRL_C_EVENT, 0);
}
}
}

pub fn kill(&mut self) {
if self.process.is_none() {
return;
}
let result = self.process.as_mut().unwrap().kill();
if result.is_err() {
logger::error(&format!("Error killing process: {}", result.unwrap_err()));
}
}

pub fn copy(&self) -> ProcessRunner {
ProcessRunner {
module_id: self.module_id,
Expand Down
80 changes: 75 additions & 5 deletions src/runner.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,30 @@
use crate::{
juno_module,
juno_module, logger,
misc::GuillotineMessage,
parser::ConfigValue,
process_runner::{ModuleConfig, ProcessRunner},
};
use std::time::Duration;
use std::time::{Duration, SystemTime, UNIX_EPOCH};

use async_std::{
fs::{self, DirEntry},
io::Error,
net::TcpStream,
path::Path,
prelude::*,
sync::Mutex,
task,
};
use futures::{
channel::mpsc::unbounded,
future::{self, Either},
};
use futures_timer::Delay;

lazy_static! {
static ref CLOSE_FLAG: Mutex<bool> = Mutex::new(false);
}

pub async fn run(config: ConfigValue) {
let juno_path = config.juno.path.clone();
let mut pid = 0;
Expand Down Expand Up @@ -74,6 +80,10 @@ pub async fn run(config: ConfigValue) {
keep_processes_alive(juno_process, config, tracked_modules).await;
}

pub async fn on_exit() {
*CLOSE_FLAG.lock().await = true;
}

async fn get_module_from_path(
expected_pid: u64,
path: Result<DirEntry, Error>,
Expand Down Expand Up @@ -115,11 +125,14 @@ async fn keep_processes_alive(

let mut timer_future = Delay::new(Duration::from_millis(100));
let mut command_future = command_receiver.next();

loop {
let selection = future::select(timer_future, command_future);
match selection.await {
let selection = future::select(timer_future, command_future).await;
match selection {
Either::Left((_, next_command_future)) => {
if *CLOSE_FLAG.lock().await {
break;
}

// Timer expired
command_future = next_command_future;
timer_future = Delay::new(Duration::from_millis(100));
Expand Down Expand Up @@ -158,6 +171,56 @@ async fn keep_processes_alive(
}
}
}

// Execute exit actions
// Kill all modules first
processes.iter_mut().for_each(|module| {
logger::info(&format!("Quitting process: {}", module.config.name));
module.send_quit_signal();
});
let quit_time = get_current_millis();
loop {
// Give the processes some time to die.
task::sleep(Duration::from_millis(100)).await;

// If all of the processes are not running, then break
if processes
.iter_mut()
.all(|module| !module.is_process_running())
{
break;
}
// If some of the processes are running, check if they've been given enough time.
if get_current_millis() > quit_time + 1000 {
// They've been trying to quit for more than 1 second. Kill them all and quit
processes.iter_mut().for_each(|module| {
logger::info(&format!("Killing process: {}", module.config.name));
module.kill();
});
break;
}
}

// Now quit juno similarly
logger::info(&format!("Quitting process: {}", juno_process.config.name));
juno_process.send_quit_signal();
let quit_time = get_current_millis();
loop {
// Give the process some time to die.
task::sleep(Duration::from_millis(100)).await;

// If the process is not running, then break
if !juno_process.is_process_running() {
break;
}
// If the processes is running, check if it's been given enough time.
if get_current_millis() > quit_time + 1000 {
// It's been trying to quit for more than 1 second. Kill it and quit
logger::info(&format!("Killing process: {}", juno_process.config.name));
juno_process.kill();
break;
}
}
}

async fn ensure_juno_initialized(config: ConfigValue) {
Expand Down Expand Up @@ -194,3 +257,10 @@ async fn connect_to_unix_socket(socket_path: &str) -> Result<(), Error> {
async fn connect_to_unix_socket(_: &str) -> Result<(), Error> {
panic!("Unix sockets are not supported on Windows");
}

fn get_current_millis() -> u128 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Time went backwards. Wtf?")
.as_millis()
}

0 comments on commit 74cf699

Please sign in to comment.