diff --git a/go.mod b/go.mod index d6b86633628..b182f6e0d4f 100644 --- a/go.mod +++ b/go.mod @@ -20,7 +20,7 @@ require ( github.com/golang/snappy v0.0.4 github.com/google/gopacket v1.1.19 github.com/gorilla/mux v1.8.0 - github.com/grafana/dskit v0.0.0-20231016230140-3866fa1df2cb + github.com/grafana/dskit v0.0.0-20231021095337-736bca5d13d8 github.com/grafana/e2e v0.1.1 github.com/hashicorp/golang-lru v1.0.2 // indirect github.com/json-iterator/go v1.1.12 diff --git a/go.sum b/go.sum index d6abee30d65..2b7b05407ca 100644 --- a/go.sum +++ b/go.sum @@ -536,8 +536,8 @@ github.com/gosimple/slug v1.1.1 h1:fRu/digW+NMwBIP+RmviTK97Ho/bEj/C9swrCspN3D4= github.com/gosimple/slug v1.1.1/go.mod h1:ER78kgg1Mv0NQGlXiDe57DpCyfbNywXXZ9mIorhxAf0= github.com/grafana-tools/sdk v0.0.0-20220919052116-6562121319fc h1:PXZQA2WCxe85Tnn+WEvr8fDpfwibmEPgfgFEaC87G24= github.com/grafana-tools/sdk v0.0.0-20220919052116-6562121319fc/go.mod h1:AHHlOEv1+GGQ3ktHMlhuTUwo3zljV3QJbC0+8o2kn+4= -github.com/grafana/dskit v0.0.0-20231016230140-3866fa1df2cb h1:QEm4X/VDJyTQIWMNgIHgAynpEYXpqRA/R2ng+vF1bog= -github.com/grafana/dskit v0.0.0-20231016230140-3866fa1df2cb/go.mod h1:byPCvaG/pqi33Kq+Wvkp7WhLfmrlyy0RAoYG4yRh01I= +github.com/grafana/dskit v0.0.0-20231021095337-736bca5d13d8 h1:Kyu05lVxS/yyZDjLoGa/O/HovpakRjVRH1jZyoU3+T0= +github.com/grafana/dskit v0.0.0-20231021095337-736bca5d13d8/go.mod h1:byPCvaG/pqi33Kq+Wvkp7WhLfmrlyy0RAoYG4yRh01I= github.com/grafana/e2e v0.1.1 h1:/b6xcv5BtoBnx8cZnCiey9DbjEc8z7gXHO5edoeRYxc= github.com/grafana/e2e v0.1.1/go.mod h1:RpNLgae5VT+BUHvPE+/zSypmOXKwEu4t+tnEMS1ATaE= github.com/grafana/goautoneg v0.0.0-20231010094147-47ce5e72a9ae h1:Yxbw9jKGJVC6qAK5Ubzzb/qZwM6rRMMqaDc/d4Vp3pM= diff --git a/vendor/github.com/grafana/dskit/grpcclient/grpcclient.go b/vendor/github.com/grafana/dskit/grpcclient/grpcclient.go index 75b1040946d..b171889d0a0 100644 --- a/vendor/github.com/grafana/dskit/grpcclient/grpcclient.go +++ b/vendor/github.com/grafana/dskit/grpcclient/grpcclient.go @@ -37,6 +37,9 @@ type Config struct { // https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md ConnectBackoffBaseDelay time.Duration `yaml:"connect_backoff_base_delay" category:"advanced"` ConnectBackoffMaxDelay time.Duration `yaml:"connect_backoff_max_delay" category:"advanced"` + + Middleware []grpc.UnaryClientInterceptor `yaml:"-"` + StreamMiddleware []grpc.StreamClientInterceptor `yaml:"-"` } // RegisterFlags registers flags. @@ -91,7 +94,8 @@ func (cfg *Config) CallOptions() []grpc.CallOption { return opts } -// DialOption returns the config as a grpc.DialOptions. +// DialOption returns the config as a grpc.DialOptions. The passed inceptors +// wrap around the configured middleware. func (cfg *Config) DialOption(unaryClientInterceptors []grpc.UnaryClientInterceptor, streamClientInterceptors []grpc.StreamClientInterceptor) ([]grpc.DialOption, error) { var opts []grpc.DialOption tlsOpts, err := cfg.TLS.GetGRPCDialOptions(cfg.TLSEnabled) @@ -100,6 +104,9 @@ func (cfg *Config) DialOption(unaryClientInterceptors []grpc.UnaryClientIntercep } opts = append(opts, tlsOpts...) + unaryClientInterceptors = append(unaryClientInterceptors, cfg.Middleware...) + streamClientInterceptors = append(streamClientInterceptors, cfg.StreamMiddleware...) + if cfg.BackoffOnRatelimits { unaryClientInterceptors = append([]grpc.UnaryClientInterceptor{NewBackoffRetry(cfg.BackoffConfig)}, unaryClientInterceptors...) } diff --git a/vendor/github.com/grafana/dskit/modules/module_service.go b/vendor/github.com/grafana/dskit/modules/module_service.go index 8ca4e25714d..5a93f61005e 100644 --- a/vendor/github.com/grafana/dskit/modules/module_service.go +++ b/vendor/github.com/grafana/dskit/modules/module_service.go @@ -85,7 +85,14 @@ func (w *moduleService) start(serviceContext context.Context) error { return errors.Wrapf(err, "error starting module: %s", w.name) } - return w.service.AwaitRunning(serviceContext) + err = w.service.AwaitRunning(serviceContext) + if errors.Is(serviceContext.Err(), context.Canceled) && errors.Is(err, context.Canceled) { + // We were asked to cancel, but the underlying service may continue starting and complete its startup after we return from moduleService.start. + // If we return an error, then we would be considered Failed and won't have moduleService.stop invoked. + // We need to invoke moduleService.stop to stop the underlying service in case it finishes starting successfully later. + return nil + } + return err } func (w *moduleService) run(serviceContext context.Context) error { @@ -97,14 +104,19 @@ func (w *moduleService) run(serviceContext context.Context) error { func (w *moduleService) stop(_ error) error { var err error - if w.service.State() == services.Running { + switch w.service.State() { + case services.Running: // Only wait for other modules, if underlying service is still running. w.waitForModulesToStop() - + // Then proceed to also stop this service if we didn't wait it out during moduleService.start. + fallthrough + case services.Starting: + // If upstream services can tolerate this service being failed and/or not started, and they have already started, + // they should track the state of this service and change state accordingly. level.Debug(w.logger).Log("msg", "stopping", "module", w.name) err = services.StopAndAwaitTerminated(context.Background(), w.service) - } else { + default: err = w.service.FailureCase() } diff --git a/vendor/github.com/grafana/dskit/ring/replication_set.go b/vendor/github.com/grafana/dskit/ring/replication_set.go index cc43331e44d..f389f4766fc 100644 --- a/vendor/github.com/grafana/dskit/ring/replication_set.go +++ b/vendor/github.com/grafana/dskit/ring/replication_set.go @@ -9,6 +9,7 @@ import ( kitlog "github.com/go-kit/log" "github.com/go-kit/log/level" + "github.com/opentracing/opentracing-go/ext" "github.com/grafana/dskit/spanlogger" ) @@ -294,7 +295,7 @@ func DoUntilQuorumWithoutSuccessfulContextCancellation[T any](ctx context.Contex terminate := func(err error) ([]T, error) { if cfg.Logger != nil { - _ = cfg.Logger.Error(err) + ext.Error.Set(cfg.Logger.Span, true) } contextTracker.cancelAllContexts() @@ -325,7 +326,7 @@ func DoUntilQuorumWithoutSuccessfulContextCancellation[T any](ctx context.Contex resultsRemaining-- if result.err != nil && cfg.IsTerminalError != nil && cfg.IsTerminalError(result.err) { - level.Error(logger).Log("msg", "cancelling all outstanding requests because a terminal error occurred", "err", result.err) + level.Warn(logger).Log("msg", "cancelling all outstanding requests because a terminal error occurred", "err", result.err) // We must return before calling resultTracker.done() below, otherwise done() might start further requests if request minimisation is enabled. return terminate(result.err) } diff --git a/vendor/github.com/grafana/dskit/ring/util.go b/vendor/github.com/grafana/dskit/ring/util.go index b5ee485ef25..a21c0f2fe2c 100644 --- a/vendor/github.com/grafana/dskit/ring/util.go +++ b/vendor/github.com/grafana/dskit/ring/util.go @@ -7,6 +7,7 @@ import ( "time" "github.com/go-kit/log" + "golang.org/x/exp/slices" "github.com/grafana/dskit/backoff" "github.com/grafana/dskit/netutil" @@ -127,9 +128,11 @@ func getZones(tokens map[string][]uint32) []string { // searchToken returns the offset of the tokens entry holding the range for the provided key. func searchToken(tokens []uint32, key uint32) int { - i := sort.Search(len(tokens), func(x int) bool { - return tokens[x] > key - }) + i, found := slices.BinarySearch(tokens, key) + if found { + // we want the first token > key, not >= key + i = i + 1 + } if i >= len(tokens) { i = 0 } diff --git a/vendor/modules.txt b/vendor/modules.txt index 4a3fa2afb8c..fd22cbbbf18 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -534,7 +534,7 @@ github.com/gosimple/slug # github.com/grafana-tools/sdk v0.0.0-20220919052116-6562121319fc ## explicit; go 1.13 github.com/grafana-tools/sdk -# github.com/grafana/dskit v0.0.0-20231016230140-3866fa1df2cb +# github.com/grafana/dskit v0.0.0-20231021095337-736bca5d13d8 ## explicit; go 1.19 github.com/grafana/dskit/backoff github.com/grafana/dskit/cache