Skip to content

Commit

Permalink
cleanup: refactor and use bicopy everywhere
Browse files Browse the repository at this point in the history
There are 3 instances of this pattern and each does this slightly
differently. Clean up the implementation to return errors using
`errors.Join` (which wasn't available when the original was written) and
use it everywhere.

This doesn't change behavior because the error return is always just
logged (see the only called of `(*pseudoLoopbackForwarder).forward`.

Note that the removal of the special handling of `io.EOF` returned from
`io.Copy` doesn't change behavior because it can never happen per the
latter's documentation.

Signed-off-by: Tamir Duberstein <[email protected]>
  • Loading branch information
tamird committed Dec 3, 2024
1 parent 7cea6f8 commit 553ebd9
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 113 deletions.
82 changes: 16 additions & 66 deletions pkg/bicopy/bicopy.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,3 @@
// From https://raw.githubusercontent.com/norouter/norouter/v0.6.5/pkg/agent/bicopy/bicopy.go
/*
Copyright (C) NoRouter authors.
Copyright (C) libnetwork authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package bicopy

import (
Expand All @@ -26,56 +7,25 @@ import (
"github.com/sirupsen/logrus"
)

// Bicopy is from https://github.com/rootless-containers/rootlesskit/blob/v0.10.1/pkg/port/builtin/parent/tcp/tcp.go#L73-L104
// (originally from libnetwork, Apache License 2.0).
func Bicopy(x, y io.ReadWriter, quit <-chan struct{}) {
type closeReader interface {
CloseRead() error
}
type closeWriter interface {
CloseWrite() error
}
var wg sync.WaitGroup
broker := func(to, from io.ReadWriter) {
if _, err := io.Copy(to, from); err != nil {
logrus.WithError(err).Debug("failed to call io.Copy")
}
if fromCR, ok := from.(closeReader); ok {
if err := fromCR.CloseRead(); err != nil {
logrus.WithError(err).Debug("failed to call CloseRead")
}
}
if toCW, ok := to.(closeWriter); ok {
if err := toCW.CloseWrite(); err != nil {
logrus.WithError(err).Debug("failed to call CloseWrite")
}
}
wg.Done()
}
type NamedReadWriter struct {
ReadWriter io.ReadWriter
Name string
}

func Bicopy(context string, x, y NamedReadWriter) {
var wg sync.WaitGroup
wg.Add(2)
go broker(x, y)
go broker(y, x)
finish := make(chan struct{})
go func() {
wg.Wait()
close(finish)
}()

select {
case <-quit:
case <-finish:
}
if xCloser, ok := x.(io.Closer); ok {
if err := xCloser.Close(); err != nil {
logrus.WithError(err).Debug("failed to call xCloser.Close")
defer wg.Done()
if _, err := io.Copy(x.ReadWriter, y.ReadWriter); err != nil {
logrus.WithError(err).Errorf("%s: io.Copy(%s, %s)", context, x.Name, y.Name)
}
}
if yCloser, ok := y.(io.Closer); ok {
if err := yCloser.Close(); err != nil {
logrus.WithError(err).Debug("failed to call yCloser.Close")
}()
go func() {
defer wg.Done()
if _, err := io.Copy(y.ReadWriter, x.ReadWriter); err != nil {
logrus.WithError(err).Errorf("%s: io.Copy(%s, %s)", context, y.Name, x.Name)
}
}
<-finish
// TODO: return copied bytes
}()
wg.Wait()
}
21 changes: 12 additions & 9 deletions pkg/hostagent/port_darwin.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,23 +134,26 @@ func (plf *pseudoLoopbackForwarder) Serve() error {
ac.Close()
continue
}
go func(ac *net.TCPConn) {
if fErr := plf.forward(ac); fErr != nil {
logrus.Error(fErr)
}
}(ac)
go plf.forward(ac)
}
}

func (plf *pseudoLoopbackForwarder) forward(ac *net.TCPConn) error {
func (plf *pseudoLoopbackForwarder) forward(ac *net.TCPConn) {
defer ac.Close()
unixConn, err := net.DialUnix("unix", nil, plf.unixAddr)
if err != nil {
return err
logrus.WithError(err).Errorf("pseudoloopback forwarder: failed to dial %q", plf.unixAddr)
return
}
defer unixConn.Close()
bicopy.Bicopy(ac, unixConn, nil)
return nil

bicopy.Bicopy("pseudoloopback forwarder", bicopy.NamedReadWriter{
ReadWriter: ac,
Name: "tcp",
}, bicopy.NamedReadWriter{
ReadWriter: unixConn,
Name: "unix",
})
}

func (plf *pseudoLoopbackForwarder) Close() error {
Expand Down
27 changes: 8 additions & 19 deletions pkg/portfwd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io"
"net"

"github.com/lima-vm/lima/pkg/bicopy"
"github.com/lima-vm/lima/pkg/guestagent/api"
guestagentclient "github.com/lima-vm/lima/pkg/guestagent/api/client"
"github.com/sirupsen/logrus"
Expand All @@ -24,27 +25,15 @@ func HandleTCPConnection(ctx context.Context, client *guestagentclient.GuestAgen
return
}

g, _ := errgroup.WithContext(ctx)

rw := &GrpcClientRW{stream: stream, id: id, addr: guestAddr}
g.Go(func() error {
_, err := io.Copy(rw, conn)
if errors.Is(err, io.EOF) {
return nil
}
return err
})
g.Go(func() error {
_, err := io.Copy(conn, rw)
if errors.Is(err, io.EOF) {
return nil
}
return err
})

if err := g.Wait(); err != nil {
logrus.Debugf("error in tcp tunnel for id: %s error:%v", id, err)
}
bicopy.Bicopy("tcp tunnel", bicopy.NamedReadWriter{
ReadWriter: rw,
Name: guestAddr,
}, bicopy.NamedReadWriter{
ReadWriter: conn,
Name: id,
})
}

func HandleUDPConnection(ctx context.Context, client *guestagentclient.GuestAgentClient, conn net.PacketConn, guestAddr string) {
Expand Down
27 changes: 8 additions & 19 deletions pkg/vz/network_darwin.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ import (
"io"
"net"
"os"
"sync"
"syscall"
"time"

"github.com/balajiv113/fd"
"github.com/lima-vm/lima/pkg/bicopy"

"github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -66,24 +66,13 @@ func forwardPackets(qemuConn *qemuPacketConn, vzConn *packetConn) {
defer qemuConn.Close()
defer vzConn.Close()

var wg sync.WaitGroup
wg.Add(2)

go func() {
defer wg.Done()
if _, err := io.Copy(qemuConn, vzConn); err != nil {
logrus.Errorf("Failed to forward packets from VZ to VMNET: %s", err)
}
}()

go func() {
defer wg.Done()
if _, err := io.Copy(vzConn, qemuConn); err != nil {
logrus.Errorf("Failed to forward packets from VMNET to VZ: %s", err)
}
}()

wg.Wait()
bicopy.Bicopy("forwarding packets", bicopy.NamedReadWriter{
ReadWriter: qemuConn,
Name: "VMNET",
}, bicopy.NamedReadWriter{
ReadWriter: vzConn,
Name: "VZ",
})
}

// qemuPacketConn converts raw network packet to a QEMU supported network packet.
Expand Down

0 comments on commit 553ebd9

Please sign in to comment.