Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kafka replay speed: upstream concurrent fetchers #9452

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -6603,6 +6603,56 @@
"fieldDefaultValue": 20000000000,
"fieldFlag": "ingest-storage.kafka.wait-strong-read-consistency-timeout",
"fieldType": "duration"
},
{
"kind": "field",
"name": "startup_fetch_concurrency",
"required": false,
"desc": "The number of concurrent fetch requests that the ingester makes when reading data from Kafka during startup. 0 to disable.",
"fieldValue": null,
"fieldDefaultValue": 0,
"fieldFlag": "ingest-storage.kafka.startup-fetch-concurrency",
"fieldType": "int"
},
{
"kind": "field",
"name": "startup_records_per_fetch",
"required": false,
"desc": "The number of records per fetch request that the ingester makes when reading data from Kafka during startup. Depends on ingest-storage.kafka.startup-fetch-concurrency being greater than 0.",
"fieldValue": null,
"fieldDefaultValue": 2500,
"fieldFlag": "ingest-storage.kafka.startup-records-per-fetch",
"fieldType": "int"
},
{
"kind": "field",
"name": "ongoing_fetch_concurrency",
"required": false,
"desc": "The number of concurrent fetch requests that the ingester makes when reading data continuously from Kafka after startup. Is disabled unless ingest-storage.kafka.startup-fetch-concurrency is greater than 0. It must be greater than 0.",
"fieldValue": null,
"fieldDefaultValue": 0,
"fieldFlag": "ingest-storage.kafka.ongoing-fetch-concurrency",
"fieldType": "int"
},
{
"kind": "field",
"name": "ongoing_records_per_fetch",
"required": false,
"desc": "The number of records per fetch request that the ingester makes when reading data continuously from Kafka after startup. Depends on ingest-storage.kafka.ongoing-fetch-concurrency being greater than 0.",
"fieldValue": null,
"fieldDefaultValue": 30,
"fieldFlag": "ingest-storage.kafka.ongoing-records-per-fetch",
"fieldType": "int"
},
{
"kind": "field",
"name": "use_compressed_bytes_as_fetch_max_bytes",
"required": false,
"desc": "When enabled, the fetch request MaxBytes field is computed using the compressed size of previous records. When disabled, MaxBytes is computed using uncompressed bytes. Different Kafka implementations interpret MaxBytes differently.",
"fieldValue": null,
"fieldDefaultValue": true,
"fieldFlag": "ingest-storage.kafka.use-compressed-bytes-as-fetch-max-bytes",
"fieldType": "boolean"
}
],
"fieldValue": null,
Expand Down
10 changes: 10 additions & 0 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -1367,14 +1367,24 @@ Usage of ./cmd/mimir/mimir:
How long to retry a failed request to get the last produced offset. (default 10s)
-ingest-storage.kafka.max-consumer-lag-at-startup duration
The guaranteed maximum lag before a consumer is considered to have caught up reading from a partition at startup, becomes ACTIVE in the hash ring and passes the readiness check. Set both -ingest-storage.kafka.target-consumer-lag-at-startup and -ingest-storage.kafka.max-consumer-lag-at-startup to 0 to disable waiting for maximum consumer lag being honored at startup. (default 15s)
-ingest-storage.kafka.ongoing-fetch-concurrency int
The number of concurrent fetch requests that the ingester makes when reading data continuously from Kafka after startup. Is disabled unless ingest-storage.kafka.startup-fetch-concurrency is greater than 0. It must be greater than 0.
-ingest-storage.kafka.ongoing-records-per-fetch int
The number of records per fetch request that the ingester makes when reading data continuously from Kafka after startup. Depends on ingest-storage.kafka.ongoing-fetch-concurrency being greater than 0. (default 30)
-ingest-storage.kafka.producer-max-buffered-bytes int
The maximum size of (uncompressed) buffered and unacknowledged produced records sent to Kafka. The produce request fails once this limit is reached. This limit is per Kafka client. 0 to disable the limit. (default 1073741824)
-ingest-storage.kafka.producer-max-record-size-bytes int
The maximum size of a Kafka record data that should be generated by the producer. An incoming write request larger than this size is split into multiple Kafka records. We strongly recommend to not change this setting unless for testing purposes. (default 15983616)
-ingest-storage.kafka.startup-fetch-concurrency int
The number of concurrent fetch requests that the ingester makes when reading data from Kafka during startup. 0 to disable.
-ingest-storage.kafka.startup-records-per-fetch int
The number of records per fetch request that the ingester makes when reading data from Kafka during startup. Depends on ingest-storage.kafka.startup-fetch-concurrency being greater than 0. (default 2500)
-ingest-storage.kafka.target-consumer-lag-at-startup duration
The best-effort maximum lag a consumer tries to achieve at startup. Set both -ingest-storage.kafka.target-consumer-lag-at-startup and -ingest-storage.kafka.max-consumer-lag-at-startup to 0 to disable waiting for maximum consumer lag being honored at startup. (default 2s)
-ingest-storage.kafka.topic string
The Kafka topic name.
-ingest-storage.kafka.use-compressed-bytes-as-fetch-max-bytes
When enabled, the fetch request MaxBytes field is computed using the compressed size of previous records. When disabled, MaxBytes is computed using uncompressed bytes. Different Kafka implementations interpret MaxBytes differently. (default true)
-ingest-storage.kafka.wait-strong-read-consistency-timeout duration
The maximum allowed for a read requests processed by an ingester to wait until strong read consistency is enforced. 0 to disable the timeout. (default 20s)
-ingest-storage.kafka.write-clients int
Expand Down
10 changes: 10 additions & 0 deletions cmd/mimir/help.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -439,14 +439,24 @@ Usage of ./cmd/mimir/mimir:
How long to retry a failed request to get the last produced offset. (default 10s)
-ingest-storage.kafka.max-consumer-lag-at-startup duration
The guaranteed maximum lag before a consumer is considered to have caught up reading from a partition at startup, becomes ACTIVE in the hash ring and passes the readiness check. Set both -ingest-storage.kafka.target-consumer-lag-at-startup and -ingest-storage.kafka.max-consumer-lag-at-startup to 0 to disable waiting for maximum consumer lag being honored at startup. (default 15s)
-ingest-storage.kafka.ongoing-fetch-concurrency int
The number of concurrent fetch requests that the ingester makes when reading data continuously from Kafka after startup. Is disabled unless ingest-storage.kafka.startup-fetch-concurrency is greater than 0. It must be greater than 0.
-ingest-storage.kafka.ongoing-records-per-fetch int
The number of records per fetch request that the ingester makes when reading data continuously from Kafka after startup. Depends on ingest-storage.kafka.ongoing-fetch-concurrency being greater than 0. (default 30)
-ingest-storage.kafka.producer-max-buffered-bytes int
The maximum size of (uncompressed) buffered and unacknowledged produced records sent to Kafka. The produce request fails once this limit is reached. This limit is per Kafka client. 0 to disable the limit. (default 1073741824)
-ingest-storage.kafka.producer-max-record-size-bytes int
The maximum size of a Kafka record data that should be generated by the producer. An incoming write request larger than this size is split into multiple Kafka records. We strongly recommend to not change this setting unless for testing purposes. (default 15983616)
-ingest-storage.kafka.startup-fetch-concurrency int
The number of concurrent fetch requests that the ingester makes when reading data from Kafka during startup. 0 to disable.
-ingest-storage.kafka.startup-records-per-fetch int
The number of records per fetch request that the ingester makes when reading data from Kafka during startup. Depends on ingest-storage.kafka.startup-fetch-concurrency being greater than 0. (default 2500)
-ingest-storage.kafka.target-consumer-lag-at-startup duration
The best-effort maximum lag a consumer tries to achieve at startup. Set both -ingest-storage.kafka.target-consumer-lag-at-startup and -ingest-storage.kafka.max-consumer-lag-at-startup to 0 to disable waiting for maximum consumer lag being honored at startup. (default 2s)
-ingest-storage.kafka.topic string
The Kafka topic name.
-ingest-storage.kafka.use-compressed-bytes-as-fetch-max-bytes
When enabled, the fetch request MaxBytes field is computed using the compressed size of previous records. When disabled, MaxBytes is computed using uncompressed bytes. Different Kafka implementations interpret MaxBytes differently. (default true)
-ingest-storage.kafka.wait-strong-read-consistency-timeout duration
The maximum allowed for a read requests processed by an ingester to wait until strong read consistency is enforced. 0 to disable the timeout. (default 20s)
-ingest-storage.kafka.write-clients int
Expand Down
31 changes: 31 additions & 0 deletions docs/sources/mimir/configure/configuration-parameters/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -3830,6 +3830,37 @@ kafka:
# CLI flag: -ingest-storage.kafka.wait-strong-read-consistency-timeout
[wait_strong_read_consistency_timeout: <duration> | default = 20s]

# The number of concurrent fetch requests that the ingester makes when reading
# data from Kafka during startup. 0 to disable.
# CLI flag: -ingest-storage.kafka.startup-fetch-concurrency
[startup_fetch_concurrency: <int> | default = 0]

# The number of records per fetch request that the ingester makes when reading
# data from Kafka during startup. Depends on
# ingest-storage.kafka.startup-fetch-concurrency being greater than 0.
# CLI flag: -ingest-storage.kafka.startup-records-per-fetch
[startup_records_per_fetch: <int> | default = 2500]

# The number of concurrent fetch requests that the ingester makes when reading
# data continuously from Kafka after startup. Is disabled unless
# ingest-storage.kafka.startup-fetch-concurrency is greater than 0. It must be
# greater than 0.
# CLI flag: -ingest-storage.kafka.ongoing-fetch-concurrency
[ongoing_fetch_concurrency: <int> | default = 0]

# The number of records per fetch request that the ingester makes when reading
# data continuously from Kafka after startup. Depends on
# ingest-storage.kafka.ongoing-fetch-concurrency being greater than 0.
# CLI flag: -ingest-storage.kafka.ongoing-records-per-fetch
[ongoing_records_per_fetch: <int> | default = 30]

# When enabled, the fetch request MaxBytes field is computed using the
# compressed size of previous records. When disabled, MaxBytes is computed
# using uncompressed bytes. Different Kafka implementations interpret MaxBytes
# differently.
# CLI flag: -ingest-storage.kafka.use-compressed-bytes-as-fetch-max-bytes
[use_compressed_bytes_as_fetch_max_bytes: <boolean> | default = true]

migration:
# When both this option and ingest storage are enabled, distributors write to
# both Kafka and ingesters. A write request is considered successful only when
Expand Down
35 changes: 35 additions & 0 deletions docs/sources/mimir/configure/configure-kafka-backend.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
---
aliases:
- ../operators-guide/configure/configure-kafka-backend/
description: Learn how to configure Grafana Mimir to use Kafka for ingest storage.
menuTitle: Kafka
title: Configure the Grafana Mimir Kafka backend
weight: 66
---

# Configure the Grafana Mimir Kafka backend

Grafana Mimir supports using Kafka for the first layer of ingestion. This is an experimental feature released in Mimir 2.14.
This page is incomplete. It will be updated as the ingest storage feature matures and moves out of the experimental phase.

## Different Kafka backend implementations

Some Kafka-compatible implementations have different behavior for the Kafka API.
To set up Mimir to work with different Kafka backends, you need to configure some parameters.
Here are the Kafka flavors and additional configurations needed to set them up in Mimir.

### Apache Kafka

Use the default options with Apache Kafka. No additional configuration is needed.

### Confluent Kafka

Use the default options with Confluent Kafka. No additional configuration is needed.

### Warpstream

Configure the following CLI flags or their YAML equivalent.

```
-ingest-storage.kafka.use-compressed-bytes-as-fetch-max-bytes=false
```
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,9 @@ replace github.com/opentracing-contrib/go-grpc => github.com/charleskorn/go-grpc
// Replacing prometheus/alertmanager with our fork.
replace github.com/prometheus/alertmanager => github.com/grafana/prometheus-alertmanager v0.25.1-0.20240924175849-b8b7c2c74eb6

// Replacing with a fork commit based on v1.17.1 with https://github.com/twmb/franz-go/pull/803 cherry-picked.
replace github.com/twmb/franz-go => github.com/dimitarvdimitrov/franz-go v0.0.0-20240904145554-ceadc28d3bd9

// Pin Google GRPC to v1.65.0 as v1.66.0 has API changes and also potentially performance regressions.
// Following https://github.com/grafana/dskit/pull/581
replace google.golang.org/grpc => google.golang.org/grpc v1.65.0
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -947,6 +947,8 @@ github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/r
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/digitalocean/godo v1.121.0 h1:ilXiHuEnhbJs2fmFEPX0r/QQ6KfiOIMAhJN3f8NiCfI=
github.com/digitalocean/godo v1.121.0/go.mod h1:WQVH83OHUy6gC4gXpEVQKtxTd4L5oCp+5OialidkPLY=
github.com/dimitarvdimitrov/franz-go v0.0.0-20240904145554-ceadc28d3bd9 h1:jszPVGeTr25QTJ/jWiT7eXnabc4R4itChxUVFSCLjRQ=
github.com/dimitarvdimitrov/franz-go v0.0.0-20240904145554-ceadc28d3bd9/go.mod h1:NreRdJ2F7dziDY/m6VyspWd6sNxHKXdMZI42UfQ3GXM=
github.com/distribution/reference v0.5.0 h1:/FUIFXtfc/x2gpa5/VGfiGLuOIdYa1t65IKK2OFGvA0=
github.com/distribution/reference v0.5.0/go.mod h1:BbU0aIcezP1/5jX/8MP0YiH4SdvB5Y4f/wlDRiLyi3E=
github.com/dlclark/regexp2 v1.11.0 h1:G/nrcoOa7ZXlpoa/91N3X7mM3r8eIlMBBJZvsz/mxKI=
Expand Down Expand Up @@ -1680,8 +1682,6 @@ github.com/tklauser/go-sysconf v0.3.12/go.mod h1:Ho14jnntGE1fpdOqQEEaiKRpvIavV0h
github.com/tklauser/numcpus v0.6.1 h1:ng9scYS7az0Bk4OZLvrNXNSAO2Pxr1XXRAPyjhIx+Fk=
github.com/tklauser/numcpus v0.6.1/go.mod h1:1XfjsgE2zo8GVw7POkMbHENHzVg3GzmoZ9fESEdAacY=
github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM=
github.com/twmb/franz-go v1.17.1 h1:0LwPsbbJeJ9R91DPUHSEd4su82WJWcTY1Zzbgbg4CeQ=
github.com/twmb/franz-go v1.17.1/go.mod h1:NreRdJ2F7dziDY/m6VyspWd6sNxHKXdMZI42UfQ3GXM=
github.com/twmb/franz-go/pkg/kadm v1.13.0 h1:bJq4C2ZikUE2jh/wl9MtMTQ/kpmnBgVFh8XMQBEC+60=
github.com/twmb/franz-go/pkg/kadm v1.13.0/go.mod h1:VMvpfjz/szpH9WB+vGM+rteTzVv0djyHFimci9qm2C0=
github.com/twmb/franz-go/pkg/kfake v0.0.0-20240821035758-b77dd13e2bfa h1:OmQ4DJhqeOPdIH60Psut1vYU8A6LGyxJbF09w5RAa2w=
Expand Down
23 changes: 23 additions & 0 deletions pkg/storage/ingest/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,12 @@ type KafkaConfig struct {

// Used when logging unsampled client errors. Set from ingester's ErrorSampleRate.
FallbackClientErrorSampleRate int64 `yaml:"-"`

StartupFetchConcurrency int `yaml:"startup_fetch_concurrency"`
StartupRecordsPerFetch int `yaml:"startup_records_per_fetch"`
OngoingFetchConcurrency int `yaml:"ongoing_fetch_concurrency"`
OngoingRecordsPerFetch int `yaml:"ongoing_records_per_fetch"`
UseCompressedBytesAsFetchMaxBytes bool `yaml:"use_compressed_bytes_as_fetch_max_bytes"`
}

func (cfg *KafkaConfig) RegisterFlags(f *flag.FlagSet) {
Expand Down Expand Up @@ -126,6 +132,12 @@ func (cfg *KafkaConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet)
f.Int64Var(&cfg.ProducerMaxBufferedBytes, prefix+".producer-max-buffered-bytes", 1024*1024*1024, "The maximum size of (uncompressed) buffered and unacknowledged produced records sent to Kafka. The produce request fails once this limit is reached. This limit is per Kafka client. 0 to disable the limit.")

f.DurationVar(&cfg.WaitStrongReadConsistencyTimeout, prefix+".wait-strong-read-consistency-timeout", 20*time.Second, "The maximum allowed for a read requests processed by an ingester to wait until strong read consistency is enforced. 0 to disable the timeout.")

f.IntVar(&cfg.StartupFetchConcurrency, prefix+".startup-fetch-concurrency", 0, "The number of concurrent fetch requests that the ingester makes when reading data from Kafka during startup. 0 to disable.")
f.IntVar(&cfg.StartupRecordsPerFetch, prefix+".startup-records-per-fetch", 2500, "The number of records per fetch request that the ingester makes when reading data from Kafka during startup. Depends on "+prefix+".startup-fetch-concurrency being greater than 0.")
f.IntVar(&cfg.OngoingFetchConcurrency, prefix+".ongoing-fetch-concurrency", 0, "The number of concurrent fetch requests that the ingester makes when reading data continuously from Kafka after startup. Is disabled unless "+prefix+".startup-fetch-concurrency is greater than 0. It must be greater than 0.")
f.IntVar(&cfg.OngoingRecordsPerFetch, prefix+".ongoing-records-per-fetch", 30, "The number of records per fetch request that the ingester makes when reading data continuously from Kafka after startup. Depends on "+prefix+".ongoing-fetch-concurrency being greater than 0.")
f.BoolVar(&cfg.UseCompressedBytesAsFetchMaxBytes, prefix+".use-compressed-bytes-as-fetch-max-bytes", true, "When enabled, the fetch request MaxBytes field is computed using the compressed size of previous records. When disabled, MaxBytes is computed using uncompressed bytes. Different Kafka implementations interpret MaxBytes differently.")
}

func (cfg *KafkaConfig) Validate() error {
Expand Down Expand Up @@ -161,6 +173,17 @@ func (cfg *KafkaConfig) Validate() error {
return ErrInvalidMaxConsumerLagAtStartup
}

if cfg.StartupFetchConcurrency < 0 {
return fmt.Errorf("ingest-storage.kafka.startup-fetch-concurrency must be greater or equal to 0")
}

if cfg.StartupFetchConcurrency > 0 && cfg.OngoingFetchConcurrency <= 0 {
return fmt.Errorf("ingest-storage.kafka.ongoing-fetch-concurrency must be greater than 0 when ingest-storage.kafka.startup-fetch-concurrency is greater than 0")
}

if cfg.StartupRecordsPerFetch <= 0 || cfg.OngoingRecordsPerFetch <= 0 {
return fmt.Errorf("ingest-storage.kafka.startup-records-per-fetch and ingest-storage.kafka.ongoing-records-per-fetch must be greater than 0")
}
return nil
}

Expand Down
Loading
Loading