Skip to content

Commit

Permalink
add validation to the flags
Browse files Browse the repository at this point in the history
Signed-off-by: gotjosh <[email protected]>
  • Loading branch information
gotjosh committed Sep 26, 2024
1 parent 918256c commit 553dbd5
Show file tree
Hide file tree
Showing 6 changed files with 22 additions and 8 deletions.
2 changes: 1 addition & 1 deletion cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -6672,7 +6672,7 @@
"kind": "field",
"name": "ongoing_fetch_concurrency",
"required": false,
"desc": "The number of concurrent fetch requests that the ingester makes when reading data continuously from Kafka after startup. 0 to disable.",
"desc": "The number of concurrent fetch requests that the ingester makes when reading data continuously from Kafka after startup. Is disabled unless ingest-storage.kafka.startup-fetch-concurrency is greater than 0. It must be greater than 0.",
"fieldValue": null,
"fieldDefaultValue": 0,
"fieldFlag": "ingest-storage.kafka.ongoing-fetch-concurrency",
Expand Down
2 changes: 1 addition & 1 deletion cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -1374,7 +1374,7 @@ Usage of ./cmd/mimir/mimir:
-ingest-storage.kafka.max-consumer-lag-at-startup duration
The guaranteed maximum lag before a consumer is considered to have caught up reading from a partition at startup, becomes ACTIVE in the hash ring and passes the readiness check. Set both -ingest-storage.kafka.target-consumer-lag-at-startup and -ingest-storage.kafka.max-consumer-lag-at-startup to 0 to disable waiting for maximum consumer lag being honored at startup. (default 15s)
-ingest-storage.kafka.ongoing-fetch-concurrency int
The number of concurrent fetch requests that the ingester makes when reading data continuously from Kafka after startup. 0 to disable.
The number of concurrent fetch requests that the ingester makes when reading data continuously from Kafka after startup. Is disabled unless ingest-storage.kafka.startup-fetch-concurrency is greater than 0. It must be greater than 0.
-ingest-storage.kafka.ongoing-records-per-fetch int
The number of records per fetch request that the ingester makes when reading data continuously from Kafka after startup. Depends on ingest-storage.kafka.ongoing-fetch-concurrency being greater than 0. (default 30)
-ingest-storage.kafka.producer-max-buffered-bytes int
Expand Down
2 changes: 1 addition & 1 deletion cmd/mimir/help.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,7 @@ Usage of ./cmd/mimir/mimir:
-ingest-storage.kafka.max-consumer-lag-at-startup duration
The guaranteed maximum lag before a consumer is considered to have caught up reading from a partition at startup, becomes ACTIVE in the hash ring and passes the readiness check. Set both -ingest-storage.kafka.target-consumer-lag-at-startup and -ingest-storage.kafka.max-consumer-lag-at-startup to 0 to disable waiting for maximum consumer lag being honored at startup. (default 15s)
-ingest-storage.kafka.ongoing-fetch-concurrency int
The number of concurrent fetch requests that the ingester makes when reading data continuously from Kafka after startup. 0 to disable.
The number of concurrent fetch requests that the ingester makes when reading data continuously from Kafka after startup. Is disabled unless ingest-storage.kafka.startup-fetch-concurrency is greater than 0. It must be greater than 0.
-ingest-storage.kafka.ongoing-records-per-fetch int
The number of records per fetch request that the ingester makes when reading data continuously from Kafka after startup. Depends on ingest-storage.kafka.ongoing-fetch-concurrency being greater than 0. (default 30)
-ingest-storage.kafka.producer-max-buffered-bytes int
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3872,7 +3872,9 @@ kafka:
[startup_records_per_fetch: <int> | default = 2500]
# The number of concurrent fetch requests that the ingester makes when reading
# data continuously from Kafka after startup. 0 to disable.
# data continuously from Kafka after startup. Is disabled unless
# ingest-storage.kafka.startup-fetch-concurrency is greater than 0. It must be
# greater than 0.
# CLI flag: -ingest-storage.kafka.ongoing-fetch-concurrency
[ongoing_fetch_concurrency: <int> | default = 0]
Expand Down
14 changes: 13 additions & 1 deletion pkg/storage/ingest/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func (cfg *KafkaConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet)

f.IntVar(&cfg.StartupFetchConcurrency, prefix+".startup-fetch-concurrency", 0, "The number of concurrent fetch requests that the ingester makes when reading data from Kafka during startup. 0 to disable.")
f.IntVar(&cfg.StartupRecordsPerFetch, prefix+".startup-records-per-fetch", 2500, "The number of records per fetch request that the ingester makes when reading data from Kafka during startup. Depends on "+prefix+".startup-fetch-concurrency being greater than 0.")
f.IntVar(&cfg.OngoingFetchConcurrency, prefix+".ongoing-fetch-concurrency", 0, "The number of concurrent fetch requests that the ingester makes when reading data continuously from Kafka after startup. 0 to disable.")
f.IntVar(&cfg.OngoingFetchConcurrency, prefix+".ongoing-fetch-concurrency", 0, "The number of concurrent fetch requests that the ingester makes when reading data continuously from Kafka after startup. Is disabled unless "+prefix+".startup-fetch-concurrency is greater than 0. It must be greater than 0.")
f.IntVar(&cfg.OngoingRecordsPerFetch, prefix+".ongoing-records-per-fetch", 30, "The number of records per fetch request that the ingester makes when reading data continuously from Kafka after startup. Depends on "+prefix+".ongoing-fetch-concurrency being greater than 0.")

f.BoolVar(&cfg.UseCompressedBytesAsFetchMaxBytes, prefix+".use-compressed-bytes-as-fetch-max-bytes", true, "When enabled, the fetch request MaxBytes field is computed using the compressed size of previous records. When disabled, MaxBytes is computed using uncompressed bytes. Different Kafka implementations interpret MaxBytes differently.")
Expand Down Expand Up @@ -179,6 +179,18 @@ func (cfg *KafkaConfig) Validate() error {
return ErrInvalidMaxConsumerLagAtStartup
}

if cfg.StartupFetchConcurrency < 0 {
return fmt.Errorf("ingest-storage.kafka.startup-fetch-concurrency must be greater or equal to 0")
}

if cfg.StartupFetchConcurrency > 0 && cfg.OngoingFetchConcurrency <= 0 {
return fmt.Errorf("ingest-storage.kafka.ongoing-fetch-concurrency must be greater than 0 when startup-fetch-concurrency is greater than 0")
}

if cfg.StartupRecordsPerFetch <= 0 || cfg.OngoingRecordsPerFetch <= 0 {
return fmt.Errorf("ingest-storage.kafka.startup-records-per-fetch and ingest-storage.kafka.ongoing-records-per-fetch must be greater than 0")
}

return nil
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/storage/ingest/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,9 +243,7 @@ func (r *PartitionReader) stopDependencies() error {
}

func (r *PartitionReader) run(ctx context.Context) error {
if r.kafkaCfg.OngoingFetchConcurrency > 0 {
r.fetcher.Update(ctx, r.kafkaCfg.OngoingFetchConcurrency, r.kafkaCfg.OngoingRecordsPerFetch)
}
r.fetcher.Update(ctx, r.kafkaCfg.OngoingFetchConcurrency, r.kafkaCfg.OngoingRecordsPerFetch)

for ctx.Err() == nil {
err := r.processNextFetches(ctx, r.metrics.receiveDelayWhenRunning)
Expand Down Expand Up @@ -1279,6 +1277,8 @@ func (r *concurrentFetchers) start(ctx context.Context, startOffset int64, concu
refillBufferedResult = nil
}
select {
case <-r.done:
return
case <-ctx.Done():
return

Expand Down

0 comments on commit 553dbd5

Please sign in to comment.