Skip to content

Commit

Permalink
chore: general fixes and variable renaming
Browse files Browse the repository at this point in the history
  • Loading branch information
eduardacoppo committed Dec 9, 2024
1 parent c101f33 commit cc2964c
Show file tree
Hide file tree
Showing 4 changed files with 177 additions and 86 deletions.
54 changes: 36 additions & 18 deletions lib/syskit/cli/log_runtime_archive.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
require "archive/tar/minitar"
require "sys/filesystem"
require "syskit/process_managers/remote/protocol"
require 'net/ftp'

module Syskit
module CLI
Expand All @@ -19,7 +20,7 @@ class LogRuntimeArchive
DEFAULT_MAX_ARCHIVE_SIZE = 10_000_000_000 # 10G

def initialize(
root_dir, target_dir,
root_dir, target_dir: nil,
logger: LogRuntimeArchive.null_logger,
max_archive_size: DEFAULT_MAX_ARCHIVE_SIZE
)
Expand All @@ -46,26 +47,18 @@ def process_root_folder
end
end

# Transfer logs from a process server to the main computer server
# Creates a FTP server and decides which logs to transfer
#
# @param [Pathname] src_dir the log folder on the process server
# @param [Pathname] root_dir the log folder on the process server
# @param [Params] server_params the FTP server parameters:
# { host, port, certificate, user, password }
def process_transfer(src_dir, server_params)
host = server_params[:host]
port = server_params[:port]
socket =
begin TCPSocket.new(host, port)
rescue Errno::ECONNREFUSED => e
raise e.class, "cannot contact process server at " \
"'#{host}:#{port}': #{e.message}"
end
socket.write(ProcessManagers::Remote::COMMAND_LOG_UPLOAD_FILE)

candidates = self.class.find_all_dataset_folders(src_dir)
# { user, password, certfile_path, interface, port }
def process_root_folder_transfer(server_params)
ftp = self.class.connect_to_remote_server(server_params)
candidates = self.class.find_all_dataset_folders(@root_dir)
candidates.each do |child|
Marshal.dump([server_params, Pathname(child)], socket)
process_dataset_transfer(child, ftp)
end
self.class.disconnect_from_remote_server(ftp)
end

# Manages folder available space
Expand Down Expand Up @@ -106,9 +99,10 @@ def ensure_free_space(free_space_low_limit, free_space_delete_until)

def process_dataset(child, full:)
use_existing = true
basename = child.basename.to_s
loop do
open_archive_for(
child.basename.to_s, use_existing: use_existing
basename, use_existing: use_existing
) do |io|
if io.tell > @max_archive_size
use_existing = false
Expand All @@ -127,6 +121,15 @@ def process_dataset(child, full:)
end
end

def process_dataset_transfer(child, ftp)
basename = child.basename.to_s
self.class.transfer_dataset(
@root_dir / basename,
basename,
ftp
)
end

# Create or open an archive
#
# The method will find an archive to open or create, do it and
Expand Down Expand Up @@ -170,6 +173,17 @@ def find_last_archive_index(basename)
end
end

def self.connect_to_remote_server(server_params)
ftp = Net::FTP.new
ftp.connect(server_params[:interface], server_params[:port])
ftp.login(server_params[:user], server_params[:password])
ftp
end

def self.disconnect_from_remote_server(ftp)
ftp.close if ftp
end

# Find all dataset-looking folders within a root log folder
def self.find_all_dataset_folders(root_dir)
candidates = root_dir.enum_for(:each_entry).map do |child|
Expand Down Expand Up @@ -278,6 +292,10 @@ def self.null_logger
logger
end

def self.transfer_dataset(local_path, remote_path, ftp)
ftp.putbinaryfile(local_path, remote_path)
end

# Archive the given dataset
#
# @param [IO] archive_io the IO of the target archive
Expand Down
51 changes: 34 additions & 17 deletions lib/syskit/cli/log_runtime_archive_main.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
require "pathname"
require "thor"
require "syskit/cli/log_runtime_archive"
require "lib/syskit/roby_app/log_transfer_server/spawn_server"
require "syskit/roby_app/log_transfer_server/spawn_server"

module Syskit
module CLI
Expand Down Expand Up @@ -52,7 +52,7 @@ def self.exit_on_failure?
def archive(root_dir, target_dir)
root_dir = validate_directory_exists(root_dir)
target_dir = validate_directory_exists(target_dir)
archiver = make_archiver(root_dir, target_dir)
archiver = make_archiver(root_dir, target_dir: target_dir)

archiver.ensure_free_space(
options[:free_space_low_limit] * 1_000_000,
Expand All @@ -67,10 +67,13 @@ def archive(root_dir, target_dir)
type: :numeric, default: 600, desc: "polling period in seconds"
option :max_size,
type: :numeric, default: 10_000, desc: "max log size in MB"
default_task def watch_transfer(src_dir, tgt_dir, server_params)
def watch_transfer( # rubocop:disable Metrics/ParameterLists
base_log_dir, user, password, certfile_path, interface, port
)
loop do
begin
transfer(src_dir, tgt_dir, server_params)
transfer(base_log_dir, user, password, certfile_path,
interface, port)
rescue Errno::ENOSPC
next
end
Expand All @@ -83,18 +86,27 @@ def archive(root_dir, target_dir)
desc "transfer", "transfers the datasets"
option :max_size,
type: :numeric, default: 10_000, desc: "max log size in MB"
def transfer(src_dir, tgt_dir, server_params)
src_dir = validate_directory_exists(src_dir)
tgt_dir = validate_directory_exists(tgt_dir)
archiver = make_archiver(src_dir, tgt_dir)

archiver.process_transfer(src_dir, server_params)
def transfer( # rubocop:disable Metrics/ParameterLists
base_log_dir, user, password, certfile_path, interface, port
)
server_params = {
user: user, password: password, certfile_path: certfile_path,
interface: interface, port: port
}
base_log_dir = validate_directory_exists(base_log_dir)
archiver = make_archiver(base_log_dir)

archiver.process_root_folder_transfer(server_params)
end

desc "transfer_server", "creates the log transfer FTP server \
that runs on the main computer"
def transfer_server(tgt_dir, user, password, certfile)
create_server(tgt_dir, user, password, certfile)
def transfer_server( # rubocop:disable Metrics/ParameterLists
tgt_log_dir, user, password, certfile_path, interface, port
)
create_server(
tgt_log_dir, user, password, certfile_path, interface, port
)
end

no_commands do
Expand All @@ -108,19 +120,24 @@ def validate_directory_exists(dir)
dir
end

def make_archiver(root_dir, target_dir)
def make_archiver(root_dir, target_dir: nil)
logger = Logger.new($stdout)

Syskit::CLI::LogRuntimeArchive.new(
root_dir, target_dir,
root_dir, target_dir: target_dir,
logger: logger, max_archive_size: options[:max_size] * (1024**2)
)
end

def create_server(tgt_dir, user, password, certfile)
RobyApp::LogTransferServer::SpawnServer.new(
tgt_dir, user, password, certfile
def create_server( # rubocop:disable Metrics/ParameterLists
tgt_log_dir, user, password, certfile_path,
interface, port
)
server = RobyApp::LogTransferServer::SpawnServer.new(
tgt_log_dir, user, password, certfile_path,
interface: interface, port: port
)
server.run
end
end
end
Expand Down
69 changes: 64 additions & 5 deletions test/cli/test_log_runtime_archive.rb
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ module CLI
describe ".process_root_folder" do
before do
@archive_dir = make_tmppath
@process = LogRuntimeArchive.new(@root, @archive_dir)
@process = LogRuntimeArchive.new(@root, target_dir: @archive_dir)
end

it "archives all folders, the last one only partially" do
Expand All @@ -391,7 +391,7 @@ module CLI
.write(test1 = Base64.encode64(Random.bytes(1024)))
(dataset / "test.2.log").write(Base64.encode64(Random.bytes(1024)))
process = LogRuntimeArchive.new(
@root, @archive_dir, max_archive_size: 1024
@root, target_dir: @archive_dir, max_archive_size: 1024
)
process.process_root_folder

Expand Down Expand Up @@ -419,7 +419,7 @@ module CLI
(dataset / "test.2.log")
.write(test2 = Base64.encode64(Random.bytes(128)))
process = LogRuntimeArchive.new(
@root, @archive_dir, max_archive_size: 1024
@root, target_dir: @archive_dir, max_archive_size: 1024
)
process.process_root_folder

Expand All @@ -445,7 +445,7 @@ module CLI
test1 = make_random_file "test.1.log", root: dataset
test2 = make_random_file "test.2.log", root: dataset
process = LogRuntimeArchive.new(
@root, @archive_dir, max_archive_size: 1024
@root, target_dir: @archive_dir, max_archive_size: 1024
)
process.process_root_folder

Expand Down Expand Up @@ -516,14 +516,73 @@ def should_archive_dataset(dataset, archive_basename, full:)
end
end

describe ".process_transfer" do
before do
@process = LogRuntimeArchive.new(@root)
interface = "127.0.0.1"
ca = RobyApp::TmpRootCA.new(interface)
@params = {
interface: interface, port: 0,
certfile_path: ca.private_certificate_path,
user: "nilvo", password: "nilvo123"
}
@target_dir = make_tmppath
@threads = []

create_server
end

it "transfers datasets" do
ftp = connect_to_server

datasets = [
make_valid_folder("20220434-2023"),
make_valid_folder("20220434-2024"),
make_valid_folder("20220434-2025")
]

datasets.map do |dataset|
transfer_dataset(ftp, @root / dataset, @target_dir / dataset)
end

datasets.each do |dataset|
assert (@target_dir / dataset).file?
end
end

def create_server
thread = Thread.new do
server = RobyApp::LogTransferServer::SpawnServer.new(
@target_dir, @params[:user], @params[:password],
@params[:certfile_path], interface: @params[:interface],
port: @params[:port]
)
server.run
end
thread.join
end

def transfer_dataset(ftp, src_path, tgt_path)
ftp.putbinaryfile(src_path, tgt_path)
end

def connect_to_server
ftp = Net::FTP.new
ftp.connect(@params[:interface], @params[:port])
ftp.login(@params[:user], @params[:password])
ftp.passive = true
ftp
end
end

describe "#ensure_free_space" do
before do
@archive_dir = make_tmppath
@mocked_files_sizes = []

10.times { |i| (@archive_dir / i.to_s).write(i.to_s) }

@archiver = LogRuntimeArchive.new(@root, @archive_dir)
@archiver = LogRuntimeArchive.new(@root, target_dir: @archive_dir)
end

it "does nothing if there is enough free space" do
Expand Down
Loading

0 comments on commit cc2964c

Please sign in to comment.