Skip to content

Commit

Permalink
enhance: refine lock granularity for produers in msgstream (milvus-io…
Browse files Browse the repository at this point in the history
…#38262)

issue: milvus-io#38261

Signed-off-by: jaime <[email protected]>
  • Loading branch information
jaime0815 authored Dec 6, 2024
1 parent aa4eb2f commit a1e14d6
Showing 1 changed file with 15 additions and 8 deletions.
23 changes: 15 additions & 8 deletions pkg/mq/msgstream/mq_msgstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/samber/lo"
uatomic "go.uber.org/atomic"
"go.uber.org/zap"
"golang.org/x/exp/maps"
"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/proto"

Expand Down Expand Up @@ -310,7 +311,14 @@ func (ms *mqMsgStream) Produce(ctx context.Context, msgPack *MsgPack) error {
k := k
v := v
eg.Go(func() error {
ms.producerLock.RLock()
channel := ms.producerChannels[k]
producer, ok := ms.producers[channel]
ms.producerLock.RUnlock()
if !ok {
return errors.New("producer not found for channel: " + channel)
}

for i := 0; i < len(v.Msgs); i++ {
spanCtx, sp := MsgSpanFromCtx(v.Msgs[i].TraceCtx(), v.Msgs[i])
defer sp.End()
Expand All @@ -330,13 +338,10 @@ func (ms *mqMsgStream) Produce(ctx context.Context, msgPack *MsgPack) error {
}}
InjectCtx(spanCtx, msg.Properties)

ms.producerLock.RLock()
if _, err := ms.producers[channel].Send(spanCtx, msg); err != nil {
ms.producerLock.RUnlock()
if _, err := producer.Send(spanCtx, msg); err != nil {
sp.RecordError(err)
return err
}
ms.producerLock.RUnlock()
}
return nil
})
Expand Down Expand Up @@ -375,18 +380,20 @@ func (ms *mqMsgStream) Broadcast(ctx context.Context, msgPack *MsgPack) (map[str
msg := &common.ProducerMessage{Payload: m, Properties: map[string]string{}}
InjectCtx(spanCtx, msg.Properties)

ms.producerLock.Lock()
for channel, producer := range ms.producers {
ms.producerLock.RLock()
// since the element never be removed in ms.producers, so it's safe to clone and iterate producers
producers := maps.Clone(ms.producers)
ms.producerLock.RUnlock()

for channel, producer := range producers {
id, err := producer.Send(spanCtx, msg)
if err != nil {
ms.producerLock.Unlock()
sp.RecordError(err)
sp.End()
return ids, err
}
ids[channel] = append(ids[channel], id)
}
ms.producerLock.Unlock()
sp.End()
}
return ids, nil
Expand Down

0 comments on commit a1e14d6

Please sign in to comment.