Skip to content

Commit

Permalink
chore: move log transfer to log_runtime_archive
Browse files Browse the repository at this point in the history
  • Loading branch information
eduardacoppo committed Dec 4, 2024
1 parent 513e480 commit 9d849f6
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 123 deletions.
34 changes: 34 additions & 0 deletions lib/syskit/cli/log_runtime_archive.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ class CompressionFailed < RuntimeError; end
#
# It depends on the syskit instance using log rotation
class LogRuntimeArchive
# @return [LogTransferManager, nil] the log transfer support object, or nil
# before {.setup}, or if log transfer is not enabled
attr_accessor :log_transfer_manager

DEFAULT_MAX_ARCHIVE_SIZE = 10_000_000_000 # 10G

def initialize(
Expand Down Expand Up @@ -45,6 +49,36 @@ def process_root_folder
end
end

# Iterate over all datasets in a Roby log process server folder
# and transfer them
#
# The method assumes the last dataset is the current one (i.e. the running
# one), and will only transfer already rotated files.
#
# @param [Pathname] source_dir the log process server folder
# @param [Pathname] target_dir the folder in which to transfer
# the datasets to
def process_source_folder
return unless (mng = syskit_log_transfer_manager)

candidates = self.class.find_all_dataset_folders(@source_dir)
candidates.each do |child|
mng.transfer(child)
end
end

def log_transfer_prepare
return unless Syskit.conf.log_transfer.ip

conf = Syskit.conf.log_transfer
conf.target_dir ||= log_dir
@log_transfer_manager = LogTransferManager.new(conf)
end

def log_transfer_shutdown
@log_transfer_manager = nil
end

# Manages folder available space
#
# The method will check if there is enough space to save more log files
Expand Down
74 changes: 73 additions & 1 deletion lib/syskit/cli/log_runtime_archive_main.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ def self.exit_on_failure?
end

desc "watch", "watch a dataset root folder and call archiver"

option :period,
type: :numeric, default: 600, desc: "polling period in seconds"
option :max_size,
Expand Down Expand Up @@ -61,6 +60,79 @@ def archive(root_dir, target_dir)
archiver.process_root_folder
end

desc "watch_transfer", "watches a dataset root folder \
and periodically performs transfer"
option :period,
type: :numeric, default: 600, desc: "polling period in seconds"
option :max_size,
type: :numeric, default: 10_000, desc: "max log size in MB"
option :source_space_low_limit,
type: :numeric, default: 5_000, desc: "starts transfering files if \
available space in process server is below this threshold \
(threshold in MB)"
option :source_space_freed_limit,
type: :numeric, default: 25_000, desc: "stops transfering files if \
available space in process server is above this threshold \
(threshold in MB)"
default_task def watch_transfer(root_dir, target_dir)
loop do
begin
transfer(root_dir, target_dir)
rescue Errno::ENOSPC
next
end

puts "Transferred pending logs, sleeping #{options[:period]}s"
sleep options[:period]
end
end

desc "transfer", "transfers the datasets and manages disk space"
option :max_size,
type: :numeric, default: 10_000, desc: "max log size in MB"
option :source_space_low_limit,
type: :numeric, default: 5_000, desc: "starts transfering files if \
available space in process server is below this threshold \
(threshold in MB)"
option :source_space_limit,
type: :numeric, default: 25_000, desc: "stops transfering files if \
available space in process server is above this threshold \
(threshold in MB)"
def transfer(source_dir, server_dir)
source_dir = validate_directory_exists(source_dir)
server_dir = validate_directory_exists(server_dir)
central_archiver = make_archiver(source_dir, server_dir)

central_archiver.ensure_free_space(
options[:source_space_low_limit] * 1_000_000,
options[:source_space_limit] * 1_000_000
)
central_archiver.process_source_folder
end

desc "transfer_server", "the log transfer server that runs on the main computer"
option :max_size,
type: :numeric, default: 10_000, desc: "max log size in MB"
option :source_space_low_limit,
type: :numeric, default: 5_000, desc: "starts transfering files if \
available space in process server is below this threshold \
(threshold in MB)"
option :source_space_limit,
type: :numeric, default: 25_000, desc: "stops transfering files if \
available space in process server is above this threshold \
(threshold in MB)"
def transfer_server(source_dir, server_dir)
source_dir = validate_directory_exists(source_dir)
server_dir = validate_directory_exists(server_dir)
transfer_server = make_archiver(source_dir, server_dir)

transfer_server.ensure_free_space(
options[:source_space_low_limit] * 1_000_000,
options[:source_space_limit] * 1_000_000
)
transfer_server.log_transfer_prepare
end

no_commands do
def validate_directory_exists(dir)
dir = Pathname.new(dir)
Expand Down
65 changes: 1 addition & 64 deletions lib/syskit/roby_app/plugin.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,6 @@ module RobyApp
# loaded models. In addition, a text notification is sent to inform
# a shell user
module Plugin
# @return [LogTransferManager,nil] the log transfer support object, or nil
# before {.setup}, or if log transfer is not enabled
attr_accessor :syskit_log_transfer_manager

attr_writer :syskit_use_update_properties

# The deployed in-process logger for ruby tasks
Expand Down Expand Up @@ -194,62 +190,6 @@ def self.define_fake_process_managers(app)
)
end

def syskit_log_transfer_prepare
return unless Syskit.conf.log_transfer.ip

unless Syskit.conf.log_rotation_period
raise ArgumentError,
"cannot enable log transfer without log rotation"
end

conf = Syskit.conf.log_transfer
conf.target_dir ||= log_dir
@syskit_log_transfer_manager = LogTransferManager.new(conf)
end

def syskit_log_transfer_shutdown
syskit_log_transfer_manager&.dispose(syskit_log_transfer_process_servers)
@syskit_log_transfer_manager = nil
end

# Returns the list of remote process servers that will take part in the
# log transfer process
#
# @return [Configuration::ProcessServerConfig]
def syskit_log_transfer_process_servers
Syskit.conf.each_process_server_config.find_all do |config|
config.supports_log_transfer? && !config.on_localhost?
end
end

def syskit_log_transfer_poll_state
syskit_log_transfer_process_servers.each do |process_server_config|
result = process_server_config.client.log_upload_state
::Robot.info "#{result.pending_count} log transfers pending or in " \
"progress from #{process_server_config.name}"
result.each_result do |r|
if r.success
::Robot.info " transferred #{r.file}"
else
::Robot.info " transfer of #{r.file} failed: #{r.message}"
end
end
end
end

# Rotate logs, and transfer the old logs if log transfer is configured
def syskit_log_perform_rotation_and_transfer
rotated_logs = syskit_rotate_logs

return unless (mng = syskit_log_transfer_manager)

handled = syskit_log_transfer_process_servers
rotated_logs.delete_if do |process_server_config, _|
!handled.include?(process_server_config)
end
mng.transfer(rotated_logs)
end

# Hook called by the main application in Application#setup after
# the main setup hooks have been called
def self.require_models(app)
Expand Down Expand Up @@ -300,16 +240,14 @@ def syskit_load_all_used_typekits

# Hook called by the main application to prepare for execution
def self.prepare(app)
app.syskit_log_transfer_prepare
@handler_ids = plug_engine_in_roby(app.execution_engine)

app.syskit_load_all_used_typekits

if Syskit.conf.log_rotation_period
@log_rotation_poll_handler =
app.execution_engine.every(Syskit.conf.log_rotation_period) do
app.syskit_log_perform_rotation_and_transfer
app.syskit_log_transfer_poll_state
app.syskit_rotate_logs
end
end
end
Expand All @@ -328,7 +266,6 @@ def self.shutdown(app)
end

@log_rotation_poll_handler&.dispose
app.syskit_log_transfer_shutdown
end

def default_loader
Expand Down
59 changes: 1 addition & 58 deletions test/roby_app/test_plugin.rb
Original file line number Diff line number Diff line change
Expand Up @@ -216,18 +216,13 @@ def perform_app_assertion(result)
end
end

describe "log rotation and transfer" do
describe "log rotation" do
before do
Syskit.conf.log_rotation_period = 600
Syskit.conf.log_transfer.ip = "127.0.0.1"
Syskit.conf.log_transfer.self_spawned = false
Syskit.conf.log_transfer.target_dir = make_tmpdir
end

after do
Syskit.conf.log_rotation_period = nil
Syskit.conf.log_transfer.ip = nil
app.syskit_log_transfer_shutdown
end

it "rotates logs and returns which logs were rotated" do
Expand All @@ -249,58 +244,6 @@ def rotate_log
stubs = Syskit.conf.process_server_config_for("stubs")
assert_equal({ stubs => ["old_log_file.log"] }, rotated_logs)
end

it "returns an empty list of process servers " \
"if log transfer is disabled" do
conf = Syskit.conf.process_server_config_for("localhost")
flexmock(conf).should_receive(supports_log_transfer?: true)
assert_equal [], app.syskit_log_transfer_process_servers
end

it "returns the list of process servers whose logs we want to transfer" do
app.syskit_log_transfer_prepare

conf = Syskit.conf.process_server_config_for("localhost")
flexmock(conf).should_receive(on_localhost?: false)
flexmock(conf).should_receive(supports_log_transfer?: true)
assert_equal [conf], app.syskit_log_transfer_process_servers
end

it "ignores local process servers" do
Syskit.conf.log_transfer.target_dir = app.log_dir
app.syskit_log_transfer_prepare

conf = Syskit.conf.process_server_config_for("localhost")
flexmock(conf).should_receive(supports_log_transfer?: true)
assert_equal [], app.syskit_log_transfer_process_servers
end

it "transfers data for the selected process servers" do
Syskit.conf.log_transfer.user = "user"
Syskit.conf.log_transfer.password = "pass"
Syskit.conf.log_transfer.certificate = "cert"
Syskit.conf.log_transfer.port = 42
Syskit.conf.log_transfer.implicit_ftps = false
conf = Syskit.conf.process_server_config_for("localhost")
flexmock(conf).should_receive(on_localhost?: false)
flexmock(conf).should_receive(supports_log_transfer?: true)
flexmock(app)
.should_receive(:syskit_rotate_logs)
.and_return(
{ conf => ["old_log_file.log"],
Configuration::ProcessServerConfig.new => ["some_file"] }
)

app.syskit_log_transfer_prepare
flexmock(conf.client)
.should_receive(:log_upload_file).explicitly
.with("127.0.0.1", 42, "cert", "user", "pass",
Pathname("old_log_file.log"),
max_upload_rate: Float::INFINITY,
implicit_ftps: false)
.once
app.syskit_log_perform_rotation_and_transfer
end
end

describe "Syskit start all deployments" do
Expand Down

0 comments on commit 9d849f6

Please sign in to comment.