diff --git a/CHANGELOG.md b/CHANGELOG.md index 50457bba8b5..9db8b06f0b0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -50,6 +50,15 @@ * [FEATURE] Query frontend: added new query pruning middleware to enable pruning dead code (eg. expressions that cannot produce any results) and simplifying expressions (eg. expressions that can be evaluated immediately) in queries. #9086 * [FEATURE] Ruler: added experimental configuration, `-ruler.rule-evaluation-write-enabled`, to disable writing the result of rule evaluation to ingesters. This feature can be used for testing purposes. #9060 * [FEATURE] Ingester: added experimental configuration `ingester.ignore-ooo-exemplars`. When set to `true` out of order exemplars are no longer reported to the remote write client. #9151 +* [FEATURE] gRPC: Support S2 compression. #9322 + * `-alertmanager.alertmanager-client.grpc-compression=s2` + * `-ingester.client.grpc-compression=s2` + * `-querier.frontend-client.grpc-compression=s2` + * `-querier.scheduler-client.grpc-compression=s2` + * `-query-frontend.grpc-client-config.grpc-compression=s2` + * `-query-scheduler.grpc-client-config.grpc-compression=s2` + * `-ruler.client.grpc-compression=s2` + * `-ruler.query-frontend.grpc-client-config.grpc-compression=s2` * [ENHANCEMENT] Compactor: Add `cortex_compactor_compaction_job_duration_seconds` and `cortex_compactor_compaction_job_blocks` histogram metrics to track duration of individual compaction jobs and number of blocks per job. #8371 * [ENHANCEMENT] Rules: Added per namespace max rules per rule group limit. The maximum number of rules per rule groups for all namespaces continues to be configured by `-ruler.max-rules-per-rule-group`, but now, this can be superseded by the new `-ruler.max-rules-per-rule-group-by-namespace` option on a per namespace basis. This new limit can be overridden using the overrides mechanism to be applied per-tenant. #8378 * [ENHANCEMENT] Rules: Added per namespace max rule groups per tenant limit. The maximum number of rule groups per rule tenant for all namespaces continues to be configured by `-ruler.max-rule-groups-per-tenant`, but now, this can be superseded by the new `-ruler.max-rule-groups-per-tenant-by-namespace` option on a per namespace basis. This new limit can be overridden using the overrides mechanism to be applied per-tenant. #8425 diff --git a/cmd/mimir/config-descriptor.json b/cmd/mimir/config-descriptor.json index a3f2bef7da0..4f77e1e8c86 100644 --- a/cmd/mimir/config-descriptor.json +++ b/cmd/mimir/config-descriptor.json @@ -2101,7 +2101,7 @@ "kind": "field", "name": "grpc_compression", "required": false, - "desc": "Use compression when sending messages. Supported values are: 'gzip', 'snappy' and '' (disable compression)", + "desc": "Use compression when sending messages. Supported values are: 'gzip', 'snappy', 's2' and '' (disable compression)", "fieldValue": null, "fieldDefaultValue": "", "fieldFlag": "ingester.client.grpc-compression", @@ -4838,7 +4838,7 @@ "kind": "field", "name": "grpc_compression", "required": false, - "desc": "Use compression when sending messages. Supported values are: 'gzip', 'snappy' and '' (disable compression)", + "desc": "Use compression when sending messages. Supported values are: 'gzip', 'snappy', 's2' and '' (disable compression)", "fieldValue": null, "fieldDefaultValue": "", "fieldFlag": "querier.frontend-client.grpc-compression", @@ -5100,7 +5100,7 @@ "kind": "field", "name": "grpc_compression", "required": false, - "desc": "Use compression when sending messages. Supported values are: 'gzip', 'snappy' and '' (disable compression)", + "desc": "Use compression when sending messages. Supported values are: 'gzip', 'snappy', 's2' and '' (disable compression)", "fieldValue": null, "fieldDefaultValue": "", "fieldFlag": "querier.scheduler-client.grpc-compression", @@ -5491,7 +5491,7 @@ "kind": "field", "name": "grpc_compression", "required": false, - "desc": "Use compression when sending messages. Supported values are: 'gzip', 'snappy' and '' (disable compression)", + "desc": "Use compression when sending messages. Supported values are: 'gzip', 'snappy', 's2' and '' (disable compression)", "fieldValue": null, "fieldDefaultValue": "", "fieldFlag": "query-frontend.grpc-client-config.grpc-compression", @@ -11455,7 +11455,7 @@ "kind": "field", "name": "grpc_compression", "required": false, - "desc": "Use compression when sending messages. Supported values are: 'gzip', 'snappy' and '' (disable compression)", + "desc": "Use compression when sending messages. Supported values are: 'gzip', 'snappy', 's2' and '' (disable compression)", "fieldValue": null, "fieldDefaultValue": "", "fieldFlag": "ruler.client.grpc-compression", @@ -12420,7 +12420,7 @@ "kind": "field", "name": "grpc_compression", "required": false, - "desc": "Use compression when sending messages. Supported values are: 'gzip', 'snappy' and '' (disable compression)", + "desc": "Use compression when sending messages. Supported values are: 'gzip', 'snappy', 's2' and '' (disable compression)", "fieldValue": null, "fieldDefaultValue": "", "fieldFlag": "ruler.query-frontend.grpc-client-config.grpc-compression", @@ -14677,7 +14677,7 @@ "kind": "field", "name": "grpc_compression", "required": false, - "desc": "Use compression when sending messages. Supported values are: 'gzip', 'snappy' and '' (disable compression)", + "desc": "Use compression when sending messages. Supported values are: 'gzip', 'snappy', 's2' and '' (disable compression)", "fieldValue": null, "fieldDefaultValue": "", "fieldFlag": "alertmanager.alertmanager-client.grpc-compression", @@ -16254,7 +16254,7 @@ "kind": "field", "name": "grpc_compression", "required": false, - "desc": "Use compression when sending messages. Supported values are: 'gzip', 'snappy' and '' (disable compression)", + "desc": "Use compression when sending messages. Supported values are: 'gzip', 'snappy', 's2' and '' (disable compression)", "fieldValue": null, "fieldDefaultValue": "", "fieldFlag": "query-scheduler.grpc-client-config.grpc-compression", diff --git a/cmd/mimir/help-all.txt.tmpl b/cmd/mimir/help-all.txt.tmpl index 3bc1816bdec..d33ac244065 100644 --- a/cmd/mimir/help-all.txt.tmpl +++ b/cmd/mimir/help-all.txt.tmpl @@ -154,7 +154,7 @@ Usage of ./cmd/mimir/mimir: -alertmanager.alertmanager-client.grpc-client-rate-limit-burst int Rate limit burst for gRPC client. -alertmanager.alertmanager-client.grpc-compression string - Use compression when sending messages. Supported values are: 'gzip', 'snappy' and '' (disable compression) + Use compression when sending messages. Supported values are: 'gzip', 'snappy', 's2' and '' (disable compression) -alertmanager.alertmanager-client.grpc-max-recv-msg-size int gRPC client max receive message size (bytes). (default 104857600) -alertmanager.alertmanager-client.grpc-max-send-msg-size int @@ -1414,7 +1414,7 @@ Usage of ./cmd/mimir/mimir: -ingester.client.grpc-client-rate-limit-burst int Rate limit burst for gRPC client. -ingester.client.grpc-compression string - Use compression when sending messages. Supported values are: 'gzip', 'snappy' and '' (disable compression) + Use compression when sending messages. Supported values are: 'gzip', 'snappy', 's2' and '' (disable compression) -ingester.client.grpc-max-recv-msg-size int gRPC client max receive message size (bytes). (default 104857600) -ingester.client.grpc-max-send-msg-size int @@ -1868,7 +1868,7 @@ Usage of ./cmd/mimir/mimir: -querier.frontend-client.grpc-client-rate-limit-burst int Rate limit burst for gRPC client. -querier.frontend-client.grpc-compression string - Use compression when sending messages. Supported values are: 'gzip', 'snappy' and '' (disable compression) + Use compression when sending messages. Supported values are: 'gzip', 'snappy', 's2' and '' (disable compression) -querier.frontend-client.grpc-max-recv-msg-size int gRPC client max receive message size (bytes). (default 104857600) -querier.frontend-client.grpc-max-send-msg-size int @@ -1972,7 +1972,7 @@ Usage of ./cmd/mimir/mimir: -querier.scheduler-client.grpc-client-rate-limit-burst int Rate limit burst for gRPC client. -querier.scheduler-client.grpc-compression string - Use compression when sending messages. Supported values are: 'gzip', 'snappy' and '' (disable compression) + Use compression when sending messages. Supported values are: 'gzip', 'snappy', 's2' and '' (disable compression) -querier.scheduler-client.grpc-max-recv-msg-size int gRPC client max receive message size (bytes). (default 104857600) -querier.scheduler-client.grpc-max-send-msg-size int @@ -2050,7 +2050,7 @@ Usage of ./cmd/mimir/mimir: -query-frontend.grpc-client-config.grpc-client-rate-limit-burst int Rate limit burst for gRPC client. -query-frontend.grpc-client-config.grpc-compression string - Use compression when sending messages. Supported values are: 'gzip', 'snappy' and '' (disable compression) + Use compression when sending messages. Supported values are: 'gzip', 'snappy', 's2' and '' (disable compression) -query-frontend.grpc-client-config.grpc-max-recv-msg-size int gRPC client max receive message size (bytes). (default 104857600) -query-frontend.grpc-client-config.grpc-max-send-msg-size int @@ -2256,7 +2256,7 @@ Usage of ./cmd/mimir/mimir: -query-scheduler.grpc-client-config.grpc-client-rate-limit-burst int Rate limit burst for gRPC client. -query-scheduler.grpc-client-config.grpc-compression string - Use compression when sending messages. Supported values are: 'gzip', 'snappy' and '' (disable compression) + Use compression when sending messages. Supported values are: 'gzip', 'snappy', 's2' and '' (disable compression) -query-scheduler.grpc-client-config.grpc-max-recv-msg-size int gRPC client max receive message size (bytes). (default 104857600) -query-scheduler.grpc-client-config.grpc-max-send-msg-size int @@ -2628,7 +2628,7 @@ Usage of ./cmd/mimir/mimir: -ruler.client.grpc-client-rate-limit-burst int Rate limit burst for gRPC client. -ruler.client.grpc-compression string - Use compression when sending messages. Supported values are: 'gzip', 'snappy' and '' (disable compression) + Use compression when sending messages. Supported values are: 'gzip', 'snappy', 's2' and '' (disable compression) -ruler.client.grpc-max-recv-msg-size int gRPC client max receive message size (bytes). (default 104857600) -ruler.client.grpc-max-send-msg-size int @@ -2716,7 +2716,7 @@ Usage of ./cmd/mimir/mimir: -ruler.query-frontend.grpc-client-config.grpc-client-rate-limit-burst int Rate limit burst for gRPC client. -ruler.query-frontend.grpc-client-config.grpc-compression string - Use compression when sending messages. Supported values are: 'gzip', 'snappy' and '' (disable compression) + Use compression when sending messages. Supported values are: 'gzip', 'snappy', 's2' and '' (disable compression) -ruler.query-frontend.grpc-client-config.grpc-max-recv-msg-size int gRPC client max receive message size (bytes). (default 104857600) -ruler.query-frontend.grpc-client-config.grpc-max-send-msg-size int diff --git a/docs/sources/mimir/configure/configuration-parameters/index.md b/docs/sources/mimir/configure/configuration-parameters/index.md index 873efb02df3..3ae87a15d31 100644 --- a/docs/sources/mimir/configure/configuration-parameters/index.md +++ b/docs/sources/mimir/configure/configuration-parameters/index.md @@ -2337,7 +2337,7 @@ alertmanager_client: [max_send_msg_size: | default = 104857600] # (advanced) Use compression when sending messages. Supported values are: - # 'gzip', 'snappy' and '' (disable compression) + # 'gzip', 'snappy', 's2' and '' (disable compression) # CLI flag: -alertmanager.alertmanager-client.grpc-compression [grpc_compression: | default = ""] @@ -2592,7 +2592,7 @@ The `grpc_client` block configures the gRPC client used to communicate between t [max_send_msg_size: | default = 104857600] # (advanced) Use compression when sending messages. Supported values are: -# 'gzip', 'snappy' and '' (disable compression) +# 'gzip', 'snappy', 's2' and '' (disable compression) # CLI flag: -.grpc-compression [grpc_compression: | default = ""] diff --git a/go.mod b/go.mod index 42d25c4996e..09adc1e7ab3 100644 --- a/go.mod +++ b/go.mod @@ -201,7 +201,7 @@ require ( github.com/google/go-querystring v1.1.0 // indirect github.com/google/pprof v0.0.0-20240711041743-f6c9dda6c6da // indirect github.com/google/s2a-go v0.1.8 // indirect - github.com/googleapis/enterprise-certificate-proxy v0.3.3 // indirect + github.com/googleapis/enterprise-certificate-proxy v0.3.4 // indirect github.com/googleapis/gax-go/v2 v2.13.0 // indirect github.com/gosimple/slug v1.1.1 // indirect github.com/grafana/gomemcache v0.0.0-20240229205252-cd6a66d6fb56 // indirect diff --git a/go.sum b/go.sum index 7c705bc13b2..ee5939f82df 100644 --- a/go.sum +++ b/go.sum @@ -1218,8 +1218,8 @@ github.com/googleapis/enterprise-certificate-proxy v0.1.0/go.mod h1:17drOmN3MwGY github.com/googleapis/enterprise-certificate-proxy v0.2.0/go.mod h1:8C0jb7/mgJe/9KK8Lm7X9ctZC2t60YyIpYEI16jx0Qg= github.com/googleapis/enterprise-certificate-proxy v0.2.1/go.mod h1:AwSRAtLfXpU5Nm3pW+v7rGDHp09LsPtGY9MduiEsR9k= github.com/googleapis/enterprise-certificate-proxy v0.2.3/go.mod h1:AwSRAtLfXpU5Nm3pW+v7rGDHp09LsPtGY9MduiEsR9k= -github.com/googleapis/enterprise-certificate-proxy v0.3.3 h1:QRje2j5GZimBzlbhGA2V2QlGNgL8G6e+wGo/+/2bWI0= -github.com/googleapis/enterprise-certificate-proxy v0.3.3/go.mod h1:YKe7cfqYXjKGpGvmSg28/fFvhNzinZQm8DGnaburhGA= +github.com/googleapis/enterprise-certificate-proxy v0.3.4 h1:XYIDZApgAnrN1c855gTgghdIA6Stxb52D5RnLI1SLyw= +github.com/googleapis/enterprise-certificate-proxy v0.3.4/go.mod h1:YKe7cfqYXjKGpGvmSg28/fFvhNzinZQm8DGnaburhGA= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= github.com/googleapis/gax-go/v2 v2.1.0/go.mod h1:Q3nei7sK6ybPYH7twZdmQpAd1MKb7pfu6SK+H1/DsU0= diff --git a/pkg/alertmanager/alertmanager_client.go b/pkg/alertmanager/alertmanager_client.go index dfb4b0d7ea5..d17d21f270b 100644 --- a/pkg/alertmanager/alertmanager_client.go +++ b/pkg/alertmanager/alertmanager_client.go @@ -20,6 +20,7 @@ import ( "google.golang.org/grpc/health/grpc_health_v1" "github.com/grafana/mimir/pkg/alertmanager/alertmanagerpb" + "github.com/grafana/mimir/pkg/util/grpcencoding/s2" ) // ClientsPool is the interface used to get the client from the pool for a specified address. @@ -45,6 +46,7 @@ type ClientConfig struct { // RegisterFlagsWithPrefix registers flags with prefix. func (cfg *ClientConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { + cfg.GRPCClientConfig.CustomCompressors = []string{s2.Name} cfg.GRPCClientConfig.RegisterFlagsWithPrefix(prefix, f) f.DurationVar(&cfg.RemoteTimeout, prefix+".remote-timeout", 2*time.Second, "Timeout for downstream alertmanagers.") } diff --git a/pkg/frontend/v2/frontend.go b/pkg/frontend/v2/frontend.go index 81ae91baf8a..ce6ac2c6ff6 100644 --- a/pkg/frontend/v2/frontend.go +++ b/pkg/frontend/v2/frontend.go @@ -38,6 +38,7 @@ import ( "github.com/grafana/mimir/pkg/querier/stats" "github.com/grafana/mimir/pkg/scheduler/schedulerdiscovery" "github.com/grafana/mimir/pkg/util/globalerror" + "github.com/grafana/mimir/pkg/util/grpcencoding/s2" "github.com/grafana/mimir/pkg/util/httpgrpcutil" "github.com/grafana/mimir/pkg/util/spanlogger" ) @@ -76,6 +77,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) { f.StringVar(&cfg.Addr, "query-frontend.instance-addr", "", "IP address to advertise to the querier (via scheduler) (default is auto-detected from network interfaces).") f.IntVar(&cfg.Port, "query-frontend.instance-port", 0, "Port to advertise to querier (via scheduler) (defaults to server.grpc-listen-port).") + cfg.GRPCClientConfig.CustomCompressors = []string{s2.Name} cfg.GRPCClientConfig.RegisterFlagsWithPrefix("query-frontend.grpc-client-config", f) } diff --git a/pkg/ingester/client/client.go b/pkg/ingester/client/client.go index 84b3ed00c5e..262dcdda789 100644 --- a/pkg/ingester/client/client.go +++ b/pkg/ingester/client/client.go @@ -16,6 +16,7 @@ import ( "github.com/grafana/mimir/pkg/mimirpb" querierapi "github.com/grafana/mimir/pkg/querier/api" + "github.com/grafana/mimir/pkg/util/grpcencoding/s2" ) // HealthAndIngesterClient is the union of IngesterClient and grpc_health_v1.HealthClient. @@ -70,6 +71,7 @@ type Config struct { // RegisterFlags registers configuration settings used by the ingester client config. func (cfg *Config) RegisterFlags(f *flag.FlagSet) { + cfg.GRPCClientConfig.CustomCompressors = []string{s2.Name} cfg.GRPCClientConfig.RegisterFlagsWithPrefix("ingester.client", f) } diff --git a/pkg/querier/worker/worker.go b/pkg/querier/worker/worker.go index da801030ba1..dcf22a10e9f 100644 --- a/pkg/querier/worker/worker.go +++ b/pkg/querier/worker/worker.go @@ -24,6 +24,7 @@ import ( "google.golang.org/grpc" "github.com/grafana/mimir/pkg/scheduler/schedulerdiscovery" + "github.com/grafana/mimir/pkg/util/grpcencoding/s2" "github.com/grafana/mimir/pkg/util/math" ) @@ -48,7 +49,9 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.StringVar(&cfg.QuerierID, "querier.id", "", "Querier ID, sent to the query-frontend to identify requests from the same querier. Defaults to hostname.") f.BoolVar(&cfg.ResponseStreamingEnabled, "querier.response-streaming-enabled", false, "Enables streaming of responses from querier to query-frontend for response types that support it (currently only `active_series` responses do).") + cfg.QueryFrontendGRPCClientConfig.CustomCompressors = []string{s2.Name} cfg.QueryFrontendGRPCClientConfig.RegisterFlagsWithPrefix("querier.frontend-client", f) + cfg.QuerySchedulerGRPCClientConfig.CustomCompressors = []string{s2.Name} cfg.QuerySchedulerGRPCClientConfig.RegisterFlagsWithPrefix("querier.scheduler-client", f) } diff --git a/pkg/ruler/remotequerier.go b/pkg/ruler/remotequerier.go index e6329ce1352..3a25145c960 100644 --- a/pkg/ruler/remotequerier.go +++ b/pkg/ruler/remotequerier.go @@ -35,6 +35,7 @@ import ( "google.golang.org/grpc/codes" "github.com/grafana/mimir/pkg/querier/api" + "github.com/grafana/mimir/pkg/util/grpcencoding/s2" "github.com/grafana/mimir/pkg/util/spanlogger" "github.com/grafana/mimir/pkg/util/version" ) @@ -75,6 +76,7 @@ func (c *QueryFrontendConfig) RegisterFlags(f *flag.FlagSet) { "GRPC listen address of the query-frontend(s). Must be a DNS address (prefixed with dns:///) "+ "to enable client side load balancing.") + c.GRPCClientConfig.CustomCompressors = []string{s2.Name} c.GRPCClientConfig.RegisterFlagsWithPrefix("ruler.query-frontend.grpc-client-config", f) f.StringVar(&c.QueryResultResponseFormat, "ruler.query-frontend.query-result-response-format", formatProtobuf, fmt.Sprintf("Format to use when retrieving query results from query-frontends. Supported values: %s", strings.Join(allFormats, ", "))) diff --git a/pkg/ruler/ruler.go b/pkg/ruler/ruler.go index daea0f20c98..9ccc2a00f23 100644 --- a/pkg/ruler/ruler.go +++ b/pkg/ruler/ruler.go @@ -40,6 +40,7 @@ import ( "github.com/grafana/mimir/pkg/ruler/rulestore" "github.com/grafana/mimir/pkg/storage/tsdb/bucketcache" "github.com/grafana/mimir/pkg/util" + "github.com/grafana/mimir/pkg/util/grpcencoding/s2" util_log "github.com/grafana/mimir/pkg/util/log" "github.com/grafana/mimir/pkg/util/spanlogger" "github.com/grafana/mimir/pkg/util/validation" @@ -167,6 +168,7 @@ func (cfg *Config) Validate(limits validation.Limits) error { // RegisterFlags adds the flags required to config this to the given FlagSet func (cfg *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) { + cfg.ClientTLSConfig.CustomCompressors = []string{s2.Name} cfg.ClientTLSConfig.RegisterFlagsWithPrefix("ruler.client", f) cfg.Ring.RegisterFlags(f, logger) cfg.Notifier.RegisterFlags(f) diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 05692da39e5..573fbced0fb 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -38,6 +38,7 @@ import ( "github.com/grafana/mimir/pkg/scheduler/schedulerdiscovery" "github.com/grafana/mimir/pkg/scheduler/schedulerpb" "github.com/grafana/mimir/pkg/util" + "github.com/grafana/mimir/pkg/util/grpcencoding/s2" "github.com/grafana/mimir/pkg/util/httpgrpcutil" "github.com/grafana/mimir/pkg/util/validation" ) @@ -106,6 +107,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) { f.BoolVar(&cfg.PrioritizeQueryComponents, "query-scheduler.prioritize-query-components", false, "When enabled, the query scheduler primarily prioritizes dequeuing fairly from queue components and secondarily prioritizes dequeuing fairly across tenants. When disabled, the query scheduler primarily prioritizes tenant fairness. You must enable the `query-scheduler.use-multi-algorithm-query-queue` setting to use this flag.") f.DurationVar(&cfg.QuerierForgetDelay, "query-scheduler.querier-forget-delay", 0, "If a querier disconnects without sending notification about graceful shutdown, the query-scheduler will keep the querier in the tenant's shard until the forget delay has passed. This feature is useful to reduce the blast radius when shuffle-sharding is enabled.") + cfg.GRPCClientConfig.CustomCompressors = []string{s2.Name} cfg.GRPCClientConfig.RegisterFlagsWithPrefix("query-scheduler.grpc-client-config", f) cfg.ServiceDiscovery.RegisterFlags(f, logger) } diff --git a/pkg/util/grpcencoding/s2/s2.go b/pkg/util/grpcencoding/s2/s2.go new file mode 100644 index 00000000000..2bb4ea0f4af --- /dev/null +++ b/pkg/util/grpcencoding/s2/s2.go @@ -0,0 +1,88 @@ +// SPDX-License-Identifier: AGPL-3.0-only +// Provenance-includes-location: https://github.com/mostynb/go-grpc-compression/blob/f7e92b39057ca421a6485f650243a3e804036498/internal/s2/s2.go +// Provenance-includes-license: Apache-2.0 +// Provenance-includes-copyright: Copyright 2022 Mostyn Bramley-Moore. + +// Package s2 is an experimental wrapper for using +// github.com/klauspost/compress/s2 stream compression with gRPC. +package s2 + +import ( + "errors" + "io" + "sync" + + "github.com/klauspost/compress/s2" + "google.golang.org/grpc/encoding" +) + +const ( + // Name is the name of the S2 compressor. + Name = "s2" +) + +type compressor struct { + name string + poolCompressor sync.Pool + poolDecompressor sync.Pool +} + +type writer struct { + *s2.Writer + pool *sync.Pool +} + +type reader struct { + *s2.Reader + pool *sync.Pool +} + +func init() { + encoding.RegisterCompressor(newCompressor()) +} + +func newCompressor() *compressor { + opts := []s2.WriterOption{s2.WriterConcurrency(1)} + c := &compressor{ + name: Name, + } + c.poolCompressor.New = func() interface{} { + w := s2.NewWriter(io.Discard, opts...) + return &writer{Writer: w, pool: &c.poolCompressor} + } + return c +} + +func (c *compressor) Compress(w io.Writer) (io.WriteCloser, error) { + s := c.poolCompressor.Get().(*writer) + s.Writer.Reset(w) + return s, nil +} + +func (c *compressor) Decompress(r io.Reader) (io.Reader, error) { + s, inPool := c.poolDecompressor.Get().(*reader) + if !inPool { + newR := s2.NewReader(r) + return &reader{Reader: newR, pool: &c.poolDecompressor}, nil + } + s.Reset(r) + return s, nil +} + +func (c *compressor) Name() string { + return c.name +} + +func (s *writer) Close() error { + err := s.Writer.Close() + s.pool.Put(s) + return err +} + +func (s *reader) Read(p []byte) (n int, err error) { + n, err = s.Reader.Read(p) + if errors.Is(err, io.EOF) { + s.pool.Put(s) + } + return n, err +} diff --git a/pkg/util/grpcencoding/s2/s2_test.go b/pkg/util/grpcencoding/s2/s2_test.go new file mode 100644 index 00000000000..e87961cdd86 --- /dev/null +++ b/pkg/util/grpcencoding/s2/s2_test.go @@ -0,0 +1,119 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package s2 + +import ( + "bytes" + "io" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/grpc/encoding" +) + +func TestCompressor(t *testing.T) { + c := newCompressor() + require.Equal(t, "s2", c.Name()) + + testCases := []struct { + name string + input string + }{ + { + name: "empty", + input: "", + }, + { + name: "short", + input: "hello world", + }, + { + name: "long", + input: strings.Repeat("123456789", 1024), + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + var buf bytes.Buffer + // Compress + w, err := c.Compress(&buf) + require.NoError(t, err) + n, err := w.Write([]byte(tc.input)) + require.NoError(t, w.Close()) + require.NoError(t, err) + assert.Equal(t, len(tc.input), n) + + // Decompress + r, err := c.Decompress(&buf) + require.NoError(t, err) + out, err := io.ReadAll(r) + require.NoError(t, err) + assert.Equal(t, tc.input, string(out)) + }) + } +} + +func BenchmarkS2Compress(b *testing.B) { + data := []byte(strings.Repeat("123456789", 1024)) + c := newCompressor() + b.ResetTimer() + for i := 0; i < b.N; i++ { + w, err := c.Compress(io.Discard) + require.NoError(b, err) + _, err = w.Write(data) + require.NoError(b, err) + require.NoError(b, w.Close()) + } +} + +func BenchmarkS2Decompress(b *testing.B) { + data := []byte(strings.Repeat("123456789", 1024)) + c := newCompressor() + var buf bytes.Buffer + w, err := c.Compress(&buf) + require.NoError(b, err) + b.Cleanup(func() { + require.NoError(b, w.Close()) + }) + _, err = w.Write(data) + require.NoError(b, err) + reader := bytes.NewReader(buf.Bytes()) + b.ResetTimer() + for i := 0; i < b.N; i++ { + r, err := c.Decompress(reader) + require.NoError(b, err) + _, err = io.ReadAll(r) + require.NoError(b, err) + _, err = reader.Seek(0, io.SeekStart) + require.NoError(b, err) + } +} + +func BenchmarkS2GRPCCompressionPerf(b *testing.B) { + data := []byte(strings.Repeat("123456789", 1024)) + grpcc := encoding.GetCompressor(Name) + + // Reset the timer to exclude setup time from the measurements + b.ResetTimer() + + for i := 0; i < b.N; i++ { + for j := 0; j < 10; j++ { + var buf bytes.Buffer + writer, err := grpcc.Compress(&buf) + require.NoError(b, err) + _, err = writer.Write(data) + require.NoError(b, err) + err = writer.Close() + require.NoError(b, err) + + compressedData := buf.Bytes() + reader, err := grpcc.Decompress(bytes.NewReader(compressedData)) + require.NoError(b, err) + var result bytes.Buffer + _, err = result.ReadFrom(reader) + require.NoError(b, err) + } + } +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 3403ee7d40a..b9cd301bb1e 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -557,7 +557,7 @@ github.com/google/s2a-go/stream # github.com/google/uuid v1.6.0 ## explicit github.com/google/uuid -# github.com/googleapis/enterprise-certificate-proxy v0.3.3 +# github.com/googleapis/enterprise-certificate-proxy v0.3.4 ## explicit; go 1.19 github.com/googleapis/enterprise-certificate-proxy/client github.com/googleapis/enterprise-certificate-proxy/client/util