Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add MPTCP support #898

Open
wants to merge 5 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,13 @@ type ChannelOptions struct {
// This is an unstable API - breaking changes are likely.
RelayTimerVerification bool

// EnableMPTCP enables MPTCP for TCP network connection to increase reliability.
// It requires underlying operating system support MPTCP.
// If EnableMPTCP is false or no MPTCP support, the connection will use normal TCP.
// It's set to false by default.
zhiyipanuber marked this conversation as resolved.
Show resolved Hide resolved
// If a Dialer is passed as option, this value will be ignored.
EnableMPTCP bool

// The reporter to use for reporting stats for this channel.
StatsReporter StatsReporter

Expand Down Expand Up @@ -184,6 +191,7 @@ type Channel struct {
relayMaxConnTimeout time.Duration
relayMaxTombs uint64
relayTimerVerify bool
enableMPTCP bool
internalHandlers *handlerMap
handler Handler
onPeerStatusChanged func(*Peer)
Expand Down Expand Up @@ -275,8 +283,12 @@ func NewChannel(serviceName string, opts *ChannelOptions) (*Channel, error) {
return nil, err
}

// Default to dialContext if dialer is not passed in as an option
// Default to dialContext or dialMPTCPContex
// if dialer is not passed in as an option
dialCtx := dialContext
if opts.EnableMPTCP {
dialCtx = dialMPTCPContext
}
if opts.Dialer != nil {
dialCtx = func(ctx context.Context, hostPort string) (net.Conn, error) {
return opts.Dialer(ctx, "tcp", hostPort)
Comment on lines 292 to 294
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens if I set both EnableMPTCP and Dialer?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we added a comment regarding this

Expand Down Expand Up @@ -306,6 +318,7 @@ func NewChannel(serviceName string, opts *ChannelOptions) (*Channel, error) {
relayMaxConnTimeout: opts.RelayMaxConnectionTimeout,
relayMaxTombs: opts.RelayMaxTombs,
relayTimerVerify: opts.RelayTimerVerification,
enableMPTCP: opts.EnableMPTCP,
dialer: dialCtx,
connContext: opts.ConnContext,
closed: make(chan struct{}),
Expand Down Expand Up @@ -402,7 +415,9 @@ func (ch *Channel) ListenAndServe(hostPort string) error {
return errAlreadyListening
}

l, err := net.Listen("tcp", hostPort)
lc := &net.ListenConfig{}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AllenLuUber @jronak do you know how we handle configuring ingress mTLS?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We create listener wrapped around tls server around it and pass the wrapped listener into tchannel Serve. To enable mptcp, we need to set the flags on listener in yarpc

lc.SetMultipathTCP(ch.enableMPTCP)
l, err := lc.Listen(context.Background(), "tcp", hostPort)
if err != nil {
mutable.RUnlock()
return err
Expand Down
35 changes: 19 additions & 16 deletions channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,24 +44,27 @@ func toMap(fields LogFields) map[string]interface{} {
}

func TestNewChannel(t *testing.T) {
ch, err := NewChannel("svc", &ChannelOptions{
ProcessName: "pname",
})
require.NoError(t, err, "NewChannel failed")

assert.Equal(t, LocalPeerInfo{
ServiceName: "svc",
PeerInfo: PeerInfo{
for _, mptcp := range []bool{true, false} {
ch, err := NewChannel("svc", &ChannelOptions{
ProcessName: "pname",
HostPort: ephemeralHostPort,
IsEphemeral: true,
Version: PeerVersion{
Language: "go",
LanguageVersion: strings.TrimPrefix(runtime.Version(), "go"),
TChannelVersion: VersionInfo,
EnableMPTCP: mptcp,
})
require.NoError(t, err, "NewChannel failed")

assert.Equal(t, LocalPeerInfo{
ServiceName: "svc",
PeerInfo: PeerInfo{
ProcessName: "pname",
HostPort: ephemeralHostPort,
IsEphemeral: true,
Version: PeerVersion{
Language: "go",
LanguageVersion: strings.TrimPrefix(runtime.Version(), "go"),
TChannelVersion: VersionInfo,
},
},
},
}, ch.PeerInfo(), "Wrong local peer info")
}, ch.PeerInfo(), "Wrong local peer info")
}
}
zhiyipanuber marked this conversation as resolved.
Show resolved Hide resolved

func TestLoggers(t *testing.T) {
Expand Down
6 changes: 6 additions & 0 deletions dial_17.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,9 @@ func dialContext(ctx context.Context, hostPort string) (net.Conn, error) {
d := net.Dialer{}
return d.DialContext(ctx, "tcp", hostPort)
}

func dialMPTCPContext(ctx context.Context, hostPort string) (net.Conn, error) {
d := net.Dialer{}
d.SetMultipathTCP(true)
return d.DialContext(ctx, "tcp", hostPort)
}