From 2938fa94883788c29d9a90b8e6e11803c5fd0ac9 Mon Sep 17 00:00:00 2001 From: Dimitar Dimitrov Date: Fri, 27 Sep 2024 15:41:01 +0200 Subject: [PATCH] kafka replay speed: upstream concurrent fetchers This is the second of series of PRs to upstream the code for improving Kafka replay speed in the ingester. In this PR I'm upstreaming the fetching code. The core of the change is in `concurrentFetchers`. # `concurrentFetchers` Overview * **Segmentation (fetchWant):** The fetcher divides the work into segments called fetchWants. Each fetchWant represents a range of records to be fetched, defined by a start and end offset. This segmentation allows for concurrent fetching of different parts of the topic partition. * **Concurrent Fetching:** Multiple goroutines (defined by the concurrency parameter) work on fetching these segments simultaneously. Each goroutine runs the `run` method, which processes fetchWants from a channel. * **Fetching Process:** For each fetchWant, the fetcher attempts to retrieve the records through the `fetchSingle` method. This method: * Finds the leader for the partition * Builds a fetch request * Sends the request to the Kafka broker * Parses the response * **Multiple Attempts:** If a fetch attempt fails or doesn't retrieve all requested records, the fetcher will retry. It uses an error backoff mechanism to avoid overwhelming the system with rapid retries. The fetcher updates the start offset of the fetchWant based on the last successfully fetched record and continues until all requested records are retrieved or the context is cancelled. * **Overfetching Risk:** The system risks overfetching because it might retrieve records that have already been processed. This is handled by: * Tracking the last returned record offset (`lastReturnedRecord`) * Using `recordIndexAfterOffset` to find the first new record in each fetch result * Discarding any duplicate records before passing them to the consumer * **Ordering:** The fetcher ensures that segments are processed in order by: * Using a linked list (`pendingResults`) to keep track of fetch results in the order they were requested * Buffering results in `bufferedResult` and only sending them to `orderedFetches` channel when they're next in sequence * The `PollFetches` method, which consumers call to get records, receives from the `orderedFetches` channel, ensuring records are always returned in the correct order * **Adaptive Fetching:** The system adapts the size of fetch requests based on previous results. It estimates the bytes per record and adjusts the `MaxBytes` parameter of fetch requests accordingly, trying to optimize the amount of data fetched in each request. Signed-off-by: Dimitar Dimitrov --- cmd/mimir/config-descriptor.json | 50 ++ cmd/mimir/help-all.txt.tmpl | 10 + cmd/mimir/help.txt.tmpl | 10 + .../configuration-parameters/index.md | 31 + go.mod | 3 + go.sum | 4 +- pkg/storage/ingest/config.go | 23 + pkg/storage/ingest/fetcher.go | 712 ++++++++++++++++++ pkg/storage/ingest/fetcher_test.go | 654 ++++++++++++++++ pkg/storage/ingest/reader.go | 119 ++- pkg/storage/ingest/reader_client.go | 8 +- pkg/storage/ingest/reader_test.go | 121 ++- pkg/storage/ingest/util.go | 9 +- pkg/storage/ingest/writer_test.go | 4 + .../twmb/franz-go/pkg/kgo/client.go | 6 +- .../twmb/franz-go/pkg/kgo/compression.go | 2 + .../twmb/franz-go/pkg/kgo/source.go | 136 ++-- vendor/modules.txt | 3 +- 18 files changed, 1818 insertions(+), 87 deletions(-) create mode 100644 pkg/storage/ingest/fetcher.go create mode 100644 pkg/storage/ingest/fetcher_test.go diff --git a/cmd/mimir/config-descriptor.json b/cmd/mimir/config-descriptor.json index a7ff4d8a459..f02bd97ff0d 100644 --- a/cmd/mimir/config-descriptor.json +++ b/cmd/mimir/config-descriptor.json @@ -6603,6 +6603,56 @@ "fieldDefaultValue": 20000000000, "fieldFlag": "ingest-storage.kafka.wait-strong-read-consistency-timeout", "fieldType": "duration" + }, + { + "kind": "field", + "name": "startup_fetch_concurrency", + "required": false, + "desc": "The number of concurrent fetch requests that the ingester makes when reading data from Kafka during startup. 0 to disable.", + "fieldValue": null, + "fieldDefaultValue": 0, + "fieldFlag": "ingest-storage.kafka.startup-fetch-concurrency", + "fieldType": "int" + }, + { + "kind": "field", + "name": "startup_records_per_fetch", + "required": false, + "desc": "The number of records per fetch request that the ingester makes when reading data from Kafka during startup. Depends on ingest-storage.kafka.startup-fetch-concurrency being greater than 0.", + "fieldValue": null, + "fieldDefaultValue": 2500, + "fieldFlag": "ingest-storage.kafka.startup-records-per-fetch", + "fieldType": "int" + }, + { + "kind": "field", + "name": "ongoing_fetch_concurrency", + "required": false, + "desc": "The number of concurrent fetch requests that the ingester makes when reading data continuously from Kafka after startup. Is disabled unless ingest-storage.kafka.startup-fetch-concurrency is greater than 0. It must be greater than 0.", + "fieldValue": null, + "fieldDefaultValue": 0, + "fieldFlag": "ingest-storage.kafka.ongoing-fetch-concurrency", + "fieldType": "int" + }, + { + "kind": "field", + "name": "ongoing_records_per_fetch", + "required": false, + "desc": "The number of records per fetch request that the ingester makes when reading data continuously from Kafka after startup. Depends on ingest-storage.kafka.ongoing-fetch-concurrency being greater than 0.", + "fieldValue": null, + "fieldDefaultValue": 30, + "fieldFlag": "ingest-storage.kafka.ongoing-records-per-fetch", + "fieldType": "int" + }, + { + "kind": "field", + "name": "use_compressed_bytes_as_fetch_max_bytes", + "required": false, + "desc": "When enabled, the fetch request MaxBytes field is computed using the compressed size of previous records. When disabled, MaxBytes is computed using uncompressed bytes. Different Kafka implementations interpret MaxBytes differently.", + "fieldValue": null, + "fieldDefaultValue": true, + "fieldFlag": "ingest-storage.kafka.use-compressed-bytes-as-fetch-max-bytes", + "fieldType": "boolean" } ], "fieldValue": null, diff --git a/cmd/mimir/help-all.txt.tmpl b/cmd/mimir/help-all.txt.tmpl index 5dc7e93cca9..2cc97360e2a 100644 --- a/cmd/mimir/help-all.txt.tmpl +++ b/cmd/mimir/help-all.txt.tmpl @@ -1367,14 +1367,24 @@ Usage of ./cmd/mimir/mimir: How long to retry a failed request to get the last produced offset. (default 10s) -ingest-storage.kafka.max-consumer-lag-at-startup duration The guaranteed maximum lag before a consumer is considered to have caught up reading from a partition at startup, becomes ACTIVE in the hash ring and passes the readiness check. Set both -ingest-storage.kafka.target-consumer-lag-at-startup and -ingest-storage.kafka.max-consumer-lag-at-startup to 0 to disable waiting for maximum consumer lag being honored at startup. (default 15s) + -ingest-storage.kafka.ongoing-fetch-concurrency int + The number of concurrent fetch requests that the ingester makes when reading data continuously from Kafka after startup. Is disabled unless ingest-storage.kafka.startup-fetch-concurrency is greater than 0. It must be greater than 0. + -ingest-storage.kafka.ongoing-records-per-fetch int + The number of records per fetch request that the ingester makes when reading data continuously from Kafka after startup. Depends on ingest-storage.kafka.ongoing-fetch-concurrency being greater than 0. (default 30) -ingest-storage.kafka.producer-max-buffered-bytes int The maximum size of (uncompressed) buffered and unacknowledged produced records sent to Kafka. The produce request fails once this limit is reached. This limit is per Kafka client. 0 to disable the limit. (default 1073741824) -ingest-storage.kafka.producer-max-record-size-bytes int The maximum size of a Kafka record data that should be generated by the producer. An incoming write request larger than this size is split into multiple Kafka records. We strongly recommend to not change this setting unless for testing purposes. (default 15983616) + -ingest-storage.kafka.startup-fetch-concurrency int + The number of concurrent fetch requests that the ingester makes when reading data from Kafka during startup. 0 to disable. + -ingest-storage.kafka.startup-records-per-fetch int + The number of records per fetch request that the ingester makes when reading data from Kafka during startup. Depends on ingest-storage.kafka.startup-fetch-concurrency being greater than 0. (default 2500) -ingest-storage.kafka.target-consumer-lag-at-startup duration The best-effort maximum lag a consumer tries to achieve at startup. Set both -ingest-storage.kafka.target-consumer-lag-at-startup and -ingest-storage.kafka.max-consumer-lag-at-startup to 0 to disable waiting for maximum consumer lag being honored at startup. (default 2s) -ingest-storage.kafka.topic string The Kafka topic name. + -ingest-storage.kafka.use-compressed-bytes-as-fetch-max-bytes + When enabled, the fetch request MaxBytes field is computed using the compressed size of previous records. When disabled, MaxBytes is computed using uncompressed bytes. Different Kafka implementations interpret MaxBytes differently. (default true) -ingest-storage.kafka.wait-strong-read-consistency-timeout duration The maximum allowed for a read requests processed by an ingester to wait until strong read consistency is enforced. 0 to disable the timeout. (default 20s) -ingest-storage.kafka.write-clients int diff --git a/cmd/mimir/help.txt.tmpl b/cmd/mimir/help.txt.tmpl index e1e5df266bc..f116c70b989 100644 --- a/cmd/mimir/help.txt.tmpl +++ b/cmd/mimir/help.txt.tmpl @@ -439,14 +439,24 @@ Usage of ./cmd/mimir/mimir: How long to retry a failed request to get the last produced offset. (default 10s) -ingest-storage.kafka.max-consumer-lag-at-startup duration The guaranteed maximum lag before a consumer is considered to have caught up reading from a partition at startup, becomes ACTIVE in the hash ring and passes the readiness check. Set both -ingest-storage.kafka.target-consumer-lag-at-startup and -ingest-storage.kafka.max-consumer-lag-at-startup to 0 to disable waiting for maximum consumer lag being honored at startup. (default 15s) + -ingest-storage.kafka.ongoing-fetch-concurrency int + The number of concurrent fetch requests that the ingester makes when reading data continuously from Kafka after startup. Is disabled unless ingest-storage.kafka.startup-fetch-concurrency is greater than 0. It must be greater than 0. + -ingest-storage.kafka.ongoing-records-per-fetch int + The number of records per fetch request that the ingester makes when reading data continuously from Kafka after startup. Depends on ingest-storage.kafka.ongoing-fetch-concurrency being greater than 0. (default 30) -ingest-storage.kafka.producer-max-buffered-bytes int The maximum size of (uncompressed) buffered and unacknowledged produced records sent to Kafka. The produce request fails once this limit is reached. This limit is per Kafka client. 0 to disable the limit. (default 1073741824) -ingest-storage.kafka.producer-max-record-size-bytes int The maximum size of a Kafka record data that should be generated by the producer. An incoming write request larger than this size is split into multiple Kafka records. We strongly recommend to not change this setting unless for testing purposes. (default 15983616) + -ingest-storage.kafka.startup-fetch-concurrency int + The number of concurrent fetch requests that the ingester makes when reading data from Kafka during startup. 0 to disable. + -ingest-storage.kafka.startup-records-per-fetch int + The number of records per fetch request that the ingester makes when reading data from Kafka during startup. Depends on ingest-storage.kafka.startup-fetch-concurrency being greater than 0. (default 2500) -ingest-storage.kafka.target-consumer-lag-at-startup duration The best-effort maximum lag a consumer tries to achieve at startup. Set both -ingest-storage.kafka.target-consumer-lag-at-startup and -ingest-storage.kafka.max-consumer-lag-at-startup to 0 to disable waiting for maximum consumer lag being honored at startup. (default 2s) -ingest-storage.kafka.topic string The Kafka topic name. + -ingest-storage.kafka.use-compressed-bytes-as-fetch-max-bytes + When enabled, the fetch request MaxBytes field is computed using the compressed size of previous records. When disabled, MaxBytes is computed using uncompressed bytes. Different Kafka implementations interpret MaxBytes differently. (default true) -ingest-storage.kafka.wait-strong-read-consistency-timeout duration The maximum allowed for a read requests processed by an ingester to wait until strong read consistency is enforced. 0 to disable the timeout. (default 20s) -ingest-storage.kafka.write-clients int diff --git a/docs/sources/mimir/configure/configuration-parameters/index.md b/docs/sources/mimir/configure/configuration-parameters/index.md index 6631bbc7cb8..540635bec9a 100644 --- a/docs/sources/mimir/configure/configuration-parameters/index.md +++ b/docs/sources/mimir/configure/configuration-parameters/index.md @@ -3830,6 +3830,37 @@ kafka: # CLI flag: -ingest-storage.kafka.wait-strong-read-consistency-timeout [wait_strong_read_consistency_timeout: | default = 20s] + # The number of concurrent fetch requests that the ingester makes when reading + # data from Kafka during startup. 0 to disable. + # CLI flag: -ingest-storage.kafka.startup-fetch-concurrency + [startup_fetch_concurrency: | default = 0] + + # The number of records per fetch request that the ingester makes when reading + # data from Kafka during startup. Depends on + # ingest-storage.kafka.startup-fetch-concurrency being greater than 0. + # CLI flag: -ingest-storage.kafka.startup-records-per-fetch + [startup_records_per_fetch: | default = 2500] + + # The number of concurrent fetch requests that the ingester makes when reading + # data continuously from Kafka after startup. Is disabled unless + # ingest-storage.kafka.startup-fetch-concurrency is greater than 0. It must be + # greater than 0. + # CLI flag: -ingest-storage.kafka.ongoing-fetch-concurrency + [ongoing_fetch_concurrency: | default = 0] + + # The number of records per fetch request that the ingester makes when reading + # data continuously from Kafka after startup. Depends on + # ingest-storage.kafka.ongoing-fetch-concurrency being greater than 0. + # CLI flag: -ingest-storage.kafka.ongoing-records-per-fetch + [ongoing_records_per_fetch: | default = 30] + + # When enabled, the fetch request MaxBytes field is computed using the + # compressed size of previous records. When disabled, MaxBytes is computed + # using uncompressed bytes. Different Kafka implementations interpret MaxBytes + # differently. + # CLI flag: -ingest-storage.kafka.use-compressed-bytes-as-fetch-max-bytes + [use_compressed_bytes_as_fetch_max_bytes: | default = true] + migration: # When both this option and ingest storage are enabled, distributors write to # both Kafka and ingesters. A write request is considered successful only when diff --git a/go.mod b/go.mod index d703076ddc0..5e7775c3c25 100644 --- a/go.mod +++ b/go.mod @@ -315,6 +315,9 @@ replace github.com/opentracing-contrib/go-grpc => github.com/charleskorn/go-grpc // Replacing prometheus/alertmanager with our fork. replace github.com/prometheus/alertmanager => github.com/grafana/prometheus-alertmanager v0.25.1-0.20240924175849-b8b7c2c74eb6 +// Replacing with a fork commit based on v1.17.1 with https://github.com/twmb/franz-go/pull/803 cherry-picked. +replace github.com/twmb/franz-go => github.com/dimitarvdimitrov/franz-go v0.0.0-20240904145554-ceadc28d3bd9 + // Pin Google GRPC to v1.65.0 as v1.66.0 has API changes and also potentially performance regressions. // Following https://github.com/grafana/dskit/pull/581 replace google.golang.org/grpc => google.golang.org/grpc v1.65.0 diff --git a/go.sum b/go.sum index 3c7e4bacc23..c37ca80d19c 100644 --- a/go.sum +++ b/go.sum @@ -947,6 +947,8 @@ github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/r github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/digitalocean/godo v1.121.0 h1:ilXiHuEnhbJs2fmFEPX0r/QQ6KfiOIMAhJN3f8NiCfI= github.com/digitalocean/godo v1.121.0/go.mod h1:WQVH83OHUy6gC4gXpEVQKtxTd4L5oCp+5OialidkPLY= +github.com/dimitarvdimitrov/franz-go v0.0.0-20240904145554-ceadc28d3bd9 h1:jszPVGeTr25QTJ/jWiT7eXnabc4R4itChxUVFSCLjRQ= +github.com/dimitarvdimitrov/franz-go v0.0.0-20240904145554-ceadc28d3bd9/go.mod h1:NreRdJ2F7dziDY/m6VyspWd6sNxHKXdMZI42UfQ3GXM= github.com/distribution/reference v0.5.0 h1:/FUIFXtfc/x2gpa5/VGfiGLuOIdYa1t65IKK2OFGvA0= github.com/distribution/reference v0.5.0/go.mod h1:BbU0aIcezP1/5jX/8MP0YiH4SdvB5Y4f/wlDRiLyi3E= github.com/dlclark/regexp2 v1.11.0 h1:G/nrcoOa7ZXlpoa/91N3X7mM3r8eIlMBBJZvsz/mxKI= @@ -1680,8 +1682,6 @@ github.com/tklauser/go-sysconf v0.3.12/go.mod h1:Ho14jnntGE1fpdOqQEEaiKRpvIavV0h github.com/tklauser/numcpus v0.6.1 h1:ng9scYS7az0Bk4OZLvrNXNSAO2Pxr1XXRAPyjhIx+Fk= github.com/tklauser/numcpus v0.6.1/go.mod h1:1XfjsgE2zo8GVw7POkMbHENHzVg3GzmoZ9fESEdAacY= github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM= -github.com/twmb/franz-go v1.17.1 h1:0LwPsbbJeJ9R91DPUHSEd4su82WJWcTY1Zzbgbg4CeQ= -github.com/twmb/franz-go v1.17.1/go.mod h1:NreRdJ2F7dziDY/m6VyspWd6sNxHKXdMZI42UfQ3GXM= github.com/twmb/franz-go/pkg/kadm v1.13.0 h1:bJq4C2ZikUE2jh/wl9MtMTQ/kpmnBgVFh8XMQBEC+60= github.com/twmb/franz-go/pkg/kadm v1.13.0/go.mod h1:VMvpfjz/szpH9WB+vGM+rteTzVv0djyHFimci9qm2C0= github.com/twmb/franz-go/pkg/kfake v0.0.0-20240821035758-b77dd13e2bfa h1:OmQ4DJhqeOPdIH60Psut1vYU8A6LGyxJbF09w5RAa2w= diff --git a/pkg/storage/ingest/config.go b/pkg/storage/ingest/config.go index 2ea46e57c1b..264978a0228 100644 --- a/pkg/storage/ingest/config.go +++ b/pkg/storage/ingest/config.go @@ -92,6 +92,12 @@ type KafkaConfig struct { // Used when logging unsampled client errors. Set from ingester's ErrorSampleRate. FallbackClientErrorSampleRate int64 `yaml:"-"` + + StartupFetchConcurrency int `yaml:"startup_fetch_concurrency"` + StartupRecordsPerFetch int `yaml:"startup_records_per_fetch"` + OngoingFetchConcurrency int `yaml:"ongoing_fetch_concurrency"` + OngoingRecordsPerFetch int `yaml:"ongoing_records_per_fetch"` + UseCompressedBytesAsFetchMaxBytes bool `yaml:"use_compressed_bytes_as_fetch_max_bytes"` } func (cfg *KafkaConfig) RegisterFlags(f *flag.FlagSet) { @@ -126,6 +132,12 @@ func (cfg *KafkaConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) f.Int64Var(&cfg.ProducerMaxBufferedBytes, prefix+".producer-max-buffered-bytes", 1024*1024*1024, "The maximum size of (uncompressed) buffered and unacknowledged produced records sent to Kafka. The produce request fails once this limit is reached. This limit is per Kafka client. 0 to disable the limit.") f.DurationVar(&cfg.WaitStrongReadConsistencyTimeout, prefix+".wait-strong-read-consistency-timeout", 20*time.Second, "The maximum allowed for a read requests processed by an ingester to wait until strong read consistency is enforced. 0 to disable the timeout.") + + f.IntVar(&cfg.StartupFetchConcurrency, prefix+".startup-fetch-concurrency", 0, "The number of concurrent fetch requests that the ingester makes when reading data from Kafka during startup. 0 to disable.") + f.IntVar(&cfg.StartupRecordsPerFetch, prefix+".startup-records-per-fetch", 2500, "The number of records per fetch request that the ingester makes when reading data from Kafka during startup. Depends on "+prefix+".startup-fetch-concurrency being greater than 0.") + f.IntVar(&cfg.OngoingFetchConcurrency, prefix+".ongoing-fetch-concurrency", 0, "The number of concurrent fetch requests that the ingester makes when reading data continuously from Kafka after startup. Is disabled unless "+prefix+".startup-fetch-concurrency is greater than 0. It must be greater than 0.") + f.IntVar(&cfg.OngoingRecordsPerFetch, prefix+".ongoing-records-per-fetch", 30, "The number of records per fetch request that the ingester makes when reading data continuously from Kafka after startup. Depends on "+prefix+".ongoing-fetch-concurrency being greater than 0.") + f.BoolVar(&cfg.UseCompressedBytesAsFetchMaxBytes, prefix+".use-compressed-bytes-as-fetch-max-bytes", true, "When enabled, the fetch request MaxBytes field is computed using the compressed size of previous records. When disabled, MaxBytes is computed using uncompressed bytes. Different Kafka implementations interpret MaxBytes differently.") } func (cfg *KafkaConfig) Validate() error { @@ -161,6 +173,17 @@ func (cfg *KafkaConfig) Validate() error { return ErrInvalidMaxConsumerLagAtStartup } + if cfg.StartupFetchConcurrency < 0 { + return fmt.Errorf("ingest-storage.kafka.startup-fetch-concurrency must be greater or equal to 0") + } + + if cfg.StartupFetchConcurrency > 0 && cfg.OngoingFetchConcurrency <= 0 { + return fmt.Errorf("ingest-storage.kafka.ongoing-fetch-concurrency must be greater than 0 when startup-fetch-concurrency is greater than 0") + } + + if cfg.StartupRecordsPerFetch <= 0 || cfg.OngoingRecordsPerFetch <= 0 { + return fmt.Errorf("ingest-storage.kafka.startup-records-per-fetch and ingest-storage.kafka.ongoing-records-per-fetch must be greater than 0") + } return nil } diff --git a/pkg/storage/ingest/fetcher.go b/pkg/storage/ingest/fetcher.go new file mode 100644 index 00000000000..cfef1be052a --- /dev/null +++ b/pkg/storage/ingest/fetcher.go @@ -0,0 +1,712 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package ingest + +import ( + "container/list" + "context" + "errors" + "fmt" + "math" + "sync" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/grafana/dskit/backoff" + "github.com/opentracing/opentracing-go" + "github.com/twmb/franz-go/pkg/kadm" + "github.com/twmb/franz-go/pkg/kerr" + "github.com/twmb/franz-go/pkg/kgo" + "github.com/twmb/franz-go/pkg/kmsg" + "github.com/twmb/franz-go/plugin/kotel" + + "github.com/grafana/mimir/pkg/util/spanlogger" +) + +type fetcher interface { + // PollFetches fetches records from Kafka and returns them. + // The returned context is the context of fetching. It can also be equal to the passed context. + // The returned context may contain spans that were used to fetch from Kafka. + // Each record in the returned fetches also contains a context. + // You should use that when doing something specific to a tenant or a + // record and use the returned context when doing something that is common to all records. + PollFetches(context.Context) (kgo.Fetches, context.Context) + + // Update updates the fetcher with the given concurrency and records. + Update(ctx context.Context, concurrency, records int) + + // Stop stops the fetcher. + Stop() +} + +// fetchWant represents a range of offsets to fetch. +// Based on a given number of records, it tries to estimate how many bytes we need to fetch, given there's no support for fetching offsets directly. +// fetchWant also contains the channel on which to send the fetched records for the offset range. +type fetchWant struct { + startOffset int64 // inclusive + endOffset int64 // exclusive + bytesPerRecord int + + // result should be closed when there are no more fetches for this partition. It is ok to send multiple times on the channel. + result chan fetchResult +} + +func fetchWantFrom(offset int64, recordsPerFetch int) fetchWant { + return fetchWant{ + startOffset: offset, + endOffset: offset + int64(recordsPerFetch), + result: make(chan fetchResult), + } +} + +// Next returns the fetchWant for the next numRecords starting from the last known offset. +func (w fetchWant) Next(numRecords int) fetchWant { + n := fetchWantFrom(w.endOffset, numRecords) + n.bytesPerRecord = w.bytesPerRecord + return n.trimIfTooBig() +} + +// MaxBytes returns the maximum number of bytes we can fetch in a single request. +// It's capped at math.MaxInt32 to avoid overflow, and it'll always fetch a minimum of 1MB. +func (w fetchWant) MaxBytes() int32 { + fetchBytes := w.expectedBytes() + if fetchBytes > math.MaxInt32 { + // This shouldn't happen because w should have been trimmed before sending the request. + // But we definitely don't want to request negative bytes by casting to int32, so add this safeguard. + return math.MaxInt32 + } + fetchBytes = max(1_000_000, fetchBytes) // when we're fetching few records, we can afford to over-fetch to avoid more requests. + return int32(fetchBytes) +} + +// UpdateBytesPerRecord updates the expected bytes per record based on the results of the last fetch and trims the fetchWant if MaxBytes() would now exceed math.MaxInt32. +func (w fetchWant) UpdateBytesPerRecord(lastFetchBytes int, lastFetchNumberOfRecords int) fetchWant { + // Smooth over the estimation to avoid having outlier fetches from throwing off the estimation. + // We don't want a fetch of 5 records to determine how we fetch the next fetch of 6000 records. + // Ideally we weigh the estimation on the number of records observed, but it's simpler to smooth it over with a constant factor. + const currentEstimateWeight = 0.8 + + actualBytesPerRecord := float64(lastFetchBytes) / float64(lastFetchNumberOfRecords) + w.bytesPerRecord = int(currentEstimateWeight*float64(w.bytesPerRecord) + (1-currentEstimateWeight)*actualBytesPerRecord) + + return w.trimIfTooBig() +} + +// expectedBytes returns how many bytes we'd need to accommodate the range of offsets using bytesPerRecord. +// They may be more than the kafka protocol supports (> MaxInt32). Use MaxBytes. +func (w fetchWant) expectedBytes() int { + // We over-fetch bytes to reduce the likelihood of under-fetching and having to run another request. + // Based on some testing 65% of under-estimations are by less than 5%. So we account for that. + const overFetchBytesFactor = 1.05 + return int(overFetchBytesFactor * float64(w.bytesPerRecord*int(w.endOffset-w.startOffset))) +} + +// trimIfTooBig adjusts the end offset if we expect to fetch too many bytes. +// It's capped at math.MaxInt32 bytes. +func (w fetchWant) trimIfTooBig() fetchWant { + if w.expectedBytes() <= math.MaxInt32 { + return w + } + // We are overflowing, so we need to trim the end offset. + // We do this by calculating how many records we can fetch with the max bytes, and then setting the end offset to that. + w.endOffset = w.startOffset + int64(math.MaxInt32/w.bytesPerRecord) + return w +} + +type fetchResult struct { + kgo.FetchPartition + ctx context.Context + fetchedBytes int + + waitingToBePickedUpFromOrderedFetchesSpan opentracing.Span +} + +func (fr *fetchResult) logCompletedFetch(fetchStartTime time.Time, w fetchWant) { + var logger log.Logger = spanlogger.FromContext(fr.ctx, log.NewNopLogger()) + + msg := "fetched records" + if fr.Err != nil { + msg = "received an error while fetching records; will retry after processing received records (if any)" + } + var ( + gotRecords = int64(len(fr.Records)) + askedRecords = w.endOffset - w.startOffset + ) + switch { + case fr.Err == nil, errors.Is(fr.Err, kerr.OffsetOutOfRange): + logger = level.Debug(logger) + default: + logger = level.Error(logger) + } + var firstTimestamp, lastTimestamp string + if gotRecords > 0 { + firstTimestamp = fr.Records[0].Timestamp.String() + lastTimestamp = fr.Records[gotRecords-1].Timestamp.String() + } + logger.Log( + "msg", msg, + "duration", time.Since(fetchStartTime), + "start_offset", w.startOffset, + "end_offset", w.endOffset, + "asked_records", askedRecords, + "got_records", gotRecords, + "diff_records", askedRecords-gotRecords, + "asked_bytes", w.MaxBytes(), + "got_bytes", fr.fetchedBytes, + "diff_bytes", int(w.MaxBytes())-fr.fetchedBytes, + "first_timestamp", firstTimestamp, + "last_timestamp", lastTimestamp, + "hwm", fr.HighWatermark, + "lso", fr.LogStartOffset, + "err", fr.Err, + ) +} + +func (fr *fetchResult) startWaitingForConsumption() { + fr.waitingToBePickedUpFromOrderedFetchesSpan, fr.ctx = opentracing.StartSpanFromContext(fr.ctx, "fetchResult.waitingForConsumption") +} + +func (fr *fetchResult) finishWaitingForConsumption() { + if fr.waitingToBePickedUpFromOrderedFetchesSpan == nil { + fr.waitingToBePickedUpFromOrderedFetchesSpan, fr.ctx = opentracing.StartSpanFromContext(fr.ctx, "fetchResult.noWaitingForConsumption") + } + fr.waitingToBePickedUpFromOrderedFetchesSpan.Finish() +} + +// Merge merges other with an older fetchResult. mergedWith keeps most of the fields of fr and assumes they are more up to date then other's. +func (fr *fetchResult) Merge(older fetchResult) fetchResult { + if older.ctx != nil { + level.Debug(spanlogger.FromContext(older.ctx, log.NewNopLogger())).Log("msg", "merged fetch result with the next result") + } + + // older.Records are older than fr.Records, so we append them first. + fr.Records = append(older.Records, fr.Records...) + + // We ignore HighWatermark, LogStartOffset, LastStableOffset because this result should be more up to date. + fr.fetchedBytes += older.fetchedBytes + return *fr +} + +func newEmptyFetchResult(ctx context.Context, err error) fetchResult { + return fetchResult{ + ctx: ctx, + fetchedBytes: 0, + FetchPartition: kgo.FetchPartition{Err: err}, + } +} + +type concurrentFetchers struct { + wg sync.WaitGroup + done chan struct{} + + client *kgo.Client + logger log.Logger + partitionID int32 + topicID [16]byte + topicName string + metrics *readerMetrics + tracer *kotel.Tracer + + minBytesWaitTime time.Duration + + orderedFetches chan fetchResult + lastReturnedRecord int64 + startOffsets *genericOffsetReader[int64] + + // trackCompressedBytes controls whether to calculate MaxBytes for fetch requests based on previous responses' compressed or uncompressed bytes. + trackCompressedBytes bool +} + +func (r *concurrentFetchers) Stop() { + close(r.done) + + r.wg.Wait() + level.Info(r.logger).Log("msg", "stopped concurrent fetchers", "last_returned_record", r.lastReturnedRecord) +} + +// newConcurrentFetchers creates a new concurrentFetchers. startOffset can be kafkaOffsetStart, kafkaOffsetEnd or a specific offset. +func newConcurrentFetchers( + ctx context.Context, + client *kgo.Client, + logger log.Logger, + topic string, + partition int32, + startOffset int64, + concurrency int, + recordsPerFetch int, + trackCompressedBytes bool, + minBytesWaitTime time.Duration, + offsetReader *partitionOffsetClient, + startOffsetsReader *genericOffsetReader[int64], + metrics *readerMetrics, +) (*concurrentFetchers, error) { + + var err error + switch startOffset { + case kafkaOffsetStart: + startOffset, err = offsetReader.FetchPartitionStartOffset(ctx, partition) + case kafkaOffsetEnd: + startOffset, err = offsetReader.FetchPartitionLastProducedOffset(ctx, partition) + // End (-1) means "ignore all existing records". FetchPartitionLastProducedOffset returns the offset of an existing record. + // We need to start from the next one, which is still not produced. + startOffset++ + } + if err != nil { + return nil, fmt.Errorf("resolving offset to start consuming from: %w", err) + } + f := &concurrentFetchers{ + client: client, + logger: logger, + topicName: topic, + partitionID: partition, + metrics: metrics, + minBytesWaitTime: minBytesWaitTime, + lastReturnedRecord: startOffset - 1, + startOffsets: startOffsetsReader, + trackCompressedBytes: trackCompressedBytes, + tracer: recordsTracer(), + orderedFetches: make(chan fetchResult), + done: make(chan struct{}), + } + + topics, err := kadm.NewClient(client).ListTopics(ctx, topic) + if err != nil { + return nil, fmt.Errorf("failed to find topic ID: %w", err) + } + if !topics.Has(topic) { + return nil, fmt.Errorf("failed to find topic ID: topic not found") + } + f.topicID = topics[topic].ID + + f.wg.Add(1) + go f.start(ctx, startOffset, concurrency, recordsPerFetch) + + return f, nil +} + +func (r *concurrentFetchers) Update(ctx context.Context, concurrency, records int) { + r.Stop() + r.done = make(chan struct{}) + + r.wg.Add(1) + go r.start(ctx, r.lastReturnedRecord+1, concurrency, records) +} + +func (r *concurrentFetchers) PollFetches(ctx context.Context) (kgo.Fetches, context.Context) { + waitStartTime := time.Now() + select { + case <-ctx.Done(): + return kgo.Fetches{}, ctx + case f := <-r.orderedFetches: + firstUnreturnedRecordIdx := recordIndexAfterOffset(f.Records, r.lastReturnedRecord) + r.recordOrderedFetchTelemetry(f, firstUnreturnedRecordIdx, waitStartTime) + + f.Records = f.Records[firstUnreturnedRecordIdx:] + if len(f.Records) > 0 { + r.lastReturnedRecord = f.Records[len(f.Records)-1].Offset + } + + return kgo.Fetches{{ + Topics: []kgo.FetchTopic{ + { + Topic: r.topicName, + Partitions: []kgo.FetchPartition{f.FetchPartition}, + }, + }, + }}, f.ctx + } +} + +func recordIndexAfterOffset(records []*kgo.Record, offset int64) int { + for i, r := range records { + if r.Offset > offset { + return i + } + } + return len(records) +} + +func (r *concurrentFetchers) recordOrderedFetchTelemetry(f fetchResult, firstReturnedRecordIndex int, waitStartTime time.Time) { + waitDuration := time.Since(waitStartTime) + level.Debug(r.logger).Log("msg", "received ordered fetch", "num_records", len(f.Records), "wait_duration", waitDuration) + r.metrics.fetchWaitDuration.Observe(waitDuration.Seconds()) + + doubleFetchedBytes := 0 + for i, record := range f.Records { + if i < firstReturnedRecordIndex { + doubleFetchedBytes += len(record.Value) + spanlogger.FromContext(record.Context, r.logger).DebugLog("msg", "skipping record because it has already been returned", "offset", record.Offset) + } + r.tracer.OnFetchRecordUnbuffered(record, true) + } + r.metrics.fetchedDiscardedRecordBytes.Add(float64(doubleFetchedBytes)) +} + +// fetchSingle attempts to find out the leader leader Kafka broker for a partition and then sends a fetch request to the leader of the fetchWant request and parses the responses +// fetchSingle returns a fetchResult which may or may not fulfil the entire fetchWant. +// If ctx is cancelled, fetchSingle will return an empty fetchResult without an error. +func (r *concurrentFetchers) fetchSingle(ctx context.Context, fw fetchWant) (fr fetchResult) { + defer func(fetchStartTime time.Time) { + fr.logCompletedFetch(fetchStartTime, fw) + }(time.Now()) + + leaderID, leaderEpoch, err := r.client.PartitionLeader(r.topicName, r.partitionID) + if err != nil || (leaderID == -1 && leaderEpoch == -1) { + if err != nil { + return newEmptyFetchResult(ctx, fmt.Errorf("finding leader for partition: %w", err)) + } + return newEmptyFetchResult(ctx, errUnknownPartitionLeader) + } + + req := r.buildFetchRequest(fw, leaderEpoch) + + resp, err := req.RequestWith(ctx, r.client.Broker(int(leaderID))) + if err != nil { + if errors.Is(err, context.Canceled) { + return newEmptyFetchResult(ctx, nil) + } + return newEmptyFetchResult(ctx, fmt.Errorf("fetching from kafka: %w", err)) + } + + return r.parseFetchResponse(ctx, fw.startOffset, resp) +} + +func (r *concurrentFetchers) buildFetchRequest(fw fetchWant, leaderEpoch int32) kmsg.FetchRequest { + req := kmsg.NewFetchRequest() + req.MinBytes = 1 // Warpstream ignores this field. This means that the WaitTime below is always waited and MaxBytes play a bigger role in how fast Ws responds. + req.Version = 13 + req.MaxWaitMillis = int32(r.minBytesWaitTime / time.Millisecond) + req.MaxBytes = fw.MaxBytes() + + reqTopic := kmsg.NewFetchRequestTopic() + reqTopic.Topic = r.topicName + reqTopic.TopicID = r.topicID + + reqPartition := kmsg.NewFetchRequestTopicPartition() + reqPartition.Partition = r.partitionID + reqPartition.FetchOffset = fw.startOffset + reqPartition.PartitionMaxBytes = req.MaxBytes + reqPartition.CurrentLeaderEpoch = leaderEpoch + + reqTopic.Partitions = append(reqTopic.Partitions, reqPartition) + req.Topics = append(req.Topics, reqTopic) + return req +} + +func (r *concurrentFetchers) parseFetchResponse(ctx context.Context, startOffset int64, resp *kmsg.FetchResponse) fetchResult { + // Here we ignore resp.ErrorCode. That error code was added for support for KIP-227 and is only set if we're using fetch sessions. We don't use fetch sessions. + // We also ignore rawPartitionResp.PreferredReadReplica to keep the code simpler. We don't provide any rack in the FetchRequest, so the broker _probably_ doesn't have a recommended replica for us. + + // Sanity check for the response we get. + // If we get something we didn't expect, maybe we're sending the wrong request or there's a bug in the kafka implementation. + // Even in case of errors we get the topic partition. + err := assertResponseContainsPartition(resp, r.topicID, r.partitionID) + if err != nil { + return newEmptyFetchResult(ctx, err) + } + + parseOptions := kgo.ProcessFetchPartitionOptions{ + KeepControlRecords: false, + Offset: startOffset, + IsolationLevel: kgo.ReadUncommitted(), // we don't produce in transactions, but leaving this here so it's explicit. + Topic: r.topicName, + Partition: r.partitionID, + } + + observeMetrics := func(m kgo.FetchBatchMetrics) { + brokerMeta := kgo.BrokerMetadata{} // leave it empty because kprom doesn't use it, and we don't exactly have all the metadata + r.metrics.kprom.OnFetchBatchRead(brokerMeta, r.topicName, r.partitionID, m) + } + rawPartitionResp := resp.Topics[0].Partitions[0] + partition, _ := kgo.ProcessRespPartition(parseOptions, &rawPartitionResp, observeMetrics) + partition.EachRecord(r.tracer.OnFetchRecordBuffered) + partition.EachRecord(func(r *kgo.Record) { + spanlogger.FromContext(r.Context, log.NewNopLogger()).DebugLog("msg", "received record") + }) + + fetchedBytes := len(rawPartitionResp.RecordBatches) + if !r.trackCompressedBytes { + fetchedBytes = sumRecordLengths(partition.Records) + } + + return fetchResult{ + ctx: ctx, + FetchPartition: partition, + fetchedBytes: fetchedBytes, + } +} + +func assertResponseContainsPartition(resp *kmsg.FetchResponse, topicID kadm.TopicID, partitionID int32) error { + if topics := resp.Topics; len(topics) < 1 || topics[0].TopicID != topicID { + receivedTopicID := kadm.TopicID{} + if len(topics) > 0 { + receivedTopicID = topics[0].TopicID + } + return fmt.Errorf("didn't find expected topic %s in fetch response; received topic %s", topicID, receivedTopicID) + } + if partitions := resp.Topics[0].Partitions; len(partitions) < 1 || partitions[0].Partition != partitionID { + receivedPartitionID := int32(-1) + if len(partitions) > 0 { + receivedPartitionID = partitions[0].Partition + } + return fmt.Errorf("didn't find expected partition %d in fetch response; received partition %d", partitionID, receivedPartitionID) + } + return nil +} + +func sumRecordLengths(records []*kgo.Record) (sum int) { + for _, r := range records { + sum += len(r.Value) + } + return sum +} + +func (r *concurrentFetchers) run(ctx context.Context, wants chan fetchWant, logger log.Logger) { + defer r.wg.Done() + + errBackoff := backoff.New(ctx, backoff.Config{ + MinBackoff: 250 * time.Millisecond, + MaxBackoff: 2 * time.Second, + MaxRetries: 0, // retry forever + }) + + for w := range wants { + // Start new span for each fetchWant. We want to record the lifecycle of a single record from being fetched to being ingested. + wantSpan, ctx := spanlogger.NewWithLogger(ctx, logger, "concurrentFetcher.fetch") + wantSpan.SetTag("start_offset", w.startOffset) + wantSpan.SetTag("end_offset", w.endOffset) + + var previousResult fetchResult + for attempt := 0; errBackoff.Ongoing() && w.endOffset > w.startOffset; attempt++ { + attemptSpan, ctx := spanlogger.NewWithLogger(ctx, logger, "concurrentFetcher.fetch.attempt") + attemptSpan.SetTag("attempt", attempt) + + f := r.fetchSingle(ctx, w) + f = f.Merge(previousResult) + previousResult = f + if f.Err != nil { + w = handleKafkaFetchErr(f.Err, w, errBackoff, r.startOffsets, r.client, attemptSpan) + } + + if len(f.Records) == 0 { + // Typically if we had an error, then there wouldn't be any records. + // But it's hard to verify this for all errors from the Kafka API docs, so just to be sure, we process any records we might have received. + attemptSpan.Finish() + + // There is a chance we've been told to stop even when we have no records. + select { + case <-r.done: + wantSpan.Finish() + close(w.result) + return + default: + } + + continue + } + // Next attempt will be from the last record onwards. + w.startOffset = f.Records[len(f.Records)-1].Offset + 1 + + // We reset the backoff if we received any records whatsoever. A received record means _some_ success. + // We don't want to slow down until we hit a larger error. + errBackoff.Reset() + + select { + case <-r.done: + wantSpan.Finish() + attemptSpan.Finish() + close(w.result) + return + case w.result <- f: + previousResult = fetchResult{} + case <-ctx.Done(): + default: + if w.startOffset >= w.endOffset { + // We've fetched all we were asked for the whole batch is ready, and we definitely have to wait to send on the channel now. + f.startWaitingForConsumption() + select { + case <-r.done: + wantSpan.Finish() + attemptSpan.Finish() + close(w.result) + return + case w.result <- f: + previousResult = fetchResult{} + case <-ctx.Done(): + } + } + } + attemptSpan.Finish() + } + wantSpan.Finish() + close(w.result) + } +} + +func (r *concurrentFetchers) start(ctx context.Context, startOffset int64, concurrency, recordsPerFetch int) { + level.Info(r.logger).Log("msg", "starting concurrent fetchers", "start_offset", startOffset, "concurrency", concurrency, "recordsPerFetch", recordsPerFetch) + + wants := make(chan fetchWant) + defer close(wants) + r.wg.Add(concurrency) + for i := 0; i < concurrency; i++ { + logger := log.With(r.logger, "fetcher", i) + go r.run(ctx, wants, logger) + } + + var ( + nextFetch = fetchWantFrom(startOffset, recordsPerFetch) + nextResult chan fetchResult + pendingResults = list.New() + + bufferedResult fetchResult + readyBufferedResults chan fetchResult // this is non-nil when bufferedResult is non-empty + ) + nextFetch.bytesPerRecord = 10_000 // start with an estimation, we will update it as we consume + + // We need to make sure we don't leak any goroutine given that start is called within a goroutine. + defer r.wg.Done() + for { + refillBufferedResult := nextResult + if readyBufferedResults != nil { + // We have a single result that's still not consumed. + // So we don't try to get new results from the fetchers. + refillBufferedResult = nil + } + select { + case <-r.done: + return + case <-ctx.Done(): + return + + case wants <- nextFetch: + pendingResults.PushBack(nextFetch.result) + if nextResult == nil { + // In case we previously exhausted pendingResults, we just created + nextResult = pendingResults.Front().Value.(chan fetchResult) + pendingResults.Remove(pendingResults.Front()) + } + nextFetch = nextFetch.Next(recordsPerFetch) + + case result, moreLeft := <-refillBufferedResult: + if !moreLeft { + if pendingResults.Len() > 0 { + nextResult = pendingResults.Front().Value.(chan fetchResult) + pendingResults.Remove(pendingResults.Front()) + } else { + nextResult = nil + } + continue + } + nextFetch = nextFetch.UpdateBytesPerRecord(result.fetchedBytes, len(result.Records)) + bufferedResult = result + readyBufferedResults = r.orderedFetches + + case readyBufferedResults <- bufferedResult: + bufferedResult.finishWaitingForConsumption() + readyBufferedResults = nil + bufferedResult = fetchResult{} + } + } +} + +type waiter interface { + Wait() +} + +type metadataRefresher interface { + ForceMetadataRefresh() +} + +// handleKafkaFetchErr handles all the errors listed in the franz-go documentation as possible errors when fetching records. +// For most of them we just apply a backoff. They are listed here so we can be explicit in what we're handling and how. +// It may also return an adjusted fetchWant in case the error indicated, we were consuming not yet produced records or records already deleted due to retention. +func handleKafkaFetchErr(err error, fw fetchWant, longBackoff waiter, partitionStartOffset *genericOffsetReader[int64], refresher metadataRefresher, logger log.Logger) fetchWant { + // Typically franz-go will update its own metadata when it detects a change in brokers. But it's hard to verify this. + // So we force a metadata refresh here to be sure. + // It's ok to call this from multiple fetchers concurrently. franz-go will only be sending one metadata request at a time (whether automatic, periodic, or forced). + // + // Metadata refresh is asynchronous. So even after forcing the refresh we might have outdated metadata. + // Hopefully the backoff that will follow is enough to get the latest metadata. + // If not, the fetcher will end up here again on the next attempt. + triggerMetadataRefresh := refresher.ForceMetadataRefresh + + switch { + case err == nil: + case errors.Is(err, kerr.OffsetOutOfRange): + // We're either consuming from before the first offset or after the last offset. + partitionStart, err := partitionStartOffset.CachedOffset() + logger = log.With(logger, "log_start_offset", partitionStart, "start_offset", fw.startOffset, "end_offset", fw.endOffset) + if err != nil { + level.Error(logger).Log("msg", "failed to find start offset to readjust on OffsetOutOfRange; retrying same records range", "err", err) + break + } + + if fw.startOffset < partitionStart { + // We're too far behind. + if partitionStart >= fw.endOffset { + // The next fetch want is responsible for this range. We set startOffset=endOffset to effectively mark this fetch as complete. + fw.startOffset = fw.endOffset + level.Debug(logger).Log("msg", "we're too far behind aborting fetch") + break + } + // Only some of the offsets of our want are out of range, so let's fast-forward. + fw.startOffset = partitionStart + level.Debug(logger).Log("msg", "part of fetch want is outside of available offsets, adjusted start offset") + } else { + // If the broker is behind or if we are requesting offsets which have not yet been produced, we end up here. + // We set a MaxWaitMillis on fetch requests, but even then there may be no records for some time. + // Wait for a short time to allow the broker to catch up or for new records to be produced. + level.Debug(logger).Log("msg", "offset out of range; waiting for new records to be produced") + } + case errors.Is(err, kerr.TopicAuthorizationFailed): + longBackoff.Wait() + case errors.Is(err, kerr.UnknownTopicOrPartition): + longBackoff.Wait() + case errors.Is(err, kerr.UnsupportedCompressionType): + level.Error(logger).Log("msg", "received UNSUPPORTED_COMPRESSION_TYPE from kafka; this shouldn't happen; please report this as a bug", "err", err) + longBackoff.Wait() // this shouldn't happen - only happens when the request version was under 10, but we always use 13 - log error and backoff - we can't afford to lose records + case errors.Is(err, kerr.UnsupportedVersion): + level.Error(logger).Log("msg", "received UNSUPPORTED_VERSION from kafka; the Kafka cluster is probably too old", "err", err) + longBackoff.Wait() // in this case our client is too old, not much we can do. This will probably continue logging the error until someone upgrades their Kafka cluster. + case errors.Is(err, kerr.KafkaStorageError): + longBackoff.Wait() // server-side error, effectively same as HTTP 500 + case errors.Is(err, kerr.UnknownTopicID): + longBackoff.Wait() // Maybe it wasn't created by the producers yet. + case errors.Is(err, kerr.OffsetMovedToTieredStorage): + level.Error(logger).Log("msg", "received OFFSET_MOVED_TO_TIERED_STORAGE from kafka; this shouldn't happen; please report this as a bug", "err", err) + longBackoff.Wait() // This should be only intra-broker error, and we shouldn't get it. + case errors.Is(err, kerr.NotLeaderForPartition): + // We're asking a broker which is no longer the leader. For a partition. We should refresh our metadata and try again. + triggerMetadataRefresh() + longBackoff.Wait() + case errors.Is(err, kerr.ReplicaNotAvailable): + // Maybe the replica hasn't replicated the log yet, or it is no longer a replica for this partition. + // We should refresh and try again with a leader or replica which is up to date. + triggerMetadataRefresh() + longBackoff.Wait() + case errors.Is(err, kerr.UnknownLeaderEpoch): + // Maybe there's an ongoing election. We should refresh our metadata and try again with a leader in the current epoch. + triggerMetadataRefresh() + longBackoff.Wait() + case errors.Is(err, kerr.FencedLeaderEpoch): + // We missed a new epoch (leader election). We should refresh our metadata and try again with a leader in the current epoch. + triggerMetadataRefresh() + longBackoff.Wait() + case errors.Is(err, kerr.LeaderNotAvailable): + // This isn't listed in the possible errors in franz-go, but Apache Kafka returns it when the partition has no leader. + triggerMetadataRefresh() + longBackoff.Wait() + case errors.Is(err, errUnknownPartitionLeader): + triggerMetadataRefresh() + longBackoff.Wait() + case errors.Is(err, &kgo.ErrFirstReadEOF{}): + longBackoff.Wait() + + default: + level.Error(logger).Log("msg", "received an error we're not prepared to handle; this shouldn't happen; please report this as a bug", "err", err) + longBackoff.Wait() + } + return fw +} diff --git a/pkg/storage/ingest/fetcher_test.go b/pkg/storage/ingest/fetcher_test.go new file mode 100644 index 00000000000..b4403551b76 --- /dev/null +++ b/pkg/storage/ingest/fetcher_test.go @@ -0,0 +1,654 @@ +package ingest + +import ( + "context" + "fmt" + "sync" + "testing" + "time" + + "github.com/go-kit/log" + "github.com/grafana/dskit/services" + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/twmb/franz-go/pkg/kerr" + "github.com/twmb/franz-go/pkg/kgo" + "go.uber.org/atomic" + + "github.com/grafana/mimir/pkg/util/testkafka" +) + +func TestHandleKafkaFetchErr(t *testing.T) { + logger := log.NewNopLogger() + + tests := map[string]struct { + err error + lso int64 + fw fetchWant + + expectedFw fetchWant + expectedBackoff bool + expectedMetadataRefresh bool + }{ + "no error": { + err: nil, + lso: 1, + fw: fetchWant{ + startOffset: 1, + endOffset: 5, + }, + expectedFw: fetchWant{ + startOffset: 1, + endOffset: 5, + }, + }, + "offset out of range - fetching slightly before start": { + err: kerr.OffsetOutOfRange, + lso: 5, + fw: fetchWant{ + startOffset: 4, + endOffset: 10, + }, + expectedFw: fetchWant{ + startOffset: 5, + endOffset: 10, + }, + }, + "offset out of range - fetching completely outside of available offsets": { + err: kerr.OffsetOutOfRange, + lso: 5, + fw: fetchWant{ + startOffset: 1, + endOffset: 3, + }, + expectedFw: fetchWant{ + startOffset: 3, + endOffset: 3, + }, + }, + "recoverable error": { + err: kerr.KafkaStorageError, + lso: -1, // unknown + fw: fetchWant{ + startOffset: 11, + endOffset: 15, + }, + expectedFw: fetchWant{ + startOffset: 11, + endOffset: 15, + }, + expectedBackoff: true, + }, + "NotLeaderForPartition": { + err: kerr.NotLeaderForPartition, + lso: 5, + fw: fetchWant{ + startOffset: 11, + endOffset: 15, + }, + expectedFw: fetchWant{ + startOffset: 11, + endOffset: 15, + }, + expectedBackoff: true, + expectedMetadataRefresh: true, + }, + "ReplicaNotAvailable": { + err: kerr.ReplicaNotAvailable, + lso: 5, + fw: fetchWant{ + startOffset: 11, + endOffset: 15, + }, + expectedFw: fetchWant{ + startOffset: 11, + endOffset: 15, + }, + expectedBackoff: true, + expectedMetadataRefresh: true, + }, + "UnknownLeaderEpoch": { + err: kerr.UnknownLeaderEpoch, + lso: 5, + fw: fetchWant{ + startOffset: 11, + endOffset: 15, + }, + expectedFw: fetchWant{ + startOffset: 11, + endOffset: 15, + }, + expectedBackoff: true, + expectedMetadataRefresh: true, + }, + "FencedLeaderEpoch": { + err: kerr.FencedLeaderEpoch, + lso: 5, + fw: fetchWant{ + startOffset: 11, + endOffset: 15, + }, + expectedFw: fetchWant{ + startOffset: 11, + endOffset: 15, + }, + expectedBackoff: true, + expectedMetadataRefresh: true, + }, + "LeaderNotAvailable": { + err: kerr.LeaderNotAvailable, + lso: 5, + fw: fetchWant{ + startOffset: 11, + endOffset: 15, + }, + expectedFw: fetchWant{ + startOffset: 11, + endOffset: 15, + }, + expectedBackoff: true, + expectedMetadataRefresh: true, + }, + "errUnknownPartitionLeader": { + err: errUnknownPartitionLeader, + lso: 5, + fw: fetchWant{ + startOffset: 11, + endOffset: 15, + }, + expectedFw: fetchWant{ + startOffset: 11, + endOffset: 15, + }, + expectedBackoff: true, + expectedMetadataRefresh: true, + }, + } + + for testName, testCase := range tests { + t.Run(testName, func(t *testing.T) { + waitedBackoff := false + backoff := waiterFunc(func() { waitedBackoff = true }) + refreshed := false + refresher := refresherFunc(func() { refreshed = true }) + + offsetR := newGenericOffsetReader(func(_ context.Context) (int64, error) { + return testCase.lso, nil + }, time.Millisecond, logger) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), offsetR)) + t.Cleanup(func() { + require.NoError(t, services.StopAndAwaitTerminated(context.Background(), offsetR)) + }) + + actualFw := handleKafkaFetchErr(testCase.err, testCase.fw, backoff, offsetR, refresher, logger) + assert.Equal(t, testCase.expectedFw, actualFw) + assert.Equal(t, testCase.expectedBackoff, waitedBackoff) + assert.Equal(t, testCase.expectedMetadataRefresh, refreshed) + }) + } +} + +func TestConcurrentFetchers(t *testing.T) { + const ( + topicName = "test-topic" + partitionID = 1 + recordsPerFetch = 3 + concurrency = 2 + ) + + t.Run("respect context cancellation", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer cancel() + + _, clusterAddr := testkafka.CreateCluster(t, partitionID+1, topicName) + client := newKafkaProduceClient(t, clusterAddr) + + fetchers := createConcurrentFetchers(ctx, t, client, topicName, partitionID, 0, concurrency, recordsPerFetch) + + // This should not block forever now + fetches, fetchCtx := fetchers.PollFetches(ctx) + + assert.Zero(t, fetches.NumRecords()) + assert.Error(t, fetchCtx.Err(), "Expected context to be cancelled") + }) + + t.Run("cold replay", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + _, clusterAddr := testkafka.CreateCluster(t, partitionID+1, topicName) + client := newKafkaProduceClient(t, clusterAddr) + + // Produce some records before starting the fetchers + for i := 0; i < 5; i++ { + produceRecord(ctx, t, client, topicName, partitionID, []byte(fmt.Sprintf("record-%d", i))) + } + + fetchers := createConcurrentFetchers(ctx, t, client, topicName, partitionID, 0, concurrency, recordsPerFetch) + + fetches, _ := fetchers.PollFetches(ctx) + assert.Equal(t, fetches.NumRecords(), 5) + }) + + t.Run("fetch records produced after startup", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + _, clusterAddr := testkafka.CreateCluster(t, partitionID+1, topicName) + client := newKafkaProduceClient(t, clusterAddr) + + fetchers := createConcurrentFetchers(ctx, t, client, topicName, partitionID, 0, concurrency, recordsPerFetch) + + // Produce some records after starting the fetchers + for i := 0; i < 3; i++ { + produceRecord(ctx, t, client, topicName, partitionID, []byte(fmt.Sprintf("record-%d", i))) + } + + fetches, _ := fetchers.PollFetches(ctx) + assert.Equal(t, fetches.NumRecords(), 3) + }) + + t.Run("slow processing of fetches", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + _, clusterAddr := testkafka.CreateCluster(t, partitionID+1, topicName) + client := newKafkaProduceClient(t, clusterAddr) + + fetchers := createConcurrentFetchers(ctx, t, client, topicName, partitionID, 0, concurrency, recordsPerFetch) + + // Produce some records + for i := 0; i < 5; i++ { + produceRecord(ctx, t, client, topicName, partitionID, []byte(fmt.Sprintf("record-%d", i))) + } + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + consumedRecords := 0 + for consumedRecords < 10 { + fetches, _ := fetchers.PollFetches(ctx) + time.Sleep(1000 * time.Millisecond) // Simulate slow processing + consumedRecords += fetches.NumRecords() + } + assert.Equal(t, 10, consumedRecords) + }() + + // Produce more records while processing is slow + for i := 5; i < 10; i++ { + produceRecord(ctx, t, client, topicName, partitionID, []byte(fmt.Sprintf("record-%d", i))) + } + + wg.Wait() + }) + + t.Run("fast processing of fetches", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + _, clusterAddr := testkafka.CreateCluster(t, partitionID+1, topicName) + client := newKafkaProduceClient(t, clusterAddr) + + fetchers := createConcurrentFetchers(ctx, t, client, topicName, partitionID, 0, concurrency, recordsPerFetch) + + // Produce some records + for i := 0; i < 10; i++ { + produceRecord(ctx, t, client, topicName, partitionID, []byte(fmt.Sprintf("record-%d", i))) + } + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + consumedRecords := 0 + for consumedRecords < 10 { + fetches, _ := fetchers.PollFetches(ctx) + consumedRecords += fetches.NumRecords() + // no processing delay + } + assert.Equal(t, 10, consumedRecords) + }() + + wg.Wait() + }) + + t.Run("fetch with different concurrency levels", func(t *testing.T) { + for _, concurrency := range []int{1, 2, 4} { + t.Run(fmt.Sprintf("concurrency-%d", concurrency), func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + _, clusterAddr := testkafka.CreateCluster(t, partitionID+1, topicName) + client := newKafkaProduceClient(t, clusterAddr) + + fetchers := createConcurrentFetchers(ctx, t, client, topicName, partitionID, 0, concurrency, 2) + + // Produce some records + for i := 0; i < 20; i++ { + produceRecord(ctx, t, client, topicName, partitionID, []byte(fmt.Sprintf("record-%d", i))) + } + + var totalRecords int + for totalRecords < 20 { + fetches, _ := fetchers.PollFetches(ctx) + totalRecords += fetches.NumRecords() + } + + assert.Equal(t, 20, totalRecords) + }) + } + }) + + t.Run("start from mid-stream offset", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + _, clusterAddr := testkafka.CreateCluster(t, partitionID+1, topicName) + client := newKafkaProduceClient(t, clusterAddr) + + // Produce some initial records + for i := 0; i < 5; i++ { + produceRecord(ctx, t, client, topicName, partitionID, []byte(fmt.Sprintf("record-%d", i))) + } + + // Get the offset of the last produced record + lastOffset := produceRecord(ctx, t, client, topicName, partitionID, []byte("last-initial-record")) + + // Start fetchers from the offset after the initial records + fetchers := createConcurrentFetchers(ctx, t, client, topicName, partitionID, lastOffset-1, concurrency, recordsPerFetch) + + // Produce some more records + for i := 0; i < 3; i++ { + produceRecord(ctx, t, client, topicName, partitionID, []byte(fmt.Sprintf("new-record-%d", i))) + } + + const expectedRecords = 5 + fetchedRecordsContents := make([]string, 0, expectedRecords) + for len(fetchedRecordsContents) < expectedRecords { + fetches, _ := fetchers.PollFetches(ctx) + fetches.EachRecord(func(r *kgo.Record) { + fetchedRecordsContents = append(fetchedRecordsContents, string(r.Value)) + }) + } + + assert.Equal(t, []string{ + "record-4", + "last-initial-record", + "new-record-0", + "new-record-1", + "new-record-2", + }, fetchedRecordsContents) + }) + + t.Run("synchronous produce and fetch", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + _, clusterAddr := testkafka.CreateCluster(t, partitionID+1, topicName) + client := newKafkaProduceClient(t, clusterAddr) + + fetchers := createConcurrentFetchers(ctx, t, client, topicName, partitionID, 0, concurrency, recordsPerFetch) + + for round := 0; round < 3; round++ { + t.Log("starting round", round) + const recordsPerRound = 4 + // Produce a few records + expectedRecords := make([]string, 0, recordsPerRound) + for i := 0; i < recordsPerRound; i++ { + rec := []byte(fmt.Sprintf("round-%d-record-%d", round, i)) + expectedRecords = append(expectedRecords, string(rec)) + producedOffset := produceRecord(ctx, t, client, topicName, partitionID, rec) + t.Log("produced", producedOffset, string(rec)) + } + + // Poll for fetches and verify + fetchedRecords := make([]string, 0, recordsPerRound) + for len(fetchedRecords) < recordsPerRound { + fetches, _ := fetchers.PollFetches(ctx) + fetches.EachRecord(func(r *kgo.Record) { + fetchedRecords = append(fetchedRecords, string(r.Value)) + t.Log("fetched", r.Offset, string(r.Value)) + }) + } + + // Verify fetched records + assert.Equal(t, expectedRecords, fetchedRecords, "Fetched records in round %d do not match expected", round) + } + }) + + t.Run("concurrency can be updated", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + rec1 := []byte("record-1") + rec2 := []byte("record-2") + rec3 := []byte("record-3") + + _, clusterAddr := testkafka.CreateCluster(t, partitionID+1, topicName) + client := newKafkaProduceClient(t, clusterAddr) + fetchers := createConcurrentFetchers(ctx, t, client, topicName, partitionID, 0, concurrency, recordsPerFetch) + + produceRecordAndAssert := func(record []byte) { + producedOffset := produceRecord(ctx, t, client, topicName, partitionID, record) + // verify that the record is fetched. + + var fetches kgo.Fetches + require.Eventually(t, func() bool { + fetches, _ = fetchers.PollFetches(ctx) + return len(fetches.Records()) == 1 + }, 5*time.Second, 100*time.Millisecond) + + require.Equal(t, fetches.Records()[0].Value, record) + require.Equal(t, fetches.Records()[0].Offset, producedOffset) + } + + // Ensure that the fetchers work with the initial concurrency. + produceRecordAndAssert(rec1) + + // Now, update the concurrency. + fetchers.Update(ctx, 1, 1) + + // Ensure that the fetchers work with the updated concurrency. + produceRecordAndAssert(rec2) + + // Update and verify again. + fetchers.Update(ctx, 10, 10) + produceRecordAndAssert(rec3) + + }) + + t.Run("update concurrency with continuous production", func(t *testing.T) { + t.Parallel() + const ( + testDuration = 10 * time.Second + produceInterval = 10 * time.Millisecond + initialConcurrency = 2 + ) + + ctx, cancel := context.WithTimeout(context.Background(), testDuration) + defer cancel() + + produceCtx, cancelProduce := context.WithTimeout(context.Background(), testDuration) + defer cancelProduce() + + _, clusterAddr := testkafka.CreateCluster(t, partitionID+1, topicName) + client := newKafkaProduceClient(t, clusterAddr) + + producedCount := atomic.NewInt64(0) + + // Start producing records continuously + go func() { + ticker := time.NewTicker(produceInterval) + defer ticker.Stop() + + for { + select { + case <-produceCtx.Done(): + return + case <-ticker.C: + count := producedCount.Inc() + record := fmt.Sprintf("record-%d", count) + produceRecord(produceCtx, t, client, topicName, partitionID, []byte(record)) + } + } + }() + + fetchers := createConcurrentFetchers(ctx, t, client, topicName, partitionID, 0, initialConcurrency, recordsPerFetch) + + fetchedRecords := make([]*kgo.Record, 0) + fetchedCount := atomic.NewInt64(0) + + fetchRecords := func(duration time.Duration) { + deadline := time.Now().Add(duration) + for time.Now().Before(deadline) { + fetches, _ := fetchers.PollFetches(ctx) + fetches.EachRecord(func(r *kgo.Record) { + fetchedRecords = append(fetchedRecords, r) + fetchedCount.Inc() + }) + } + } + + // Initial fetch with starting concurrency + fetchRecords(2 * time.Second) + initialFetched := fetchedCount.Load() + + // Update to higher concurrency + fetchers.Update(ctx, 4, recordsPerFetch) + fetchRecords(3 * time.Second) + highConcurrencyFetched := fetchedCount.Load() - initialFetched + + // Update to lower concurrency + fetchers.Update(ctx, 1, recordsPerFetch) + fetchRecords(3 * time.Second) + + cancelProduce() + // Produce everything that's left now. + fetchRecords(time.Second) + totalProduced := producedCount.Load() + totalFetched := fetchedCount.Load() + + // Verify fetched records + assert.True(t, totalFetched > 0, "Expected to fetch some records") + assert.Equal(t, totalFetched, totalProduced, "Should not fetch more records than produced") + assert.True(t, highConcurrencyFetched > initialFetched, "Expected to fetch more records with higher concurrency") + + // Verify record contents + for i, record := range fetchedRecords { + expectedContent := fmt.Sprintf("record-%d", i+1) + assert.Equal(t, expectedContent, string(record.Value), + "Record %d has unexpected content: %s", i, string(record.Value)) + } + + // Log some statistics + t.Logf("Total produced: %d, Total fetched: %d", totalProduced, totalFetched) + t.Logf("Fetched with initial concurrency: %d", initialFetched) + t.Logf("Fetched with high concurrency: %d", highConcurrencyFetched) + t.Logf("Fetched with low concurrency: %d", totalFetched-initialFetched-highConcurrencyFetched) + }) + + t.Run("consume from end and update immediately", func(t *testing.T) { + t.Parallel() + const ( + initialRecords = 100 + additionalRecords = 50 + initialConcurrency = 2 + updatedConcurrency = 4 + ) + + ctx := context.Background() + + _, clusterAddr := testkafka.CreateCluster(t, partitionID+1, topicName) + client := newKafkaProduceClient(t, clusterAddr) + + // Produce initial records + for i := 0; i < initialRecords; i++ { + record := fmt.Sprintf("initial-record-%d", i+1) + produceRecord(ctx, t, client, topicName, partitionID, []byte(record)) + } + + // Start concurrent fetchers from the end + fetchers := createConcurrentFetchers(ctx, t, client, topicName, partitionID, kafkaOffsetEnd, initialConcurrency, recordsPerFetch) + + // Immediately update concurrency + fetchers.Update(ctx, updatedConcurrency, recordsPerFetch) + + // Produce additional records + for i := 0; i < additionalRecords; i++ { + record := fmt.Sprintf("additional-record-%d", i+1) + produceRecord(ctx, t, client, topicName, partitionID, []byte(record)) + } + + fetchedRecords := make([]*kgo.Record, 0, additionalRecords) + fetchDeadline := time.Now().Add(5 * time.Second) + + // Fetch records + for len(fetchedRecords) < additionalRecords && time.Now().Before(fetchDeadline) { + fetches, _ := fetchers.PollFetches(ctx) + fetches.EachRecord(func(r *kgo.Record) { + fetchedRecords = append(fetchedRecords, r) + }) + } + + // Verify fetched records + assert.LessOrEqual(t, len(fetchedRecords), additionalRecords, + "Should not fetch more records than produced after start") + + // Verify record contents + for i, record := range fetchedRecords { + expectedContent := fmt.Sprintf("additional-record-%d", i+1) + assert.Equal(t, expectedContent, string(record.Value), + "Record %d has unexpected content: %s", i, string(record.Value)) + } + + // Log some statistics + t.Logf("Total records produced: %d", initialRecords+additionalRecords) + t.Logf("Records produced after start: %d", additionalRecords) + t.Logf("Records fetched: %d", len(fetchedRecords)) + }) +} + +func createConcurrentFetchers(ctx context.Context, t *testing.T, client *kgo.Client, topic string, partition int32, startOffset int64, concurrency, recordsPerFetch int) *concurrentFetchers { + logger := log.NewNopLogger() + reg := prometheus.NewPedanticRegistry() + metrics := newReaderMetrics(partition, reg) + + // This instantiates the fields of kprom. + // This is usually done by franz-go, but since now we use the metrics ourselves, we need to instantiate the metrics ourselves. + metrics.kprom.OnNewClient(client) + + offsetReader := newPartitionOffsetClient(client, topic, reg, logger) + + startOffsetsReader := newGenericOffsetReader(func(ctx context.Context) (int64, error) { + return offsetReader.FetchPartitionStartOffset(ctx, partition) + }, time.Second, logger) + + f, err := newConcurrentFetchers( + ctx, + client, + logger, + topic, + partition, + startOffset, + concurrency, + recordsPerFetch, + false, + time.Second, // same order of magnitude as the real one (defaultMinBytesMaxWaitTime), but faster for tests + offsetReader, + startOffsetsReader, + &metrics, + ) + require.NoError(t, err) + t.Cleanup(f.Stop) + + return f +} + +type waiterFunc func() + +func (w waiterFunc) Wait() { w() } + +type refresherFunc func() + +func (r refresherFunc) ForceMetadataRefresh() { r() } diff --git a/pkg/storage/ingest/reader.go b/pkg/storage/ingest/reader.go index 4cbd9df43b0..121c0362611 100644 --- a/pkg/storage/ingest/reader.go +++ b/pkg/storage/ingest/reader.go @@ -14,6 +14,7 @@ import ( "github.com/grafana/dskit/backoff" "github.com/grafana/dskit/multierror" "github.com/grafana/dskit/services" + "github.com/opentracing/opentracing-go" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -33,11 +34,17 @@ const ( // kafkaOffsetEnd is a special offset value that means the end of the partition. kafkaOffsetEnd = int64(-1) + + // defaultMinBytesMaxWaitTime is the time the Kafka broker can wait for MinBytes to be filled. + // This is usually used when there aren't enough records available to fulfil MinBytes, so the broker waits for more records to be produced. + // Warpstream clamps this between 5s and 30s. + defaultMinBytesMaxWaitTime = 5 * time.Second ) var ( errWaitStrongReadConsistencyTimeoutExceeded = errors.Wrap(context.DeadlineExceeded, "wait strong read consistency timeout exceeded") errWaitTargetLagDeadlineExceeded = errors.Wrap(context.DeadlineExceeded, "target lag deadline exceeded") + errUnknownPartitionLeader = fmt.Errorf("unknown partition leader") ) type record struct { @@ -48,7 +55,9 @@ type record struct { } type recordConsumer interface { - // consume should return an error only if there is a recoverable error. Returning an error will cause consumption to slow down. + // Consume consumes the given records in the order they are provided. We need this as samples that will be ingested, + // are also needed to be in order to avoid ingesting samples out of order. + // The function is expected to be idempotent and incremental, meaning that it can be called multiple times with the same records, and it won't respond to context cancellation. consume(context.Context, []record) error } @@ -56,11 +65,13 @@ type PartitionReader struct { services.Service dependencies *services.Manager - kafkaCfg KafkaConfig - partitionID int32 - consumerGroup string + kafkaCfg KafkaConfig + partitionID int32 + consumerGroup string + concurrentFetchersMinBytesMaxWaitTime time.Duration - client *kgo.Client + client *kgo.Client + fetcher fetcher consumer recordConsumer metrics readerMetrics @@ -76,6 +87,16 @@ type PartitionReader struct { reg prometheus.Registerer } +// Stop implements fetcher +func (r *PartitionReader) Stop() { + // Given the partition reader has no concurrency it doesn't support stopping anything. +} + +// Update implements fetcher +func (r *PartitionReader) Update(_ context.Context, _, _ int) { + // Given the partition reader has no concurrency it doesn't support updates. +} + func NewPartitionReaderForPusher(kafkaCfg KafkaConfig, partitionID int32, instanceID string, pusher Pusher, logger log.Logger, reg prometheus.Registerer) (*PartitionReader, error) { consumer := newPusherConsumer(pusher, util_log.NewSampler(kafkaCfg.FallbackClientErrorSampleRate), reg, logger) return newPartitionReader(kafkaCfg, partitionID, instanceID, consumer, logger, reg) @@ -83,14 +104,15 @@ func NewPartitionReaderForPusher(kafkaCfg KafkaConfig, partitionID int32, instan func newPartitionReader(kafkaCfg KafkaConfig, partitionID int32, instanceID string, consumer recordConsumer, logger log.Logger, reg prometheus.Registerer) (*PartitionReader, error) { r := &PartitionReader{ - kafkaCfg: kafkaCfg, - partitionID: partitionID, - consumer: consumer, - consumerGroup: kafkaCfg.GetConsumerGroup(instanceID, partitionID), - metrics: newReaderMetrics(partitionID, reg), - consumedOffsetWatcher: newPartitionOffsetWatcher(), - logger: log.With(logger, "partition", partitionID), - reg: reg, + kafkaCfg: kafkaCfg, + partitionID: partitionID, + consumer: consumer, + consumerGroup: kafkaCfg.GetConsumerGroup(instanceID, partitionID), + metrics: newReaderMetrics(partitionID, reg), + consumedOffsetWatcher: newPartitionOffsetWatcher(), + concurrentFetchersMinBytesMaxWaitTime: defaultMinBytesMaxWaitTime, + logger: log.With(logger, "partition", partitionID), + reg: reg, } r.Service = services.NewBasicService(r.start, r.run, r.stop) @@ -125,9 +147,21 @@ func (r *PartitionReader) start(ctx context.Context) (returnErr error) { } r.committer = newPartitionCommitter(r.kafkaCfg, kadm.NewClient(r.client), r.partitionID, r.consumerGroup, r.logger, r.reg) - r.offsetReader = newPartitionOffsetReader(r.client, r.kafkaCfg.Topic, r.partitionID, r.kafkaCfg.LastProducedOffsetPollInterval, r.reg, r.logger) + offsetsClient := newPartitionOffsetClient(r.client, r.kafkaCfg.Topic, r.reg, r.logger) + + // It's ok to have the start offset slightly outdated. + // We only need this offset accurate if we fall behind or if we start and the log gets truncated from beneath us. + // In both cases we should recover after receiving one updated value. + // In the more common case where this offset is used when we're fetching from after the end, there we don't need an accurate value. + const startOffsetReaderRefreshDuration = 10 * time.Second + getPartitionStart := func(ctx context.Context) (int64, error) { + return offsetsClient.FetchPartitionStartOffset(ctx, r.partitionID) + } + startOffsetReader := newGenericOffsetReader(getPartitionStart, startOffsetReaderRefreshDuration, r.logger) + + r.offsetReader = newPartitionOffsetReaderWithOffsetClient(offsetsClient, r.partitionID, r.kafkaCfg.LastProducedOffsetPollInterval, r.logger) - r.dependencies, err = services.NewManager(r.committer, r.offsetReader, r.consumedOffsetWatcher) + r.dependencies, err = services.NewManager(r.committer, r.offsetReader, r.consumedOffsetWatcher, startOffsetReader) if err != nil { return errors.Wrap(err, "creating service manager") } @@ -136,6 +170,16 @@ func (r *PartitionReader) start(ctx context.Context) (returnErr error) { return errors.Wrap(err, "starting service manager") } + if r.kafkaCfg.StartupFetchConcurrency > 0 { + f, err := newConcurrentFetchers(ctx, r.client, r.logger, r.kafkaCfg.Topic, r.partitionID, startOffset, r.kafkaCfg.StartupFetchConcurrency, r.kafkaCfg.StartupRecordsPerFetch, r.kafkaCfg.UseCompressedBytesAsFetchMaxBytes, r.concurrentFetchersMinBytesMaxWaitTime, offsetsClient, startOffsetReader, &r.metrics) + if err != nil { + return errors.Wrap(err, "creating concurrent fetchers during startup") + } + r.fetcher = f + } else { + r.fetcher = r + } + // Enforce the max consumer lag (if enabled). if targetLag, maxLag := r.kafkaCfg.TargetConsumerLagAtStartup, r.kafkaCfg.MaxConsumerLagAtStartup; targetLag > 0 && maxLag > 0 { if startOffset != kafkaOffsetEnd { @@ -163,6 +207,10 @@ func (r *PartitionReader) stopDependencies() error { } } + if r.fetcher != nil { + r.fetcher.Stop() + } + if r.client != nil { r.client.Close() } @@ -171,6 +219,8 @@ func (r *PartitionReader) stopDependencies() error { } func (r *PartitionReader) run(ctx context.Context) error { + r.fetcher.Update(ctx, r.kafkaCfg.OngoingFetchConcurrency, r.kafkaCfg.OngoingRecordsPerFetch) + for ctx.Err() == nil { err := r.processNextFetches(ctx, r.metrics.receiveDelayWhenRunning) if err != nil && !errors.Is(err, context.Canceled) { @@ -183,7 +233,9 @@ func (r *PartitionReader) run(ctx context.Context) error { } func (r *PartitionReader) processNextFetches(ctx context.Context, delayObserver prometheus.Observer) error { - fetches := r.pollFetches(ctx) + fetches, fetchCtx := r.fetcher.PollFetches(ctx) + // Propagate the fetching span to consuming the records. + ctx = opentracing.ContextWithSpan(ctx, opentracing.SpanFromContext(fetchCtx)) r.recordFetchesMetrics(fetches, delayObserver) r.logFetchErrors(fetches) fetches = filterOutErrFetches(fetches) @@ -212,14 +264,16 @@ func (r *PartitionReader) processNextFetchesUntilTargetOrMaxLagHonored(ctx conte // If the target lag hasn't been reached with the first attempt (which stops once at least the max lag // is honored) then we try to reach the (lower) target lag within a fixed time (best-effort). - // The timeout is equal to the max lag. This is done because we expect at least a 2x replay speed - // from Kafka (which means at most it takes 1s to ingest 2s of data): assuming new data is continuously - // written to the partition, we give the reader maxLag time to replay the backlog + ingest the new data - // written in the meanwhile. + // The timeout is equal to the max lag x2. This is done because the ongoing fetcher config reduces lag more slowly, + // but is better at keeping up with the partition and minimizing e2e lag. func() (time.Duration, error) { - timedCtx, cancel := context.WithTimeoutCause(ctx, maxLag, errWaitTargetLagDeadlineExceeded) + timedCtx, cancel := context.WithTimeoutCause(ctx, 2*maxLag, errWaitTargetLagDeadlineExceeded) defer cancel() + // Don't use timedCtx because we want the fetchers to continue running + // At this point we're close enough to the end of the partition that we should switch to the more responsive fetcher. + r.fetcher.Update(ctx, r.kafkaCfg.OngoingFetchConcurrency, r.kafkaCfg.OngoingRecordsPerFetch) + return r.processNextFetchesUntilLagHonored(timedCtx, targetLag, logger) }, @@ -388,6 +442,9 @@ func (r *PartitionReader) enqueueCommit(fetches kgo.Fetches) { } func (r *PartitionReader) consumeFetches(ctx context.Context, fetches kgo.Fetches) error { + span, ctx := opentracing.StartSpanFromContext(ctx, "PartitionReader.consumeFetches") + defer span.Finish() + if fetches.NumRecords() == 0 { return nil } @@ -414,6 +471,12 @@ func (r *PartitionReader) consumeFetches(ctx context.Context, fetches kgo.Fetche MaxBackoff: 2 * time.Second, MaxRetries: 0, // retry forever }) + defer func(consumeStart time.Time) { + r.metrics.consumeLatency.Observe(time.Since(consumeStart).Seconds()) + }(time.Now()) + + logger := spanlogger.FromContext(ctx, r.logger) + for boff.Ongoing() { // If the PartitionReader is stopping and the ctx was cancelled, we don't want to interrupt the in-flight // processing midway. Instead, we let it finish, assuming it'll succeed. @@ -421,13 +484,12 @@ func (r *PartitionReader) consumeFetches(ctx context.Context, fetches kgo.Fetche // There is an edge-case when the processing gets stuck and doesn't let the stopping process. In such a case, // we expect the infrastructure (e.g. k8s) to eventually kill the process. consumeCtx := context.WithoutCancel(ctx) - consumeStart := time.Now() err := r.consumer.consume(consumeCtx, records) - r.metrics.consumeLatency.Observe(time.Since(consumeStart).Seconds()) if err == nil { + level.Debug(logger).Log("msg", "closing consumer after successful consumption") return nil } - level.Error(r.logger).Log( + level.Error(logger).Log( "msg", "encountered error while ingesting data from Kafka; should retry", "err", err, "record_min_offset", minOffset, @@ -648,11 +710,11 @@ func (r *PartitionReader) waitReadConsistency(ctx context.Context, withOffset bo return err } -func (r *PartitionReader) pollFetches(ctx context.Context) kgo.Fetches { +func (r *PartitionReader) PollFetches(ctx context.Context) (result kgo.Fetches, fetchContext context.Context) { defer func(start time.Time) { r.metrics.fetchWaitDuration.Observe(time.Since(start).Seconds()) }(time.Now()) - return r.client.PollFetches(ctx) + return r.client.PollFetches(ctx), ctx } type partitionCommitter struct { @@ -792,6 +854,7 @@ type readerMetrics struct { fetchesErrors prometheus.Counter fetchesTotal prometheus.Counter fetchWaitDuration prometheus.Histogram + fetchedDiscardedRecordBytes prometheus.Counter strongConsistencyInstrumentation *StrongReadConsistencyInstrumentation[struct{}] lastConsumedOffset prometheus.Gauge consumeLatency prometheus.Histogram @@ -841,6 +904,10 @@ func newReaderMetrics(partitionID int32, reg prometheus.Registerer) readerMetric Help: "How long a consumer spent waiting for a batch of records from the Kafka client. If fetching is faster than processing, then this will be close to 0.", NativeHistogramBucketFactor: 1.1, }), + fetchedDiscardedRecordBytes: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "cortex_ingest_storage_reader_fetched_discarded_bytes_total", + Help: "Total number of uncompressed bytes of records discarded from because they were already consumed. A higher rate means that the concurrent fetching estimations are less accurate.", + }), consumeLatency: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ Name: "cortex_ingest_storage_reader_records_batch_process_duration_seconds", Help: "How long a consumer spent processing a batch of records from Kafka.", diff --git a/pkg/storage/ingest/reader_client.go b/pkg/storage/ingest/reader_client.go index 65331aa8ab0..628fa545fac 100644 --- a/pkg/storage/ingest/reader_client.go +++ b/pkg/storage/ingest/reader_client.go @@ -3,8 +3,6 @@ package ingest import ( - "time" - "github.com/go-kit/log" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" @@ -18,15 +16,17 @@ func NewKafkaReaderClient(cfg KafkaConfig, metrics *kprom.Metrics, logger log.Lo opts = append(opts, commonKafkaClientOptions(cfg, metrics, logger)...) opts = append(opts, + // Fetch configuration is unused when using concurrent fetchers. kgo.FetchMinBytes(1), kgo.FetchMaxBytes(fetchMaxBytes), - kgo.FetchMaxWait(5*time.Second), + kgo.FetchMaxWait(defaultMinBytesMaxWaitTime), kgo.FetchMaxPartitionBytes(50_000_000), // BrokerMaxReadBytes sets the maximum response size that can be read from // Kafka. This is a safety measure to avoid OOMing on invalid responses. // franz-go recommendation is to set it 2x FetchMaxBytes. - kgo.BrokerMaxReadBytes(2*fetchMaxBytes), + // With concurrent fetchers we set FetchMaxBytes and FetchMaxPartitionBytes on a per-request basis, so here we put a high enough limit that should work for those requests. + kgo.BrokerMaxReadBytes(1_000_000_000), ) client, err := kgo.NewClient(opts...) if err != nil { diff --git a/pkg/storage/ingest/reader_test.go b/pkg/storage/ingest/reader_test.go index 14e012486de..dd6790dd546 100644 --- a/pkg/storage/ingest/reader_test.go +++ b/pkg/storage/ingest/reader_test.go @@ -219,7 +219,7 @@ func TestPartitionReader_WaitReadConsistencyUntilLastProducedOffset_And_WaitRead setup := func(t *testing.T, consumer recordConsumer, opts ...readerTestCfgOtp) (*PartitionReader, *kgo.Client, *prometheus.Registry) { reg := prometheus.NewPedanticRegistry() - _, clusterAddr := testkafka.CreateCluster(t, 1, topicName) + _, clusterAddr := testkafka.CreateCluster(t, partitionID+1, topicName) // Configure the reader to poll the "last produced offset" frequently. reader := createAndStartReader(ctx, t, clusterAddr, topicName, partitionID, consumer, @@ -876,6 +876,122 @@ func TestPartitionReader_ConsumeAtStartup(t *testing.T) { } }) + t.Run("should consume partition from start if position=start, and wait until target lag is honored, and then consume some records after lag is honored", func(t *testing.T) { + t.Parallel() + + var ( + cluster, clusterAddr = testkafka.CreateCluster(t, partitionID+1, topicName) + fetchRequestsCount = atomic.NewInt64(0) + fetchShouldFail = atomic.NewBool(false) + consumedRecordsMx sync.Mutex + consumedRecords []string + ) + + consumer := consumerFunc(func(_ context.Context, records []record) error { + consumedRecordsMx.Lock() + defer consumedRecordsMx.Unlock() + + for _, r := range records { + consumedRecords = append(consumedRecords, string(r.content)) + } + return nil + }) + + cluster.ControlKey(int16(kmsg.Fetch), func(kmsg.Request) (kmsg.Response, error, bool) { + cluster.KeepControl() + fetchRequestsCount.Inc() + + if fetchShouldFail.Load() { + return nil, errors.New("mocked error"), true + } + + return nil, nil, false + }) + + // Produce some records. + writeClient := newKafkaProduceClient(t, clusterAddr) + produceRecord(ctx, t, writeClient, topicName, partitionID, []byte("record-1")) + produceRecord(ctx, t, writeClient, topicName, partitionID, []byte("record-2")) + t.Log("produced 2 records") + + // Run the test twice with the same Kafka cluster to show that second time it consumes all records again. + // Reset the test. + fetchShouldFail.Store(true) + fetchRequestsCount.Store(0) + consumedRecordsMx.Lock() + consumedRecords = nil + consumedRecordsMx.Unlock() + + // Create and start the reader. + reg := prometheus.NewPedanticRegistry() + logs := &concurrency.SyncBuffer{} + reader := createReader(t, clusterAddr, topicName, partitionID, consumer, + withConsumeFromPositionAtStartup(consumeFromStart), + withTargetAndMaxConsumerLagAtStartup(time.Second, 2*time.Second), + withRegistry(reg), + withLogger(log.NewLogfmtLogger(logs))) + require.NoError(t, reader.StartAsync(ctx)) + t.Cleanup(func() { + require.NoError(t, services.StopAndAwaitTerminated(ctx, reader)) + }) + + // Wait until the Kafka cluster received few Fetch requests. + test.Poll(t, 5*time.Second, true, func() interface{} { + return fetchRequestsCount.Load() > 2 + }) + + // Since the mocked Kafka cluster is configured to fail any Fetch we expect the reader hasn't + // catched up yet, and it's still in Starting state. + assert.Equal(t, services.Starting, reader.State()) + + // Unblock the Fetch requests. Now they will succeed. + fetchShouldFail.Store(false) + + // We expect the reader to catch up, and then switch to Running state. + test.Poll(t, 5*time.Second, services.Running, func() interface{} { + return reader.State() + }) + + // We expect the reader to have switched to running because target consumer lag has been honored. + assert.Contains(t, logs.String(), "partition reader consumed partition and current lag is lower than configured target consumer lag") + + // We expect the reader to have consumed the partition from start. + test.Poll(t, time.Second, []string{"record-1", "record-2"}, func() interface{} { + consumedRecordsMx.Lock() + defer consumedRecordsMx.Unlock() + return slices.Clone(consumedRecords) + }) + + // We expect the last consumed offset to be tracked in a metric. + test.Poll(t, time.Second, nil, func() interface{} { + return promtest.GatherAndCompare(reg, strings.NewReader(` + # HELP cortex_ingest_storage_reader_last_consumed_offset The last offset successfully consumed by the partition reader. Set to -1 if not offset has been consumed yet. + # TYPE cortex_ingest_storage_reader_last_consumed_offset gauge + cortex_ingest_storage_reader_last_consumed_offset{partition="1"} 1 + `), "cortex_ingest_storage_reader_last_consumed_offset") + }) + + produceRecord(ctx, t, writeClient, topicName, partitionID, []byte("record-3")) + produceRecord(ctx, t, writeClient, topicName, partitionID, []byte("record-4")) + t.Log("produced 2 records") + + // We expect the reader to have consumed the partition from start. + test.Poll(t, time.Second, []string{"record-1", "record-2", "record-3", "record-4"}, func() interface{} { + consumedRecordsMx.Lock() + defer consumedRecordsMx.Unlock() + return slices.Clone(consumedRecords) + }) + + // We expect the last consumed offset to be tracked in a metric. + test.Poll(t, time.Second, nil, func() interface{} { + return promtest.GatherAndCompare(reg, strings.NewReader(` + # HELP cortex_ingest_storage_reader_last_consumed_offset The last offset successfully consumed by the partition reader. Set to -1 if not offset has been consumed yet. + # TYPE cortex_ingest_storage_reader_last_consumed_offset gauge + cortex_ingest_storage_reader_last_consumed_offset{partition="1"} 3 + `), "cortex_ingest_storage_reader_last_consumed_offset") + }) + }) + t.Run("should consume partition from the timestamp if position=timestamp, and wait until target lag is honored", func(t *testing.T) { t.Parallel() @@ -1733,6 +1849,9 @@ func createReader(t *testing.T, addr string, topicName string, partitionID int32 reader, err := newPartitionReader(cfg.kafka, cfg.partitionID, "test-group", cfg.consumer, cfg.logger, cfg.registry) require.NoError(t, err) + // Reduce the time the fake kafka would wait for new records. Sometimes this blocks startup. + reader.concurrentFetchersMinBytesMaxWaitTime = 500 * time.Millisecond + return reader } diff --git a/pkg/storage/ingest/util.go b/pkg/storage/ingest/util.go index b17a433aeb4..6abde06f73d 100644 --- a/pkg/storage/ingest/util.go +++ b/pkg/storage/ingest/util.go @@ -97,10 +97,7 @@ func commonKafkaClientOptions(cfg KafkaConfig, metrics *kprom.Metrics, logger lo opts = append(opts, kgo.AllowAutoTopicCreation()) } - tracer := kotel.NewTracer( - kotel.TracerPropagator(propagation.NewCompositeTextMapPropagator(onlySampledTraces{propagation.TraceContext{}})), - ) - opts = append(opts, kgo.WithHooks(kotel.NewKotel(kotel.WithTracer(tracer)).Hooks()...)) + opts = append(opts, kgo.WithHooks(kotel.NewKotel(kotel.WithTracer(recordsTracer())).Hooks()...)) if metrics != nil { opts = append(opts, kgo.WithHooks(metrics)) @@ -109,6 +106,10 @@ func commonKafkaClientOptions(cfg KafkaConfig, metrics *kprom.Metrics, logger lo return opts } +func recordsTracer() *kotel.Tracer { + return kotel.NewTracer(kotel.TracerPropagator(propagation.NewCompositeTextMapPropagator(onlySampledTraces{propagation.TraceContext{}}))) +} + // resultPromise is a simple utility to have multiple goroutines waiting for a result from another one. type resultPromise[T any] struct { // done is a channel used to wait the result. Once the channel is closed diff --git a/pkg/storage/ingest/writer_test.go b/pkg/storage/ingest/writer_test.go index 6156a9b4913..ce904e644bf 100644 --- a/pkg/storage/ingest/writer_test.go +++ b/pkg/storage/ingest/writer_test.go @@ -1078,6 +1078,10 @@ func createTestKafkaConfig(clusterAddr, topicName string) KafkaConfig { cfg.Address = clusterAddr cfg.Topic = topicName cfg.WriteTimeout = 2 * time.Second + cfg.StartupFetchConcurrency = 2 + cfg.StartupRecordsPerFetch = 2 + cfg.OngoingFetchConcurrency = 2 + cfg.OngoingRecordsPerFetch = 2 return cfg } diff --git a/vendor/github.com/twmb/franz-go/pkg/kgo/client.go b/vendor/github.com/twmb/franz-go/pkg/kgo/client.go index 775a22e6ee2..197c39336da 100644 --- a/vendor/github.com/twmb/franz-go/pkg/kgo/client.go +++ b/vendor/github.com/twmb/franz-go/pkg/kgo/client.go @@ -78,8 +78,7 @@ type Client struct { producer producer consumer consumer - compressor *compressor - decompressor *decompressor + compressor *compressor coordinatorsMu sync.Mutex coordinators map[coordinatorKey]*coordinatorLoad @@ -482,8 +481,7 @@ func NewClient(opts ...Opt) (*Client, error) { bufPool: newBufPool(), prsPool: newPrsPool(), - compressor: compressor, - decompressor: newDecompressor(), + compressor: compressor, coordinators: make(map[coordinatorKey]*coordinatorLoad), diff --git a/vendor/github.com/twmb/franz-go/pkg/kgo/compression.go b/vendor/github.com/twmb/franz-go/pkg/kgo/compression.go index fe8ad645bbd..81d9d8a7e3b 100644 --- a/vendor/github.com/twmb/franz-go/pkg/kgo/compression.go +++ b/vendor/github.com/twmb/franz-go/pkg/kgo/compression.go @@ -235,6 +235,8 @@ type decompressor struct { unzstdPool sync.Pool } +var defaultDecompressor = newDecompressor() + func newDecompressor() *decompressor { d := &decompressor{ ungzPool: sync.Pool{ diff --git a/vendor/github.com/twmb/franz-go/pkg/kgo/source.go b/vendor/github.com/twmb/franz-go/pkg/kgo/source.go index 0c475d14a94..85586a0a412 100644 --- a/vendor/github.com/twmb/franz-go/pkg/kgo/source.go +++ b/vendor/github.com/twmb/franz-go/pkg/kgo/source.go @@ -92,6 +92,26 @@ func (s *source) removeCursor(rm *cursor) { } } +type ProcessFetchPartitionOptions struct { + // KeepControlRecords sets the parser to keep control messages and return + // them with fetches, overriding the default that discards them. + // + // Generally, control messages are not useful. It is the same as kgo.KeepControlRecords(). + KeepControlRecords bool + + // Offset is the minimum offset for which we'll parse records. Records with lower offsets will not be parsed or returned. + Offset int64 + + // IsolationLevel controls whether or not to return uncomitted records. See kgo.IsolationLevel. + IsolationLevel IsolationLevel + + // Topic is used to populate the Topic field of each Record. + Topic string + + // Topic is used to populate the Partition field of each Record. + Partition int32 +} + // cursor is where we are consuming from for an individual partition. type cursor struct { topic string @@ -1068,7 +1088,7 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe continue } - fp := partOffset.processRespPartition(br, rp, s.cl.decompressor, s.cl.cfg.hooks) + fp := partOffset.processRespPartition(br, rp, s.cl.cfg.hooks) if fp.Err != nil { if moving := kmove.maybeAddFetchPartition(resp, rp, partOffset.from); moving { strip(topic, partition, fp.Err) @@ -1245,7 +1265,41 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe // processRespPartition processes all records in all potentially compressed // batches (or message sets). -func (o *cursorOffsetNext) processRespPartition(br *broker, rp *kmsg.FetchResponseTopicPartition, decompressor *decompressor, hooks hooks) FetchPartition { +func (o *cursorOffsetNext) processRespPartition(br *broker, rp *kmsg.FetchResponseTopicPartition, hooks hooks) (fp FetchPartition) { + if rp.ErrorCode == 0 { + o.hwm = rp.HighWatermark + } + opts := ProcessFetchPartitionOptions{ + KeepControlRecords: br.cl.cfg.keepControl, + Offset: o.offset, + IsolationLevel: IsolationLevel{br.cl.cfg.isolationLevel}, + Topic: o.from.topic, + Partition: o.from.partition, + } + observeMetrics := func(m FetchBatchMetrics) { + hooks.each(func(h Hook) { + if h, ok := h.(HookFetchBatchRead); ok { + h.OnFetchBatchRead(br.meta, o.from.topic, o.from.partition, m) + } + }) + } + fp, o.offset = ProcessRespPartition(opts, rp, observeMetrics) + if len(fp.Records) > 0 { + lastRecord := fp.Records[len(fp.Records)-1] + // We adjust the offset separately because it may be larger than the offset of the last record for compacted partitions. + o.lastConsumedEpoch = lastRecord.LeaderEpoch + o.lastConsumedTime = lastRecord.Timestamp + } + + return fp +} + +// ProcessRespPartition processes all records in all potentially compressed batches (or message sets). +// ProcessRespPartition returns the FetchPartition and the last offset of records processed. observeMetrics can be nil. +// This is useful when issuing manual Fetch requests for records. +// In case of a compacted partition, the last offset may be larger than the offset of the last record. +// If the partition response is truncated and the partiiton was compacted, then the last offset is the offset of the last record. +func ProcessRespPartition(o ProcessFetchPartitionOptions, rp *kmsg.FetchResponseTopicPartition, observeMetrics func(FetchBatchMetrics)) (FetchPartition, int64) { fp := FetchPartition{ Partition: rp.Partition, Err: kerr.ErrorForCode(rp.ErrorCode), @@ -1253,12 +1307,9 @@ func (o *cursorOffsetNext) processRespPartition(br *broker, rp *kmsg.FetchRespon LastStableOffset: rp.LastStableOffset, LogStartOffset: rp.LogStartOffset, } - if rp.ErrorCode == 0 { - o.hwm = rp.HighWatermark - } var aborter aborter - if br.cl.cfg.isolationLevel == 1 { + if o.IsolationLevel.level == 1 { aborter = buildAborter(rp) } @@ -1349,10 +1400,10 @@ func (o *cursorOffsetNext) processRespPartition(br *broker, rp *kmsg.FetchRespon default: fp.Err = fmt.Errorf("unknown magic %d; message offset is %d and length is %d, skipping and setting to next offset", magic, offset, length) - if next := offset + 1; next > o.offset { - o.offset = next + if next := offset + 1; next > o.Offset { + o.Offset = next } - return fp + return fp, o.Offset } if !check() { @@ -1367,30 +1418,27 @@ func (o *cursorOffsetNext) processRespPartition(br *broker, rp *kmsg.FetchRespon case *kmsg.MessageV0: m.CompressedBytes = int(length) // for message sets, we include the message set overhead in length m.CompressionType = uint8(t.Attributes) & 0b0000_0111 - m.NumRecords, m.UncompressedBytes = o.processV0OuterMessage(&fp, t, decompressor) + m.NumRecords, m.UncompressedBytes = processV0OuterMessage(&o, &fp, t, defaultDecompressor) case *kmsg.MessageV1: m.CompressedBytes = int(length) m.CompressionType = uint8(t.Attributes) & 0b0000_0111 - m.NumRecords, m.UncompressedBytes = o.processV1OuterMessage(&fp, t, decompressor) + m.NumRecords, m.UncompressedBytes = processV1OuterMessage(&o, &fp, t, defaultDecompressor) case *kmsg.RecordBatch: m.CompressedBytes = len(t.Records) // for record batches, we only track the record batch length m.CompressionType = uint8(t.Attributes) & 0b0000_0111 - m.NumRecords, m.UncompressedBytes = o.processRecordBatch(&fp, t, aborter, decompressor) + m.NumRecords, m.UncompressedBytes = processRecordBatch(&o, &fp, t, aborter, defaultDecompressor) } if m.UncompressedBytes == 0 { m.UncompressedBytes = m.CompressedBytes } - hooks.each(func(h Hook) { - if h, ok := h.(HookFetchBatchRead); ok { - h.OnFetchBatchRead(br.meta, o.from.topic, o.from.partition, m) - } - }) + if observeMetrics != nil { + observeMetrics(m) + } } - - return fp + return fp, o.Offset } type aborter map[int64][]int64 @@ -1453,7 +1501,8 @@ func readRawRecords(n int, in []byte) []kmsg.Record { return rs } -func (o *cursorOffsetNext) processRecordBatch( +func processRecordBatch( + o *ProcessFetchPartitionOptions, fp *FetchPartition, batch *kmsg.RecordBatch, aborter aborter, @@ -1464,7 +1513,7 @@ func (o *cursorOffsetNext) processRecordBatch( return 0, 0 } lastOffset := batch.FirstOffset + int64(batch.LastOffsetDelta) - if lastOffset < o.offset { + if lastOffset < o.Offset { // If the last offset in this batch is less than what we asked // for, we got a batch that we entirely do not need. We can // avoid all work (although we should not get this batch). @@ -1496,15 +1545,15 @@ func (o *cursorOffsetNext) processRecordBatch( // either advance offsets or will set to nextAskOffset. nextAskOffset := lastOffset + 1 defer func() { - if numRecords == len(krecords) && o.offset < nextAskOffset { - o.offset = nextAskOffset + if numRecords == len(krecords) && o.Offset < nextAskOffset { + o.Offset = nextAskOffset } }() abortBatch := aborter.shouldAbortBatch(batch) for i := range krecords { record := recordToRecord( - o.from.topic, + o.Topic, fp.Partition, batch, &krecords[i], @@ -1528,14 +1577,10 @@ func (o *cursorOffsetNext) processRecordBatch( // this easy, but if not, we decompress and process each inner message as // either v0 or v1. We only expect the inner message to be v1, but technically // a crazy pipeline could have v0 anywhere. -func (o *cursorOffsetNext) processV1OuterMessage( - fp *FetchPartition, - message *kmsg.MessageV1, - decompressor *decompressor, -) (int, int) { +func processV1OuterMessage(o *ProcessFetchPartitionOptions, fp *FetchPartition, message *kmsg.MessageV1, decompressor *decompressor) (int, int) { compression := byte(message.Attributes & 0x0003) if compression == 0 { - o.processV1Message(fp, message) + processV1Message(o, fp, message) return 1, 0 } @@ -1606,13 +1651,13 @@ out: case *kmsg.MessageV0: innerMessage.Offset = firstOffset + int64(i) innerMessage.Attributes |= int8(compression) - if !o.processV0Message(fp, innerMessage) { + if !processV0Message(o, fp, innerMessage) { return i, uncompressedBytes } case *kmsg.MessageV1: innerMessage.Offset = firstOffset + int64(i) innerMessage.Attributes |= int8(compression) - if !o.processV1Message(fp, innerMessage) { + if !processV1Message(o, fp, innerMessage) { return i, uncompressedBytes } } @@ -1620,7 +1665,8 @@ out: return len(innerMessages), uncompressedBytes } -func (o *cursorOffsetNext) processV1Message( +func processV1Message( + o *ProcessFetchPartitionOptions, fp *FetchPartition, message *kmsg.MessageV1, ) bool { @@ -1632,21 +1678,22 @@ func (o *cursorOffsetNext) processV1Message( fp.Err = fmt.Errorf("unknown attributes on message %d", message.Attributes) return false } - record := v1MessageToRecord(o.from.topic, fp.Partition, message) + record := v1MessageToRecord(o.Topic, fp.Partition, message) o.maybeKeepRecord(fp, record, false) return true } // Processes an outer v0 message. We expect inner messages to be entirely v0 as // well, so this only tries v0 always. -func (o *cursorOffsetNext) processV0OuterMessage( +func processV0OuterMessage( + o *ProcessFetchPartitionOptions, fp *FetchPartition, message *kmsg.MessageV0, decompressor *decompressor, ) (int, int) { compression := byte(message.Attributes & 0x0003) if compression == 0 { - o.processV0Message(fp, message) + processV0Message(o, fp, message) return 1, 0 // uncompressed bytes is 0; set to compressed bytes on return } @@ -1689,14 +1736,15 @@ func (o *cursorOffsetNext) processV0OuterMessage( innerMessage := &innerMessages[i] innerMessage.Attributes |= int8(compression) innerMessage.Offset = firstOffset + int64(i) - if !o.processV0Message(fp, innerMessage) { + if !processV0Message(o, fp, innerMessage) { return i, uncompressedBytes } } return len(innerMessages), uncompressedBytes } -func (o *cursorOffsetNext) processV0Message( +func processV0Message( + o *ProcessFetchPartitionOptions, fp *FetchPartition, message *kmsg.MessageV0, ) bool { @@ -1708,7 +1756,7 @@ func (o *cursorOffsetNext) processV0Message( fp.Err = fmt.Errorf("unknown attributes on message %d", message.Attributes) return false } - record := v0MessageToRecord(o.from.topic, fp.Partition, message) + record := v0MessageToRecord(o.Topic, fp.Partition, message) o.maybeKeepRecord(fp, record, false) return true } @@ -1717,8 +1765,8 @@ func (o *cursorOffsetNext) processV0Message( // // If the record is being aborted or the record is a control record and the // client does not want to keep control records, this does not keep the record. -func (o *cursorOffsetNext) maybeKeepRecord(fp *FetchPartition, record *Record, abort bool) { - if record.Offset < o.offset { +func (o *ProcessFetchPartitionOptions) maybeKeepRecord(fp *FetchPartition, record *Record, abort bool) { + if record.Offset < o.Offset { // We asked for offset 5, but that was in the middle of a // batch; we got offsets 0 thru 4 that we need to skip. return @@ -1726,7 +1774,7 @@ func (o *cursorOffsetNext) maybeKeepRecord(fp *FetchPartition, record *Record, a // We only keep control records if specifically requested. if record.Attrs.IsControl() { - abort = !o.from.keepControl + abort = !o.KeepControlRecords } if !abort { fp.Records = append(fp.Records, record) @@ -1734,9 +1782,7 @@ func (o *cursorOffsetNext) maybeKeepRecord(fp *FetchPartition, record *Record, a // The record offset may be much larger than our expected offset if the // topic is compacted. - o.offset = record.Offset + 1 - o.lastConsumedEpoch = record.LeaderEpoch - o.lastConsumedTime = record.Timestamp + o.Offset = record.Offset + 1 } /////////////////////////////// diff --git a/vendor/modules.txt b/vendor/modules.txt index c6015c39aee..f8938cc3ca8 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1143,7 +1143,7 @@ github.com/tklauser/go-sysconf # github.com/tklauser/numcpus v0.6.1 ## explicit; go 1.13 github.com/tklauser/numcpus -# github.com/twmb/franz-go v1.17.1 +# github.com/twmb/franz-go v1.17.1 => github.com/dimitarvdimitrov/franz-go v0.0.0-20240904145554-ceadc28d3bd9 ## explicit; go 1.21 github.com/twmb/franz-go/pkg/kbin github.com/twmb/franz-go/pkg/kerr @@ -1672,4 +1672,5 @@ sigs.k8s.io/yaml/goyaml.v3 # github.com/opentracing-contrib/go-stdlib => github.com/grafana/opentracing-contrib-go-stdlib v0.0.0-20230509071955-f410e79da956 # github.com/opentracing-contrib/go-grpc => github.com/charleskorn/go-grpc v0.0.0-20231024023642-e9298576254f # github.com/prometheus/alertmanager => github.com/grafana/prometheus-alertmanager v0.25.1-0.20240924175849-b8b7c2c74eb6 +# github.com/twmb/franz-go => github.com/dimitarvdimitrov/franz-go v0.0.0-20240904145554-ceadc28d3bd9 # google.golang.org/grpc => google.golang.org/grpc v1.65.0