Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] feat: copy log to central system #441

Open
wants to merge 4 commits into
base: transition-to-runkit
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 44 additions & 2 deletions lib/syskit/cli/log_runtime_archive.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

require "archive/tar/minitar"
require "sys/filesystem"
require "syskit/process_managers/remote/protocol"
require "net/ftp"

module Syskit
module CLI
Expand All @@ -18,7 +20,7 @@ class LogRuntimeArchive
DEFAULT_MAX_ARCHIVE_SIZE = 10_000_000_000 # 10G

def initialize(
root_dir, target_dir,
root_dir, target_dir: nil,
logger: LogRuntimeArchive.null_logger,
max_archive_size: DEFAULT_MAX_ARCHIVE_SIZE
)
Expand All @@ -45,6 +47,20 @@ def process_root_folder
end
end

# Creates a FTP server and decides which logs to transfer
#
# @param [Pathname] root_dir the log folder on the process server
# @param [Params] server_params the FTP server parameters:
# { user, password, certfile_path, interface, port }
def process_root_folder_transfer(server_params)
ftp = self.class.connect_to_remote_server(server_params)
candidates = self.class.find_all_dataset_folders(@root_dir)
candidates.each do |child|
process_dataset_transfer(child, ftp)
end
self.class.disconnect_from_remote_server(ftp)
end

# Manages folder available space
#
# The method will check if there is enough space to save more log files
Expand Down Expand Up @@ -83,9 +99,10 @@ def ensure_free_space(free_space_low_limit, free_space_delete_until)

def process_dataset(child, full:)
use_existing = true
basename = child.basename.to_s
loop do
open_archive_for(
child.basename.to_s, use_existing: use_existing
basename, use_existing: use_existing
) do |io|
if io.tell > @max_archive_size
use_existing = false
Expand All @@ -104,6 +121,15 @@ def process_dataset(child, full:)
end
end

def process_dataset_transfer(child, ftp)
basename = child.basename.to_s
self.class.transfer_dataset(
@root_dir / basename,
basename,
ftp
)
end

# Create or open an archive
#
# The method will find an archive to open or create, do it and
Expand Down Expand Up @@ -147,6 +173,18 @@ def find_last_archive_index(basename)
end
end

def self.connect_to_remote_server(server_params)
ftp = Net::FTP.new
ftp.connect(server_params[:interface], server_params[:port])
ftp.login(server_params[:user], server_params[:password])
ftp.passive = true
ftp
end

def self.disconnect_from_remote_server(ftp)
ftp&.close
end

# Find all dataset-looking folders within a root log folder
def self.find_all_dataset_folders(root_dir)
candidates = root_dir.enum_for(:each_entry).map do |child|
Expand Down Expand Up @@ -255,6 +293,10 @@ def self.null_logger
logger
end

def self.transfer_dataset(local_path, remote_path, ftp)
ftp.putbinaryfile(local_path, remote_path)
end

# Archive the given dataset
#
# @param [IO] archive_io the IO of the target archive
Expand Down
73 changes: 67 additions & 6 deletions lib/syskit/cli/log_runtime_archive_main.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
require "pathname"
require "thor"
require "syskit/cli/log_runtime_archive"
require "syskit/roby_app/log_transfer_server/spawn_server"

module Syskit
module CLI
Expand All @@ -16,7 +17,6 @@ def self.exit_on_failure?
end

desc "watch", "watch a dataset root folder and call archiver"

option :period,
type: :numeric, default: 600, desc: "polling period in seconds"
option :max_size,
Expand Down Expand Up @@ -52,7 +52,7 @@ def self.exit_on_failure?
def archive(root_dir, target_dir)
root_dir = validate_directory_exists(root_dir)
target_dir = validate_directory_exists(target_dir)
archiver = make_archiver(root_dir, target_dir)
archiver = make_archiver(root_dir, target_dir: target_dir)

archiver.ensure_free_space(
options[:free_space_low_limit] * 1_000_000,
Expand All @@ -61,7 +61,55 @@ def archive(root_dir, target_dir)
archiver.process_root_folder
end

no_commands do
desc "watch_transfer", "watches a dataset root folder \
and periodically performs transfer"
option :period,
type: :numeric, default: 600, desc: "polling period in seconds"
option :max_size,
type: :numeric, default: 10_000, desc: "max log size in MB"
def watch_transfer( # rubocop:disable Metrics/ParameterLists
base_log_dir, user, password, certfile_path, interface, port
)
loop do
begin
transfer(base_log_dir, user, password, certfile_path,
interface, port)
rescue Errno::ENOSPC
next
end

puts "Transferred pending logs, sleeping #{options[:period]}s"
sleep options[:period]
end
end

desc "transfer", "transfers the datasets"
option :max_size,
type: :numeric, default: 10_000, desc: "max log size in MB"
def transfer( # rubocop:disable Metrics/ParameterLists
base_log_dir, user, password, certfile_path, interface, port
)
server_params = {
user: user, password: password, certfile_path: certfile_path,
interface: interface, port: port
}
base_log_dir = validate_directory_exists(base_log_dir)
archiver = make_archiver(base_log_dir)

archiver.process_root_folder_transfer(server_params)
end

desc "transfer_server", "creates the log transfer FTP server \
that runs on the main computer"
def transfer_server( # rubocop:disable Metrics/ParameterLists
tgt_log_dir, user, password, certfile_path, interface, port
)
create_server(
tgt_log_dir, user, password, certfile_path, interface, port
)
end

no_commands do # rubocop:disable Metrics/BlockLength
def validate_directory_exists(dir)
dir = Pathname.new(dir)
unless dir.directory?
Expand All @@ -72,13 +120,26 @@ def validate_directory_exists(dir)
dir
end

def make_archiver(root_dir, target_dir)
def make_archiver(root_dir, target_dir: nil)
logger = Logger.new($stdout)

Syskit::CLI::LogRuntimeArchive.new(
root_dir, target_dir,
logger: logger, max_archive_size: options[:max_size] * (1024**2)
root_dir,
target_dir: target_dir, logger: logger,
max_archive_size: options[:max_size] * (1024**2)
)
end

def create_server( # rubocop:disable Metrics/ParameterLists
tgt_log_dir, user, password, certfile_path,
interface, port
)
server = RobyApp::LogTransferServer::SpawnServer.new(
tgt_log_dir, user, password, certfile_path,
interface: interface, port: port
)
server.run
server
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/syskit/roby_app/log_transfer_server/spawn_server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def wait_until_stopped
puts "FTP server started. Press ENTER or c-C to stop it"
$stdout.flush
begin
gets
$stdin.readline
rescue Interrupt
puts "Interrupt"
end
Expand Down
69 changes: 64 additions & 5 deletions test/cli/test_log_runtime_archive.rb
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ module CLI
describe ".process_root_folder" do
before do
@archive_dir = make_tmppath
@process = LogRuntimeArchive.new(@root, @archive_dir)
@process = LogRuntimeArchive.new(@root, target_dir: @archive_dir)
end

it "archives all folders, the last one only partially" do
Expand All @@ -391,7 +391,7 @@ module CLI
.write(test1 = Base64.encode64(Random.bytes(1024)))
(dataset / "test.2.log").write(Base64.encode64(Random.bytes(1024)))
process = LogRuntimeArchive.new(
@root, @archive_dir, max_archive_size: 1024
@root, target_dir: @archive_dir, max_archive_size: 1024
)
process.process_root_folder

Expand Down Expand Up @@ -419,7 +419,7 @@ module CLI
(dataset / "test.2.log")
.write(test2 = Base64.encode64(Random.bytes(128)))
process = LogRuntimeArchive.new(
@root, @archive_dir, max_archive_size: 1024
@root, target_dir: @archive_dir, max_archive_size: 1024
)
process.process_root_folder

Expand All @@ -445,7 +445,7 @@ module CLI
test1 = make_random_file "test.1.log", root: dataset
test2 = make_random_file "test.2.log", root: dataset
process = LogRuntimeArchive.new(
@root, @archive_dir, max_archive_size: 1024
@root, target_dir: @archive_dir, max_archive_size: 1024
)
process.process_root_folder

Expand Down Expand Up @@ -516,14 +516,73 @@ def should_archive_dataset(dataset, archive_basename, full:)
end
end

describe ".process_transfer" do
before do
@process = LogRuntimeArchive.new(@root)
interface = "127.0.0.1"
ca = RobyApp::TmpRootCA.new(interface)
@params = {
interface: interface, port: 0,
certfile_path: ca.private_certificate_path,
user: "nilvo", password: "nilvo123"
}
@target_dir = make_tmppath
@threads = []

create_server
end

it "transfers datasets" do
ftp = connect_to_server

datasets = [
make_valid_folder("20220434-2023"),
make_valid_folder("20220434-2024"),
make_valid_folder("20220434-2025")
]

datasets.map do |dataset|
transfer_dataset(ftp, @root / dataset, @target_dir / dataset)
end

datasets.each do |dataset|
assert (@target_dir / dataset).file?
end
end

def create_server
thread = Thread.new do
server = RobyApp::LogTransferServer::SpawnServer.new(
@target_dir, @params[:user], @params[:password],
@params[:certfile_path],
interface: @params[:interface], port: @params[:port]
)
server.run
end
thread.join
end

def transfer_dataset(ftp, src_path, tgt_path)
ftp.putbinaryfile(src_path, tgt_path)
end

def connect_to_server
ftp = Net::FTP.new
ftp.connect(@params[:interface], @params[:port])
ftp.login(@params[:user], @params[:password])
ftp.passive = true
ftp
end
end

describe "#ensure_free_space" do
before do
@archive_dir = make_tmppath
@mocked_files_sizes = []

10.times { |i| (@archive_dir / i.to_s).write(i.to_s) }

@archiver = LogRuntimeArchive.new(@root, @archive_dir)
@archiver = LogRuntimeArchive.new(@root, target_dir: @archive_dir)
end

it "does nothing if there is enough free space" do
Expand Down
Loading