diff --git a/lib/syskit/cli/log_runtime_archive.rb b/lib/syskit/cli/log_runtime_archive.rb index 66e01a84e..ed279e4dd 100644 --- a/lib/syskit/cli/log_runtime_archive.rb +++ b/lib/syskit/cli/log_runtime_archive.rb @@ -2,6 +2,8 @@ require "archive/tar/minitar" require "sys/filesystem" +require "syskit/process_managers/remote/protocol" +require "net/ftp" module Syskit module CLI @@ -18,7 +20,7 @@ class LogRuntimeArchive DEFAULT_MAX_ARCHIVE_SIZE = 10_000_000_000 # 10G def initialize( - root_dir, target_dir, + root_dir, target_dir: nil, logger: LogRuntimeArchive.null_logger, max_archive_size: DEFAULT_MAX_ARCHIVE_SIZE ) @@ -45,6 +47,20 @@ def process_root_folder end end + # Creates a FTP server and decides which logs to transfer + # + # @param [Pathname] root_dir the log folder on the process server + # @param [Params] server_params the FTP server parameters: + # { user, password, certfile_path, interface, port } + def process_root_folder_transfer(server_params) + ftp = self.class.connect_to_remote_server(server_params) + candidates = self.class.find_all_dataset_folders(@root_dir) + candidates.each do |child| + process_dataset_transfer(child, ftp) + end + self.class.disconnect_from_remote_server(ftp) + end + # Manages folder available space # # The method will check if there is enough space to save more log files @@ -83,9 +99,10 @@ def ensure_free_space(free_space_low_limit, free_space_delete_until) def process_dataset(child, full:) use_existing = true + basename = child.basename.to_s loop do open_archive_for( - child.basename.to_s, use_existing: use_existing + basename, use_existing: use_existing ) do |io| if io.tell > @max_archive_size use_existing = false @@ -104,6 +121,15 @@ def process_dataset(child, full:) end end + def process_dataset_transfer(child, ftp) + basename = child.basename.to_s + self.class.transfer_dataset( + @root_dir / basename, + basename, + ftp + ) + end + # Create or open an archive # # The method will find an archive to open or create, do it and @@ -147,6 +173,18 @@ def find_last_archive_index(basename) end end + def self.connect_to_remote_server(server_params) + ftp = Net::FTP.new + ftp.connect(server_params[:interface], server_params[:port]) + ftp.login(server_params[:user], server_params[:password]) + ftp.passive = true + ftp + end + + def self.disconnect_from_remote_server(ftp) + ftp&.close + end + # Find all dataset-looking folders within a root log folder def self.find_all_dataset_folders(root_dir) candidates = root_dir.enum_for(:each_entry).map do |child| @@ -255,6 +293,10 @@ def self.null_logger logger end + def self.transfer_dataset(local_path, remote_path, ftp) + ftp.putbinaryfile(local_path, remote_path) + end + # Archive the given dataset # # @param [IO] archive_io the IO of the target archive diff --git a/lib/syskit/cli/log_runtime_archive_main.rb b/lib/syskit/cli/log_runtime_archive_main.rb index aa536b575..d997c8456 100644 --- a/lib/syskit/cli/log_runtime_archive_main.rb +++ b/lib/syskit/cli/log_runtime_archive_main.rb @@ -6,6 +6,7 @@ require "pathname" require "thor" require "syskit/cli/log_runtime_archive" +require "syskit/roby_app/log_transfer_server/spawn_server" module Syskit module CLI @@ -16,7 +17,6 @@ def self.exit_on_failure? end desc "watch", "watch a dataset root folder and call archiver" - option :period, type: :numeric, default: 600, desc: "polling period in seconds" option :max_size, @@ -52,7 +52,7 @@ def self.exit_on_failure? def archive(root_dir, target_dir) root_dir = validate_directory_exists(root_dir) target_dir = validate_directory_exists(target_dir) - archiver = make_archiver(root_dir, target_dir) + archiver = make_archiver(root_dir, target_dir: target_dir) archiver.ensure_free_space( options[:free_space_low_limit] * 1_000_000, @@ -61,7 +61,55 @@ def archive(root_dir, target_dir) archiver.process_root_folder end - no_commands do + desc "watch_transfer", "watches a dataset root folder \ + and periodically performs transfer" + option :period, + type: :numeric, default: 600, desc: "polling period in seconds" + option :max_size, + type: :numeric, default: 10_000, desc: "max log size in MB" + def watch_transfer( # rubocop:disable Metrics/ParameterLists + base_log_dir, user, password, certfile_path, interface, port + ) + loop do + begin + transfer(base_log_dir, user, password, certfile_path, + interface, port) + rescue Errno::ENOSPC + next + end + + puts "Transferred pending logs, sleeping #{options[:period]}s" + sleep options[:period] + end + end + + desc "transfer", "transfers the datasets" + option :max_size, + type: :numeric, default: 10_000, desc: "max log size in MB" + def transfer( # rubocop:disable Metrics/ParameterLists + base_log_dir, user, password, certfile_path, interface, port + ) + server_params = { + user: user, password: password, certfile_path: certfile_path, + interface: interface, port: port + } + base_log_dir = validate_directory_exists(base_log_dir) + archiver = make_archiver(base_log_dir) + + archiver.process_root_folder_transfer(server_params) + end + + desc "transfer_server", "creates the log transfer FTP server \ + that runs on the main computer" + def transfer_server( # rubocop:disable Metrics/ParameterLists + tgt_log_dir, user, password, certfile_path, interface, port + ) + create_server( + tgt_log_dir, user, password, certfile_path, interface, port + ) + end + + no_commands do # rubocop:disable Metrics/BlockLength def validate_directory_exists(dir) dir = Pathname.new(dir) unless dir.directory? @@ -72,13 +120,26 @@ def validate_directory_exists(dir) dir end - def make_archiver(root_dir, target_dir) + def make_archiver(root_dir, target_dir: nil) logger = Logger.new($stdout) Syskit::CLI::LogRuntimeArchive.new( - root_dir, target_dir, - logger: logger, max_archive_size: options[:max_size] * (1024**2) + root_dir, + target_dir: target_dir, logger: logger, + max_archive_size: options[:max_size] * (1024**2) + ) + end + + def create_server( # rubocop:disable Metrics/ParameterLists + tgt_log_dir, user, password, certfile_path, + interface, port + ) + server = RobyApp::LogTransferServer::SpawnServer.new( + tgt_log_dir, user, password, certfile_path, + interface: interface, port: port ) + server.run + server end end end diff --git a/lib/syskit/roby_app/log_transfer_server/spawn_server.rb b/lib/syskit/roby_app/log_transfer_server/spawn_server.rb index 1405d5c57..7b9260c45 100644 --- a/lib/syskit/roby_app/log_transfer_server/spawn_server.rb +++ b/lib/syskit/roby_app/log_transfer_server/spawn_server.rb @@ -85,7 +85,7 @@ def wait_until_stopped puts "FTP server started. Press ENTER or c-C to stop it" $stdout.flush begin - gets + $stdin.readline rescue Interrupt puts "Interrupt" end diff --git a/test/cli/test_log_runtime_archive.rb b/test/cli/test_log_runtime_archive.rb index 7bb32b97e..86e6ddb5b 100644 --- a/test/cli/test_log_runtime_archive.rb +++ b/test/cli/test_log_runtime_archive.rb @@ -365,7 +365,7 @@ module CLI describe ".process_root_folder" do before do @archive_dir = make_tmppath - @process = LogRuntimeArchive.new(@root, @archive_dir) + @process = LogRuntimeArchive.new(@root, target_dir: @archive_dir) end it "archives all folders, the last one only partially" do @@ -391,7 +391,7 @@ module CLI .write(test1 = Base64.encode64(Random.bytes(1024))) (dataset / "test.2.log").write(Base64.encode64(Random.bytes(1024))) process = LogRuntimeArchive.new( - @root, @archive_dir, max_archive_size: 1024 + @root, target_dir: @archive_dir, max_archive_size: 1024 ) process.process_root_folder @@ -419,7 +419,7 @@ module CLI (dataset / "test.2.log") .write(test2 = Base64.encode64(Random.bytes(128))) process = LogRuntimeArchive.new( - @root, @archive_dir, max_archive_size: 1024 + @root, target_dir: @archive_dir, max_archive_size: 1024 ) process.process_root_folder @@ -445,7 +445,7 @@ module CLI test1 = make_random_file "test.1.log", root: dataset test2 = make_random_file "test.2.log", root: dataset process = LogRuntimeArchive.new( - @root, @archive_dir, max_archive_size: 1024 + @root, target_dir: @archive_dir, max_archive_size: 1024 ) process.process_root_folder @@ -516,6 +516,65 @@ def should_archive_dataset(dataset, archive_basename, full:) end end + describe ".process_transfer" do + before do + @process = LogRuntimeArchive.new(@root) + interface = "127.0.0.1" + ca = RobyApp::TmpRootCA.new(interface) + @params = { + interface: interface, port: 0, + certfile_path: ca.private_certificate_path, + user: "nilvo", password: "nilvo123" + } + @target_dir = make_tmppath + @threads = [] + + create_server + end + + it "transfers datasets" do + ftp = connect_to_server + + datasets = [ + make_valid_folder("20220434-2023"), + make_valid_folder("20220434-2024"), + make_valid_folder("20220434-2025") + ] + + datasets.map do |dataset| + transfer_dataset(ftp, @root / dataset, @target_dir / dataset) + end + + datasets.each do |dataset| + assert (@target_dir / dataset).file? + end + end + + def create_server + thread = Thread.new do + server = RobyApp::LogTransferServer::SpawnServer.new( + @target_dir, @params[:user], @params[:password], + @params[:certfile_path], + interface: @params[:interface], port: @params[:port] + ) + server.run + end + thread.join + end + + def transfer_dataset(ftp, src_path, tgt_path) + ftp.putbinaryfile(src_path, tgt_path) + end + + def connect_to_server + ftp = Net::FTP.new + ftp.connect(@params[:interface], @params[:port]) + ftp.login(@params[:user], @params[:password]) + ftp.passive = true + ftp + end + end + describe "#ensure_free_space" do before do @archive_dir = make_tmppath @@ -523,7 +582,7 @@ def should_archive_dataset(dataset, archive_basename, full:) 10.times { |i| (@archive_dir / i.to_s).write(i.to_s) } - @archiver = LogRuntimeArchive.new(@root, @archive_dir) + @archiver = LogRuntimeArchive.new(@root, target_dir: @archive_dir) end it "does nothing if there is enough free space" do diff --git a/test/cli/test_log_runtime_archive_main.rb b/test/cli/test_log_runtime_archive_main.rb index e30adb9d0..7658a7d52 100644 --- a/test/cli/test_log_runtime_archive_main.rb +++ b/test/cli/test_log_runtime_archive_main.rb @@ -2,6 +2,7 @@ require "syskit/test/self" require "syskit/cli/log_runtime_archive_main" +require "syskit/roby_app/tmp_root_ca" module Syskit module CLI @@ -128,6 +129,99 @@ def call_archive(root_path, archive_path, low_limit, freed_limit) end end + describe "#transfer_server" do + before do + @tgt_log_dir = make_tmppath + interface = "127.0.0.1" + ca = RobyApp::TmpRootCA.new(interface) + + server_params = { + user: "nilvo", password: "nilvo123", + certfile_path: ca.private_certificate_path, + interface: interface, port: 0 + } + end + end + + describe "#watch_transfer" do + before do + @base_log_dir = make_tmppath + @tgt_log_dir = make_tmppath + interface = "127.0.0.1" + ca = RobyApp::TmpRootCA.new(interface) + + @server_params = { + user: "nilvo", password: "nilvo123", + certfile_path: ca.private_certificate_path, + interface: interface, port: 0 + } + @threads = [] + server = nil + flexmock(RobyApp::LogTransferServer::SpawnServer) + .should_receive(:new) + .with_any_args + .pass_thru do |arg| + server = arg + end + call_create_server + @server = server + end + + after do + @server.stop + @server.join + @threads.each(&:kill) + end + + it "calls transfer with the specified period" do + quit = Class.new(RuntimeError) + called = 0 + flexmock(LogRuntimeArchive) + .new_instances + .should_receive(:process_root_folder_transfer) + .with(@server_params) + .pass_thru do + called += 1 + raise quit if called == 3 + end + + tic = Time.now + assert_raises(quit) do + args = [ + "watch_transfer", + @base_log_dir, + *@server_params.values, + "--period", 0.5 + ] + LogRuntimeArchiveMain.start(args) + end + + assert called == 3 + assert_operator(Time.now - tic, :>, 0.9) + end + + def call_create_server + cli = LogRuntimeArchiveMain.new + cli.create_server(@tgt_log_dir, *@server_params.values) + end + end + + describe "#transfer" do + before do + @base_log_dir = make_tmppath + end + + # Call 'transfer' function instead of 'watch' to call transfer once + def call_transfer(src_dir, params) + args = [ + "transfer", + src_dir, + *params.values + ] + LogRuntimeArchiveMain.start(args) + end + end + # Mock files sizes in bytes # @param [Array] size of files in MB def mock_files_size(sizes)