Skip to content

Commit

Permalink
Set kafka read timeout to 10s and make it configurable (#27238)
Browse files Browse the repository at this point in the history
Signed-off-by: bigsheeper <[email protected]>
  • Loading branch information
bigsheeper authored Sep 20, 2023
1 parent 93e2eb7 commit fe01d54
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 1 deletion.
1 change: 1 addition & 0 deletions configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ pulsar:
# saslPassword:
# saslMechanisms: PLAIN
# securityProtocol: SASL_SSL
# readTimeout: 10 # read message timeout in seconds

rocksmq:
# The path where the message is stored in rocksmq
Expand Down
4 changes: 3 additions & 1 deletion pkg/mq/msgstream/mqwrapper/kafka/kafka_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)

type Consumer struct {
Expand Down Expand Up @@ -140,7 +141,8 @@ func (kc *Consumer) Chan() <-chan mqwrapper.Message {
}
return
default:
e, err := kc.c.ReadMessage(30 * time.Second)
readTimeout := paramtable.Get().KafkaCfg.ReadTimeout.GetAsDuration(time.Second)
e, err := kc.c.ReadMessage(readTimeout)
if err != nil {
// if we failed to read message in 30 Seconds, print out a warn message since there should always be a tt
log.Warn("consume msg failed", zap.Any("topic", kc.topic), zap.String("groupID", kc.groupID), zap.Error(err))
Expand Down
9 changes: 9 additions & 0 deletions pkg/util/paramtable/service_param.go
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,7 @@ type KafkaConfig struct {
SecurityProtocol ParamItem `refreshable:"false"`
ConsumerExtraConfig ParamGroup `refreshable:"false"`
ProducerExtraConfig ParamGroup `refreshable:"false"`
ReadTimeout ParamItem `refreshable:"true"`
}

func (k *KafkaConfig) Init(base *BaseTable) {
Expand Down Expand Up @@ -692,6 +693,14 @@ func (k *KafkaConfig) Init(base *BaseTable) {
Version: "2.2.0",
}
k.ProducerExtraConfig.Init(base.mgr)

k.ReadTimeout = ParamItem{
Key: "kafka.readTimeout",
DefaultValue: "10",
Version: "2.3.1",
Export: true,
}
k.ReadTimeout.Init(base.mgr)
}

// /////////////////////////////////////////////////////////////////////////////
Expand Down
2 changes: 2 additions & 0 deletions pkg/util/paramtable/service_param_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package paramtable

import (
"testing"
"time"

"github.com/stretchr/testify/assert"

Expand Down Expand Up @@ -167,6 +168,7 @@ func TestServiceParam(t *testing.T) {
assert.Empty(t, kc.Address.GetValue())
assert.Equal(t, kc.SaslMechanisms.GetValue(), "PLAIN")
assert.Equal(t, kc.SecurityProtocol.GetValue(), "SASL_SSL")
assert.Equal(t, kc.ReadTimeout.GetAsDuration(time.Second), 10*time.Second)
}
})

Expand Down

0 comments on commit fe01d54

Please sign in to comment.