Skip to content

Commit

Permalink
add thrift socket timeout impl (#470)
Browse files Browse the repository at this point in the history
* add thrift socket timeout impl

* refactor server logger suppressor

* remove thrift prefix for socket timeout config field
  • Loading branch information
marcoferrer authored Feb 1, 2022
1 parent 2c2736a commit 8696800
Showing 1 changed file with 39 additions and 1 deletion.
40 changes: 39 additions & 1 deletion thriftbp/server.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,21 @@
package thriftbp

import (
"strings"
"time"

"github.com/apache/thrift/lib/go/thrift"
"github.com/reddit/baseplate.go/metricsbp"

"github.com/reddit/baseplate.go"
"github.com/reddit/baseplate.go/errorsbp"
"github.com/reddit/baseplate.go/log"
)

const (
meterNameThriftSocketErrorCounter = "thrift.socket.timeout"
)

// ServerConfig is the arg struct for both NewServer and NewBaseplateServer.
//
// Some of the fields are only used by NewServer and some of them are only used
Expand Down Expand Up @@ -63,6 +69,16 @@ type ServerConfig struct {
// Deprecated: No-op for now, will be removed in a future release.
Timeout time.Duration

// Optional, This duration is used to set both the read and write idle timeouts
// for the thrift.TServerSocket used by the baseplate server.
//
// This is an experimental configuration and is subject to change or deprecation
// without notice. When using NewBaseplateServer, setting a socket timeout will
// also override the default thrift server logger to one that emits metrics
// instead of logs in the event of a socket disconnect. A zero value means I/O
// read or write operations will not time out.
SocketTimeout time.Duration

// Optional, used only by NewServer.
// In NewBaseplateServer the address and timeout set in bp.Config() will be
// used instead.
Expand All @@ -78,7 +94,11 @@ func NewServer(cfg ServerConfig) (*thrift.TSimpleServer, error) {
var transport thrift.TServerTransport
if cfg.Socket == nil {
var err error
transport, err = thrift.NewTServerSocket(cfg.Addr)
if cfg.SocketTimeout > 0 {
transport, err = thrift.NewTServerSocketTimeout(cfg.Addr, cfg.SocketTimeout)
} else {
transport, err = thrift.NewTServerSocket(cfg.Addr)
}
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -116,12 +136,18 @@ func NewBaseplateServer(
)
middlewares = append(middlewares, cfg.Middlewares...)
cfg.Middlewares = middlewares

cfg.Logger = log.ZapWrapper(log.ZapWrapperArgs{
Level: bp.GetConfig().Log.Level,
KVPairs: map[string]interface{}{
"from": "thrift",
},
}).ToThriftLogger()

if cfg.SocketTimeout > 0 {
cfg.Logger = suppressTimeoutLogger(cfg.Logger)
}

cfg.Addr = bp.GetConfig().Addr
cfg.Socket = nil
srv, err := NewServer(cfg)
Expand All @@ -131,6 +157,18 @@ func NewBaseplateServer(
return ApplyBaseplate(bp, srv), nil
}

func suppressTimeoutLogger(logger thrift.Logger) thrift.Logger {
c := metricsbp.M.Counter(meterNameThriftSocketErrorCounter)
return func(msg string) {
if strings.Contains(msg, "i/o timeout") {
c.Add(1)
return
}

logger(msg)
}
}

// ApplyBaseplate returns the given TSimpleServer as a baseplate Server with the
// given Baseplate.
//
Expand Down

0 comments on commit 8696800

Please sign in to comment.