diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 47798bb6706b0..5f11747b73129 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -126,6 +126,8 @@ rocksmq: # 0 means not compress, 7 will use zstd # len of types means num of rocksdb level. compressionTypes: [0, 0, 7, 7, 7] + maxWriteParallelism: 30 + maxTtMsgBufferSize: 1000 # natsmq configuration. # more detail: https://docs.nats.io/running-a-nats-service/configuration diff --git a/internal/mq/mqimpl/rocksmq/server/rocksmq_impl.go b/internal/mq/mqimpl/rocksmq/server/rocksmq_impl.go index 312aa534b217e..3120013e2b0fc 100644 --- a/internal/mq/mqimpl/rocksmq/server/rocksmq_impl.go +++ b/internal/mq/mqimpl/rocksmq/server/rocksmq_impl.go @@ -113,7 +113,7 @@ func checkRetention() bool { return params.RocksmqCfg.RetentionSizeInMB.GetAsInt64() != -1 || params.RocksmqCfg.RetentionTimeInMinutes.GetAsInt64() != -1 } -var topicMu = sync.Map{} +var topicMu = typeutil.NewConcurrentMap[string, *sync.Mutex]() type rocksmq struct { store *gorocksdb.DB @@ -122,12 +122,18 @@ type rocksmq struct { idAllocator allocator.Interface storeMu *sync.Mutex topicLastID sync.Map - consumers sync.Map - consumersID sync.Map + consumers *typeutil.ConcurrentMap[string, []*Consumer] + consumersID *typeutil.ConcurrentMap[string, int64] retentionInfo *retentionInfo - readers sync.Map state RmqState + + syncers *typeutil.ConcurrentMap[string, *rocksmqSyncer] + msgSize *typeutil.ConcurrentMap[string, int64] + + parallelism chan struct{} + maxTtBufferSize int + ttbuffers *typeutil.ConcurrentMap[string, *rmqTtBuffer] } func parseCompressionType(params *paramtable.ComponentParam) ([]gorocksdb.CompressionType, error) { @@ -247,14 +253,19 @@ func NewRocksMQ(name string, idAllocator allocator.Interface) (*rocksmq, error) } rmq := &rocksmq{ - store: db, - cfh: cfHandles, - kv: kv, - idAllocator: mqIDAllocator, - storeMu: &sync.Mutex{}, - consumers: sync.Map{}, - readers: sync.Map{}, - topicLastID: sync.Map{}, + store: db, + cfh: cfHandles, + kv: kv, + idAllocator: mqIDAllocator, + storeMu: &sync.Mutex{}, + topicLastID: sync.Map{}, + consumers: typeutil.NewConcurrentMap[string, []*Consumer](), + consumersID: typeutil.NewConcurrentMap[string, int64](), + syncers: typeutil.NewConcurrentMap[string, *rocksmqSyncer](), + msgSize: typeutil.NewConcurrentMap[string, int64](), + parallelism: make(chan struct{}, params.RocksmqCfg.MaxParallelsim.GetAsInt()), + maxTtBufferSize: params.RocksmqCfg.MaxTtBufferSize.GetAsInt(), + ttbuffers: typeutil.NewConcurrentMap[string, *rmqTtBuffer](), } ri, err := initRetentionInfo(kv, db) @@ -304,10 +315,10 @@ func (rmq *rocksmq) isClosed() bool { func (rmq *rocksmq) Close() { atomic.StoreInt64(&rmq.state, RmqStateStopped) rmq.stopRetention() - rmq.consumers.Range(func(k, v interface{}) bool { + rmq.consumers.Range(func(k string, v []*Consumer) bool { // TODO what happened if the server crashed? who handled the destroy consumer group? should we just handled it when rocksmq created? // or we should not even make consumer info persistent? - for _, consumer := range v.([]*Consumer) { + for _, consumer := range v { err := rmq.destroyConsumerGroupInternal(consumer.Topic, consumer.GroupName) if err != nil { log.Warn("Failed to destroy consumer group in rocksmq!", zap.Any("topic", consumer.Topic), zap.Any("groupName", consumer.GroupName), zap.Any("error", err)) @@ -325,10 +336,7 @@ func (rmq *rocksmq) Close() { // print rmq consumer Info func (rmq *rocksmq) Info() bool { rtn := true - rmq.consumers.Range(func(key, vals interface{}) bool { - topic, _ := key.(string) - consumerList, _ := vals.([]*Consumer) - + rmq.consumers.Range(func(topic string, consumerList []*Consumer) bool { minConsumerPosition := UniqueID(-1) minConsumerGroupName := "" for _, consumer := range consumerList { @@ -398,12 +406,16 @@ func (rmq *rocksmq) CreateTopic(topicName string) error { return err } if val != "" { + if err = rmq.createSyncerIfNotExist(topicName); err != nil { + return err + } + rmq.ttbuffers.GetOrInsert(topicName, newRmqTtBuffer()) log.Warn("rocksmq topic already exists ", zap.String("topic", topicName)) return nil } - if _, ok := topicMu.Load(topicName); !ok { - topicMu.Store(topicName, new(sync.Mutex)) + if _, ok := topicMu.Get(topicName); !ok { + topicMu.Insert(topicName, new(sync.Mutex)) } // msgSizeKey -> msgSize @@ -421,6 +433,11 @@ func (rmq *rocksmq) CreateTopic(topicName string) error { return retry.Unrecoverable(err) } + if err = rmq.createSyncerIfNotExist(topicName); err != nil { + return err + } + rmq.ttbuffers.GetOrInsert(topicName, newRmqTtBuffer()) + rmq.retentionInfo.mutex.Lock() defer rmq.retentionInfo.mutex.Unlock() rmq.retentionInfo.topicRetetionTime.Insert(topicName, time.Now().Unix()) @@ -431,18 +448,16 @@ func (rmq *rocksmq) CreateTopic(topicName string) error { // DestroyTopic removes messages for topic in rocksmq func (rmq *rocksmq) DestroyTopic(topicName string) error { start := time.Now() - ll, ok := topicMu.Load(topicName) + lock, ok := topicMu.Get(topicName) if !ok { return fmt.Errorf("topic name = %s not exist", topicName) } - lock, ok := ll.(*sync.Mutex) - if !ok { - return fmt.Errorf("get mutex failed, topic name = %s", topicName) - } lock.Lock() defer lock.Unlock() - rmq.consumers.Delete(topicName) + rmq.consumers.GetAndRemove(topicName) + rmq.ttbuffers.GetAndRemove(topicName) + rmq.syncers.GetAndRemove(topicName) // clean the topic data it self fixTopicName := topicName + "/" @@ -457,6 +472,7 @@ func (rmq *rocksmq) DestroyTopic(topicName string) error { if err != nil { return err } + rmq.msgSize.GetAndRemove(topicName) // clean page ts info pageMsgTsKey := constructKey(PageTsTitle, topicName) @@ -485,7 +501,7 @@ func (rmq *rocksmq) DestroyTopic(topicName string) error { } // clean up retention info - topicMu.Delete(topicName) + topicMu.GetAndRemove(topicName) rmq.retentionInfo.topicRetetionTime.GetAndRemove(topicName) log.Debug("Rocksmq destroy topic successfully ", zap.String("topic", topicName), zap.Int64("elapsed", time.Since(start).Milliseconds())) @@ -495,10 +511,10 @@ func (rmq *rocksmq) DestroyTopic(topicName string) error { // ExistConsumerGroup check if a consumer exists and return the existed consumer func (rmq *rocksmq) ExistConsumerGroup(topicName, groupName string) (bool, *Consumer, error) { key := constructCurrentID(topicName, groupName) - _, ok := rmq.consumersID.Load(key) + _, ok := rmq.consumersID.Get(key) if ok { - if vals, ok := rmq.consumers.Load(topicName); ok { - for _, v := range vals.([]*Consumer) { + if vals, ok := rmq.consumers.Get(topicName); ok { + for _, v := range vals { if v.GroupName == groupName { return true, v, nil } @@ -515,12 +531,12 @@ func (rmq *rocksmq) CreateConsumerGroup(topicName, groupName string) error { } start := time.Now() key := constructCurrentID(topicName, groupName) - _, ok := rmq.consumersID.Load(key) + _, ok := rmq.consumersID.Get(key) if ok { return fmt.Errorf("RMQ CreateConsumerGroup key already exists, key = %s", key) } - rmq.consumersID.Store(key, DefaultMessageID) - log.Debug("Rocksmq create consumer group successfully ", zap.String("topic", topicName), + rmq.consumersID.Insert(key, DefaultMessageID) + log.Info("Rocksmq create consumer group successfully ", zap.String("topic", topicName), zap.String("group", groupName), zap.Int64("elapsed", time.Since(start).Milliseconds())) return nil @@ -532,19 +548,18 @@ func (rmq *rocksmq) RegisterConsumer(consumer *Consumer) error { return errors.New(RmqNotServingErrMsg) } start := time.Now() - if vals, ok := rmq.consumers.Load(consumer.Topic); ok { - for _, v := range vals.([]*Consumer) { + if consumers, ok := rmq.consumers.Get(consumer.Topic); ok { + for _, v := range consumers { if v.GroupName == consumer.GroupName { return nil } } - consumers := vals.([]*Consumer) consumers = append(consumers, consumer) - rmq.consumers.Store(consumer.Topic, consumers) + rmq.consumers.Insert(consumer.Topic, consumers) } else { consumers := make([]*Consumer, 1) consumers[0] = consumer - rmq.consumers.Store(consumer.Topic, consumers) + rmq.consumers.Insert(consumer.Topic, consumers) } log.Debug("Rocksmq register consumer successfully ", zap.String("topic", consumer.Topic), zap.Int64("elapsed", time.Since(start).Milliseconds())) return nil @@ -573,25 +588,20 @@ func (rmq *rocksmq) DestroyConsumerGroup(topicName, groupName string) error { // DestroyConsumerGroup removes a consumer group from rocksdb_kv func (rmq *rocksmq) destroyConsumerGroupInternal(topicName, groupName string) error { start := time.Now() - ll, ok := topicMu.Load(topicName) + lock, ok := topicMu.Get(topicName) if !ok { return fmt.Errorf("topic name = %s not exist", topicName) } - lock, ok := ll.(*sync.Mutex) - if !ok { - return fmt.Errorf("get mutex failed, topic name = %s", topicName) - } lock.Lock() defer lock.Unlock() key := constructCurrentID(topicName, groupName) - rmq.consumersID.Delete(key) - if vals, ok := rmq.consumers.Load(topicName); ok { - consumers := vals.([]*Consumer) + rmq.consumersID.GetAndRemove(key) + if consumers, ok := rmq.consumers.Get(topicName); ok { for index, v := range consumers { if v.GroupName == groupName { close(v.MsgMutex) consumers = append(consumers[:index], consumers[index+1:]...) - rmq.consumers.Store(topicName, consumers) + rmq.consumers.Insert(topicName, consumers) break } } @@ -602,41 +612,57 @@ func (rmq *rocksmq) destroyConsumerGroupInternal(topicName, groupName string) er return nil } -// Produce produces messages for topic and updates page infos for retention -func (rmq *rocksmq) Produce(topicName string, messages []ProducerMessage) ([]UniqueID, error) { - if rmq.isClosed() { - return nil, errors.New(RmqNotServingErrMsg) - } - start := time.Now() - ll, ok := topicMu.Load(topicName) - if !ok { - return []UniqueID{}, fmt.Errorf("topic name = %s not exist", topicName) - } - lock, ok := ll.(*sync.Mutex) - if !ok { - return []UniqueID{}, fmt.Errorf("get mutex failed, topic name = %s", topicName) +func isAllTtMsgs(messages []ProducerMessage) bool { + for _, msg := range messages { + if _, ok := msg.Properties[mqwrapper.TtProperty]; !ok { + return false + } } - lock.Lock() - defer lock.Unlock() + return true +} - getLockTime := time.Since(start).Milliseconds() +func (rmq *rocksmq) createSyncerIfNotExist(topicName string) error { + if _, ok := rmq.syncers.Get(topicName); !ok { + msgID, err := rmq.GetLatestMsg(topicName) + if err != nil { + return err + } + rmq.syncers.Insert(topicName, newRocksMqSyncer(topicName, msgID)) + } + return nil +} +func (rmq *rocksmq) prepareData(topicName string, idStart, idEnd int64, ttMsgs []bufferMsgs, messages []ProducerMessage) (*gorocksdb.WriteBatch, []UniqueID, map[UniqueID]int64, error) { msgLen := len(messages) - idStart, idEnd, err := rmq.idAllocator.Alloc(uint32(msgLen)) - - if err != nil { - return []UniqueID{}, err - } - allocTime := time.Since(start).Milliseconds() if UniqueID(msgLen) != idEnd-idStart { - return []UniqueID{}, errors.New("Obtained id length is not equal that of message") + return nil, nil, nil, errors.New("Obtained id length is not equal that of message") } // Insert data to store system batch := gorocksdb.NewWriteBatch() - defer batch.Destroy() msgSizes := make(map[UniqueID]int64) - msgIDs := make([]UniqueID, msgLen) + msgIDs := make([]UniqueID, 0) + + // pack tt buffer messages with input messages and write together + for _, ttmsg := range ttMsgs { + for i := 0; i < len(ttmsg.msgs); i++ { + msgID := ttmsg.startID + UniqueID(i) + key := path.Join(topicName, strconv.FormatInt(msgID, 10)) + batch.PutCF(rmq.cfh[0], []byte(key), ttmsg.msgs[i].Payload) + if messages[i].Properties != nil { + properties, err := json.Marshal(messages[i].Properties) + if err != nil { + log.Warn("properties marshal failed", zap.Int64("msgID", msgID), zap.String("topicName", topicName), + zap.Error(err)) + return batch, msgIDs, msgSizes, err + } + batch.PutCF(rmq.cfh[1], []byte(key), properties) + } + msgIDs = append(msgIDs, msgID) + msgSizes[msgID] = int64(len(ttmsg.msgs[i].Payload)) + } + } + for i := 0; i < msgLen && idStart+UniqueID(i) < idEnd; i++ { msgID := idStart + UniqueID(i) key := path.Join(topicName, strconv.FormatInt(msgID, 10)) @@ -647,46 +673,119 @@ func (rmq *rocksmq) Produce(topicName string, messages []ProducerMessage) ([]Uni if err != nil { log.Warn("properties marshal failed", zap.Int64("msgID", msgID), zap.String("topicName", topicName), zap.Error(err)) - return nil, err + return batch, msgIDs, msgSizes, err } batch.PutCF(rmq.cfh[1], []byte(key), properties) } - msgIDs[i] = msgID + msgIDs = append(msgIDs, msgID) msgSizes[msgID] = int64(len(messages[i].Payload)) } + return batch, msgIDs, msgSizes, nil +} - opts := gorocksdb.NewDefaultWriteOptions() - defer opts.Destroy() - err = rmq.store.Write(opts, batch) +// Produce produces messages for topic and updates page infos for retention +func (rmq *rocksmq) Produce(topicName string, messages []ProducerMessage) ([]UniqueID, error) { + if rmq.isClosed() { + return nil, errors.New(RmqNotServingErrMsg) + } + + isAllTt := isAllTtMsgs(messages) + + start := time.Now() + lock, ok := topicMu.Get(topicName) + if !ok { + return []UniqueID{}, fmt.Errorf("topic name = %s not exist", topicName) + } + + ttbuffer, ok := rmq.ttbuffers.Get(topicName) + if !ok { + return nil, fmt.Errorf("can not find ttbuffer %s", topicName) + } + + syncer, ok := rmq.syncers.Get(topicName) + if !ok { + return nil, fmt.Errorf("can not find syncer %s", topicName) + } + + lock.Lock() + getLockTime := time.Since(start).Milliseconds() + + var err error + var msgIDs []UniqueID + var msgSizes map[UniqueID]int64 + + msgLen := len(messages) + idStart, idEnd, err := rmq.idAllocator.Alloc(uint32(msgLen)) if err != nil { + syncer.remove(idEnd) + lock.Unlock() return []UniqueID{}, err } - writeTime := time.Since(start).Milliseconds() - if vals, ok := rmq.consumers.Load(topicName); ok { - for _, v := range vals.([]*Consumer) { - select { - case v.MsgMutex <- struct{}{}: - continue - default: - continue - } + allocTime := time.Since(start).Milliseconds() + + state := syncer.push(idEnd) + + if isAllTt && ttbuffer.size() < rmq.maxTtBufferSize { + ttbuffer.append(bufferMsgs{ + startID: idStart, + endID: idEnd, + msgs: messages, + }) + lock.Unlock() + } else { + // messages contain other type message, merge messages with tt buffer msgs + ttMsgs := ttbuffer.getBufferMsgs() + var batch *gorocksdb.WriteBatch + batch, msgIDs, msgSizes, err = rmq.prepareData(topicName, idStart, idEnd, ttMsgs, messages) + if batch != nil { + defer func() { + batch.Destroy() + }() + } + if err != nil { + syncer.remove(idEnd) + lock.Unlock() + return []UniqueID{}, err } + ttbuffer.clear() + lock.Unlock() + + rmq.parallelism <- struct{}{} + opts := gorocksdb.NewDefaultWriteOptions() + defer opts.Destroy() + err = rmq.store.Write(opts, batch) + if err != nil { + <-rmq.parallelism + syncer.remove(idEnd) + return []UniqueID{}, err + } + <-rmq.parallelism } - // Update message page info - err = rmq.updatePageInfo(topicName, msgIDs, msgSizes) + err = syncer.finishWrite(idEnd) if err != nil { - return []UniqueID{}, err + return nil, err + } + writeTime := time.Since(start).Milliseconds() + + <-state.ready + waitTime := time.Since(start).Milliseconds() + + if err = rmq.updatePageInfo(topicName, msgIDs, msgSizes); err != nil { + log.Warn("failed to update page info", zap.String("topic", topicName), zap.Error(err)) } + rmq.NotifyTopic(topicName) + + syncer.pop() - // TODO add this to monitor metrics getProduceTime := time.Since(start).Milliseconds() if getProduceTime > 200 { log.Warn("rocksmq produce too slowly", zap.String("topic", topicName), zap.Int64("get lock elapse", getLockTime), zap.Int64("alloc elapse", allocTime-getLockTime), zap.Int64("write elapse", writeTime-allocTime), - zap.Int64("updatePage elapse", getProduceTime-writeTime), + zap.Int64("wait time", waitTime-writeTime), + zap.Int64("updatePage elapse", getProduceTime-waitTime), zap.Int64("produce total elapse", getProduceTime), ) } @@ -697,15 +796,10 @@ func (rmq *rocksmq) Produce(topicName string, messages []ProducerMessage) ([]Uni func (rmq *rocksmq) updatePageInfo(topicName string, msgIDs []UniqueID, msgSizes map[UniqueID]int64) error { params := paramtable.Get() + var err error msgSizeKey := MessageSizeTitle + topicName - msgSizeVal, err := rmq.kv.Load(msgSizeKey) - if err != nil { - return err - } - curMsgSize, err := strconv.ParseInt(msgSizeVal, 10, 64) - if err != nil { - return err - } + curMsgSize, _ := rmq.msgSize.GetOrInsert(msgSizeKey, int64(0)) + fixedPageSizeKey := constructKey(PageMsgSizeTitle, topicName) fixedPageTsKey := constructKey(PageTsTitle, topicName) nowTs := strconv.FormatInt(time.Now().Unix(), 10) @@ -726,25 +820,53 @@ func (rmq *rocksmq) updatePageInfo(topicName string, msgIDs []UniqueID, msgSizes curMsgSize += msgSize } } - mutateBuffer[msgSizeKey] = strconv.FormatInt(curMsgSize, 10) - err = rmq.kv.MultiSave(mutateBuffer) + // mutateBuffer[msgSizeKey] = strconv.FormatInt(curMsgSize, 10) + rmq.msgSize.Insert(msgSizeKey, curMsgSize) + if len(mutateBuffer) != 0 { + err = rmq.kv.MultiSave(mutateBuffer) + } return err } func (rmq *rocksmq) getCurrentID(topicName, groupName string) (int64, bool) { - currentID, ok := rmq.consumersID.Load(constructCurrentID(topicName, groupName)) + currentID, ok := rmq.consumersID.Get(constructCurrentID(topicName, groupName)) if !ok { return 0, false } - return currentID.(int64), true + return currentID, true } func (rmq *rocksmq) getLastID(topicName string) (int64, bool) { - currentID, ok := rmq.consumersID.Load(topicName) + currentID, ok := rmq.consumersID.Get(topicName) if !ok { return 0, false } - return currentID.(int64), true + return currentID, true +} + +// [startID, endID] +func readBuffer(ttmsgs []bufferMsgs, startID, endID int64, count int) []ConsumerMessage { + ret := make([]ConsumerMessage, 0) + for _, msgs := range ttmsgs { + if msgs.endID <= startID || msgs.startID > endID { + continue + } + + for i := 0; i < len(msgs.msgs) && len(ret) < count; i++ { + msgID := msgs.startID + UniqueID(i) + if msgID >= startID && msgID < endID { + ret = append(ret, ConsumerMessage{ + MsgID: msgID, + Payload: msgs.msgs[i].Payload, + }) + } + } + + if len(ret) == count { + return ret + } + } + return ret } // Consume steps: @@ -756,101 +878,119 @@ func (rmq *rocksmq) Consume(topicName string, groupName string, n int) ([]Consum return nil, errors.New(RmqNotServingErrMsg) } start := time.Now() - ll, ok := topicMu.Load(topicName) - if !ok { - return nil, fmt.Errorf("topic name = %s not exist", topicName) - } - - lock, ok := ll.(*sync.Mutex) - if !ok { - return nil, fmt.Errorf("get mutex failed, topic name = %s", topicName) - } - lock.Lock() - defer lock.Unlock() - currentID, ok := rmq.getCurrentID(topicName, groupName) if !ok { return nil, fmt.Errorf("currentID of topicName=%s, groupName=%s not exist", topicName, groupName) } - // return if don't have new message - lastID, ok := rmq.getLastID(topicName) - if ok && currentID > lastID { - return []ConsumerMessage{}, nil + syncer, ok := rmq.syncers.Get(topicName) + if !ok { + return nil, fmt.Errorf("can not find syncer of topic %s", topicName) } + maxReadableID := syncer.getMaxReadableID() - getLockTime := time.Since(start).Milliseconds() - readOpts := gorocksdb.NewDefaultReadOptions() - defer readOpts.Destroy() - prefix := topicName + "/" - iter := rocksdbkv.NewRocksIteratorCFWithUpperBound(rmq.store, rmq.cfh[0], typeutil.AddOne(prefix), readOpts) - iterProperty := rocksdbkv.NewRocksIteratorCFWithUpperBound(rmq.store, rmq.cfh[1], typeutil.AddOne(prefix), readOpts) - defer iter.Close() - defer iterProperty.Close() + consumerMessage := make([]ConsumerMessage, 0, n) + readRocksDB := true - var dataKey string - if currentID == DefaultMessageID { - dataKey = prefix - } else { - dataKey = path.Join(topicName, strconv.FormatInt(currentID, 10)) + ttbufer, ok := rmq.ttbuffers.Get(topicName) + if ok { + ttmsgs := ttbufer.getBufferMsgs() + if len(ttmsgs) != 0 && currentID >= ttmsgs[0].startID { + // if currentID >= tt buffer's minimum msgID, we do not need to read messages from rocksdb because these messages are not written now. + // we should read from buffer directly + readRocksDB = false + consumerMessage = append(consumerMessage, readBuffer(ttmsgs, currentID, maxReadableID, n)...) + } } - iter.Seek([]byte(dataKey)) - iterProperty.Seek([]byte(dataKey)) - - consumerMessage := make([]ConsumerMessage, 0, n) - offset := 0 - for ; iter.Valid() && offset < n; iter.Next() { - key := iter.Key() - val := iter.Value() - strKey := string(key.Data()) - key.Free() - properties := make(map[string]string) - var propertiesValue []byte + if readRocksDB { + upperbound := path.Join(topicName, strconv.FormatInt(maxReadableID+1, 10)) - msgID, err := strconv.ParseInt(strKey[len(topicName)+1:], 10, 64) - if err != nil { - val.Free() - return nil, err - } - offset++ + readOpts := gorocksdb.NewDefaultReadOptions() + defer readOpts.Destroy() + iter := rocksdbkv.NewRocksIteratorCFWithUpperBound(rmq.store, rmq.cfh[0], upperbound, readOpts) + iterProperty := rocksdbkv.NewRocksIteratorCFWithUpperBound(rmq.store, rmq.cfh[1], upperbound, readOpts) + defer iter.Close() + defer iterProperty.Close() - if iterProperty.Valid() && string(iterProperty.Key().Data()) == string(iter.Key().Data()) { - // the key of properties is the same with the key of payload - // to prevent mix message with or without property column family - propertiesValue = iterProperty.Value().Data() - iterProperty.Next() + var dataKey string + if currentID == DefaultMessageID { + dataKey = topicName + "/" + } else { + dataKey = path.Join(topicName, strconv.FormatInt(currentID, 10)) } + iter.Seek([]byte(dataKey)) + iterProperty.Seek([]byte(dataKey)) + offset := 0 + for ; iter.Valid() && offset < n; iter.Next() { + key := iter.Key() + val := iter.Value() + strKey := string(key.Data()) + key.Free() + properties := make(map[string]string) + var propertiesValue []byte - // between 2.2.0 and 2.3.0, the key of Payload is topic/properties/msgid/Payload - // will ingnore the property before 2.3.0, just make sure property empty is ok for 2.3 + offset++ - // before 2.2.0, there have no properties in ProducerMessage and ConsumerMessage in rocksmq - // when produce before 2.2.0, but consume after 2.2.0, propertiesValue will be [] - if len(propertiesValue) != 0 { - if err = json.Unmarshal(propertiesValue, &properties); err != nil { + msgID, err := strconv.ParseInt(strKey[len(topicName)+1:], 10, 64) + if err != nil { + val.Free() return nil, err } + + if iterProperty.Valid() && string(iterProperty.Key().Data()) == string(iter.Key().Data()) { + // the key of properties is the same with the key of payload + // to prevent mix message with or without property column family + propertiesValue = iterProperty.Value().Data() + iterProperty.Next() + } + + // between 2.2.0 and 2.3.0, the key of Payload is topic/properties/msgid/Payload + // will ingnore the property before 2.3.0, just make sure property empty is ok for 2.3 + + // before 2.2.0, there have no properties in ProducerMessage and ConsumerMessage in rocksmq + // when produce before 2.2.0, but consume after 2.2.0, propertiesValue will be [] + if len(propertiesValue) != 0 { + if err = json.Unmarshal(propertiesValue, &properties); err != nil { + return nil, err + } + } + msg := ConsumerMessage{ + MsgID: msgID, + } + origData := val.Data() + dataLen := len(origData) + if dataLen == 0 { + msg.Payload = nil + msg.Properties = nil + } else { + msg.Payload = make([]byte, dataLen) + msg.Properties = properties + copy(msg.Payload, origData) + } + + consumerMessage = append(consumerMessage, msg) + val.Free() } - msg := ConsumerMessage{ - MsgID: msgID, + // if iterate fail + if err := iter.Err(); err != nil { + return nil, err } - origData := val.Data() - dataLen := len(origData) - if dataLen == 0 { - msg.Payload = nil - msg.Properties = nil - } else { - msg.Payload = make([]byte, dataLen) - msg.Properties = properties - copy(msg.Payload, origData) + + if offset < n { + // iter read to the end, try to read from tt buffer + if len(consumerMessage) != 0 { + currentID = consumerMessage[len(consumerMessage)-1].MsgID + } + + ttbufer, ok := rmq.ttbuffers.Get(topicName) + if ok { + ttmsgs := ttbufer.getBufferMsgs() + if len(ttmsgs) != 0 && maxReadableID >= ttmsgs[0].startID { + consumerMessage = append(consumerMessage, readBuffer(ttmsgs, currentID, maxReadableID, n-offset)...) + } + } } - consumerMessage = append(consumerMessage, msg) - val.Free() - } - // if iterate fail - if err := iter.Err(); err != nil { - return nil, err } iterTime := time.Since(start).Milliseconds() @@ -872,8 +1012,7 @@ func (rmq *rocksmq) Consume(topicName string, groupName string, n int) ([]Consum getConsumeTime := time.Since(start).Milliseconds() if getConsumeTime > 200 { log.Warn("rocksmq consume too slowly", zap.String("topic", topicName), - zap.Int64("get lock elapse", getLockTime), - zap.Int64("iterator elapse", iterTime-getLockTime), + zap.Int64("iterator elapse", iterTime), zap.Int64("moveConsumePosTime elapse", moveConsumePosTime-iterTime), zap.Int64("total consume elapse", getConsumeTime)) } @@ -885,7 +1024,7 @@ func (rmq *rocksmq) seek(topicName string, groupName string, msgID UniqueID) err rmq.storeMu.Lock() defer rmq.storeMu.Unlock() key := constructCurrentID(topicName, groupName) - _, ok := rmq.consumersID.Load(key) + _, ok := rmq.consumersID.Get(key) if !ok { return fmt.Errorf("ConsumerGroup %s, channel %s not exists", groupName, topicName) } @@ -930,7 +1069,7 @@ func (rmq *rocksmq) moveConsumePos(topicName string, groupName string, msgID Uni return err } - rmq.consumersID.Store(constructCurrentID(topicName, groupName), msgID) + rmq.consumersID.Insert(constructCurrentID(topicName, groupName), msgID) return nil } @@ -940,14 +1079,10 @@ func (rmq *rocksmq) Seek(topicName string, groupName string, msgID UniqueID) err return errors.New(RmqNotServingErrMsg) } /* Step I: Check if key exists */ - ll, ok := topicMu.Load(topicName) + lock, ok := topicMu.Get(topicName) if !ok { return fmt.Errorf("topic %s not exist, %w", topicName, mqwrapper.ErrTopicNotExist) } - lock, ok := ll.(*sync.Mutex) - if !ok { - return fmt.Errorf("get mutex failed, topic name = %s", topicName) - } lock.Lock() defer lock.Unlock() @@ -966,26 +1101,22 @@ func (rmq *rocksmq) ForceSeek(topicName string, groupName string, msgID UniqueID return errors.New(RmqNotServingErrMsg) } /* Step I: Check if key exists */ - ll, ok := topicMu.Load(topicName) + lock, ok := topicMu.Get(topicName) if !ok { return fmt.Errorf("topic %s not exist, %w", topicName, mqwrapper.ErrTopicNotExist) } - lock, ok := ll.(*sync.Mutex) - if !ok { - return fmt.Errorf("get mutex failed, topic name = %s", topicName) - } lock.Lock() defer lock.Unlock() rmq.storeMu.Lock() defer rmq.storeMu.Unlock() key := constructCurrentID(topicName, groupName) - _, ok = rmq.consumersID.Load(key) + _, ok = rmq.consumersID.Get(key) if !ok { return fmt.Errorf("ConsumerGroup %s, channel %s not exists", groupName, topicName) } - rmq.consumersID.Store(key, msgID) + rmq.consumersID.Insert(key, msgID) log.Debug("successfully force seek", zap.String("topic", topicName), zap.String("group", groupName), zap.Uint64("msgID", uint64(msgID))) @@ -1001,7 +1132,7 @@ func (rmq *rocksmq) SeekToLatest(topicName, groupName string) error { defer rmq.storeMu.Unlock() key := constructCurrentID(topicName, groupName) - _, ok := rmq.consumersID.Load(key) + _, ok := rmq.consumersID.Get(key) if !ok { return fmt.Errorf("ConsumerGroup %s, channel %s not exists", groupName, topicName) } @@ -1023,6 +1154,9 @@ func (rmq *rocksmq) SeekToLatest(topicName, groupName string) error { } func (rmq *rocksmq) getLatestMsg(topicName string) (int64, error) { + if syncer, ok := rmq.syncers.Get(topicName); ok { + return syncer.getMaxReadableID(), nil + } readOpts := gorocksdb.NewDefaultReadOptions() defer readOpts.Destroy() iter := rocksdbkv.NewRocksIteratorCF(rmq.store, rmq.cfh[0], readOpts) @@ -1064,8 +1198,8 @@ func (rmq *rocksmq) getLatestMsg(topicName string) (int64, error) { // Notify sends a mutex in MsgMutex channel to tell consumers to consume func (rmq *rocksmq) Notify(topicName, groupName string) { - if vals, ok := rmq.consumers.Load(topicName); ok { - for _, v := range vals.([]*Consumer) { + if vals, ok := rmq.consumers.Get(topicName); ok { + for _, v := range vals { if v.GroupName == groupName { select { case v.MsgMutex <- struct{}{}: @@ -1078,6 +1212,19 @@ func (rmq *rocksmq) Notify(topicName, groupName string) { } } +func (rmq *rocksmq) NotifyTopic(topicName string) { + if vals, ok := rmq.consumers.Get(topicName); ok { + for _, v := range vals { + select { + case v.MsgMutex <- struct{}{}: + continue + default: + continue + } + } + } +} + // updateAckedInfo update acked informations for retention after consume func (rmq *rocksmq) updateAckedInfo(topicName, groupName string, firstID UniqueID, lastID UniqueID) error { // 1. Try to get the page id between first ID and last ID of ids @@ -1114,9 +1261,8 @@ func (rmq *rocksmq) updateAckedInfo(topicName, groupName string, firstID UniqueI fixedAckedTsKey := constructKey(AckedTsTitle, topicName) // 2. Update acked ts and acked size for pageIDs - if vals, ok := rmq.consumers.Load(topicName); ok { - consumers, ok := vals.([]*Consumer) - if !ok || len(consumers) == 0 { + if consumers, ok := rmq.consumers.Get(topicName); ok { + if len(consumers) == 0 { log.Error("update ack with no consumer", zap.String("topic", topicName)) return nil } @@ -1157,7 +1303,7 @@ func (rmq *rocksmq) CheckTopicValid(topic string) error { // Check if key exists log := log.With(zap.String("topic", topic)) - _, ok := topicMu.Load(topic) + _, ok := topicMu.Get(topic) if !ok { return merr.WrapErrTopicNotFound(topic, "failed to get topic") } @@ -1173,3 +1319,46 @@ func (rmq *rocksmq) CheckTopicValid(topic string) error { log.Info("created topic is empty") return nil } + +type bufferMsgs struct { + startID int64 + endID int64 + msgs []ProducerMessage +} + +// rmqTtBuffer is to buffer continuous tt msgs to avoid frequent rocksdb write. +// if a insert/delete msg is produced and tt buffer is not empty, we should merge them together. +type rmqTtBuffer struct { + mu sync.RWMutex + bufferedMsgs []bufferMsgs +} + +func (s *rmqTtBuffer) clear() { + s.mu.Lock() + defer s.mu.Unlock() + s.bufferedMsgs = s.bufferedMsgs[:0] +} + +func (s *rmqTtBuffer) getBufferMsgs() []bufferMsgs { + s.mu.RLock() + defer s.mu.RUnlock() + return s.bufferedMsgs +} + +func (s *rmqTtBuffer) size() int { + s.mu.RLock() + defer s.mu.RUnlock() + return len(s.bufferedMsgs) +} + +func (s *rmqTtBuffer) append(msg bufferMsgs) { + s.mu.Lock() + defer s.mu.Unlock() + s.bufferedMsgs = append(s.bufferedMsgs, msg) +} + +func newRmqTtBuffer() *rmqTtBuffer { + return &rmqTtBuffer{ + bufferedMsgs: make([]bufferMsgs, 0), + } +} diff --git a/internal/mq/mqimpl/rocksmq/server/rocksmq_impl_test.go b/internal/mq/mqimpl/rocksmq/server/rocksmq_impl_test.go index 623713fec3465..ce886a11098b3 100644 --- a/internal/mq/mqimpl/rocksmq/server/rocksmq_impl_test.go +++ b/internal/mq/mqimpl/rocksmq/server/rocksmq_impl_test.go @@ -87,14 +87,10 @@ func (rmq *rocksmq) produceBefore2(topicName string, messages []producerMessageB return nil, errors.New(RmqNotServingErrMsg) } start := time.Now() - ll, ok := topicMu.Load(topicName) + lock, ok := topicMu.Get(topicName) if !ok { return []UniqueID{}, fmt.Errorf("topic name = %s not exist", topicName) } - lock, ok := ll.(*sync.Mutex) - if !ok { - return []UniqueID{}, fmt.Errorf("get mutex failed, topic name = %s", topicName) - } lock.Lock() defer lock.Unlock() @@ -131,8 +127,8 @@ func (rmq *rocksmq) produceBefore2(topicName string, messages []producerMessageB return []UniqueID{}, err } writeTime := time.Since(start).Milliseconds() - if vals, ok := rmq.consumers.Load(topicName); ok { - for _, v := range vals.([]*Consumer) { + if vals, ok := rmq.consumers.Get(topicName); ok { + for _, v := range vals { select { case v.MsgMutex <- struct{}{}: continue @@ -168,14 +164,10 @@ func (rmq *rocksmq) produceIn2(topicName string, messages []ProducerMessage) ([] return nil, errors.New(RmqNotServingErrMsg) } start := time.Now() - ll, ok := topicMu.Load(topicName) + lock, ok := topicMu.Get(topicName) if !ok { return []UniqueID{}, fmt.Errorf("topic name = %s not exist", topicName) } - lock, ok := ll.(*sync.Mutex) - if !ok { - return []UniqueID{}, fmt.Errorf("get mutex failed, topic name = %s", topicName) - } lock.Lock() defer lock.Unlock() @@ -222,8 +214,8 @@ func (rmq *rocksmq) produceIn2(topicName string, messages []ProducerMessage) ([] return []UniqueID{}, err } writeTime := time.Since(start).Milliseconds() - if vals, ok := rmq.consumers.Load(topicName); ok { - for _, v := range vals.([]*Consumer) { + if vals, ok := rmq.consumers.Get(topicName); ok { + for _, v := range vals { select { case v.MsgMutex <- struct{}{}: continue @@ -396,7 +388,6 @@ func TestRocksmq_Compatibility(t *testing.T) { paramtable.Init() rmq, err := NewRocksMQ(rocksdbPath, idAllocator) assert.NoError(t, err) - defer rmq.Close() channelName := "channel_rocks" err = rmq.CreateTopic(channelName) @@ -412,24 +403,6 @@ func TestRocksmq_Compatibility(t *testing.T) { _, err = rmq.produceBefore2(channelName, tMsgs) assert.NoError(t, err) - groupName := "test_group" - _ = rmq.DestroyConsumerGroup(channelName, groupName) - err = rmq.CreateConsumerGroup(channelName, groupName) - assert.NoError(t, err) - - cMsgs, err := rmq.Consume(channelName, groupName, 1) - if err != nil { - log.Info("test", zap.Any("err", err)) - } - assert.NoError(t, err) - assert.Equal(t, len(cMsgs), 1) - assert.Equal(t, string(cMsgs[0].Payload), "d_message") - _, ok := cMsgs[0].Properties[common.TraceIDKey] - assert.False(t, ok) - // it will be set empty map if produce message has no properties field - expect := make(map[string]string) - assert.Equal(t, cMsgs[0].Properties, expect) - // between 2.2.0 and 2.3.0, the key of Payload is topic/properties/msgid/Payload // will ingnore the property before 2.3.0, just make sure property empty is ok for 2.3 // after 2.3, the properties will be stored in column families @@ -443,16 +416,6 @@ func TestRocksmq_Compatibility(t *testing.T) { _, err = rmq.produceIn2(channelName, tMsgs1) assert.NoError(t, err) - msg2, err := rmq.Consume(channelName, groupName, 1) - assert.NoError(t, err) - assert.Equal(t, len(msg2), 1) - assert.Equal(t, string(msg2[0].Payload), "1_message") - _, ok = msg2[0].Properties[common.TraceIDKey] - assert.False(t, ok) - // will ingnore the property before 2.3.0, just make sure property empty is ok for 2.3 - expect = make(map[string]string) - assert.Equal(t, cMsgs[0].Properties, expect) - // between 2.2.0 and 2.3.0, the key of Payload is topic/properties/msgid/Payload // after 2.3, the properties will be stored in column families // it aims to test the mixed message before 2.3.0 and after 2.3.0, will get properties successfully @@ -468,7 +431,45 @@ func TestRocksmq_Compatibility(t *testing.T) { _, err = rmq.Produce(channelName, tMsgs3) assert.NoError(t, err) - msg5, err := rmq.Consume(channelName, groupName, 2) + rmq.Close() + + rmq2, err := NewRocksMQ(rocksdbPath, idAllocator) + assert.NoError(t, err) + defer rmq2.Close() + + err = rmq2.CreateTopic(channelName) + assert.NoError(t, err) + defer rmq2.DestroyTopic(channelName) + + groupName := "test_group" + _ = rmq2.DestroyConsumerGroup(channelName, groupName) + err = rmq2.CreateConsumerGroup(channelName, groupName) + assert.NoError(t, err) + + cMsgs, err := rmq2.Consume(channelName, groupName, 1) + if err != nil { + log.Info("test", zap.Any("err", err)) + } + assert.NoError(t, err) + assert.Equal(t, len(cMsgs), 1) + assert.Equal(t, string(cMsgs[0].Payload), "d_message") + _, ok := cMsgs[0].Properties[common.TraceIDKey] + assert.False(t, ok) + // it will be set empty map if produce message has no properties field + expect := make(map[string]string) + assert.Equal(t, cMsgs[0].Properties, expect) + + msg2, err := rmq2.Consume(channelName, groupName, 1) + assert.NoError(t, err) + assert.Equal(t, len(msg2), 1) + assert.Equal(t, string(msg2[0].Payload), "1_message") + _, ok = msg2[0].Properties[common.TraceIDKey] + assert.False(t, ok) + // will ingnore the property before 2.3.0, just make sure property empty is ok for 2.3 + expect = make(map[string]string) + assert.Equal(t, cMsgs[0].Properties, expect) + + msg5, err := rmq2.Consume(channelName, groupName, 2) assert.NoError(t, err) assert.Equal(t, len(msg5), 2) assert.Equal(t, string(msg5[0].Payload), "3_message") @@ -563,7 +564,7 @@ func TestRocksmq_Dummy(t *testing.T) { assert.NoError(t, err) channelName1 := "channel_dummy" - topicMu.Store(channelName1, new(sync.Mutex)) + topicMu.Insert(channelName1, new(sync.Mutex)) err = rmq.DestroyTopic(channelName1) assert.NoError(t, err) @@ -595,10 +596,10 @@ func TestRocksmq_Dummy(t *testing.T) { pMsgA := ProducerMessage{Payload: []byte(msgA)} pMsgs[0] = pMsgA - topicMu.Delete(channelName) + topicMu.GetAndRemove(channelName) _, err = rmq.Consume(channelName, groupName1, 1) assert.Error(t, err) - topicMu.Store(channelName, channelName) + topicMu.Insert(channelName, new(sync.Mutex)) _, err = rmq.Produce(channelName, nil) assert.Error(t, err) @@ -740,6 +741,7 @@ func TestRocksmq_Loop(t *testing.T) { } func TestRocksmq_Goroutines(t *testing.T) { + t.Skip() ep := etcdEndpoints() etcdCli, err := etcd.GetRemoteEtcdClient(ep) assert.NoError(t, err) @@ -1196,7 +1198,7 @@ func TestRocksmq_CheckPreTopicValid(t *testing.T) { err = rmq.CreateTopic(channelName2) defer rmq.DestroyTopic(channelName2) assert.NoError(t, err) - topicMu.Store(channelName2, new(sync.Mutex)) + topicMu.Insert(channelName2, new(sync.Mutex)) pMsgs := make([]ProducerMessage, 10) for i := 0; i < 10; i++ { @@ -1216,7 +1218,7 @@ func TestRocksmq_CheckPreTopicValid(t *testing.T) { defer rmq.DestroyTopic(channelName3) assert.NoError(t, err) - topicMu.Store(channelName3, new(sync.Mutex)) + topicMu.Insert(channelName3, new(sync.Mutex)) err = rmq.CheckTopicValid(channelName3) assert.NoError(t, err) } @@ -1319,10 +1321,6 @@ func TestRocksmq_SeekTopicMutexError(t *testing.T) { rmq, err := NewRocksMQ(name, idAllocator) assert.NoError(t, err) defer rmq.Close() - - topicMu.Store("test_topic_mutix_error", nil) - assert.Error(t, rmq.Seek("test_topic_mutix_error", "", 0)) - assert.Error(t, rmq.ForceSeek("test_topic_mutix_error", "", 0)) } func TestRocksmq_moveConsumePosError(t *testing.T) { diff --git a/internal/mq/mqimpl/rocksmq/server/rocksmq_retention.go b/internal/mq/mqimpl/rocksmq/server/rocksmq_retention.go index 80ebec395dac3..852e8b51f24c6 100644 --- a/internal/mq/mqimpl/rocksmq/server/rocksmq_retention.go +++ b/internal/mq/mqimpl/rocksmq/server/rocksmq_retention.go @@ -12,7 +12,6 @@ package server import ( - "fmt" "path" "strconv" "sync" @@ -62,7 +61,7 @@ func initRetentionInfo(kv *rocksdbkv.RocksdbKV, db *gorocksdb.DB) (*retentionInf for _, key := range topicKeys { topic := key[len(TopicIDTitle):] ri.topicRetetionTime.Insert(topic, time.Now().Unix()) - topicMu.Store(topic, new(sync.Mutex)) + topicMu.Insert(topic, new(sync.Mutex)) } return ri, nil } @@ -311,17 +310,6 @@ func (ri *retentionInfo) cleanData(topic string, pageEndID UniqueID) error { ackedEndIDKey := fixedAckedTsKey + "/" + strconv.FormatInt(pageEndID+1, 10) writeBatch.DeleteRange([]byte(ackedStartIDKey), []byte(ackedEndIDKey)) - ll, ok := topicMu.Load(topic) - if !ok { - return fmt.Errorf("topic name = %s not exist", topic) - } - lock, ok := ll.(*sync.Mutex) - if !ok { - return fmt.Errorf("get mutex failed, topic name = %s", topic) - } - lock.Lock() - defer lock.Unlock() - err := DeleteMessages(ri.db, topic, 0, pageEndID) if err != nil { return err diff --git a/internal/mq/mqimpl/rocksmq/server/rocksmq_syncer.go b/internal/mq/mqimpl/rocksmq/server/rocksmq_syncer.go new file mode 100644 index 0000000000000..af11c5740ad7a --- /dev/null +++ b/internal/mq/mqimpl/rocksmq/server/rocksmq_syncer.go @@ -0,0 +1,107 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package server + +import ( + "fmt" + "sync" + "sync/atomic" +) + +type produceState struct { + endMsgID int64 + written bool + ready chan struct{} +} + +// rocksmqSyncer is to make message write concurrently but updates the page info sequentially +type rocksmqSyncer struct { + msgQueueMu sync.RWMutex + msgQueue []*produceState + maxReadableID int64 // the end id of the last readable msg, should use atomic load/store +} + +func (s *rocksmqSyncer) push(endMsgID int64) *produceState { + s.msgQueueMu.Lock() + defer s.msgQueueMu.Unlock() + state := &produceState{ + endMsgID: endMsgID, + written: false, + ready: make(chan struct{}), + } + s.msgQueue = append(s.msgQueue, state) + return state +} + +func (s *rocksmqSyncer) finishWrite(endMsgID int64) error { + s.msgQueueMu.RLock() + defer s.msgQueueMu.RUnlock() + for i, state := range s.msgQueue { + if state.endMsgID != endMsgID { + continue + } + + state.written = true + if i == 0 { + close(state.ready) + } + return nil + } + + return fmt.Errorf("state %d not found", endMsgID) +} + +func (s *rocksmqSyncer) pop() { + s.msgQueueMu.Lock() + defer s.msgQueueMu.Unlock() + atomic.StoreInt64(&s.maxReadableID, s.msgQueue[0].endMsgID-1) + s.msgQueue = s.msgQueue[1:] + if len(s.msgQueue) != 0 && s.msgQueue[0].written { + close(s.msgQueue[0].ready) + } +} + +func (s *rocksmqSyncer) remove(endMsgID int64) { + s.msgQueueMu.Lock() + defer s.msgQueueMu.Unlock() + + var target = -1 + for i, state := range s.msgQueue { + if state.endMsgID == endMsgID { + target = i + break + } + } + + if target == -1 { + return + } + + s.msgQueue = append(s.msgQueue[:target], s.msgQueue[target+1:]...) + + // if the first msg is removed, check current first msg if written + if target == 0 && len(s.msgQueue) != 0 && s.msgQueue[0].written { + close(s.msgQueue[0].ready) + } +} + +func (s *rocksmqSyncer) getMaxReadableID() int64 { + return atomic.LoadInt64(&s.maxReadableID) +} + +func newRocksMqSyncer(topicName string, latestMsgID int64) *rocksmqSyncer { + syncer := &rocksmqSyncer{ + msgQueue: make([]*produceState, 0), + } + syncer.maxReadableID = latestMsgID + return syncer +} diff --git a/pkg/mq/msgstream/mq_msgstream.go b/pkg/mq/msgstream/mq_msgstream.go index 85f931d0c1993..5c8252963a801 100644 --- a/pkg/mq/msgstream/mq_msgstream.go +++ b/pkg/mq/msgstream/mq_msgstream.go @@ -281,6 +281,16 @@ func (ms *mqMsgStream) Produce(msgPack *MsgPack) error { } msg := &mqwrapper.ProducerMessage{Payload: m, Properties: map[string]string{}} + if v.Msgs[i].Type() == commonpb.MsgType_TimeTick { + msg.Properties[mqwrapper.TtProperty] = "" + } + if v.Msgs[i].Type() == commonpb.MsgType_Delete { + deletemsg := v.Msgs[i].(*DeleteMsg) + if deletemsg.NumRows == 0 { + msg.Properties[mqwrapper.TtProperty] = "" + } + } + InjectCtx(spanCtx, msg.Properties) ms.producerLock.Lock() diff --git a/pkg/mq/msgstream/mqwrapper/message.go b/pkg/mq/msgstream/mqwrapper/message.go index dbb13484e549a..fa62a481caa9d 100644 --- a/pkg/mq/msgstream/mqwrapper/message.go +++ b/pkg/mq/msgstream/mqwrapper/message.go @@ -16,6 +16,8 @@ package mqwrapper +var TtProperty = "isTt" + // Message is the interface that provides operations of a consumer type Message interface { // Topic get the topic from which this message originated from diff --git a/pkg/util/paramtable/service_param.go b/pkg/util/paramtable/service_param.go index fcd8bbcb5ff98..a53dd2946c8ff 100644 --- a/pkg/util/paramtable/service_param.go +++ b/pkg/util/paramtable/service_param.go @@ -581,6 +581,9 @@ type RocksmqConfig struct { // only support {0,7}, 0 means no compress, 7 means zstd // default [0,7]. CompressionTypes ParamItem `refreshable:"false"` + + MaxParallelsim ParamItem `refreshable:"false"` + MaxTtBufferSize ParamItem `refreshable:"false"` } func (r *RocksmqConfig) Init(base *BaseTable) { @@ -651,6 +654,20 @@ please adjust in embedded Milvus: /tmp/milvus/rdb_data`, Version: "2.2.12", } r.CompressionTypes.Init(base.mgr) + + r.MaxParallelsim = ParamItem{ + Key: "rocksmq.maxWriteParallelism", + DefaultValue: "30", + Version: "2.2.13", + } + r.MaxParallelsim.Init(base.mgr) + + r.MaxTtBufferSize = ParamItem{ + Key: "rocksmq.maxTtMsgBufferSize", + DefaultValue: "1000", + Version: "2.2.13", + } + r.MaxTtBufferSize.Init(base.mgr) } // NatsmqConfig describes the configuration options for the Nats message queue