From b21112f8de2d6a025eae9587ff18f7ebcc821a9f Mon Sep 17 00:00:00 2001 From: Mikhail Nozdrachev Date: Wed, 24 Jan 2024 08:41:52 +0100 Subject: [PATCH] receive: race condition in handler Close() when stopped early (#7087) Receiver hangs waiting for the HTTP Hander to shutdown if an error occurs before Handler is initialized. This might happen, for example, if the hashring is too small for a given replication factor. Signed-off-by: Mikhail Nozdrachev --- pkg/receive/handler.go | 39 +++++++++++++++++-------------------- pkg/receive/handler_test.go | 9 +++++++++ 2 files changed, 27 insertions(+), 21 deletions(-) diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go index ba4b9b94ce..e10874d749 100644 --- a/pkg/receive/handler.go +++ b/pkg/receive/handler.go @@ -98,11 +98,11 @@ type Options struct { // Handler serves a Prometheus remote write receiving HTTP endpoint. type Handler struct { - logger log.Logger - writer *Writer - router *route.Router - options *Options - listener net.Listener + logger log.Logger + writer *Writer + router *route.Router + options *Options + httpSrv *http.Server mtx sync.RWMutex hashring Hashring @@ -241,6 +241,14 @@ func NewHandler(logger log.Logger, o *Options) *Handler { }) statusAPI.Register(h.router, o.Tracer, logger, ins, logging.NewHTTPServerMiddleware(logger)) + errlog := stdlog.New(log.NewStdlibAdapter(level.Error(h.logger)), "", 0) + + h.httpSrv = &http.Server{ + Handler: h.router, + ErrorLog: errlog, + TLSConfig: h.options.TLSConfig, + } + return h } @@ -364,42 +372,31 @@ func (h *Handler) getStats(r *http.Request, statsByLabelName string) ([]statusap // Close stops the Handler. func (h *Handler) Close() { - if h.listener != nil { - runutil.CloseWithLogOnErr(h.logger, h.listener, "receive HTTP listener") - } + runutil.CloseWithLogOnErr(h.logger, h.httpSrv, "receive HTTP server") } // Run serves the HTTP endpoints. func (h *Handler) Run() error { level.Info(h.logger).Log("msg", "Start listening for connections", "address", h.options.ListenAddress) - var err error - h.listener, err = net.Listen("tcp", h.options.ListenAddress) + listener, err := net.Listen("tcp", h.options.ListenAddress) if err != nil { return err } // Monitor incoming connections with conntrack. - h.listener = conntrack.NewListener(h.listener, + listener = conntrack.NewListener(listener, conntrack.TrackWithName("http"), conntrack.TrackWithTracing()) - errlog := stdlog.New(log.NewStdlibAdapter(level.Error(h.logger)), "", 0) - - httpSrv := &http.Server{ - Handler: h.router, - ErrorLog: errlog, - TLSConfig: h.options.TLSConfig, - } - if h.options.TLSConfig != nil { level.Info(h.logger).Log("msg", "Serving HTTPS", "address", h.options.ListenAddress) // Cert & Key are already being passed in via TLSConfig. - return httpSrv.ServeTLS(h.listener, "", "") + return h.httpSrv.ServeTLS(listener, "", "") } level.Info(h.logger).Log("msg", "Serving plain HTTP", "address", h.options.ListenAddress) - return httpSrv.Serve(h.listener) + return h.httpSrv.Serve(listener) } // replica encapsulates the replica number of a request and if the request is diff --git a/pkg/receive/handler_test.go b/pkg/receive/handler_test.go index a693843aff..e7e0d316c9 100644 --- a/pkg/receive/handler_test.go +++ b/pkg/receive/handler_test.go @@ -1645,3 +1645,12 @@ func TestHashringChangeCallsClose(t *testing.T) { pg := allHandlers[0].peers.(*fakePeersGroup) testutil.Assert(t, len(pg.closeCalled) > 0) } + +func TestHandlerEarlyStop(t *testing.T) { + h := NewHandler(nil, &Options{}) + h.Close() + + err := h.Run() + testutil.NotOk(t, err) + testutil.Equals(t, "http: Server closed", err.Error()) +}