Skip to content

Commit

Permalink
fix: Fix stats task wrong RootPath when upload binlog (#38539)
Browse files Browse the repository at this point in the history
issue: #38336

Signed-off-by: Cai Zhang <[email protected]>
  • Loading branch information
xiaocai2333 authored Dec 17, 2024
1 parent 1aa31e2 commit 7a05b5b
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 19 deletions.
25 changes: 10 additions & 15 deletions internal/indexnode/task_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import (
"github.com/milvus-io/milvus/internal/proto/workerpb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/indexcgowrapper"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/conc"
_ "github.com/milvus-io/milvus/pkg/util/funcutil"
Expand Down Expand Up @@ -201,7 +200,7 @@ func (st *statsTask) sortSegment(ctx context.Context) ([]*datapb.FieldBinlog, er

if (i+1)%statsBatchSize == 0 && writer.IsFullWithBinlogMaxSize(st.req.GetBinlogMaxSize()) {
serWriteStart := time.Now()
binlogNum, kvs, partialBinlogs, err := serializeWrite(ctx, st.req.GetStartLogID()+st.logIDOffset, writer)
binlogNum, kvs, partialBinlogs, err := serializeWrite(ctx, st.req.GetStorageConfig().GetRootPath(), st.req.GetStartLogID()+st.logIDOffset, writer)
if err != nil {
log.Ctx(ctx).Warn("stats wrong, failed to serialize writer", zap.Int64("taskID", st.req.GetTaskID()), zap.Error(err))
return nil, err
Expand All @@ -224,7 +223,7 @@ func (st *statsTask) sortSegment(ctx context.Context) ([]*datapb.FieldBinlog, er

if !writer.FlushAndIsEmpty() {
serWriteStart := time.Now()
binlogNum, kvs, partialBinlogs, err := serializeWrite(ctx, st.req.GetStartLogID()+st.logIDOffset, writer)
binlogNum, kvs, partialBinlogs, err := serializeWrite(ctx, st.req.GetStorageConfig().GetRootPath(), st.req.GetStartLogID()+st.logIDOffset, writer)
if err != nil {
log.Ctx(ctx).Warn("stats wrong, failed to serialize writer", zap.Int64("taskID", st.req.GetTaskID()), zap.Error(err))
return nil, err
Expand All @@ -244,7 +243,7 @@ func (st *statsTask) sortSegment(ctx context.Context) ([]*datapb.FieldBinlog, er
}

serWriteStart := time.Now()
binlogNums, sPath, err := statSerializeWrite(ctx, st.binlogIO, st.req.GetStartLogID()+st.logIDOffset, writer, numRows)
binlogNums, sPath, err := statSerializeWrite(ctx, st.req.GetStorageConfig().GetRootPath(), st.binlogIO, st.req.GetStartLogID()+st.logIDOffset, writer, numRows)
if err != nil {
log.Ctx(ctx).Warn("stats wrong, failed to serialize write segment stats", zap.Int64("taskID", st.req.GetTaskID()),
zap.Int64("remaining row count", numRows), zap.Error(err))
Expand All @@ -256,7 +255,7 @@ func (st *statsTask) sortSegment(ctx context.Context) ([]*datapb.FieldBinlog, er

var bm25StatsLogs []*datapb.FieldBinlog
if len(bm25FieldIds) > 0 {
binlogNums, bm25StatsLogs, err = bm25SerializeWrite(ctx, st.binlogIO, st.req.GetStartLogID()+st.logIDOffset, writer, numRows)
binlogNums, bm25StatsLogs, err = bm25SerializeWrite(ctx, st.req.GetStorageConfig().GetRootPath(), st.binlogIO, st.req.GetStartLogID()+st.logIDOffset, writer, numRows)
if err != nil {
log.Ctx(ctx).Warn("compact wrong, failed to serialize write segment bm25 stats", zap.Error(err))
return nil, err
Expand Down Expand Up @@ -510,7 +509,7 @@ func mergeFieldBinlogs(base, paths map[typeutil.UniqueID]*datapb.FieldBinlog) {
}
}

func serializeWrite(ctx context.Context, startID int64, writer *compaction.SegmentWriter) (binlogNum int64, kvs map[string][]byte, fieldBinlogs map[int64]*datapb.FieldBinlog, err error) {
func serializeWrite(ctx context.Context, rootPath string, startID int64, writer *compaction.SegmentWriter) (binlogNum int64, kvs map[string][]byte, fieldBinlogs map[int64]*datapb.FieldBinlog, err error) {
_, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "serializeWrite")
defer span.End()

Expand All @@ -525,7 +524,7 @@ func serializeWrite(ctx context.Context, startID int64, writer *compaction.Segme
for i := range blobs {
// Blob Key is generated by Serialize from int64 fieldID in collection schema, which won't raise error in ParseInt
fID, _ := strconv.ParseInt(blobs[i].GetKey(), 10, 64)
key, _ := binlog.BuildLogPath(storage.InsertBinlog, writer.GetCollectionID(), writer.GetPartitionID(), writer.GetSegmentID(), fID, startID+int64(i))
key, _ := binlog.BuildLogPathWithRootPath(rootPath, storage.InsertBinlog, writer.GetCollectionID(), writer.GetPartitionID(), writer.GetSegmentID(), fID, startID+int64(i))

kvs[key] = blobs[i].GetValue()
fieldBinlogs[fID] = &datapb.FieldBinlog{
Expand All @@ -546,7 +545,7 @@ func serializeWrite(ctx context.Context, startID int64, writer *compaction.Segme
return
}

func statSerializeWrite(ctx context.Context, io io.BinlogIO, startID int64, writer *compaction.SegmentWriter, finalRowCount int64) (int64, *datapb.FieldBinlog, error) {
func statSerializeWrite(ctx context.Context, rootPath string, io io.BinlogIO, startID int64, writer *compaction.SegmentWriter, finalRowCount int64) (int64, *datapb.FieldBinlog, error) {
ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "statslog serializeWrite")
defer span.End()
sblob, err := writer.Finish()
Expand All @@ -555,7 +554,7 @@ func statSerializeWrite(ctx context.Context, io io.BinlogIO, startID int64, writ
}

binlogNum := int64(1)
key, _ := binlog.BuildLogPath(storage.StatsBinlog, writer.GetCollectionID(), writer.GetPartitionID(), writer.GetSegmentID(), writer.GetPkID(), startID)
key, _ := binlog.BuildLogPathWithRootPath(rootPath, storage.StatsBinlog, writer.GetCollectionID(), writer.GetPartitionID(), writer.GetSegmentID(), writer.GetPkID(), startID)
kvs := map[string][]byte{key: sblob.GetValue()}
statFieldLog := &datapb.FieldBinlog{
FieldID: writer.GetPkID(),
Expand All @@ -576,7 +575,7 @@ func statSerializeWrite(ctx context.Context, io io.BinlogIO, startID int64, writ
return binlogNum, statFieldLog, nil
}

func bm25SerializeWrite(ctx context.Context, io io.BinlogIO, startID int64, writer *compaction.SegmentWriter, finalRowCount int64) (int64, []*datapb.FieldBinlog, error) {
func bm25SerializeWrite(ctx context.Context, rootPath string, io io.BinlogIO, startID int64, writer *compaction.SegmentWriter, finalRowCount int64) (int64, []*datapb.FieldBinlog, error) {
ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "bm25log serializeWrite")
defer span.End()
stats, err := writer.GetBm25StatsBlob()
Expand All @@ -588,7 +587,7 @@ func bm25SerializeWrite(ctx context.Context, io io.BinlogIO, startID int64, writ
binlogs := []*datapb.FieldBinlog{}
cnt := int64(0)
for fieldID, blob := range stats {
key, _ := binlog.BuildLogPath(storage.BM25Binlog, writer.GetCollectionID(), writer.GetPartitionID(), writer.GetSegmentID(), fieldID, startID+cnt)
key, _ := binlog.BuildLogPathWithRootPath(rootPath, storage.BM25Binlog, writer.GetCollectionID(), writer.GetPartitionID(), writer.GetSegmentID(), fieldID, startID+cnt)
kvs[key] = blob.GetValue()
fieldLog := &datapb.FieldBinlog{
FieldID: fieldID,
Expand All @@ -614,10 +613,6 @@ func bm25SerializeWrite(ctx context.Context, io io.BinlogIO, startID int64, writ
return cnt, binlogs, nil
}

func buildTextLogPrefix(rootPath string, collID, partID, segID, fieldID, version int64) string {
return fmt.Sprintf("%s/%s/%d/%d/%d/%d/%d", rootPath, common.TextIndexPath, collID, partID, segID, fieldID, version)
}

func ParseStorageConfig(s *indexpb.StorageConfig) (*indexcgopb.StorageConfig, error) {
bs, err := proto.Marshal(s)
if err != nil {
Expand Down
8 changes: 4 additions & 4 deletions internal/indexnode/task_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (s *TaskStatsSuite) Testbm25SerializeWriteError() {
s.schema = genCollectionSchemaWithBM25()
s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(nil).Once()
s.GenSegmentWriterWithBM25(0)
cnt, binlogs, err := bm25SerializeWrite(context.Background(), s.mockBinlogIO, 0, s.segWriter, 1)
cnt, binlogs, err := bm25SerializeWrite(context.Background(), "root_path", s.mockBinlogIO, 0, s.segWriter, 1)
s.Require().NoError(err)
s.Equal(int64(1), cnt)
s.Equal(1, len(binlogs))
Expand All @@ -96,7 +96,7 @@ func (s *TaskStatsSuite) Testbm25SerializeWriteError() {
s.schema = genCollectionSchemaWithBM25()
s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(fmt.Errorf("mock error")).Once()
s.GenSegmentWriterWithBM25(0)
_, _, err := bm25SerializeWrite(context.Background(), s.mockBinlogIO, 0, s.segWriter, 1)
_, _, err := bm25SerializeWrite(context.Background(), "root_path", s.mockBinlogIO, 0, s.segWriter, 1)
s.Error(err)
})
}
Expand All @@ -105,7 +105,7 @@ func (s *TaskStatsSuite) TestSortSegmentWithBM25() {
s.Run("normal case", func() {
s.schema = genCollectionSchemaWithBM25()
s.GenSegmentWriterWithBM25(0)
_, kvs, fBinlogs, err := serializeWrite(context.TODO(), 0, s.segWriter)
_, kvs, fBinlogs, err := serializeWrite(context.TODO(), "root_path", 0, s.segWriter)
s.NoError(err)
s.mockBinlogIO.EXPECT().Download(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, paths []string) ([][]byte, error) {
result := make([][]byte, len(paths))
Expand Down Expand Up @@ -149,7 +149,7 @@ func (s *TaskStatsSuite) TestSortSegmentWithBM25() {
s.Run("upload bm25 binlog failed", func() {
s.schema = genCollectionSchemaWithBM25()
s.GenSegmentWriterWithBM25(0)
_, kvs, fBinlogs, err := serializeWrite(context.TODO(), 0, s.segWriter)
_, kvs, fBinlogs, err := serializeWrite(context.TODO(), "root_path", 0, s.segWriter)
s.NoError(err)
s.mockBinlogIO.EXPECT().Download(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, paths []string) ([][]byte, error) {
result := make([][]byte, len(paths))
Expand Down

0 comments on commit 7a05b5b

Please sign in to comment.