From 6b6241f88fd0fcc0c407dd13188bcf4b64d13a22 Mon Sep 17 00:00:00 2001 From: Zhiyi Pan Date: Sun, 20 Aug 2023 11:53:24 -0700 Subject: [PATCH 1/4] Add MPTCP support --- channel.go | 24 ++++++++++++++++++++++-- channel_test.go | 22 ++++++++++++++++++++++ dial_17.go | 6 ++++++ 3 files changed, 50 insertions(+), 2 deletions(-) diff --git a/channel.go b/channel.go index 1b76277d..3597949b 100644 --- a/channel.go +++ b/channel.go @@ -94,6 +94,12 @@ type ChannelOptions struct { // This is an unstable API - breaking changes are likely. RelayTimerVerification bool + // EnableMPTCP enables MPTCP for TCP network connection to increase reliability. + // It requires underlying operating system support MPTCP. + // If EnableMPTCP is false or no MPTCP support, the connection will use normal TCP. + // It's set to false by default. + EnableMPTCP bool + // The reporter to use for reporting stats for this channel. StatsReporter StatsReporter @@ -184,6 +190,7 @@ type Channel struct { relayMaxConnTimeout time.Duration relayMaxTombs uint64 relayTimerVerify bool + enableMPTCP bool internalHandlers *handlerMap handler Handler onPeerStatusChanged func(*Peer) @@ -275,8 +282,12 @@ func NewChannel(serviceName string, opts *ChannelOptions) (*Channel, error) { return nil, err } - // Default to dialContext if dialer is not passed in as an option + // Default to dialContext or dialMPTCPContex + // if dialer is not passed in as an option dialCtx := dialContext + if opts.EnableMPTCP { + dialCtx = dialMPTCPContext + } if opts.Dialer != nil { dialCtx = func(ctx context.Context, hostPort string) (net.Conn, error) { return opts.Dialer(ctx, "tcp", hostPort) @@ -306,6 +317,7 @@ func NewChannel(serviceName string, opts *ChannelOptions) (*Channel, error) { relayMaxConnTimeout: opts.RelayMaxConnectionTimeout, relayMaxTombs: opts.RelayMaxTombs, relayTimerVerify: opts.RelayTimerVerification, + enableMPTCP: opts.EnableMPTCP, dialer: dialCtx, connContext: opts.ConnContext, closed: make(chan struct{}), @@ -402,7 +414,15 @@ func (ch *Channel) ListenAndServe(hostPort string) error { return errAlreadyListening } - l, err := net.Listen("tcp", hostPort) + var l net.Listener + var err error + if ch.enableMPTCP { + lc := &net.ListenConfig{} + lc.SetMultipathTCP(true) + l, err = lc.Listen(context.Background(), "tcp", hostPort) + } else { + l, err = net.Listen("tcp", hostPort) + } if err != nil { mutable.RUnlock() return err diff --git a/channel_test.go b/channel_test.go index 3757a922..350152cb 100644 --- a/channel_test.go +++ b/channel_test.go @@ -64,6 +64,28 @@ func TestNewChannel(t *testing.T) { }, ch.PeerInfo(), "Wrong local peer info") } +func TestNewChannelEnableMPTCP(t *testing.T) { + ch, err := NewChannel("svc", &ChannelOptions{ + ProcessName: "pname", + EnableMPTCP: true, + }) + require.NoError(t, err, "NewChannel failed") + + assert.Equal(t, LocalPeerInfo{ + ServiceName: "svc", + PeerInfo: PeerInfo{ + ProcessName: "pname", + HostPort: ephemeralHostPort, + IsEphemeral: true, + Version: PeerVersion{ + Language: "go", + LanguageVersion: strings.TrimPrefix(runtime.Version(), "go"), + TChannelVersion: VersionInfo, + }, + }, + }, ch.PeerInfo(), "Wrong local peer info") +} + func TestLoggers(t *testing.T) { ch, err := NewChannel("svc", &ChannelOptions{ Logger: NewLogger(ioutil.Discard), diff --git a/dial_17.go b/dial_17.go index 313a754a..b1d72c20 100644 --- a/dial_17.go +++ b/dial_17.go @@ -32,3 +32,9 @@ func dialContext(ctx context.Context, hostPort string) (net.Conn, error) { d := net.Dialer{} return d.DialContext(ctx, "tcp", hostPort) } + +func dialMPTCPContext(ctx context.Context, hostPort string) (net.Conn, error) { + d := net.Dialer{} + d.SetMultipathTCP(true) + return d.DialContext(ctx, "tcp", hostPort) +} From 810b98d4bc612e6cc5d7db642432d276c7898a22 Mon Sep 17 00:00:00 2001 From: Zhiyi Pan Date: Mon, 21 Aug 2023 16:13:29 -0700 Subject: [PATCH 2/4] Clean code and tests --- channel.go | 13 ++++------- channel_test.go | 57 +++++++++++++++++-------------------------------- 2 files changed, 23 insertions(+), 47 deletions(-) diff --git a/channel.go b/channel.go index 3597949b..cca873de 100644 --- a/channel.go +++ b/channel.go @@ -98,6 +98,7 @@ type ChannelOptions struct { // It requires underlying operating system support MPTCP. // If EnableMPTCP is false or no MPTCP support, the connection will use normal TCP. // It's set to false by default. + // If a Dialer is passed as option, this value will be ignored. EnableMPTCP bool // The reporter to use for reporting stats for this channel. @@ -414,15 +415,9 @@ func (ch *Channel) ListenAndServe(hostPort string) error { return errAlreadyListening } - var l net.Listener - var err error - if ch.enableMPTCP { - lc := &net.ListenConfig{} - lc.SetMultipathTCP(true) - l, err = lc.Listen(context.Background(), "tcp", hostPort) - } else { - l, err = net.Listen("tcp", hostPort) - } + lc := net.ListenConfig{} + lc.SetMultipathTCP(ch.enableMPTCP) + l, err := lc.Listen(context.Background(), "tcp", hostPort) if err != nil { mutable.RUnlock() return err diff --git a/channel_test.go b/channel_test.go index 350152cb..0c09d94c 100644 --- a/channel_test.go +++ b/channel_test.go @@ -44,46 +44,27 @@ func toMap(fields LogFields) map[string]interface{} { } func TestNewChannel(t *testing.T) { - ch, err := NewChannel("svc", &ChannelOptions{ - ProcessName: "pname", - }) - require.NoError(t, err, "NewChannel failed") - - assert.Equal(t, LocalPeerInfo{ - ServiceName: "svc", - PeerInfo: PeerInfo{ - ProcessName: "pname", - HostPort: ephemeralHostPort, - IsEphemeral: true, - Version: PeerVersion{ - Language: "go", - LanguageVersion: strings.TrimPrefix(runtime.Version(), "go"), - TChannelVersion: VersionInfo, - }, - }, - }, ch.PeerInfo(), "Wrong local peer info") -} - -func TestNewChannelEnableMPTCP(t *testing.T) { - ch, err := NewChannel("svc", &ChannelOptions{ - ProcessName: "pname", - EnableMPTCP: true, - }) - require.NoError(t, err, "NewChannel failed") - - assert.Equal(t, LocalPeerInfo{ - ServiceName: "svc", - PeerInfo: PeerInfo{ + for _, mptcp := range []bool{true, false} { + ch, err := NewChannel("svc", &ChannelOptions{ ProcessName: "pname", - HostPort: ephemeralHostPort, - IsEphemeral: true, - Version: PeerVersion{ - Language: "go", - LanguageVersion: strings.TrimPrefix(runtime.Version(), "go"), - TChannelVersion: VersionInfo, + EnableMPTCP: mptcp, + }) + require.NoError(t, err, "NewChannel failed") + + assert.Equal(t, LocalPeerInfo{ + ServiceName: "svc", + PeerInfo: PeerInfo{ + ProcessName: "pname", + HostPort: ephemeralHostPort, + IsEphemeral: true, + Version: PeerVersion{ + Language: "go", + LanguageVersion: strings.TrimPrefix(runtime.Version(), "go"), + TChannelVersion: VersionInfo, + }, }, - }, - }, ch.PeerInfo(), "Wrong local peer info") + }, ch.PeerInfo(), "Wrong local peer info") + } } func TestLoggers(t *testing.T) { From d5cc467600d64402c7276fc6838409cb8f2ba1ea Mon Sep 17 00:00:00 2001 From: Zhiyi Pan Date: Mon, 21 Aug 2023 16:26:10 -0700 Subject: [PATCH 3/4] Use pointer to construct ListenConfig --- channel.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/channel.go b/channel.go index cca873de..cbe4d692 100644 --- a/channel.go +++ b/channel.go @@ -415,7 +415,7 @@ func (ch *Channel) ListenAndServe(hostPort string) error { return errAlreadyListening } - lc := net.ListenConfig{} + lc := &net.ListenConfig{} lc.SetMultipathTCP(ch.enableMPTCP) l, err := lc.Listen(context.Background(), "tcp", hostPort) if err != nil { From 890da60d7d1d939eed510fcf2695fd51cf447e7f Mon Sep 17 00:00:00 2001 From: Zhiyi Pan Date: Mon, 21 Aug 2023 16:57:02 -0700 Subject: [PATCH 4/4] Use copy to construt ListenConfig --- channel.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/channel.go b/channel.go index cbe4d692..cca873de 100644 --- a/channel.go +++ b/channel.go @@ -415,7 +415,7 @@ func (ch *Channel) ListenAndServe(hostPort string) error { return errAlreadyListening } - lc := &net.ListenConfig{} + lc := net.ListenConfig{} lc.SetMultipathTCP(ch.enableMPTCP) l, err := lc.Listen(context.Background(), "tcp", hostPort) if err != nil {