diff --git a/lib/syskit/cli/log_runtime_archive.rb b/lib/syskit/cli/log_runtime_archive.rb index 6ba025189..e612d1b74 100644 --- a/lib/syskit/cli/log_runtime_archive.rb +++ b/lib/syskit/cli/log_runtime_archive.rb @@ -3,6 +3,7 @@ require "archive/tar/minitar" require "sys/filesystem" require "syskit/process_managers/remote/protocol" +require 'net/ftp' module Syskit module CLI @@ -19,7 +20,7 @@ class LogRuntimeArchive DEFAULT_MAX_ARCHIVE_SIZE = 10_000_000_000 # 10G def initialize( - root_dir, target_dir, + root_dir, target_dir: nil, logger: LogRuntimeArchive.null_logger, max_archive_size: DEFAULT_MAX_ARCHIVE_SIZE ) @@ -46,26 +47,18 @@ def process_root_folder end end - # Transfer logs from a process server to the main computer server + # Creates a FTP server and decides which logs to transfer # - # @param [Pathname] src_dir the log folder on the process server + # @param [Pathname] root_dir the log folder on the process server # @param [Params] server_params the FTP server parameters: - # { host, port, certificate, user, password } - def process_transfer(src_dir, server_params) - host = server_params[:host] - port = server_params[:port] - socket = - begin TCPSocket.new(host, port) - rescue Errno::ECONNREFUSED => e - raise e.class, "cannot contact process server at " \ - "'#{host}:#{port}': #{e.message}" - end - socket.write(ProcessManagers::Remote::COMMAND_LOG_UPLOAD_FILE) - - candidates = self.class.find_all_dataset_folders(src_dir) + # { user, password, certfile_path, interface, port } + def process_root_folder_transfer(server_params) + ftp = self.class.connect_to_remote_server(server_params) + candidates = self.class.find_all_dataset_folders(@root_dir) candidates.each do |child| - Marshal.dump([server_params, Pathname(child)], socket) + process_dataset_transfer(child, ftp) end + self.class.disconnect_from_remote_server(ftp) end # Manages folder available space @@ -106,9 +99,10 @@ def ensure_free_space(free_space_low_limit, free_space_delete_until) def process_dataset(child, full:) use_existing = true + basename = child.basename.to_s loop do open_archive_for( - child.basename.to_s, use_existing: use_existing + basename, use_existing: use_existing ) do |io| if io.tell > @max_archive_size use_existing = false @@ -127,6 +121,15 @@ def process_dataset(child, full:) end end + def process_dataset_transfer(child, ftp) + basename = child.basename.to_s + self.class.transfer_dataset( + @root_dir / basename, + basename, + ftp + ) + end + # Create or open an archive # # The method will find an archive to open or create, do it and @@ -170,6 +173,17 @@ def find_last_archive_index(basename) end end + def self.connect_to_remote_server(server_params) + ftp = Net::FTP.new + ftp.connect(server_params[:interface], server_params[:port]) + ftp.login(server_params[:user], server_params[:password]) + ftp + end + + def self.disconnect_from_remote_server(ftp) + ftp.close if ftp + end + # Find all dataset-looking folders within a root log folder def self.find_all_dataset_folders(root_dir) candidates = root_dir.enum_for(:each_entry).map do |child| @@ -278,6 +292,10 @@ def self.null_logger logger end + def self.transfer_dataset(local_path, remote_path, ftp) + ftp.putbinaryfile(local_path, remote_path) + end + # Archive the given dataset # # @param [IO] archive_io the IO of the target archive diff --git a/lib/syskit/cli/log_runtime_archive_main.rb b/lib/syskit/cli/log_runtime_archive_main.rb index 84fa4a49d..c8dff5b9f 100644 --- a/lib/syskit/cli/log_runtime_archive_main.rb +++ b/lib/syskit/cli/log_runtime_archive_main.rb @@ -6,7 +6,7 @@ require "pathname" require "thor" require "syskit/cli/log_runtime_archive" -require "lib/syskit/roby_app/log_transfer_server/spawn_server" +require "syskit/roby_app/log_transfer_server/spawn_server" module Syskit module CLI @@ -52,7 +52,7 @@ def self.exit_on_failure? def archive(root_dir, target_dir) root_dir = validate_directory_exists(root_dir) target_dir = validate_directory_exists(target_dir) - archiver = make_archiver(root_dir, target_dir) + archiver = make_archiver(root_dir, target_dir: target_dir) archiver.ensure_free_space( options[:free_space_low_limit] * 1_000_000, @@ -67,10 +67,13 @@ def archive(root_dir, target_dir) type: :numeric, default: 600, desc: "polling period in seconds" option :max_size, type: :numeric, default: 10_000, desc: "max log size in MB" - default_task def watch_transfer(src_dir, tgt_dir, server_params) + def watch_transfer( # rubocop:disable Metrics/ParameterLists + base_log_dir, user, password, certfile_path, interface, port + ) loop do begin - transfer(src_dir, tgt_dir, server_params) + transfer(base_log_dir, user, password, certfile_path, + interface, port) rescue Errno::ENOSPC next end @@ -83,18 +86,27 @@ def archive(root_dir, target_dir) desc "transfer", "transfers the datasets" option :max_size, type: :numeric, default: 10_000, desc: "max log size in MB" - def transfer(src_dir, tgt_dir, server_params) - src_dir = validate_directory_exists(src_dir) - tgt_dir = validate_directory_exists(tgt_dir) - archiver = make_archiver(src_dir, tgt_dir) - - archiver.process_transfer(src_dir, server_params) + def transfer( # rubocop:disable Metrics/ParameterLists + base_log_dir, user, password, certfile_path, interface, port + ) + server_params = { + user: user, password: password, certfile_path: certfile_path, + interface: interface, port: port + } + base_log_dir = validate_directory_exists(base_log_dir) + archiver = make_archiver(base_log_dir) + + archiver.process_root_folder_transfer(server_params) end desc "transfer_server", "creates the log transfer FTP server \ that runs on the main computer" - def transfer_server(tgt_dir, user, password, certfile) - create_server(tgt_dir, user, password, certfile) + def transfer_server( # rubocop:disable Metrics/ParameterLists + tgt_log_dir, user, password, certfile_path, interface, port + ) + create_server( + tgt_log_dir, user, password, certfile_path, interface, port + ) end no_commands do @@ -108,19 +120,24 @@ def validate_directory_exists(dir) dir end - def make_archiver(root_dir, target_dir) + def make_archiver(root_dir, target_dir: nil) logger = Logger.new($stdout) Syskit::CLI::LogRuntimeArchive.new( - root_dir, target_dir, + root_dir, target_dir: target_dir, logger: logger, max_archive_size: options[:max_size] * (1024**2) ) end - def create_server(tgt_dir, user, password, certfile) - RobyApp::LogTransferServer::SpawnServer.new( - tgt_dir, user, password, certfile + def create_server( # rubocop:disable Metrics/ParameterLists + tgt_log_dir, user, password, certfile_path, + interface, port + ) + server = RobyApp::LogTransferServer::SpawnServer.new( + tgt_log_dir, user, password, certfile_path, + interface: interface, port: port ) + server.run end end end diff --git a/test/cli/test_log_runtime_archive.rb b/test/cli/test_log_runtime_archive.rb index 7bb32b97e..b268b7f81 100644 --- a/test/cli/test_log_runtime_archive.rb +++ b/test/cli/test_log_runtime_archive.rb @@ -365,7 +365,7 @@ module CLI describe ".process_root_folder" do before do @archive_dir = make_tmppath - @process = LogRuntimeArchive.new(@root, @archive_dir) + @process = LogRuntimeArchive.new(@root, target_dir: @archive_dir) end it "archives all folders, the last one only partially" do @@ -391,7 +391,7 @@ module CLI .write(test1 = Base64.encode64(Random.bytes(1024))) (dataset / "test.2.log").write(Base64.encode64(Random.bytes(1024))) process = LogRuntimeArchive.new( - @root, @archive_dir, max_archive_size: 1024 + @root, target_dir: @archive_dir, max_archive_size: 1024 ) process.process_root_folder @@ -419,7 +419,7 @@ module CLI (dataset / "test.2.log") .write(test2 = Base64.encode64(Random.bytes(128))) process = LogRuntimeArchive.new( - @root, @archive_dir, max_archive_size: 1024 + @root, target_dir: @archive_dir, max_archive_size: 1024 ) process.process_root_folder @@ -445,7 +445,7 @@ module CLI test1 = make_random_file "test.1.log", root: dataset test2 = make_random_file "test.2.log", root: dataset process = LogRuntimeArchive.new( - @root, @archive_dir, max_archive_size: 1024 + @root, target_dir: @archive_dir, max_archive_size: 1024 ) process.process_root_folder @@ -516,6 +516,65 @@ def should_archive_dataset(dataset, archive_basename, full:) end end + describe ".process_transfer" do + before do + @process = LogRuntimeArchive.new(@root) + interface = "127.0.0.1" + ca = RobyApp::TmpRootCA.new(interface) + @params = { + interface: interface, port: 0, + certfile_path: ca.private_certificate_path, + user: "nilvo", password: "nilvo123" + } + @target_dir = make_tmppath + @threads = [] + + create_server + end + + it "transfers datasets" do + ftp = connect_to_server + + datasets = [ + make_valid_folder("20220434-2023"), + make_valid_folder("20220434-2024"), + make_valid_folder("20220434-2025") + ] + + datasets.map do |dataset| + transfer_dataset(ftp, @root / dataset, @target_dir / dataset) + end + + datasets.each do |dataset| + assert (@target_dir / dataset).file? + end + end + + def create_server + thread = Thread.new do + server = RobyApp::LogTransferServer::SpawnServer.new( + @target_dir, @params[:user], @params[:password], + @params[:certfile_path], interface: @params[:interface], + port: @params[:port] + ) + server.run + end + thread.join + end + + def transfer_dataset(ftp, src_path, tgt_path) + ftp.putbinaryfile(src_path, tgt_path) + end + + def connect_to_server + ftp = Net::FTP.new + ftp.connect(@params[:interface], @params[:port]) + ftp.login(@params[:user], @params[:password]) + ftp.passive = true + ftp + end + end + describe "#ensure_free_space" do before do @archive_dir = make_tmppath @@ -523,7 +582,7 @@ def should_archive_dataset(dataset, archive_basename, full:) 10.times { |i| (@archive_dir / i.to_s).write(i.to_s) } - @archiver = LogRuntimeArchive.new(@root, @archive_dir) + @archiver = LogRuntimeArchive.new(@root, target_dir: @archive_dir) end it "does nothing if there is enough free space" do diff --git a/test/cli/test_log_runtime_archive_main.rb b/test/cli/test_log_runtime_archive_main.rb index ff58dc361..8adad2461 100644 --- a/test/cli/test_log_runtime_archive_main.rb +++ b/test/cli/test_log_runtime_archive_main.rb @@ -131,20 +131,20 @@ def call_archive(root_path, archive_path, low_limit, freed_limit) describe "#watch_transfer" do before do - @src_dir = make_tmppath - @tgt_dir = make_tmppath - host = "127.0.0.1" - ca = RobyApp::TmpRootCA.new(host) + @base_log_dir = make_tmppath + @tgt_log_dir = make_tmppath + interface = "127.0.0.1" + port = 0 + ca = RobyApp::TmpRootCA.new(interface) user = "nilvo" password = "nilvo123" - server = spawn_server(@tgt_dir, user, password, ca) - port = server.port - @server_params = { - host: host, port: port, certificate: "", - user: user, password: password + user: user, password: password, + certfile_path: ca.private_certificate_path, + interface: interface, port: port } + server = spawn_server end it "calls transfer with the specified period" do @@ -152,7 +152,8 @@ def call_archive(root_path, archive_path, low_limit, freed_limit) called = 0 flexmock(LogRuntimeArchive) .new_instances - .should_receive(:process_transfer) + .should_receive(:process_root_folder_transfer) + .with(@server_params) .pass_thru do called += 1 raise quit if called == 3 @@ -160,10 +161,14 @@ def call_archive(root_path, archive_path, low_limit, freed_limit) tic = Time.now assert_raises(quit) do - LogRuntimeArchiveMain.start( - ["watch_transfer", - @src_dir, @tgt_dir, @server_params, "--period", 0.5] - ) + args = [ + "watch_transfer", + @base_log_dir, + *@server_params.values, + "--period", 0.5 + ] + pp "***", args + LogRuntimeArchiveMain.start(args) end assert called == 3 @@ -176,7 +181,8 @@ def call_archive(root_path, archive_path, low_limit, freed_limit) called = 0 flexmock(LogRuntimeArchive) .new_instances - .should_receive(:process_transfer) + .should_receive(:process_root_folder_transfer) + .with(@server_params) .pass_thru do called += 1 raise quit if called == 3 @@ -186,10 +192,13 @@ def call_archive(root_path, archive_path, low_limit, freed_limit) tic = Time.now assert_raises(quit) do - LogRuntimeArchiveMain.start( - ["watch_transfer", - @src_dir, @tgt_dir, @server_params, "--period", 0.5] - ) + args = [ + "watch_transfer", + @base_log_dir, + *@server_params.values, + "--period", 0.5 + ] + LogRuntimeArchiveMain.start(args) end assert_operator(Time.now - tic, :<, 1) end @@ -199,31 +208,17 @@ def call_archive(root_path, archive_path, low_limit, freed_limit) describe "#transfer" do before do - @src_dir = make_tmppath - @tgt_dir = make_tmppath - end - - it "raises ArgumentError if src_dir does not exist" do - e = assert_raises ArgumentError do - call_transfer("/does/not/exist", @tgt_dir, {}) - end - assert_equal "/does/not/exist does not exist, or is not a directory", - e.message - end - - it "raises ArgumentError if tgt_dir does not exist" do - e = assert_raises ArgumentError do - call_transfer(@src_dir, "/does/not/exist", {}) - end - assert_equal "/does/not/exist does not exist, or is not a directory", - e.message + @base_log_dir = make_tmppath end # Call 'transfer' function instead of 'watch' to call transfer once - def call_transfer(src_dir, tgt_dir, params) - LogRuntimeArchiveMain.start( - ["transfer", src_dir, tgt_dir, params] - ) + def call_transfer(src_dir, params) + args = [ + "transfer", + src_dir, + *params.values + ] + LogRuntimeArchiveMain.start(args) end end @@ -248,11 +243,13 @@ def mock_available_space(total_available_disk_space) end end - def spawn_server(tgt_dir, user, password, cert) - LogRuntimeArchiveMain.start( - ["transfer_server", - tgt_dir, user, password, cert.private_certificate_path] - ) + def spawn_server + args = [ + "transfer_server", + @tgt_log_dir, + *@server_params.values + ] + LogRuntimeArchiveMain.start(args) end def assert_deleted_files(deleted_files)