Skip to content

Commit

Permalink
chore: move log transfer to log_runtime_archive
Browse files Browse the repository at this point in the history
  • Loading branch information
eduardacoppo committed Dec 4, 2024
1 parent 513e480 commit d86cc46
Show file tree
Hide file tree
Showing 6 changed files with 174 additions and 123 deletions.
30 changes: 30 additions & 0 deletions lib/syskit/cli/log_runtime_archive.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ class CompressionFailed < RuntimeError; end
#
# It depends on the syskit instance using log rotation
class LogRuntimeArchive
# @return [LogTransferManager, nil] the log transfer support object, or nil
# before {.setup}, or if log transfer is not enabled
attr_accessor :log_transfer_manager

DEFAULT_MAX_ARCHIVE_SIZE = 10_000_000_000 # 10G

def initialize(
Expand Down Expand Up @@ -45,6 +49,32 @@ def process_root_folder
end
end

# Transfer logs from a process server to the main computer server
#
# @param [Pathname] src_dir the log folder on the process server
# @param [Pathname] dst_dir the folder on the main computer
# in which to transfer the logs to
def process_transfer(src_dir, dst_dir)
return unless (mng = log_transfer_manager)

candidates = self.class.find_all_dataset_folders(@src_dir)
candidates.each do |child|
mng.transfer(child)
end
end

def log_transfer_prepare
return unless Syskit.conf.log_transfer.ip

conf = Syskit.conf.log_transfer
conf.target_dir ||= log_dir
@log_transfer_manager = LogTransferManager.new(conf)
end

def log_transfer_shutdown
@log_transfer_manager = nil
end

# Manages folder available space
#
# The method will check if there is enough space to save more log files
Expand Down
43 changes: 42 additions & 1 deletion lib/syskit/cli/log_runtime_archive_main.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ def self.exit_on_failure?
end

desc "watch", "watch a dataset root folder and call archiver"

option :period,
type: :numeric, default: 600, desc: "polling period in seconds"
option :max_size,
Expand Down Expand Up @@ -61,6 +60,44 @@ def archive(root_dir, target_dir)
archiver.process_root_folder
end

desc "watch_transfer", "watches a dataset root folder \
and periodically performs transfer"
option :period,
type: :numeric, default: 600, desc: "polling period in seconds"
option :max_size,
type: :numeric, default: 10_000, desc: "max log size in MB"
default_task def watch_transfer(src_dir, dst_dir)
loop do
begin
transfer(src_dir, dst_dir)
rescue Errno::ENOSPC
next
end

puts "Transferred pending logs, sleeping #{options[:period]}s"
sleep options[:period]
end
end

desc "transfer", "transfers the datasets"
# TODO: manage disk space
def transfer(src_dir, dst_dir)
src_dir = validate_directory_exists(src_dir)
dst_dir = validate_directory_exists(dst_dir)
archiver = make_archiver(src_dir, dst_dir)

archiver.log_transfer_prepare
archiver.process_transfer(src_dir, dst_dir)
end

desc "transfer_server", "creates the log transfer server \
that runs on the main computer"
option :max_size,
type: :numeric, default: 10_000, desc: "max log size in MB"
def transfer_server(host, port)
create_server(host, port)
end

no_commands do
def validate_directory_exists(dir)
dir = Pathname.new(dir)
Expand All @@ -80,6 +117,10 @@ def make_archiver(root_dir, target_dir)
logger: logger, max_archive_size: options[:max_size] * (1024**2)
)
end

def create_server(host, port)
ProcessManagers::Remote::Manager.new(host, port)
end
end
end
end
Expand Down
65 changes: 1 addition & 64 deletions lib/syskit/roby_app/plugin.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,6 @@ module RobyApp
# loaded models. In addition, a text notification is sent to inform
# a shell user
module Plugin
# @return [LogTransferManager,nil] the log transfer support object, or nil
# before {.setup}, or if log transfer is not enabled
attr_accessor :syskit_log_transfer_manager

attr_writer :syskit_use_update_properties

# The deployed in-process logger for ruby tasks
Expand Down Expand Up @@ -194,62 +190,6 @@ def self.define_fake_process_managers(app)
)
end

def syskit_log_transfer_prepare
return unless Syskit.conf.log_transfer.ip

unless Syskit.conf.log_rotation_period
raise ArgumentError,
"cannot enable log transfer without log rotation"
end

conf = Syskit.conf.log_transfer
conf.target_dir ||= log_dir
@syskit_log_transfer_manager = LogTransferManager.new(conf)
end

def syskit_log_transfer_shutdown
syskit_log_transfer_manager&.dispose(syskit_log_transfer_process_servers)
@syskit_log_transfer_manager = nil
end

# Returns the list of remote process servers that will take part in the
# log transfer process
#
# @return [Configuration::ProcessServerConfig]
def syskit_log_transfer_process_servers
Syskit.conf.each_process_server_config.find_all do |config|
config.supports_log_transfer? && !config.on_localhost?
end
end

def syskit_log_transfer_poll_state
syskit_log_transfer_process_servers.each do |process_server_config|
result = process_server_config.client.log_upload_state
::Robot.info "#{result.pending_count} log transfers pending or in " \
"progress from #{process_server_config.name}"
result.each_result do |r|
if r.success
::Robot.info " transferred #{r.file}"
else
::Robot.info " transfer of #{r.file} failed: #{r.message}"
end
end
end
end

# Rotate logs, and transfer the old logs if log transfer is configured
def syskit_log_perform_rotation_and_transfer
rotated_logs = syskit_rotate_logs

return unless (mng = syskit_log_transfer_manager)

handled = syskit_log_transfer_process_servers
rotated_logs.delete_if do |process_server_config, _|
!handled.include?(process_server_config)
end
mng.transfer(rotated_logs)
end

# Hook called by the main application in Application#setup after
# the main setup hooks have been called
def self.require_models(app)
Expand Down Expand Up @@ -300,16 +240,14 @@ def syskit_load_all_used_typekits

# Hook called by the main application to prepare for execution
def self.prepare(app)
app.syskit_log_transfer_prepare
@handler_ids = plug_engine_in_roby(app.execution_engine)

app.syskit_load_all_used_typekits

if Syskit.conf.log_rotation_period
@log_rotation_poll_handler =
app.execution_engine.every(Syskit.conf.log_rotation_period) do
app.syskit_log_perform_rotation_and_transfer
app.syskit_log_transfer_poll_state
app.syskit_rotate_logs
end
end
end
Expand All @@ -328,7 +266,6 @@ def self.shutdown(app)
end

@log_rotation_poll_handler&.dispose
app.syskit_log_transfer_shutdown
end

def default_loader
Expand Down
31 changes: 31 additions & 0 deletions test/cli/test_log_runtime_archive.rb
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,37 @@ def should_archive_dataset(dataset, archive_basename, full:)
end
end

describe ".process_transfer" do
before do
@dst_dir = make_tmppath
@process = LogRuntimeArchive.new(@root, @dst_dir)
end

it "transfers all folders" do
dataset0 = make_valid_folder("20220434-2023")
dataset1 = make_valid_folder("20220434-2024")
dataset2 = make_valid_folder("20220434-2025")

should_archive_dataset(dataset0, "20220434-2023.0.tar")
should_archive_dataset(dataset1, "20220434-2024.0.tar")
should_archive_dataset(dataset2, "20220434-2025.0.tar")
@process.process_transfer

assert (@dst_dir / "20220434-2023.0.tar").file?
assert (@dst_dir / "20220434-2024.0.tar").file?
assert (@dst_dir / "20220434-2025.0.tar").file?
end

def should_transfer_dataset(dataset, dst_basename, full:)
flexmock(LogRuntimeArchive)
.should_receive(:transfer_dataset)
.with(
->(p) { p.path == (@dst_dir / dst_basename).to_s },
dataset, hsh(full: full)
).once.pass_thru
end
end

describe "#ensure_free_space" do
before do
@archive_dir = make_tmppath
Expand Down
69 changes: 69 additions & 0 deletions test/cli/test_log_runtime_archive_main.rb
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,75 @@ def call_archive(root_path, archive_path, low_limit, freed_limit)
end
end

describe "#watch_transfer" do
before do
@src_dir = make_tmppath
@dst_dir = make_tmppath

@mocked_files_sizes = []
5.times { |i| (@dst_dir / i.to_s).write(i.to_s) }
end

it "calls transfer with the specified period" do
quit = Class.new(RuntimeError)
called = 0
flexmock(LogRuntimeArchive)
.new_instances
.should_receive(:log_transfer_prepare)
.pass_thru do
called += 1
raise quit if called == 3
end

tic = Time.now
assert_raises(quit) do
LogRuntimeArchiveMain.start(
["watch_transfer", @src_dir, @dst_dir, "--period", 0.5]
)
end

assert called == 3
assert_operator(Time.now - tic, :>, 0.9)
end

it "creates LogTransferManager" do
flexmock(LogRuntimeArchive)
.new_instances
.should_receive(:log_transfer_prepare)
.and_return(RobyApp::LogTransferManager)
end
end

describe "#transfer" do
before do
@src_dir = make_tmppath
@dst_dir = make_tmppath
end

it "raises ArgumentError if src_dir does not exist" do
e = assert_raises ArgumentError do
call_transfer("/does/not/exist", @dst_dir)
end
assert_equal "/does/not/exist does not exist, or is not a directory",
e.message
end

it "raises ArgumentError if dst_dir does not exist" do
e = assert_raises ArgumentError do
call_transfer(@src_dir, "/does/not/exist")
end
assert_equal "/does/not/exist does not exist, or is not a directory",
e.message
end

# Call 'transfer' function instead of 'watch_transfer' to call archiver once
def call_transfer(src_path, dst_path)
LogRuntimeArchiveMain.start(
["transfer", src_path, dst_path]
)
end
end

# Mock files sizes in bytes
# @param [Array] size of files in MB
def mock_files_size(sizes)
Expand Down
Loading

0 comments on commit d86cc46

Please sign in to comment.