Skip to content

Commit

Permalink
fix: the issue of replicate message exception when the ttMsgEnable co…
Browse files Browse the repository at this point in the history
…nfig is changed dynamically

Signed-off-by: SimFG <[email protected]>
  • Loading branch information
SimFG committed Dec 6, 2024
1 parent d7a5ad4 commit 0e11cdb
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 5 deletions.
4 changes: 2 additions & 2 deletions internal/rootcoord/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ func CheckTimeTickLagExceeded(ctx context.Context, queryCoord types.QueryCoordCl
delay := now.Sub(minTt)

if delay.Milliseconds() >= maxDelay.Milliseconds() {
queryNodeTTDelay.Insert(qm.Fgm.MinFlowGraphChannel, delay)
queryNodeTTDelay.Insert(qm.Fgm.MinFlowGraphChannel+"/query", delay)

Check warning on line 311 in internal/rootcoord/util.go

View check run for this annotation

Codecov / codecov/patch

internal/rootcoord/util.go#L311

Added line #L311 was not covered by tests
}
}
}
Expand All @@ -331,7 +331,7 @@ func CheckTimeTickLagExceeded(ctx context.Context, queryCoord types.QueryCoordCl
delay := now.Sub(minTt)

if delay.Milliseconds() >= maxDelay.Milliseconds() {
dataNodeTTDelay.Insert(dm.Fgm.MinFlowGraphChannel, delay)
dataNodeTTDelay.Insert(dm.Fgm.MinFlowGraphChannel+"/data", delay)
}
}
}
Expand Down
8 changes: 5 additions & 3 deletions pkg/mq/msgstream/mq_msgstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ type mqMsgStream struct {
consumerLock *sync.Mutex
closed int32
onceChan sync.Once
ttMsgEnable atomic.Value
enableProduce atomic.Value
configEvent config.EventHandler
}
Expand Down Expand Up @@ -104,14 +105,15 @@ func NewMqMsgStream(ctx context.Context,
closed: 0,
}
ctxLog := log.Ctx(ctx)
stream.enableProduce.Store(paramtable.Get().CommonCfg.TTMsgEnabled.GetAsBool())
stream.enableProduce.Store(false)
stream.ttMsgEnable.Store(paramtable.Get().CommonCfg.TTMsgEnabled.GetAsBool())
stream.configEvent = config.NewHandler("enable send tt msg "+fmt.Sprint(streamCounter.Inc()), func(event *config.Event) {
value, err := strconv.ParseBool(event.Value)
if err != nil {
ctxLog.Warn("Failed to parse bool value", zap.String("v", event.Value), zap.Error(err))
return
}
stream.enableProduce.Store(value)
stream.ttMsgEnable.Store(value)
ctxLog.Info("Msg Stream state updated", zap.Bool("can_produce", stream.isEnabledProduce()))
})
paramtable.Get().Watch(paramtable.Get().CommonCfg.TTMsgEnabled.Key, stream.configEvent)
Expand Down Expand Up @@ -270,7 +272,7 @@ func (ms *mqMsgStream) EnableProduce(can bool) {
}

func (ms *mqMsgStream) isEnabledProduce() bool {
return ms.enableProduce.Load().(bool)
return ms.enableProduce.Load().(bool) || ms.ttMsgEnable.Load().(bool)
}

func (ms *mqMsgStream) Produce(ctx context.Context, msgPack *MsgPack) error {
Expand Down
4 changes: 4 additions & 0 deletions pkg/mq/msgstream/mq_msgstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ func TestStream_ConfigEvent(t *testing.T) {
}

func TestStream_PulsarMsgStream_Insert(t *testing.T) {
Params.Save(Params.CommonCfg.TTMsgEnabled.Key, "false")
defer Params.Remove(Params.CommonCfg.TTMsgEnabled.Key)
pulsarAddress := getPulsarAddress()
c1, c2 := funcutil.RandomString(8), funcutil.RandomString(8)
producerChannels := []string{c1, c2}
Expand Down Expand Up @@ -187,6 +189,8 @@ func TestStream_PulsarMsgStream_TimeTick(t *testing.T) {
}

func TestStream_PulsarMsgStream_BroadCast(t *testing.T) {
Params.Save(Params.CommonCfg.TTMsgEnabled.Key, "false")
defer Params.Remove(Params.CommonCfg.TTMsgEnabled.Key)
pulsarAddress := getPulsarAddress()
c1, c2 := funcutil.RandomString(8), funcutil.RandomString(8)
producerChannels := []string{c1, c2}
Expand Down

0 comments on commit 0e11cdb

Please sign in to comment.