Skip to content

Commit

Permalink
cleanup: refactor and use bicopy everywhere
Browse files Browse the repository at this point in the history
There are 3 instances of this pattern and each does this slightly
differently. Clean up the implementation to return errors using
`errors.Join` (which wasn't available when the original was written) and
use it everywhere.

This doesn't change behavior because the error return is always just
logged (see the only called of `(*pseudoLoopbackForwarder).forward`.

Note that the removal of the special handling of `io.EOF` returned from
`io.Copy` doesn't change behavior because it can never happen per the
latter's documentation.

Signed-off-by: Tamir Duberstein <[email protected]>
  • Loading branch information
tamird committed Nov 27, 2024
1 parent ca778e3 commit c8e7582
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 103 deletions.
89 changes: 26 additions & 63 deletions pkg/bicopy/bicopy.go
Original file line number Diff line number Diff line change
@@ -1,81 +1,44 @@
// From https://raw.githubusercontent.com/norouter/norouter/v0.6.5/pkg/agent/bicopy/bicopy.go
/*
Copyright (C) NoRouter authors.
Copyright (C) libnetwork authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package bicopy

import (
"errors"
"io"
"sync"

"github.com/sirupsen/logrus"
)

// Bicopy is from https://github.com/rootless-containers/rootlesskit/blob/v0.10.1/pkg/port/builtin/parent/tcp/tcp.go#L73-L104
// (originally from libnetwork, Apache License 2.0).
func Bicopy(x, y io.ReadWriter, quit <-chan struct{}) {
type closeReader interface {
func broker(to, from io.ReadWriter) error {
_, errCopy := io.Copy(to, from)
var errCloseRead, errCloseWrite error
if from, ok := from.(interface {
CloseRead() error
}); ok {
errCloseRead = from.CloseRead()
}
type closeWriter interface {
if to, ok := to.(interface {
CloseWrite() error
}); ok {
errCloseWrite = to.CloseWrite()
}
var wg sync.WaitGroup
broker := func(to, from io.ReadWriter) {
if _, err := io.Copy(to, from); err != nil {
logrus.WithError(err).Debug("failed to call io.Copy")
}
if fromCR, ok := from.(closeReader); ok {
if err := fromCR.CloseRead(); err != nil {
logrus.WithError(err).Debug("failed to call CloseRead")
}
}
if toCW, ok := to.(closeWriter); ok {
if err := toCW.CloseWrite(); err != nil {
logrus.WithError(err).Debug("failed to call CloseWrite")
}
}
wg.Done()
}
return errors.Join(errCopy, errCloseRead, errCloseWrite)
}

func Bicopy(x, y io.ReadWriter) error {
var wg sync.WaitGroup
wg.Add(2)
go broker(x, y)
go broker(y, x)
finish := make(chan struct{})
var errLeft, errRight error
go func() {
wg.Wait()
close(finish)
errLeft = broker(x, y)
}()

select {
case <-quit:
case <-finish:
}
if xCloser, ok := x.(io.Closer); ok {
if err := xCloser.Close(); err != nil {
logrus.WithError(err).Debug("failed to call xCloser.Close")
}
go func() {
errRight = broker(x, y)
}()
wg.Wait()
var errLeftClose, errRightClose error
if x, ok := x.(io.Closer); ok {
errLeftClose = x.Close()
}
if yCloser, ok := y.(io.Closer); ok {
if err := yCloser.Close(); err != nil {
logrus.WithError(err).Debug("failed to call yCloser.Close")
}
if y, ok := y.(io.Closer); ok {
errRightClose = y.Close()
}
<-finish
// TODO: return copied bytes
return errors.Join(errLeft, errRight, errLeftClose, errRightClose)
}
8 changes: 4 additions & 4 deletions pkg/hostagent/port_darwin.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,8 @@ func (plf *pseudoLoopbackForwarder) Serve() error {
continue
}
go func(ac *net.TCPConn) {
if fErr := plf.forward(ac); fErr != nil {
logrus.Error(fErr)
if err := plf.forward(ac); err != nil {
logrus.Error(err)
}
}(ac)
}
Expand All @@ -149,8 +149,8 @@ func (plf *pseudoLoopbackForwarder) forward(ac *net.TCPConn) error {
return err
}
defer unixConn.Close()
bicopy.Bicopy(ac, unixConn, nil)
return nil

return bicopy.Bicopy(ac, unixConn)
}

func (plf *pseudoLoopbackForwarder) Close() error {
Expand Down
19 changes: 2 additions & 17 deletions pkg/portfwd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io"
"net"

"github.com/lima-vm/lima/pkg/bicopy"
"github.com/lima-vm/lima/pkg/guestagent/api"
guestagentclient "github.com/lima-vm/lima/pkg/guestagent/api/client"
"github.com/sirupsen/logrus"
Expand All @@ -24,25 +25,9 @@ func HandleTCPConnection(ctx context.Context, client *guestagentclient.GuestAgen
return
}

g, _ := errgroup.WithContext(ctx)

rw := &GrpcClientRW{stream: stream, id: id, addr: guestAddr}
g.Go(func() error {
_, err := io.Copy(rw, conn)
if errors.Is(err, io.EOF) {
return nil
}
return err
})
g.Go(func() error {
_, err := io.Copy(conn, rw)
if errors.Is(err, io.EOF) {
return nil
}
return err
})

if err := g.Wait(); err != nil {
if err := bicopy.Bicopy(rw, conn); err != nil {
logrus.Debugf("error in tcp tunnel for id: %s error:%v", id, err)
}
}
Expand Down
23 changes: 4 additions & 19 deletions pkg/vz/network_darwin.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ import (
"io"
"net"
"os"
"sync"
"syscall"
"time"

"github.com/balajiv113/fd"
"github.com/lima-vm/lima/pkg/bicopy"

"github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -62,24 +62,9 @@ func forwardPackets(qemuConn *qemuPacketConn, vzConn *packetConn) {
defer qemuConn.Close()
defer vzConn.Close()

var wg sync.WaitGroup
wg.Add(2)

go func() {
defer wg.Done()
if _, err := io.Copy(qemuConn, vzConn); err != nil {
logrus.Errorf("Failed to forward packets from VZ to VMNET: %s", err)
}
}()

go func() {
defer wg.Done()
if _, err := io.Copy(vzConn, qemuConn); err != nil {
logrus.Errorf("Failed to forward packets from VMNET to VZ: %s", err)
}
}()

wg.Wait()
if err := bicopy.Bicopy(qemuConn, vzConn); err != nil {
logrus.Errorf("Failed to forward packets between QEMU and VZ: %s", err)
}
}

// qemuPacketConn converts raw network packet to a QEMU supported network packet.
Expand Down

0 comments on commit c8e7582

Please sign in to comment.