From 8696800e628b9b66ef55c1eaba72de85f9db7457 Mon Sep 17 00:00:00 2001 From: Marco Ferrer <35935108+marcoferrer@users.noreply.github.com> Date: Mon, 31 Jan 2022 20:08:38 -0500 Subject: [PATCH] add thrift socket timeout impl (#470) * add thrift socket timeout impl * refactor server logger suppressor * remove thrift prefix for socket timeout config field --- thriftbp/server.go | 40 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 39 insertions(+), 1 deletion(-) diff --git a/thriftbp/server.go b/thriftbp/server.go index cc06c6e09..b525ef88f 100644 --- a/thriftbp/server.go +++ b/thriftbp/server.go @@ -1,15 +1,21 @@ package thriftbp import ( + "strings" "time" "github.com/apache/thrift/lib/go/thrift" + "github.com/reddit/baseplate.go/metricsbp" "github.com/reddit/baseplate.go" "github.com/reddit/baseplate.go/errorsbp" "github.com/reddit/baseplate.go/log" ) +const ( + meterNameThriftSocketErrorCounter = "thrift.socket.timeout" +) + // ServerConfig is the arg struct for both NewServer and NewBaseplateServer. // // Some of the fields are only used by NewServer and some of them are only used @@ -63,6 +69,16 @@ type ServerConfig struct { // Deprecated: No-op for now, will be removed in a future release. Timeout time.Duration + // Optional, This duration is used to set both the read and write idle timeouts + // for the thrift.TServerSocket used by the baseplate server. + // + // This is an experimental configuration and is subject to change or deprecation + // without notice. When using NewBaseplateServer, setting a socket timeout will + // also override the default thrift server logger to one that emits metrics + // instead of logs in the event of a socket disconnect. A zero value means I/O + // read or write operations will not time out. + SocketTimeout time.Duration + // Optional, used only by NewServer. // In NewBaseplateServer the address and timeout set in bp.Config() will be // used instead. @@ -78,7 +94,11 @@ func NewServer(cfg ServerConfig) (*thrift.TSimpleServer, error) { var transport thrift.TServerTransport if cfg.Socket == nil { var err error - transport, err = thrift.NewTServerSocket(cfg.Addr) + if cfg.SocketTimeout > 0 { + transport, err = thrift.NewTServerSocketTimeout(cfg.Addr, cfg.SocketTimeout) + } else { + transport, err = thrift.NewTServerSocket(cfg.Addr) + } if err != nil { return nil, err } @@ -116,12 +136,18 @@ func NewBaseplateServer( ) middlewares = append(middlewares, cfg.Middlewares...) cfg.Middlewares = middlewares + cfg.Logger = log.ZapWrapper(log.ZapWrapperArgs{ Level: bp.GetConfig().Log.Level, KVPairs: map[string]interface{}{ "from": "thrift", }, }).ToThriftLogger() + + if cfg.SocketTimeout > 0 { + cfg.Logger = suppressTimeoutLogger(cfg.Logger) + } + cfg.Addr = bp.GetConfig().Addr cfg.Socket = nil srv, err := NewServer(cfg) @@ -131,6 +157,18 @@ func NewBaseplateServer( return ApplyBaseplate(bp, srv), nil } +func suppressTimeoutLogger(logger thrift.Logger) thrift.Logger { + c := metricsbp.M.Counter(meterNameThriftSocketErrorCounter) + return func(msg string) { + if strings.Contains(msg, "i/o timeout") { + c.Add(1) + return + } + + logger(msg) + } +} + // ApplyBaseplate returns the given TSimpleServer as a baseplate Server with the // given Baseplate. //