Skip to content

Commit

Permalink
receive: race condition in handler Close() when stopped early (thanos…
Browse files Browse the repository at this point in the history
…-io#7087)

Receiver hangs waiting for the HTTP Hander to shutdown if an error occurs
before Handler is initialized. This might happen, for example, if the hashring
is too small for a given replication factor.

Signed-off-by: Mikhail Nozdrachev <[email protected]>
  • Loading branch information
cincinnat authored and jnyi committed Apr 4, 2024
1 parent b8d4c36 commit b21112f
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 21 deletions.
39 changes: 18 additions & 21 deletions pkg/receive/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,11 @@ type Options struct {

// Handler serves a Prometheus remote write receiving HTTP endpoint.
type Handler struct {
logger log.Logger
writer *Writer
router *route.Router
options *Options
listener net.Listener
logger log.Logger
writer *Writer
router *route.Router
options *Options
httpSrv *http.Server

mtx sync.RWMutex
hashring Hashring
Expand Down Expand Up @@ -241,6 +241,14 @@ func NewHandler(logger log.Logger, o *Options) *Handler {
})
statusAPI.Register(h.router, o.Tracer, logger, ins, logging.NewHTTPServerMiddleware(logger))

errlog := stdlog.New(log.NewStdlibAdapter(level.Error(h.logger)), "", 0)

h.httpSrv = &http.Server{
Handler: h.router,
ErrorLog: errlog,
TLSConfig: h.options.TLSConfig,
}

return h
}

Expand Down Expand Up @@ -364,42 +372,31 @@ func (h *Handler) getStats(r *http.Request, statsByLabelName string) ([]statusap

// Close stops the Handler.
func (h *Handler) Close() {
if h.listener != nil {
runutil.CloseWithLogOnErr(h.logger, h.listener, "receive HTTP listener")
}
runutil.CloseWithLogOnErr(h.logger, h.httpSrv, "receive HTTP server")
}

// Run serves the HTTP endpoints.
func (h *Handler) Run() error {
level.Info(h.logger).Log("msg", "Start listening for connections", "address", h.options.ListenAddress)

var err error
h.listener, err = net.Listen("tcp", h.options.ListenAddress)
listener, err := net.Listen("tcp", h.options.ListenAddress)
if err != nil {
return err
}

// Monitor incoming connections with conntrack.
h.listener = conntrack.NewListener(h.listener,
listener = conntrack.NewListener(listener,
conntrack.TrackWithName("http"),
conntrack.TrackWithTracing())

errlog := stdlog.New(log.NewStdlibAdapter(level.Error(h.logger)), "", 0)

httpSrv := &http.Server{
Handler: h.router,
ErrorLog: errlog,
TLSConfig: h.options.TLSConfig,
}

if h.options.TLSConfig != nil {
level.Info(h.logger).Log("msg", "Serving HTTPS", "address", h.options.ListenAddress)
// Cert & Key are already being passed in via TLSConfig.
return httpSrv.ServeTLS(h.listener, "", "")
return h.httpSrv.ServeTLS(listener, "", "")
}

level.Info(h.logger).Log("msg", "Serving plain HTTP", "address", h.options.ListenAddress)
return httpSrv.Serve(h.listener)
return h.httpSrv.Serve(listener)
}

// replica encapsulates the replica number of a request and if the request is
Expand Down
9 changes: 9 additions & 0 deletions pkg/receive/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1645,3 +1645,12 @@ func TestHashringChangeCallsClose(t *testing.T) {
pg := allHandlers[0].peers.(*fakePeersGroup)
testutil.Assert(t, len(pg.closeCalled) > 0)
}

func TestHandlerEarlyStop(t *testing.T) {
h := NewHandler(nil, &Options{})
h.Close()

err := h.Run()
testutil.NotOk(t, err)
testutil.Equals(t, "http: Server closed", err.Error())
}

0 comments on commit b21112f

Please sign in to comment.