From c8e7582bac6e3b56c851ec9382c8afbf4559e99c Mon Sep 17 00:00:00 2001 From: Tamir Duberstein Date: Mon, 25 Nov 2024 14:39:15 -0500 Subject: [PATCH] cleanup: refactor and use bicopy everywhere There are 3 instances of this pattern and each does this slightly differently. Clean up the implementation to return errors using `errors.Join` (which wasn't available when the original was written) and use it everywhere. This doesn't change behavior because the error return is always just logged (see the only called of `(*pseudoLoopbackForwarder).forward`. Note that the removal of the special handling of `io.EOF` returned from `io.Copy` doesn't change behavior because it can never happen per the latter's documentation. Signed-off-by: Tamir Duberstein --- pkg/bicopy/bicopy.go | 89 +++++++++++------------------------- pkg/hostagent/port_darwin.go | 8 ++-- pkg/portfwd/client.go | 19 +------- pkg/vz/network_darwin.go | 23 ++-------- 4 files changed, 36 insertions(+), 103 deletions(-) diff --git a/pkg/bicopy/bicopy.go b/pkg/bicopy/bicopy.go index 3db68521620..cd18b9632d7 100644 --- a/pkg/bicopy/bicopy.go +++ b/pkg/bicopy/bicopy.go @@ -1,81 +1,44 @@ -// From https://raw.githubusercontent.com/norouter/norouter/v0.6.5/pkg/agent/bicopy/bicopy.go -/* - Copyright (C) NoRouter authors. - - Copyright (C) libnetwork authors. - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -*/ - package bicopy import ( + "errors" "io" "sync" - - "github.com/sirupsen/logrus" ) -// Bicopy is from https://github.com/rootless-containers/rootlesskit/blob/v0.10.1/pkg/port/builtin/parent/tcp/tcp.go#L73-L104 -// (originally from libnetwork, Apache License 2.0). -func Bicopy(x, y io.ReadWriter, quit <-chan struct{}) { - type closeReader interface { +func broker(to, from io.ReadWriter) error { + _, errCopy := io.Copy(to, from) + var errCloseRead, errCloseWrite error + if from, ok := from.(interface { CloseRead() error + }); ok { + errCloseRead = from.CloseRead() } - type closeWriter interface { + if to, ok := to.(interface { CloseWrite() error + }); ok { + errCloseWrite = to.CloseWrite() } - var wg sync.WaitGroup - broker := func(to, from io.ReadWriter) { - if _, err := io.Copy(to, from); err != nil { - logrus.WithError(err).Debug("failed to call io.Copy") - } - if fromCR, ok := from.(closeReader); ok { - if err := fromCR.CloseRead(); err != nil { - logrus.WithError(err).Debug("failed to call CloseRead") - } - } - if toCW, ok := to.(closeWriter); ok { - if err := toCW.CloseWrite(); err != nil { - logrus.WithError(err).Debug("failed to call CloseWrite") - } - } - wg.Done() - } + return errors.Join(errCopy, errCloseRead, errCloseWrite) +} +func Bicopy(x, y io.ReadWriter) error { + var wg sync.WaitGroup wg.Add(2) - go broker(x, y) - go broker(y, x) - finish := make(chan struct{}) + var errLeft, errRight error go func() { - wg.Wait() - close(finish) + errLeft = broker(x, y) }() - - select { - case <-quit: - case <-finish: - } - if xCloser, ok := x.(io.Closer); ok { - if err := xCloser.Close(); err != nil { - logrus.WithError(err).Debug("failed to call xCloser.Close") - } + go func() { + errRight = broker(x, y) + }() + wg.Wait() + var errLeftClose, errRightClose error + if x, ok := x.(io.Closer); ok { + errLeftClose = x.Close() } - if yCloser, ok := y.(io.Closer); ok { - if err := yCloser.Close(); err != nil { - logrus.WithError(err).Debug("failed to call yCloser.Close") - } + if y, ok := y.(io.Closer); ok { + errRightClose = y.Close() } - <-finish - // TODO: return copied bytes + return errors.Join(errLeft, errRight, errLeftClose, errRightClose) } diff --git a/pkg/hostagent/port_darwin.go b/pkg/hostagent/port_darwin.go index a99897b6f3e..f25e43b5808 100644 --- a/pkg/hostagent/port_darwin.go +++ b/pkg/hostagent/port_darwin.go @@ -135,8 +135,8 @@ func (plf *pseudoLoopbackForwarder) Serve() error { continue } go func(ac *net.TCPConn) { - if fErr := plf.forward(ac); fErr != nil { - logrus.Error(fErr) + if err := plf.forward(ac); err != nil { + logrus.Error(err) } }(ac) } @@ -149,8 +149,8 @@ func (plf *pseudoLoopbackForwarder) forward(ac *net.TCPConn) error { return err } defer unixConn.Close() - bicopy.Bicopy(ac, unixConn, nil) - return nil + + return bicopy.Bicopy(ac, unixConn) } func (plf *pseudoLoopbackForwarder) Close() error { diff --git a/pkg/portfwd/client.go b/pkg/portfwd/client.go index b877850cccf..472d587f522 100644 --- a/pkg/portfwd/client.go +++ b/pkg/portfwd/client.go @@ -7,6 +7,7 @@ import ( "io" "net" + "github.com/lima-vm/lima/pkg/bicopy" "github.com/lima-vm/lima/pkg/guestagent/api" guestagentclient "github.com/lima-vm/lima/pkg/guestagent/api/client" "github.com/sirupsen/logrus" @@ -24,25 +25,9 @@ func HandleTCPConnection(ctx context.Context, client *guestagentclient.GuestAgen return } - g, _ := errgroup.WithContext(ctx) - rw := &GrpcClientRW{stream: stream, id: id, addr: guestAddr} - g.Go(func() error { - _, err := io.Copy(rw, conn) - if errors.Is(err, io.EOF) { - return nil - } - return err - }) - g.Go(func() error { - _, err := io.Copy(conn, rw) - if errors.Is(err, io.EOF) { - return nil - } - return err - }) - if err := g.Wait(); err != nil { + if err := bicopy.Bicopy(rw, conn); err != nil { logrus.Debugf("error in tcp tunnel for id: %s error:%v", id, err) } } diff --git a/pkg/vz/network_darwin.go b/pkg/vz/network_darwin.go index d7c8fbfb845..ce8b80ad807 100644 --- a/pkg/vz/network_darwin.go +++ b/pkg/vz/network_darwin.go @@ -8,11 +8,11 @@ import ( "io" "net" "os" - "sync" "syscall" "time" "github.com/balajiv113/fd" + "github.com/lima-vm/lima/pkg/bicopy" "github.com/sirupsen/logrus" ) @@ -62,24 +62,9 @@ func forwardPackets(qemuConn *qemuPacketConn, vzConn *packetConn) { defer qemuConn.Close() defer vzConn.Close() - var wg sync.WaitGroup - wg.Add(2) - - go func() { - defer wg.Done() - if _, err := io.Copy(qemuConn, vzConn); err != nil { - logrus.Errorf("Failed to forward packets from VZ to VMNET: %s", err) - } - }() - - go func() { - defer wg.Done() - if _, err := io.Copy(vzConn, qemuConn); err != nil { - logrus.Errorf("Failed to forward packets from VMNET to VZ: %s", err) - } - }() - - wg.Wait() + if err := bicopy.Bicopy(qemuConn, vzConn); err != nil { + logrus.Errorf("Failed to forward packets between QEMU and VZ: %s", err) + } } // qemuPacketConn converts raw network packet to a QEMU supported network packet.