From fe01d54eca69109aff34e9282759eb0a60b59ed5 Mon Sep 17 00:00:00 2001 From: "yihao.dai" Date: Wed, 20 Sep 2023 22:01:27 +0800 Subject: [PATCH] Set kafka read timeout to 10s and make it configurable (#27238) Signed-off-by: bigsheeper --- configs/milvus.yaml | 1 + pkg/mq/msgstream/mqwrapper/kafka/kafka_consumer.go | 4 +++- pkg/util/paramtable/service_param.go | 9 +++++++++ pkg/util/paramtable/service_param_test.go | 2 ++ 4 files changed, 15 insertions(+), 1 deletion(-) diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 87b7f62bbc8fd..eec18547040fe 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -123,6 +123,7 @@ pulsar: # saslPassword: # saslMechanisms: PLAIN # securityProtocol: SASL_SSL +# readTimeout: 10 # read message timeout in seconds rocksmq: # The path where the message is stored in rocksmq diff --git a/pkg/mq/msgstream/mqwrapper/kafka/kafka_consumer.go b/pkg/mq/msgstream/mqwrapper/kafka/kafka_consumer.go index edfadea7374aa..18ed40f96b93c 100644 --- a/pkg/mq/msgstream/mqwrapper/kafka/kafka_consumer.go +++ b/pkg/mq/msgstream/mqwrapper/kafka/kafka_consumer.go @@ -11,6 +11,7 @@ import ( "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper" "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/paramtable" ) type Consumer struct { @@ -140,7 +141,8 @@ func (kc *Consumer) Chan() <-chan mqwrapper.Message { } return default: - e, err := kc.c.ReadMessage(30 * time.Second) + readTimeout := paramtable.Get().KafkaCfg.ReadTimeout.GetAsDuration(time.Second) + e, err := kc.c.ReadMessage(readTimeout) if err != nil { // if we failed to read message in 30 Seconds, print out a warn message since there should always be a tt log.Warn("consume msg failed", zap.Any("topic", kc.topic), zap.String("groupID", kc.groupID), zap.Error(err)) diff --git a/pkg/util/paramtable/service_param.go b/pkg/util/paramtable/service_param.go index 734991d370e0b..5953a713893b7 100644 --- a/pkg/util/paramtable/service_param.go +++ b/pkg/util/paramtable/service_param.go @@ -637,6 +637,7 @@ type KafkaConfig struct { SecurityProtocol ParamItem `refreshable:"false"` ConsumerExtraConfig ParamGroup `refreshable:"false"` ProducerExtraConfig ParamGroup `refreshable:"false"` + ReadTimeout ParamItem `refreshable:"true"` } func (k *KafkaConfig) Init(base *BaseTable) { @@ -692,6 +693,14 @@ func (k *KafkaConfig) Init(base *BaseTable) { Version: "2.2.0", } k.ProducerExtraConfig.Init(base.mgr) + + k.ReadTimeout = ParamItem{ + Key: "kafka.readTimeout", + DefaultValue: "10", + Version: "2.3.1", + Export: true, + } + k.ReadTimeout.Init(base.mgr) } // ///////////////////////////////////////////////////////////////////////////// diff --git a/pkg/util/paramtable/service_param_test.go b/pkg/util/paramtable/service_param_test.go index 6036726f0ca97..847301ce93867 100644 --- a/pkg/util/paramtable/service_param_test.go +++ b/pkg/util/paramtable/service_param_test.go @@ -13,6 +13,7 @@ package paramtable import ( "testing" + "time" "github.com/stretchr/testify/assert" @@ -167,6 +168,7 @@ func TestServiceParam(t *testing.T) { assert.Empty(t, kc.Address.GetValue()) assert.Equal(t, kc.SaslMechanisms.GetValue(), "PLAIN") assert.Equal(t, kc.SecurityProtocol.GetValue(), "SASL_SSL") + assert.Equal(t, kc.ReadTimeout.GetAsDuration(time.Second), 10*time.Second) } })