Skip to content

Commit

Permalink
kafka replay speed: upstream concurrent fetchers
Browse files Browse the repository at this point in the history
This is the second of series of PRs to upstream the code for improving Kafka replay speed in the ingester.

In this PR I'm upstreaming the fetching code. The core of the change is in `concurrentFetchers`.

# `concurrentFetchers` Overview

                              * **Segmentation (fetchWant):** The fetcher divides the work into segments called fetchWants. Each fetchWant represents a range of records to be fetched, defined by a start and end offset. This segmentation allows for concurrent fetching of different parts of the topic partition.

                              * **Concurrent Fetching:** Multiple goroutines (defined by the concurrency parameter) work on fetching these segments simultaneously. Each goroutine runs the `run` method, which processes fetchWants from a channel.

                              * **Fetching Process:** For each fetchWant, the fetcher attempts to retrieve the records through the `fetchSingle` method. This method:
                                 * Finds the leader for the partition
                                 * Builds a fetch request
                                 * Sends the request to the Kafka broker
                                 * Parses the response

                              * **Multiple Attempts:** If a fetch attempt fails or doesn't retrieve all requested records, the fetcher will retry. It uses an error backoff mechanism to avoid overwhelming the system with rapid retries. The fetcher updates the start offset of the fetchWant based on the last successfully fetched record and continues until all requested records are retrieved or the context is cancelled.

                              * **Overfetching Risk:** The system risks overfetching because it might retrieve records that have already been processed. This is handled by:
                                 * Tracking the last returned record offset (`lastReturnedRecord`)
                                 * Using `recordIndexAfterOffset` to find the first new record in each fetch result
                                 * Discarding any duplicate records before passing them to the consumer

                              * **Ordering:** The fetcher ensures that segments are processed in order by:
                                 * Using a linked list (`pendingResults`) to keep track of fetch results in the order they were requested
                                 * Buffering results in `bufferedResult` and only sending them to `orderedFetches` channel when they're next in sequence
                                 * The `PollFetches` method, which consumers call to get records, receives from the `orderedFetches` channel, ensuring records are always returned in the correct order

                              * **Adaptive Fetching:** The system adapts the size of fetch requests based on previous results. It estimates the bytes per record and adjusts the `MaxBytes` parameter of fetch requests accordingly, trying to optimize the amount of data fetched in each request.

Signed-off-by: Dimitar Dimitrov <[email protected]>
  • Loading branch information
dimitarvdimitrov committed Sep 27, 2024
1 parent da02ca5 commit 2938fa9
Show file tree
Hide file tree
Showing 18 changed files with 1,818 additions and 87 deletions.
50 changes: 50 additions & 0 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -6603,6 +6603,56 @@
"fieldDefaultValue": 20000000000,
"fieldFlag": "ingest-storage.kafka.wait-strong-read-consistency-timeout",
"fieldType": "duration"
},
{
"kind": "field",
"name": "startup_fetch_concurrency",
"required": false,
"desc": "The number of concurrent fetch requests that the ingester makes when reading data from Kafka during startup. 0 to disable.",
"fieldValue": null,
"fieldDefaultValue": 0,
"fieldFlag": "ingest-storage.kafka.startup-fetch-concurrency",
"fieldType": "int"
},
{
"kind": "field",
"name": "startup_records_per_fetch",
"required": false,
"desc": "The number of records per fetch request that the ingester makes when reading data from Kafka during startup. Depends on ingest-storage.kafka.startup-fetch-concurrency being greater than 0.",
"fieldValue": null,
"fieldDefaultValue": 2500,
"fieldFlag": "ingest-storage.kafka.startup-records-per-fetch",
"fieldType": "int"
},
{
"kind": "field",
"name": "ongoing_fetch_concurrency",
"required": false,
"desc": "The number of concurrent fetch requests that the ingester makes when reading data continuously from Kafka after startup. Is disabled unless ingest-storage.kafka.startup-fetch-concurrency is greater than 0. It must be greater than 0.",
"fieldValue": null,
"fieldDefaultValue": 0,
"fieldFlag": "ingest-storage.kafka.ongoing-fetch-concurrency",
"fieldType": "int"
},
{
"kind": "field",
"name": "ongoing_records_per_fetch",
"required": false,
"desc": "The number of records per fetch request that the ingester makes when reading data continuously from Kafka after startup. Depends on ingest-storage.kafka.ongoing-fetch-concurrency being greater than 0.",
"fieldValue": null,
"fieldDefaultValue": 30,
"fieldFlag": "ingest-storage.kafka.ongoing-records-per-fetch",
"fieldType": "int"
},
{
"kind": "field",
"name": "use_compressed_bytes_as_fetch_max_bytes",
"required": false,
"desc": "When enabled, the fetch request MaxBytes field is computed using the compressed size of previous records. When disabled, MaxBytes is computed using uncompressed bytes. Different Kafka implementations interpret MaxBytes differently.",
"fieldValue": null,
"fieldDefaultValue": true,
"fieldFlag": "ingest-storage.kafka.use-compressed-bytes-as-fetch-max-bytes",
"fieldType": "boolean"
}
],
"fieldValue": null,
Expand Down
10 changes: 10 additions & 0 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -1367,14 +1367,24 @@ Usage of ./cmd/mimir/mimir:
How long to retry a failed request to get the last produced offset. (default 10s)
-ingest-storage.kafka.max-consumer-lag-at-startup duration
The guaranteed maximum lag before a consumer is considered to have caught up reading from a partition at startup, becomes ACTIVE in the hash ring and passes the readiness check. Set both -ingest-storage.kafka.target-consumer-lag-at-startup and -ingest-storage.kafka.max-consumer-lag-at-startup to 0 to disable waiting for maximum consumer lag being honored at startup. (default 15s)
-ingest-storage.kafka.ongoing-fetch-concurrency int
The number of concurrent fetch requests that the ingester makes when reading data continuously from Kafka after startup. Is disabled unless ingest-storage.kafka.startup-fetch-concurrency is greater than 0. It must be greater than 0.
-ingest-storage.kafka.ongoing-records-per-fetch int
The number of records per fetch request that the ingester makes when reading data continuously from Kafka after startup. Depends on ingest-storage.kafka.ongoing-fetch-concurrency being greater than 0. (default 30)
-ingest-storage.kafka.producer-max-buffered-bytes int
The maximum size of (uncompressed) buffered and unacknowledged produced records sent to Kafka. The produce request fails once this limit is reached. This limit is per Kafka client. 0 to disable the limit. (default 1073741824)
-ingest-storage.kafka.producer-max-record-size-bytes int
The maximum size of a Kafka record data that should be generated by the producer. An incoming write request larger than this size is split into multiple Kafka records. We strongly recommend to not change this setting unless for testing purposes. (default 15983616)
-ingest-storage.kafka.startup-fetch-concurrency int
The number of concurrent fetch requests that the ingester makes when reading data from Kafka during startup. 0 to disable.
-ingest-storage.kafka.startup-records-per-fetch int
The number of records per fetch request that the ingester makes when reading data from Kafka during startup. Depends on ingest-storage.kafka.startup-fetch-concurrency being greater than 0. (default 2500)
-ingest-storage.kafka.target-consumer-lag-at-startup duration
The best-effort maximum lag a consumer tries to achieve at startup. Set both -ingest-storage.kafka.target-consumer-lag-at-startup and -ingest-storage.kafka.max-consumer-lag-at-startup to 0 to disable waiting for maximum consumer lag being honored at startup. (default 2s)
-ingest-storage.kafka.topic string
The Kafka topic name.
-ingest-storage.kafka.use-compressed-bytes-as-fetch-max-bytes
When enabled, the fetch request MaxBytes field is computed using the compressed size of previous records. When disabled, MaxBytes is computed using uncompressed bytes. Different Kafka implementations interpret MaxBytes differently. (default true)
-ingest-storage.kafka.wait-strong-read-consistency-timeout duration
The maximum allowed for a read requests processed by an ingester to wait until strong read consistency is enforced. 0 to disable the timeout. (default 20s)
-ingest-storage.kafka.write-clients int
Expand Down
10 changes: 10 additions & 0 deletions cmd/mimir/help.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -439,14 +439,24 @@ Usage of ./cmd/mimir/mimir:
How long to retry a failed request to get the last produced offset. (default 10s)
-ingest-storage.kafka.max-consumer-lag-at-startup duration
The guaranteed maximum lag before a consumer is considered to have caught up reading from a partition at startup, becomes ACTIVE in the hash ring and passes the readiness check. Set both -ingest-storage.kafka.target-consumer-lag-at-startup and -ingest-storage.kafka.max-consumer-lag-at-startup to 0 to disable waiting for maximum consumer lag being honored at startup. (default 15s)
-ingest-storage.kafka.ongoing-fetch-concurrency int
The number of concurrent fetch requests that the ingester makes when reading data continuously from Kafka after startup. Is disabled unless ingest-storage.kafka.startup-fetch-concurrency is greater than 0. It must be greater than 0.
-ingest-storage.kafka.ongoing-records-per-fetch int
The number of records per fetch request that the ingester makes when reading data continuously from Kafka after startup. Depends on ingest-storage.kafka.ongoing-fetch-concurrency being greater than 0. (default 30)
-ingest-storage.kafka.producer-max-buffered-bytes int
The maximum size of (uncompressed) buffered and unacknowledged produced records sent to Kafka. The produce request fails once this limit is reached. This limit is per Kafka client. 0 to disable the limit. (default 1073741824)
-ingest-storage.kafka.producer-max-record-size-bytes int
The maximum size of a Kafka record data that should be generated by the producer. An incoming write request larger than this size is split into multiple Kafka records. We strongly recommend to not change this setting unless for testing purposes. (default 15983616)
-ingest-storage.kafka.startup-fetch-concurrency int
The number of concurrent fetch requests that the ingester makes when reading data from Kafka during startup. 0 to disable.
-ingest-storage.kafka.startup-records-per-fetch int
The number of records per fetch request that the ingester makes when reading data from Kafka during startup. Depends on ingest-storage.kafka.startup-fetch-concurrency being greater than 0. (default 2500)
-ingest-storage.kafka.target-consumer-lag-at-startup duration
The best-effort maximum lag a consumer tries to achieve at startup. Set both -ingest-storage.kafka.target-consumer-lag-at-startup and -ingest-storage.kafka.max-consumer-lag-at-startup to 0 to disable waiting for maximum consumer lag being honored at startup. (default 2s)
-ingest-storage.kafka.topic string
The Kafka topic name.
-ingest-storage.kafka.use-compressed-bytes-as-fetch-max-bytes
When enabled, the fetch request MaxBytes field is computed using the compressed size of previous records. When disabled, MaxBytes is computed using uncompressed bytes. Different Kafka implementations interpret MaxBytes differently. (default true)
-ingest-storage.kafka.wait-strong-read-consistency-timeout duration
The maximum allowed for a read requests processed by an ingester to wait until strong read consistency is enforced. 0 to disable the timeout. (default 20s)
-ingest-storage.kafka.write-clients int
Expand Down
31 changes: 31 additions & 0 deletions docs/sources/mimir/configure/configuration-parameters/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -3830,6 +3830,37 @@ kafka:
# CLI flag: -ingest-storage.kafka.wait-strong-read-consistency-timeout
[wait_strong_read_consistency_timeout: <duration> | default = 20s]
# The number of concurrent fetch requests that the ingester makes when reading
# data from Kafka during startup. 0 to disable.
# CLI flag: -ingest-storage.kafka.startup-fetch-concurrency
[startup_fetch_concurrency: <int> | default = 0]
# The number of records per fetch request that the ingester makes when reading
# data from Kafka during startup. Depends on
# ingest-storage.kafka.startup-fetch-concurrency being greater than 0.
# CLI flag: -ingest-storage.kafka.startup-records-per-fetch
[startup_records_per_fetch: <int> | default = 2500]
# The number of concurrent fetch requests that the ingester makes when reading
# data continuously from Kafka after startup. Is disabled unless
# ingest-storage.kafka.startup-fetch-concurrency is greater than 0. It must be
# greater than 0.
# CLI flag: -ingest-storage.kafka.ongoing-fetch-concurrency
[ongoing_fetch_concurrency: <int> | default = 0]
# The number of records per fetch request that the ingester makes when reading
# data continuously from Kafka after startup. Depends on
# ingest-storage.kafka.ongoing-fetch-concurrency being greater than 0.
# CLI flag: -ingest-storage.kafka.ongoing-records-per-fetch
[ongoing_records_per_fetch: <int> | default = 30]
# When enabled, the fetch request MaxBytes field is computed using the
# compressed size of previous records. When disabled, MaxBytes is computed
# using uncompressed bytes. Different Kafka implementations interpret MaxBytes
# differently.
# CLI flag: -ingest-storage.kafka.use-compressed-bytes-as-fetch-max-bytes
[use_compressed_bytes_as_fetch_max_bytes: <boolean> | default = true]
migration:
# When both this option and ingest storage are enabled, distributors write to
# both Kafka and ingesters. A write request is considered successful only when
Expand Down
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,9 @@ replace github.com/opentracing-contrib/go-grpc => github.com/charleskorn/go-grpc
// Replacing prometheus/alertmanager with our fork.
replace github.com/prometheus/alertmanager => github.com/grafana/prometheus-alertmanager v0.25.1-0.20240924175849-b8b7c2c74eb6

// Replacing with a fork commit based on v1.17.1 with https://github.com/twmb/franz-go/pull/803 cherry-picked.
replace github.com/twmb/franz-go => github.com/dimitarvdimitrov/franz-go v0.0.0-20240904145554-ceadc28d3bd9

// Pin Google GRPC to v1.65.0 as v1.66.0 has API changes and also potentially performance regressions.
// Following https://github.com/grafana/dskit/pull/581
replace google.golang.org/grpc => google.golang.org/grpc v1.65.0
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -947,6 +947,8 @@ github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/r
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/digitalocean/godo v1.121.0 h1:ilXiHuEnhbJs2fmFEPX0r/QQ6KfiOIMAhJN3f8NiCfI=
github.com/digitalocean/godo v1.121.0/go.mod h1:WQVH83OHUy6gC4gXpEVQKtxTd4L5oCp+5OialidkPLY=
github.com/dimitarvdimitrov/franz-go v0.0.0-20240904145554-ceadc28d3bd9 h1:jszPVGeTr25QTJ/jWiT7eXnabc4R4itChxUVFSCLjRQ=
github.com/dimitarvdimitrov/franz-go v0.0.0-20240904145554-ceadc28d3bd9/go.mod h1:NreRdJ2F7dziDY/m6VyspWd6sNxHKXdMZI42UfQ3GXM=
github.com/distribution/reference v0.5.0 h1:/FUIFXtfc/x2gpa5/VGfiGLuOIdYa1t65IKK2OFGvA0=
github.com/distribution/reference v0.5.0/go.mod h1:BbU0aIcezP1/5jX/8MP0YiH4SdvB5Y4f/wlDRiLyi3E=
github.com/dlclark/regexp2 v1.11.0 h1:G/nrcoOa7ZXlpoa/91N3X7mM3r8eIlMBBJZvsz/mxKI=
Expand Down Expand Up @@ -1680,8 +1682,6 @@ github.com/tklauser/go-sysconf v0.3.12/go.mod h1:Ho14jnntGE1fpdOqQEEaiKRpvIavV0h
github.com/tklauser/numcpus v0.6.1 h1:ng9scYS7az0Bk4OZLvrNXNSAO2Pxr1XXRAPyjhIx+Fk=
github.com/tklauser/numcpus v0.6.1/go.mod h1:1XfjsgE2zo8GVw7POkMbHENHzVg3GzmoZ9fESEdAacY=
github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM=
github.com/twmb/franz-go v1.17.1 h1:0LwPsbbJeJ9R91DPUHSEd4su82WJWcTY1Zzbgbg4CeQ=
github.com/twmb/franz-go v1.17.1/go.mod h1:NreRdJ2F7dziDY/m6VyspWd6sNxHKXdMZI42UfQ3GXM=
github.com/twmb/franz-go/pkg/kadm v1.13.0 h1:bJq4C2ZikUE2jh/wl9MtMTQ/kpmnBgVFh8XMQBEC+60=
github.com/twmb/franz-go/pkg/kadm v1.13.0/go.mod h1:VMvpfjz/szpH9WB+vGM+rteTzVv0djyHFimci9qm2C0=
github.com/twmb/franz-go/pkg/kfake v0.0.0-20240821035758-b77dd13e2bfa h1:OmQ4DJhqeOPdIH60Psut1vYU8A6LGyxJbF09w5RAa2w=
Expand Down
23 changes: 23 additions & 0 deletions pkg/storage/ingest/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,12 @@ type KafkaConfig struct {

// Used when logging unsampled client errors. Set from ingester's ErrorSampleRate.
FallbackClientErrorSampleRate int64 `yaml:"-"`

StartupFetchConcurrency int `yaml:"startup_fetch_concurrency"`
StartupRecordsPerFetch int `yaml:"startup_records_per_fetch"`
OngoingFetchConcurrency int `yaml:"ongoing_fetch_concurrency"`
OngoingRecordsPerFetch int `yaml:"ongoing_records_per_fetch"`
UseCompressedBytesAsFetchMaxBytes bool `yaml:"use_compressed_bytes_as_fetch_max_bytes"`
}

func (cfg *KafkaConfig) RegisterFlags(f *flag.FlagSet) {
Expand Down Expand Up @@ -126,6 +132,12 @@ func (cfg *KafkaConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet)
f.Int64Var(&cfg.ProducerMaxBufferedBytes, prefix+".producer-max-buffered-bytes", 1024*1024*1024, "The maximum size of (uncompressed) buffered and unacknowledged produced records sent to Kafka. The produce request fails once this limit is reached. This limit is per Kafka client. 0 to disable the limit.")

f.DurationVar(&cfg.WaitStrongReadConsistencyTimeout, prefix+".wait-strong-read-consistency-timeout", 20*time.Second, "The maximum allowed for a read requests processed by an ingester to wait until strong read consistency is enforced. 0 to disable the timeout.")

f.IntVar(&cfg.StartupFetchConcurrency, prefix+".startup-fetch-concurrency", 0, "The number of concurrent fetch requests that the ingester makes when reading data from Kafka during startup. 0 to disable.")
f.IntVar(&cfg.StartupRecordsPerFetch, prefix+".startup-records-per-fetch", 2500, "The number of records per fetch request that the ingester makes when reading data from Kafka during startup. Depends on "+prefix+".startup-fetch-concurrency being greater than 0.")
f.IntVar(&cfg.OngoingFetchConcurrency, prefix+".ongoing-fetch-concurrency", 0, "The number of concurrent fetch requests that the ingester makes when reading data continuously from Kafka after startup. Is disabled unless "+prefix+".startup-fetch-concurrency is greater than 0. It must be greater than 0.")
f.IntVar(&cfg.OngoingRecordsPerFetch, prefix+".ongoing-records-per-fetch", 30, "The number of records per fetch request that the ingester makes when reading data continuously from Kafka after startup. Depends on "+prefix+".ongoing-fetch-concurrency being greater than 0.")
f.BoolVar(&cfg.UseCompressedBytesAsFetchMaxBytes, prefix+".use-compressed-bytes-as-fetch-max-bytes", true, "When enabled, the fetch request MaxBytes field is computed using the compressed size of previous records. When disabled, MaxBytes is computed using uncompressed bytes. Different Kafka implementations interpret MaxBytes differently.")
}

func (cfg *KafkaConfig) Validate() error {
Expand Down Expand Up @@ -161,6 +173,17 @@ func (cfg *KafkaConfig) Validate() error {
return ErrInvalidMaxConsumerLagAtStartup
}

if cfg.StartupFetchConcurrency < 0 {
return fmt.Errorf("ingest-storage.kafka.startup-fetch-concurrency must be greater or equal to 0")
}

if cfg.StartupFetchConcurrency > 0 && cfg.OngoingFetchConcurrency <= 0 {
return fmt.Errorf("ingest-storage.kafka.ongoing-fetch-concurrency must be greater than 0 when startup-fetch-concurrency is greater than 0")
}

if cfg.StartupRecordsPerFetch <= 0 || cfg.OngoingRecordsPerFetch <= 0 {
return fmt.Errorf("ingest-storage.kafka.startup-records-per-fetch and ingest-storage.kafka.ongoing-records-per-fetch must be greater than 0")
}
return nil
}

Expand Down
Loading

0 comments on commit 2938fa9

Please sign in to comment.