diff --git a/thriftbp/server.go b/thriftbp/server.go index 16b4bc73e..cc06c6e09 100644 --- a/thriftbp/server.go +++ b/thriftbp/server.go @@ -46,6 +46,12 @@ type ServerConfig struct { // If not set none of the requests will be sampled. ReportPayloadSizeMetricsSampleRate float64 + // Optional, used by NewBaseplateServer and NewServer. + // + // Report the number of clients connected to the server as a runtime gauge + // with metric name of 'thrift.connections' + ReportConnectionCount bool + // Optional, used only by NewServer. // In NewBaseplateServer the address set in bp.Config() will be used instead. // @@ -69,7 +75,7 @@ type ServerConfig struct { // and protocol to serve the given TProcessor which is wrapped with the // given ProcessorMiddlewares. func NewServer(cfg ServerConfig) (*thrift.TSimpleServer, error) { - var transport *thrift.TServerSocket + var transport thrift.TServerTransport if cfg.Socket == nil { var err error transport, err = thrift.NewTServerSocket(cfg.Addr) @@ -80,6 +86,10 @@ func NewServer(cfg ServerConfig) (*thrift.TSimpleServer, error) { transport = cfg.Socket } + if cfg.ReportConnectionCount { + transport = &CountedTServerTransport{transport} + } + server := thrift.NewTSimpleServer4( thrift.WrapProcessor(cfg.Processor, cfg.Middlewares...), transport, diff --git a/thriftbp/server_transport.go b/thriftbp/server_transport.go new file mode 100644 index 000000000..3ca43a8d9 --- /dev/null +++ b/thriftbp/server_transport.go @@ -0,0 +1,54 @@ +package thriftbp + +import ( + "sync" + + "github.com/apache/thrift/lib/go/thrift" + "github.com/go-kit/kit/metrics" + + "github.com/reddit/baseplate.go/metricsbp" +) + +const meterNameTransportConnCounter = "thrift.connections" + +type CountedTServerTransport struct { + thrift.TServerTransport +} + +func (m *CountedTServerTransport) Accept() (thrift.TTransport, error) { + transport, err := m.TServerTransport.Accept() + if err != nil { + return nil, err + } + + return newCountedTTransport(transport), nil +} + +type countedTTransport struct { + thrift.TTransport + + gauge metrics.Gauge + closeOnce sync.Once +} + +func newCountedTTransport(transport thrift.TTransport) thrift.TTransport { + return &countedTTransport{ + TTransport: transport, + gauge: metricsbp.M.RuntimeGauge(meterNameTransportConnCounter), + } +} + +func (m *countedTTransport) Close() error { + m.closeOnce.Do(func() { + m.gauge.Add(-1) + }) + return m.TTransport.Close() +} + +func (m *countedTTransport) Open() error { + if err := m.TTransport.Open(); err != nil { + return err + } + m.gauge.Add(1) + return nil +}