diff --git a/bin/lvmsync b/bin/lvmsync index 324fce1..1fb177f 100755 --- a/bin/lvmsync +++ b/bin/lvmsync @@ -112,13 +112,13 @@ def run_apply(opts) infile = opts[:apply] == '-' ? $stdin : File.open(opts[:apply], 'r') destdev = opts[:device] - process_dumpdata(infile, destdev, snapfile) + process_dumpdata(infile, destdev, snapfile, opts) ensure snapfile.close unless snapfile.nil? infile.close unless infile.nil? or infile == $stdin end -def process_dumpdata(instream, destdev, snapback = nil) +def process_dumpdata(instream, destdev, snapback = nil, opts) handshake = instream.readline.chomp unless handshake == PROTOCOL_VERSION $stderr.puts "Handshake failed; protocol mismatch? (saw '#{handshake}' expected '#{PROTOCOL_VERSION}'" @@ -190,25 +190,38 @@ def run_client(opts) if opts[:stdout] dump_changes(lv, $stdout, opts) else + verbose = opts[:verbose] ? '-v' : '' server_cmd = if desthost - "ssh #{desthost} lvmsync --apply - #{snapback} #{destdev}" + "ssh #{desthost} lvmsync --apply - #{snapback} #{verbose} #{destdev}" else - "#{$0} --apply - #{snapback} #{destdev}" + "#{$0} --apply - #{snapback} #{verbose} #{destdev}" end exit_status = nil errors = nil Open3.popen3(server_cmd) do |stdin_fd, stdout_fd, stderr_fd, wait_thr| - dump_changes(lv, stdin_fd, opts) + dump_changes(lv, stdin_fd, opts) do + more_to_read = true + while more_to_read + more_to_read = false + (IO.select([stdout_fd, stderr_fd], [], [], 0) || [[]])[0].each do |fd| + more_to_read = true + $stderr.puts "\e[2K\rremote:#{fd.readline}" + end + end + end stdin_fd.close - errors = stderr_fd.read + [stderr_fd, stdout_fd].each do |fd| + until fd.eof? + $stderr.puts "\e[2K\rremote:#{fd.readline}" + end + end exit_status = wait_thr.value if wait_thr end if (exit_status or $?).exitstatus != 0 $stderr.puts "APPLY FAILED." - $stderr.puts errors.split("\n").map { |l| "remote: #{l}" }.join("\n") end end end @@ -220,6 +233,7 @@ def dump_changes(lv, outfd, opts) xfer_count = 0 xfer_size = 0 total_size = 0 + change_count = lv.changes.length File.open(lv.origin.path, 'r') do |origindev| lv.changes.each do |r| @@ -228,21 +242,28 @@ def dump_changes(lv, outfd, opts) xfer_size += chunk_size $stderr.puts "Sending chunk #{r.to_s}..." if opts[:verbose] - $stderr.puts "Seeking to #{r.first} in #{originfile}" if opts[:verbose] + $stderr.puts "Seeking to #{r.first} in #{lv.origin.path}" if opts[:verbose] origindev.seek(r.first, IO::SEEK_SET) - outfd.print [htonq(r.first), chunk_size].pack("QN") - outfd.print origindev.read(chunk_size) + begin + outfd.print [htonq(r.first), chunk_size].pack("QN") + outfd.print origindev.read(chunk_size) + rescue Errno::EPIPE + $stderr.puts "Remote prematurely closed the connection" + yield if block_given? + return + end # Progress bar! if xfer_count % 100 == 50 $stderr.printf "\e[2K\rSending chunk %i of %i, %.2fMB/s", xfer_count, - snap.differences.length, + change_count, xfer_size / (Time.now - start_time) / 1048576 $stderr.flush end + yield if block_given? end origindev.seek(0, IO::SEEK_END)