Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update signal handling #3

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
212 changes: 169 additions & 43 deletions lib/sproc/core.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,57 @@
require "open3"

module SProc
class Signal
def initialize(sproc)
@sproc = sproc
end

# send the 'kill' signal to the subprocess.
#
# return:: true if the signal was sent, false if the sending failed
def kill
signal("KILL")
end

# send the 'abort' signal to the subprocess.
#
# return:: true if the signal was sent, false if the sending failed
def abort
signal("ABRT")
end

# send the 'interrup' signal to the subprocess.
#
# return:: true if the signal was sent, false if the sending failed
def interrupt
signal("INT")
end

# send the 'terminate' signal to the subprocess.
#
# return:: true if the signal was sent, false if the sending failed
def terminate
signal("TERM")
end

private

# Try to send a signal to the subprocess and returns true
# if the sending succeeded. If the sending fails
# for any reason at all (process not started, wrong argument type, ...)
# false is returned.
#
# return:: true if the signal was sent, false otherwise
def signal(sig)
return false if @sproc.pid.nil?

Process.kill(sig, @sproc.pid)
true
rescue
false
end
end

# Defines the supported shell environments in which a subprocess
# can be run.
module ShellType
Expand All @@ -18,19 +69,40 @@ module ShellType
# running within an SProc instance.
module ExecutionState
# The process is initiated but does not yet run
NOT_STARTED = 0
class NotStarted; end

# The process is running
RUNNING = 1
class Running; end

# The process has previously been running but is now aborted
ABORTED = 2
class Aborted; end

# The process has previously been running but has now run to completion
COMPLETED = 3
class Completed; end

# The process failed to start and thus, have never been running
FAILED_TO_START = 4
class FailedToStart; end
end

# Execute a command in a subprocess, either synchronuously or asyncronously.
class SProc
include ShellType
include ExecutionState

# A SProc::Signal instance that can be used to send signals to the subprocess.
#
# === Example
#
# # start an async subprocess running "sleep"
# s = SProc.new.exec_async("sleep", "1"])
#
# # clobber the subprocess and wait for it to die
# s.signal.kill
# s.wait_on_completion
#
# ...
attr_reader :signal

# A struct that represents queryable info about the task run by this SProc
#
# cmd_str:: the invokation string used to start the process
Expand All @@ -47,8 +119,20 @@ class SProc
:process_status, # the ProcessStatus object (see Ruby docs)
:popen_thread, # the thread created by the popen call, nil before started
:stdout, # a String containing all output from the process' stdout
:stderr # a String containing all output from the process' stderr
)
:stderr, # a String containing all output from the process' stderr
:pid # the process id of the sub process or nil if no process id has yet been allocated
) {
def to_s
str = "<Status: pid: #{pid} "
if process_status
ps = process_status
str += " caught signal #{ps.termsig} " if ps.signaled?
str += " coredumped! " if ps.coredump?
str += " exited with status #{ps.exitstatus} " if ps.exited?
end
str + " > <cmd_str: #{cmd_str}> "
end
}

@logger = nil
class << self
Expand All @@ -60,9 +144,6 @@ def logger
self.class.logger
end

include ShellType
include ExecutionState

# prepare to run a sub process
# type:: the ShellType used to run the process within
# stdout_callback:: a callback that will receive all stdout output
Expand All @@ -87,6 +168,7 @@ def initialize(type: ShellType::NONE, stdout_callback: nil,
@runner = TaskRunner.new(@run_opts)
@execution_thread = nil
@env = env
@signal = Signal.new(self)
end

# Start the sub-process and block until it has completed.
Expand Down Expand Up @@ -116,55 +198,48 @@ def exec_async(cmd, *args, **opts)
exec(false, @env, cmd, *args, **opts)
end

# return:: the process id of the running/completed subprocess or nil
# if no subprocess has started.
def pid
task_info[:pid]
end

# return:: +true+ if this process has completed with exit code 0
# (success). +false+ otherwise
def exit_zero?
return false unless execution_state == ExecutionState::COMPLETED
return false unless execution_state == ExecutionState::Completed

task_info[:process_status].exitstatus.zero?
end

# Block the caller as long as this subprocess is running.
# If this SProc has not been started, the call returns
# immediately
#
# return:: the TaskInfo struct of the completed process or
# nil if the subprocess has not yet been started.
def wait_on_completion
return nil if @execution_thread.nil?

@execution_thread.join
task_info
end

# Return the execution state of this SProc. Note that it is not
# identical with the life-cycle of the underlying ProcessStatus object
#
# return:: current ExecutionState
def execution_state
return ExecutionState::NOT_STARTED if @execution_thread.nil?
return ExecutionState::NotStarted if @execution_thread.nil?

# Count this SProc as running as long as the thread
# that executes it is alive (this includes book keeping
# chores within this class as well)
return ExecutionState::RUNNING if @execution_thread.alive?
return ExecutionState::Running if @execution_thread.alive?

status = task_info[:process_status]

# an execution thread that has run but not generated a task_info
# means that we tried to start a process but failed
return ExecutionState::FAILED_TO_START if status.nil?
return ExecutionState::FailedToStart if status.nil?

# a process can terminate for different reasons:
# - its done
# - an uncaught exception-
# - an uncaught signal

# this should take care of uncaught signals
return ExecutionState::ABORTED if status.signaled?
return ExecutionState::Aborted if status.signaled?

# If the process completed (either successfully or not)
return ExecutionState::COMPLETED if status.exited?
return ExecutionState::Completed if status.exited?

# We don't currently handle a process that has been stopped...
raise NotImplementedError("Unhandled process 'stopped' status!") if status.stopped?
Expand All @@ -174,11 +249,28 @@ def execution_state
end

# return:: the TaskInfo representing this SProc, nil if
# process has not started
# the subprocess has not started
def task_info
@runner.task_info
end

# Block the caller as long as this subprocess is running.
# If this SProc has not been started, the call returns
# immediately
#
# return:: the TaskInfo struct of the completed process or
# nil if the subprocess has not yet been started.
def wait_on_completion
return nil if @execution_thread.nil?

@execution_thread.join
task_info
end

def to_s
"<subprocess: #{execution_state} #{@runner}> <env: #{@env.inspect}>"
end

# Blocks until all processes in the given array are completed/aborted.
#
# If the caller submits a block, that block is called once for each
Expand All @@ -202,14 +294,17 @@ def task_info
# assert(completed.exit_zero?)
# end
def self.wait_on_all(running_proc, polling_interval = 100, &block)
ok = true
until running_proc.empty?
done = get_finished(running_proc)
running_proc -= done
done.each { |sp| ok &&= sp.exit_zero? }
next unless block

done.each(&block) if block
sleep polling_interval / 1000
end
ok
end

# Wait for subprocesses to complete and give a block an opportunity to
Expand Down Expand Up @@ -269,9 +364,9 @@ def self.wait_or_back_to_back(running_proc, polling_interval = 100)
# return:: an array of SProc objects not running (but previously started)
def self.get_finished(sproc_array)
sproc_array.select do |p|
[ExecutionState::COMPLETED,
ExecutionState::ABORTED,
ExecutionState::FAILED_TO_START].include?(p.execution_state)
[ExecutionState::Completed,
ExecutionState::Aborted,
ExecutionState::FailedToStart].include?(p.execution_state)
end
end

Expand All @@ -287,6 +382,9 @@ def exec(synch, env, cmd, *args, **opts)
@execution_thread = Thread.new do
@runner.execute(env, cmd, *args, **opts)
end
@runner.mutex.synchronize {
@runner.resource.wait(@runner.mutex)
}
@execution_thread.join if synch
self
end
Expand All @@ -298,6 +396,7 @@ def exec(synch, env, cmd, *args, **opts)
# :nodoc: all
class TaskRunner
attr_reader :task_info
attr_accessor :mutex, :resource

include ShellType

Expand All @@ -311,8 +410,10 @@ class TaskRunner
}.freeze

def initialize(opts)
@task_info = TaskInfo.new("", nil, 0, nil, nil, "", "")
@task_info = TaskInfo.new("", nil, 0, nil, nil, "", "", nil)
@opts = DEFAULT_OPTS.dup.merge!(opts)
@mutex = Mutex.new
@resource = ConditionVariable.new
end

# Runs the process and blocks until it is completed or aborted.
Expand All @@ -324,10 +425,16 @@ def execute(env, cmd, *args, **opts)
shell_out_via_popen(env, cmd, *args, **opts)
rescue => e
@task_info[:exception] = e
@resource.signal
ensure
@task_info[:wall_time] = (Process.clock_gettime(
Process::CLOCK_MONOTONIC
) - start_time)
end
@task_info[:wall_time] = (Process.clock_gettime(
Process::CLOCK_MONOTONIC
) - start_time)
end

def to_s
task_info.to_s + "<opts: #{@opts.inspect}>"
end

private
Expand All @@ -351,12 +458,31 @@ def shell_out_via_popen(env, cmd, *args, **opts)
SProc.logger&.debug { "Start: #{task_info[:cmd_str]}" }
SProc.logger&.debug { "Supplying env: #{env}" } unless env.nil?
SProc.logger&.debug { "Spawn options: #{opts}" } unless opts.nil?
Open3.popen3(env, *args) do |stdin, stdout, stderr, thread|
@task_info[:popen_thread] = thread
threads = do_while_process_running(stdin, stdout, stderr)
@task_info[:process_status] = thread.value
threads.each(&:join)
end

# non-blocking kick-off of the subprocess
stdin, stdout, stderr, wait_thread = Open3.popen3(env, *args)
# get info that is available while the process is running
@task_info[:popen_thread] = wait_thread
@task_info[:pid] = wait_thread.pid
@resource.signal

# kick-off the processing of the output io streams for the process
threads = do_while_process_running(stdin, stdout, stderr)
# block here until the process is completed/aborted
@task_info[:process_status] = wait_thread.value
# block until the remaining stream info has been processed
threads.each(&:join)
# explicitly close the streams
stdin.close
stdout.close
stderr.close

# Open3.popen3(env, *args) do |stdin, stdout, stderr, thread|
# @task_info[:popen_thread] = thread
# threads = do_while_process_running(stdin, stdout, stderr)
# @task_info[:process_status] = thread.value
# threads.each(&:join)
# end
end

def get_args_native(cmd, *args, **opts)
Expand Down
Loading