Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set timeout for helper subprocesses to enhance stability #11125

Merged
merged 11 commits into from
Dec 18, 2024
226 changes: 226 additions & 0 deletions common/lib/dependabot/command_helpers.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
# typed: strict
# frozen_string_literal: true

require "open3"
require "timeout"
require "sorbet-runtime"
require "shellwords"

module Dependabot
module CommandHelpers
extend T::Sig

module TIMEOUTS
NO_TIME_OUT = -1
LOCAL = 30
NETWORK = 120
LONG_RUNNING = 300
DEFAULT = NETWORK
end

class ProcessStatus
extend T::Sig

sig { params(process_status: Process::Status, custom_exitstatus: T.nilable(Integer)).void }
def initialize(process_status, custom_exitstatus = nil)
@process_status = process_status
@custom_exitstatus = custom_exitstatus
end

# Return the exit status, either from the process status or the custom one
sig { returns(Integer) }
def exitstatus
@custom_exitstatus || @process_status.exitstatus || 0
end

# Determine if the process was successful
sig { returns(T::Boolean) }
def success?
@custom_exitstatus.nil? ? @process_status.success? || false : @custom_exitstatus.zero?
end

# Return the PID of the process (if available)
sig { returns(T.nilable(Integer)) }
def pid
@process_status.pid
end

sig { returns(T.nilable(Integer)) }
def termsig
@process_status.termsig
end

# String representation of the status
sig { returns(String) }
def to_s
if @custom_exitstatus
"pid #{pid || 'unknown'}: exit #{@custom_exitstatus} (custom status)"
else
@process_status.to_s
end
end
end

# rubocop:disable Metrics/AbcSize
# rubocop:disable Metrics/MethodLength
# rubocop:disable Metrics/PerceivedComplexity
# rubocop:disable Metrics/CyclomaticComplexity
sig do
params(
env_cmd: T::Array[T.any(T::Hash[String, String], String)],
stdin_data: T.nilable(String),
stderr_to_stdout: T::Boolean,
timeout: Integer
).returns([T.nilable(String), T.nilable(String), T.nilable(ProcessStatus), Float])
end
def self.capture3_with_timeout(
env_cmd,
stdin_data: nil,
stderr_to_stdout: false,
timeout: TIMEOUTS::DEFAULT
)

stdout = T.let("", String)
stderr = T.let("", String)
status = T.let(nil, T.nilable(ProcessStatus))
pid = T.let(nil, T.untyped)
start_time = Time.now

begin
T.unsafe(Open3).popen3(*env_cmd) do |stdin, stdout_io, stderr_io, wait_thr| # rubocop:disable Metrics/BlockLength
pid = wait_thr.pid
Dependabot.logger.info("Started process PID: #{pid} with command: #{env_cmd.join(' ')}")

# Write to stdin if input data is provided
stdin&.write(stdin_data) if stdin_data
stdin&.close

stdout_io.sync = true
stderr_io.sync = true

# Array to monitor both stdout and stderr
ios = [stdout_io, stderr_io]

last_output_time = Time.now # Track the last time output was received

until ios.empty?
if timeout.positive?
# Calculate remaining timeout dynamically
remaining_timeout = timeout - (Time.now - last_output_time)

# Raise an error if timeout is exceeded
if remaining_timeout <= 0
Dependabot.logger.warn("Process PID: #{pid} timed out after #{timeout}s. Terminating...")
terminate_process(pid)
status = ProcessStatus.new(wait_thr.value, 124)
raise Timeout::Error, "Timed out due to inactivity after #{timeout} seconds"
end
end

# Use IO.select with a dynamically calculated short timeout
ready_ios = IO.select(ios, nil, nil, 0)

# Process ready IO streams
ready_ios&.first&.each do |io|
# 1. Read data from the stream
io.set_encoding("BINARY")
data = io.read_nonblock(1024)

# 2. Force encoding to UTF-8 (for proper conversion)
data.force_encoding("UTF-8")

# 3. Convert to UTF-8 safely, handling invalid/undefined bytes
data = data.encode("UTF-8", invalid: :replace, undef: :replace, replace: "?")

# Reset the timeout if data is received
last_output_time = Time.now unless data.empty?

# 4. Append data to the appropriate stream
if io == stdout_io
stdout += data
else
stderr += data unless stderr_to_stdout
stdout += data if stderr_to_stdout
end
rescue EOFError
# Remove the stream when EOF is reached
ios.delete(io)
rescue IO::WaitReadable
# Continue when IO is not ready yet
next
end
end

status = ProcessStatus.new(wait_thr.value)
Dependabot.logger.info("Process PID: #{pid} completed with status: #{status}")
end
rescue Timeout::Error => e
Dependabot.logger.error("Process PID: #{pid} failed due to timeout: #{e.message}")
terminate_process(pid)

# Append timeout message only to stderr without interfering with stdout
stderr += "\n#{e.message}" unless stderr_to_stdout
stdout += "\n#{e.message}" if stderr_to_stdout
rescue Errno::ENOENT => e
Dependabot.logger.error("Command failed: #{e.message}")
stderr += e.message unless stderr_to_stdout
stdout += e.message if stderr_to_stdout
end

elapsed_time = Time.now - start_time
Dependabot.logger.info("Total execution time: #{elapsed_time.round(2)} seconds")
[stdout, stderr, status, elapsed_time]
end
# rubocop:enable Metrics/AbcSize
# rubocop:enable Metrics/MethodLength
# rubocop:enable Metrics/PerceivedComplexity
# rubocop:enable Metrics/CyclomaticComplexity

# Terminate a process by PID
sig { params(pid: T.nilable(Integer)).void }
def self.terminate_process(pid)
return unless pid

begin
if process_alive?(pid)
Process.kill("TERM", pid) # Attempt graceful termination
sleep(0.5) # Allow process to terminate
end
if process_alive?(pid)
Process.kill("KILL", pid) # Forcefully kill if still running
end
rescue Errno::EPERM
Dependabot.logger.error("Insufficient permissions to terminate process: #{pid}")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens in this case? Will the job still terminate?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In natural way if it terminates (old way) we don't do anything. If it doesn't then the operation is going to get canceled after hanging out as how it was before. But generally we should have permission to do it since we initiated the command in this thread.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would be surprised if this ever triggers TBH. This is only ever going to terminate a sub-process, and AFAIK the parent process should always have perms to terminate a child process.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I aggree.

ensure
begin
Process.waitpid(pid)
rescue Errno::ESRCH, Errno::ECHILD
# Process has already exited
end
end
end

# Check if the process is still alive
sig { params(pid: T.nilable(Integer)).returns(T::Boolean) }
def self.process_alive?(pid)
return false if pid.nil?

begin
Process.kill(0, pid) # Check if the process exists
true
rescue Errno::ESRCH
false
rescue Errno::EPERM
Dependabot.logger.error("Insufficient permissions to check process: #{pid}")
false
end
end

# Escape shell commands to ensure safe execution
sig { params(command: String).returns(String) }
def self.escape_command(command)
command_parts = command.split.map(&:strip).reject(&:empty?)
Shellwords.join(command_parts)
end
end
end
50 changes: 35 additions & 15 deletions common/lib/dependabot/shared_helpers.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
require "fileutils"
require "json"
require "open3"
require "shellwords"
require "sorbet-runtime"
require "tmpdir"

Expand All @@ -17,9 +16,10 @@
require "dependabot/errors"
require "dependabot/workspace"
require "dependabot"
require "dependabot/command_helpers"

module Dependabot
module SharedHelpers
module SharedHelpers # rubocop:disable Metrics/ModuleLength
extend T::Sig

GIT_CONFIG_GLOBAL_PATH = T.let(File.expand_path(".gitconfig", Utils::BUMP_TMP_DIR_PATH), String)
Expand Down Expand Up @@ -121,8 +121,7 @@ def sentry_context
# Escapes all special characters, e.g. = & | <>
sig { params(command: String).returns(String) }
def self.escape_command(command)
command_parts = command.split.map(&:strip).reject(&:empty?)
Shellwords.join(command_parts)
CommandHelpers.escape_command(command)
end

# rubocop:disable Metrics/MethodLength
Expand All @@ -135,14 +134,16 @@ def self.escape_command(command)
env: T.nilable(T::Hash[String, String]),
stderr_to_stdout: T::Boolean,
allow_unsafe_shell_command: T::Boolean,
error_class: T.class_of(HelperSubprocessFailed)
error_class: T.class_of(HelperSubprocessFailed),
timeout: Integer
)
.returns(T.nilable(T.any(String, T::Hash[String, T.untyped], T::Array[T::Hash[String, T.untyped]])))
end
def self.run_helper_subprocess(command:, function:, args:, env: nil,
stderr_to_stdout: false,
allow_unsafe_shell_command: false,
error_class: HelperSubprocessFailed)
error_class: HelperSubprocessFailed,
timeout: CommandHelpers::TIMEOUTS::DEFAULT)
start = Time.now
stdin_data = JSON.dump(function: function, args: args)
cmd = allow_unsafe_shell_command ? command : escape_command(command)
Expand All @@ -157,7 +158,15 @@ def self.run_helper_subprocess(command:, function:, args:, env: nil,
end

env_cmd = [env, cmd].compact
stdout, stderr, process = T.unsafe(Open3).capture3(*env_cmd, stdin_data: stdin_data)
if Experiments.enabled?(:enable_shared_helpers_command_timeout)
stdout, stderr, process = CommandHelpers.capture3_with_timeout(
env_cmd,
stdin_data: stdin_data,
timeout: timeout
)
else
stdout, stderr, process = T.unsafe(Open3).capture3(*env_cmd, stdin_data: stdin_data)
end
time_taken = Time.now - start

if ENV["DEBUG_HELPERS"] == "true"
Expand All @@ -177,16 +186,16 @@ def self.run_helper_subprocess(command:, function:, args:, env: nil,
function: function,
args: args,
time_taken: time_taken,
stderr_output: stderr ? stderr[0..50_000] : "", # Truncate to ~100kb
stderr_output: stderr[0..50_000], # Truncate to ~100kb
process_exit_value: process.to_s,
process_termsig: process.termsig
process_termsig: process&.termsig
}

check_out_of_memory_error(stderr, error_context, error_class)

begin
response = JSON.parse(stdout)
return response["result"] if process.success?
return response["result"] if process&.success?

raise error_class.new(
message: response["error"],
Expand Down Expand Up @@ -415,22 +424,25 @@ def self.find_safe_directories
safe_directories
end

# rubocop:disable Metrics/PerceivedComplexity
sig do
params(
command: String,
allow_unsafe_shell_command: T::Boolean,
cwd: T.nilable(String),
env: T.nilable(T::Hash[String, String]),
fingerprint: T.nilable(String),
stderr_to_stdout: T::Boolean
stderr_to_stdout: T::Boolean,
timeout: Integer
).returns(String)
end
def self.run_shell_command(command,
allow_unsafe_shell_command: false,
cwd: nil,
env: {},
fingerprint: nil,
stderr_to_stdout: true)
stderr_to_stdout: true,
timeout: CommandHelpers::TIMEOUTS::DEFAULT)
start = Time.now
cmd = allow_unsafe_shell_command ? command : escape_command(command)

Expand All @@ -439,7 +451,14 @@ def self.run_shell_command(command,
opts = {}
opts[:chdir] = cwd if cwd

if stderr_to_stdout
env_cmd = [env || {}, cmd, opts].compact
if Experiments.enabled?(:enable_shared_helpers_command_timeout)
stdout, stderr, process, _elapsed_time = CommandHelpers.capture3_with_timeout(
env_cmd,
stderr_to_stdout: stderr_to_stdout,
timeout: timeout
)
elsif stderr_to_stdout
stdout, process = Open3.capture2e(env || {}, cmd, opts)
else
stdout, stderr, process = Open3.capture3(env || {}, cmd, opts)
Expand All @@ -449,7 +468,7 @@ def self.run_shell_command(command,

# Raise an error with the output from the shell session if the
# command returns a non-zero status
return stdout if process.success?
return stdout || "" if process&.success?

error_context = {
command: cmd,
Expand All @@ -461,10 +480,11 @@ def self.run_shell_command(command,
check_out_of_disk_memory_error(stderr, error_context)

raise SharedHelpers::HelperSubprocessFailed.new(
message: stderr_to_stdout ? stdout : "#{stderr}\n#{stdout}",
message: stderr_to_stdout ? (stdout || "") : "#{stderr}\n#{stdout}",
error_context: error_context
)
end
# rubocop:enable Metrics/PerceivedComplexity

sig { params(stderr: T.nilable(String), error_context: T::Hash[Symbol, String]).void }
def self.check_out_of_disk_memory_error(stderr, error_context)
Expand Down
Loading
Loading