Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cleanup: refactor and use bicopy everywhere #2944

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 16 additions & 66 deletions pkg/bicopy/bicopy.go
tamird marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -1,22 +1,3 @@
// From https://raw.githubusercontent.com/norouter/norouter/v0.6.5/pkg/agent/bicopy/bicopy.go
/*
Copyright (C) NoRouter authors.

Copyright (C) libnetwork authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package bicopy

import (
Expand All @@ -26,56 +7,25 @@ import (
"github.com/sirupsen/logrus"
)

// Bicopy is from https://github.com/rootless-containers/rootlesskit/blob/v0.10.1/pkg/port/builtin/parent/tcp/tcp.go#L73-L104
// (originally from libnetwork, Apache License 2.0).
func Bicopy(x, y io.ReadWriter, quit <-chan struct{}) {
tamird marked this conversation as resolved.
Show resolved Hide resolved
type closeReader interface {
CloseRead() error
}
type closeWriter interface {
CloseWrite() error
}
var wg sync.WaitGroup
broker := func(to, from io.ReadWriter) {
if _, err := io.Copy(to, from); err != nil {
logrus.WithError(err).Debug("failed to call io.Copy")
}
tamird marked this conversation as resolved.
Show resolved Hide resolved
if fromCR, ok := from.(closeReader); ok {
if err := fromCR.CloseRead(); err != nil {
logrus.WithError(err).Debug("failed to call CloseRead")
}
}
if toCW, ok := to.(closeWriter); ok {
if err := toCW.CloseWrite(); err != nil {
logrus.WithError(err).Debug("failed to call CloseWrite")
}
}
wg.Done()
}
type NamedReadWriter struct {
ReadWriter io.ReadWriter
Name string
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason to accept io.ReadWriter and not a net.Con? This is not a library for general use, but a lima helper specific to forwarding packets around.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GrpcClientRW doesn't implement net.Conn.

}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we ensure that the entire Bicopy() finish when one of the io.Copy() finish?

I think this can be done if we do:

func broker(dst, src, dstName, srcName) {
    defer dst.Close()
    defer src.Close()
	if _, err := io.Copy(dst, src); err != nil {
		logrus.WithError(err).Errorf("io.Copy(%s, %s)", dstName, srcName)
	}   
}

Second issue, do we really need the broker function? it is pretty simple, and it may be more clear to see the entire flow in one function like we have today in vz/network_drawin.go.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we ensure that the entire Bicopy() finish when one of the io.Copy() finish?

Why does it matter? We're always running Bicopy in a goroutine, and we don't have this guarantee on any of the code paths today.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't ensure this today, so it should not block your change. Current code assumes that if we fail to read or write from socket and io.Copy() returns, the other io.Copy() will also returns since it is trying to read of write using the same socket. This may be enough, but I'm not sure.

func Bicopy(context string, x, y NamedReadWriter) {
var wg sync.WaitGroup
wg.Add(2)
go broker(x, y)
go broker(y, x)
finish := make(chan struct{})
go func() {
wg.Wait()
close(finish)
}()

select {
case <-quit:
case <-finish:
}
if xCloser, ok := x.(io.Closer); ok {
if err := xCloser.Close(); err != nil {
logrus.WithError(err).Debug("failed to call xCloser.Close")
defer wg.Done()
if _, err := io.Copy(x.ReadWriter, y.ReadWriter); err != nil {
logrus.WithError(err).Errorf("%s: io.Copy(%s, %s)", context, x.Name, y.Name)
}
}
if yCloser, ok := y.(io.Closer); ok {
if err := yCloser.Close(); err != nil {
logrus.WithError(err).Debug("failed to call yCloser.Close")
}()
go func() {
defer wg.Done()
if _, err := io.Copy(y.ReadWriter, x.ReadWriter); err != nil {
logrus.WithError(err).Errorf("%s: io.Copy(%s, %s)", context, y.Name, x.Name)
}
}
<-finish
// TODO: return copied bytes
}()
wg.Wait()
}
21 changes: 12 additions & 9 deletions pkg/hostagent/port_darwin.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,23 +134,26 @@ func (plf *pseudoLoopbackForwarder) Serve() error {
ac.Close()
continue
}
go func(ac *net.TCPConn) {
if fErr := plf.forward(ac); fErr != nil {
logrus.Error(fErr)
}
}(ac)
go plf.forward(ac)
}
}

func (plf *pseudoLoopbackForwarder) forward(ac *net.TCPConn) error {
func (plf *pseudoLoopbackForwarder) forward(ac *net.TCPConn) {
defer ac.Close()
unixConn, err := net.DialUnix("unix", nil, plf.unixAddr)
if err != nil {
return err
logrus.WithError(err).Errorf("pseudoloopback forwarder: failed to dial %q", plf.unixAddr)
return
}
defer unixConn.Close()
bicopy.Bicopy(ac, unixConn, nil)
return nil
tamird marked this conversation as resolved.
Show resolved Hide resolved

bicopy.Bicopy("pseudoloopback forwarder", bicopy.NamedReadWriter{
ReadWriter: ac,
Name: "tcp",
}, bicopy.NamedReadWriter{
ReadWriter: unixConn,
Name: "unix",
})
tamird marked this conversation as resolved.
Show resolved Hide resolved
}

func (plf *pseudoLoopbackForwarder) Close() error {
Expand Down
27 changes: 8 additions & 19 deletions pkg/portfwd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io"
"net"

"github.com/lima-vm/lima/pkg/bicopy"
"github.com/lima-vm/lima/pkg/guestagent/api"
guestagentclient "github.com/lima-vm/lima/pkg/guestagent/api/client"
"github.com/sirupsen/logrus"
Expand All @@ -24,27 +25,15 @@ func HandleTCPConnection(ctx context.Context, client *guestagentclient.GuestAgen
return
}

g, _ := errgroup.WithContext(ctx)

rw := &GrpcClientRW{stream: stream, id: id, addr: guestAddr}
g.Go(func() error {
_, err := io.Copy(rw, conn)
if errors.Is(err, io.EOF) {
return nil
}
tamird marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at io.Copy implementation:

func copyBuffer(dst Writer, src Reader, buf []byte) (written int64, err error) {
	// If the reader has a WriteTo method, use it to do the copy.
	// Avoids an allocation and a copy.
	if wt, ok := src.(WriterTo); ok {
		return wt.WriteTo(dst)
	}
	// Similarly, if the writer has a ReadFrom method, use it to do the copy.
	if rf, ok := dst.(ReaderFrom); ok {
		return rf.ReadFrom(src)
	}
	if buf == nil {
		size := 32 * 1024
		if l, ok := src.(*LimitedReader); ok && int64(size) > l.N {
			if l.N < 1 {
				size = 1
			} else {
				size = int(l.N)
			}
		}
		buf = make([]byte, size)
	}
	for {
		nr, er := src.Read(buf)
		if nr > 0 {
			nw, ew := dst.Write(buf[0:nr])
			if nw < 0 || nr < nw {
				nw = 0
				if ew == nil {
					ew = errInvalidWrite
				}
			}
			written += int64(nw)
			if ew != nil {
				err = ew
				break
			}
			if nr != nw {
				err = ErrShortWrite
				break
			}
		}
		if er != nil {
			if er != EOF {
				err = er
			}
			break
		}
	}
	return written, err
}

It will not return io.EOF from the copy loop - but it may delegate the copy to ReadFrom() or WriteTo(), which can be implemented by the actual types we copy. I don't think we have any guarantee that we will never get io.EOF.

So it may be better to keep this if we don't want to report it as an error.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ReadFrom is not permitted to return EOF:
https://cs.opensource.google/go/go/+/refs/tags/go1.23.4:src/io/io.go;l=186

WriteTo returns "Any error encountered during the write":
https://cs.opensource.google/go/go/+/refs/tags/go1.23.4:src/io/io.go;l=197
EOF does not occur on write, only on read.

return err
})
g.Go(func() error {
_, err := io.Copy(conn, rw)
if errors.Is(err, io.EOF) {
return nil
}
return err
})

if err := g.Wait(); err != nil {
logrus.Debugf("error in tcp tunnel for id: %s error:%v", id, err)
tamird marked this conversation as resolved.
Show resolved Hide resolved
}
bicopy.Bicopy("tcp tunnel", bicopy.NamedReadWriter{
ReadWriter: rw,
Name: guestAddr,
}, bicopy.NamedReadWriter{
ReadWriter: conn,
Name: id,
})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice but previously we got an more specific log about the tunnel.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could include the caller's file and line number in the error message?

}

func HandleUDPConnection(ctx context.Context, client *guestagentclient.GuestAgentClient, conn net.PacketConn, guestAddr string) {
Expand Down
27 changes: 8 additions & 19 deletions pkg/vz/network_darwin.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ import (
"io"
"net"
"os"
"sync"
"syscall"
"time"

"github.com/balajiv113/fd"
"github.com/lima-vm/lima/pkg/bicopy"

"github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -66,24 +66,13 @@ func forwardPackets(qemuConn *qemuPacketConn, vzConn *packetConn) {
defer qemuConn.Close()
defer vzConn.Close()

var wg sync.WaitGroup
wg.Add(2)

go func() {
defer wg.Done()
if _, err := io.Copy(qemuConn, vzConn); err != nil {
logrus.Errorf("Failed to forward packets from VZ to VMNET: %s", err)
tamird marked this conversation as resolved.
Show resolved Hide resolved
}
}()

go func() {
defer wg.Done()
if _, err := io.Copy(vzConn, qemuConn); err != nil {
logrus.Errorf("Failed to forward packets from VMNET to VZ: %s", err)
tamird marked this conversation as resolved.
Show resolved Hide resolved
}
}()

wg.Wait()
bicopy.Bicopy("forwarding packets", bicopy.NamedReadWriter{
ReadWriter: qemuConn,
Name: "VMNET",
}, bicopy.NamedReadWriter{
ReadWriter: vzConn,
Name: "VZ",
})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice but previously the logs were more specific to this package.

}

// qemuPacketConn converts raw network packet to a QEMU supported network packet.
Expand Down
Loading