Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/milvus-io/milvus into 231…
Browse files Browse the repository at this point in the history
…1-import-proto-meta
  • Loading branch information
bigsheeper committed Nov 28, 2023
2 parents 803f551 + 9c9ab08 commit f047028
Show file tree
Hide file tree
Showing 91 changed files with 2,662 additions and 826 deletions.
3 changes: 3 additions & 0 deletions internal/datacoord/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/metautil"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/timerecord"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
Expand Down Expand Up @@ -1268,6 +1269,8 @@ func (m *meta) UpdateChannelCheckpoint(vChannel string, pos *msgpb.MsgPosition)
zap.Uint64("ts", pos.GetTimestamp()),
zap.ByteString("msgID", pos.GetMsgID()),
zap.Time("time", ts))
metrics.DataCoordCheckpointLag.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), vChannel).
Set(float64(time.Since(ts).Milliseconds()))
}
return nil
}
Expand Down
7 changes: 7 additions & 0 deletions internal/datacoord/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,13 +330,15 @@ func (s *Server) initDataCoord() error {
if err = s.initRootCoordClient(); err != nil {
return err
}
log.Info("init rootcoord client done")

s.broker = NewCoordinatorBroker(s.rootCoordClient)

storageCli, err := s.newChunkManagerFactory()
if err != nil {
return err
}
log.Info("init chunk manager factory done")

if err = s.initMeta(storageCli); err != nil {
return err
Expand All @@ -347,6 +349,7 @@ func (s *Server) initDataCoord() error {
if err = s.initCluster(); err != nil {
return err
}
log.Info("init datanode cluster done")

s.allocator = newRootCoordAllocator(s.rootCoordClient)

Expand All @@ -355,21 +358,25 @@ func (s *Server) initDataCoord() error {
if err = s.initServiceDiscovery(); err != nil {
return err
}
log.Info("init service discovery done")

if Params.DataCoordCfg.EnableCompaction.GetAsBool() {
s.createCompactionHandler()
s.createCompactionTrigger()
log.Info("init compaction scheduler done")
}

if err = s.initSegmentManager(); err != nil {
return err
}
log.Info("init segment manager done")

s.initGarbageCollection(storageCli)
s.initIndexBuilder(storageCli)

s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx)

log.Info("init datacoord done", zap.Int64("nodeID", paramtable.GetNodeID()), zap.String("Address", s.address))
return nil
}

Expand Down
2 changes: 2 additions & 0 deletions internal/datanode/data_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,8 @@ func (node *DataNode) Init() error {
node.syncMgr = syncMgr

node.writeBufferManager = writebuffer.NewManager(syncMgr)

log.Info("init datanode done", zap.Int64("nodeID", paramtable.GetNodeID()), zap.String("Address", node.address))
})
return initError
}
Expand Down
21 changes: 12 additions & 9 deletions internal/datanode/metacache/meta_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

type MetaCache interface {
Expand Down Expand Up @@ -126,18 +127,20 @@ func (c *metaCacheImpl) CompactSegments(newSegmentID, partitionID int64, numOfRo
bfs: bfs,
}
}
log.Info("add compactTo segment info metacache", zap.Int64("segmentID", compactTo))
}

for _, segID := range oldSegmentIDs {
if segmentInfo, ok := c.segmentInfos[segID]; ok {
updated := segmentInfo.Clone()
oldSet := typeutil.NewSet(oldSegmentIDs...)
for _, segment := range c.segmentInfos {
if oldSet.Contain(segment.segmentID) ||
oldSet.Contain(segment.compactTo) {
updated := segment.Clone()
updated.compactTo = compactTo
c.segmentInfos[segID] = updated
} else {
log.Warn("some dropped segment not exist in meta cache",
zap.String("channel", c.vChannelName),
zap.Int64("collectionID", c.collectionID),
zap.Int64("segmentID", segID))
c.segmentInfos[segment.segmentID] = updated
log.Info("update segment compactTo",
zap.Int64("segmentID", segment.segmentID),
zap.Int64("originalCompactTo", segment.compactTo),
zap.Int64("compactTo", compactTo))
}
}
}
Expand Down
16 changes: 10 additions & 6 deletions internal/datanode/syncmgr/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type SyncTask struct {
insertData *storage.InsertData
deleteData *storage.DeleteData

segment *metacache.SegmentInfo
collectionID int64
partitionID int64
segmentID int64
Expand Down Expand Up @@ -81,24 +82,25 @@ func (t *SyncTask) handleError(err error) {
func (t *SyncTask) Run() error {
log := t.getLogger()
var err error
var has bool

segment, has := t.metacache.GetSegmentByID(t.segmentID)
t.segment, has = t.metacache.GetSegmentByID(t.segmentID)
if !has {
log.Warn("failed to sync data, segment not found in metacache")
t.handleError(err)
return merr.WrapErrSegmentNotFound(t.segmentID)
}

if segment.CompactTo() == metacache.NullSegment {
if t.segment.CompactTo() == metacache.NullSegment {
log.Info("segment compacted to zero-length segment, discard sync task")
return nil
}

if segment.CompactTo() > 0 {
log.Info("syncing segment compacted, update segment id", zap.Int64("compactTo", segment.CompactTo()))
if t.segment.CompactTo() > 0 {
log.Info("syncing segment compacted, update segment id", zap.Int64("compactTo", t.segment.CompactTo()))
// update sync task segment id
// it's ok to use compactTo segmentID here, since there shall be no insert for compacted segment
t.segmentID = segment.CompactTo()
t.segmentID = t.segment.CompactTo()
}

err = t.serializeInsertData()
Expand Down Expand Up @@ -322,7 +324,9 @@ func (t *SyncTask) serializePkStatsLog() error {
}
}

if t.isFlush {
// skip statslog for empty segment
// DO NOT use level check here since Level zero segment may contain insert data in the future
if t.isFlush && t.segment.NumOfRows() > 0 {
return t.serializeMergedPkStats(fieldID, pkField.GetDataType())
}
return nil
Expand Down
25 changes: 25 additions & 0 deletions internal/datanode/syncmgr/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,31 @@ func (s *SyncTaskSuite) TestRunNormal() {
})
}

func (s *SyncTaskSuite) TestRunL0Segment() {
s.broker.EXPECT().SaveBinlogPaths(mock.Anything, mock.Anything).Return(nil)
bfs := metacache.NewBloomFilterSet()
seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{Level: datapb.SegmentLevel_L0}, bfs)
s.metacache.EXPECT().GetSegmentByID(s.segmentID).Return(seg, true)
s.metacache.EXPECT().GetSegmentsBy(mock.Anything).Return([]*metacache.SegmentInfo{seg})
s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return()

s.Run("pure_delete_l0_flush", func() {
task := s.getSuiteSyncTask()
task.WithDeleteData(s.getDeleteBuffer())
task.WithTimeRange(50, 100)
task.WithMetaWriter(BrokerMetaWriter(s.broker))
task.WithCheckpoint(&msgpb.MsgPosition{
ChannelName: s.channelName,
MsgID: []byte{1, 2, 3, 4},
Timestamp: 100,
})
task.WithFlush()

err := task.Run()
s.NoError(err)
})
}

func (s *SyncTaskSuite) TestCompactToNull() {
bfs := metacache.NewBloomFilterSet()
fd, err := storage.NewFieldData(schemapb.DataType_Int64, &schemapb.FieldSchema{
Expand Down
4 changes: 2 additions & 2 deletions internal/datanode/writebuffer/bf_write_buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func (s *BFWriteBufferSuite) TestAutoSync() {
s.Run("normal_auto_sync", func() {
wb, err := NewBFWriteBuffer(s.channelName, s.metacache, nil, s.syncMgr, &writeBufferOption{
syncPolicies: []SyncPolicy{
SyncFullBuffer,
GetFullBufferPolicy(),
GetSyncStaleBufferPolicy(paramtable.Get().DataNodeCfg.SyncPeriod.GetAsDuration(time.Second)),
GetFlushingSegmentsPolicy(s.metacache),
},
Expand Down Expand Up @@ -248,7 +248,7 @@ func (s *BFWriteBufferSuite) TestAutoSyncWithStorageV2() {
s.Run("normal_auto_sync", func() {
wb, err := NewBFWriteBuffer(s.channelName, s.metacache, s.storageV2Cache, s.syncMgr, &writeBufferOption{
syncPolicies: []SyncPolicy{
SyncFullBuffer,
GetFullBufferPolicy(),
GetSyncStaleBufferPolicy(paramtable.Get().DataNodeCfg.SyncPeriod.GetAsDuration(time.Second)),
GetFlushingSegmentsPolicy(s.metacache),
},
Expand Down
15 changes: 14 additions & 1 deletion internal/datanode/writebuffer/l0_write_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,25 @@ func (wb *l0WriteBuffer) BufferData(insertMsgs []*msgstream.InsertMsg, deleteMsg
defer wb.mut.Unlock()

// process insert msgs
_, err := wb.bufferInsert(insertMsgs, startPos, endPos)
pkData, err := wb.bufferInsert(insertMsgs, startPos, endPos)
if err != nil {
log.Warn("failed to buffer insert data", zap.Error(err))
return err
}

// update pk oracle
for segmentID, dataList := range pkData {
segments := wb.metaCache.GetSegmentsBy(metacache.WithSegmentIDs(segmentID))
for _, segment := range segments {
for _, fieldData := range dataList {
err := segment.GetBloomFilterSet().UpdatePKRange(fieldData)
if err != nil {
return err
}
}
}
}

for _, msg := range deleteMsgs {
l0SegmentID := wb.getL0SegmentID(msg.GetPartitionID(), startPos)
pks := storage.ParseIDs2PrimaryKeys(msg.GetPrimaryKeys())
Expand Down
3 changes: 3 additions & 0 deletions internal/datanode/writebuffer/l0_write_buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/milvus-io/milvus/internal/allocator"
"github.com/milvus-io/milvus/internal/datanode/metacache"
"github.com/milvus-io/milvus/internal/datanode/syncmgr"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
Expand Down Expand Up @@ -153,6 +154,8 @@ func (s *L0WriteBufferSuite) TestBufferData() {
pks, msg := s.composeInsertMsg(1000, 10, 128)
delMsg := s.composeDeleteMsg(lo.Map(pks, func(id int64, _ int) storage.PrimaryKey { return storage.NewInt64PrimaryKey(id) }))

seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 1000}, metacache.NewBloomFilterSet())
s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg})
s.metacache.EXPECT().GetSegmentByID(int64(1000)).Return(nil, false)
s.metacache.EXPECT().AddSegment(mock.Anything, mock.Anything, mock.Anything).Return()
s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return()
Expand Down
6 changes: 4 additions & 2 deletions internal/datanode/writebuffer/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,15 @@ type writeBufferOption struct {
metaWriter syncmgr.MetaWriter
}

func defaultWBOption() *writeBufferOption {
func defaultWBOption(metacache metacache.MetaCache) *writeBufferOption {
return &writeBufferOption{
// TODO use l0 delta as default after implementation.
deletePolicy: paramtable.Get().DataNodeCfg.DeltaPolicy.GetValue(),
syncPolicies: []SyncPolicy{
SyncFullBuffer,
GetFullBufferPolicy(),
GetSyncStaleBufferPolicy(paramtable.Get().DataNodeCfg.SyncPeriod.GetAsDuration(time.Second)),
GetCompactedSegmentsPolicy(metacache),
GetFlushingSegmentsPolicy(metacache),
},
}
}
Expand Down
55 changes: 44 additions & 11 deletions internal/datanode/writebuffer/sync_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,34 +13,67 @@ import (
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

type SyncPolicy func(buffers []*segmentBuffer, ts typeutil.Timestamp) []int64
type SyncPolicy interface {
SelectSegments(buffers []*segmentBuffer, ts typeutil.Timestamp) []int64
Reason() string
}

type SelectSegmentFunc func(buffer []*segmentBuffer, ts typeutil.Timestamp) []int64

type SelectSegmentFnPolicy struct {
fn SelectSegmentFunc
reason string
}

func (f SelectSegmentFnPolicy) SelectSegments(buffers []*segmentBuffer, ts typeutil.Timestamp) []int64 {
return f.fn(buffers, ts)
}

func (f SelectSegmentFnPolicy) Reason() string { return f.reason }

func wrapSelectSegmentFuncPolicy(fn SelectSegmentFunc, reason string) SelectSegmentFnPolicy {
return SelectSegmentFnPolicy{
fn: fn,
reason: reason,
}
}

func GetFullBufferPolicy() SyncPolicy {
return wrapSelectSegmentFuncPolicy(
func(buffers []*segmentBuffer, _ typeutil.Timestamp) []int64 {
return lo.FilterMap(buffers, func(buf *segmentBuffer, _ int) (int64, bool) {
return buf.segmentID, buf.IsFull()
})
}, "buffer full")
}

func SyncFullBuffer(buffers []*segmentBuffer, _ typeutil.Timestamp) []int64 {
return lo.FilterMap(buffers, func(buf *segmentBuffer, _ int) (int64, bool) {
return buf.segmentID, buf.IsFull()
})
func GetCompactedSegmentsPolicy(meta metacache.MetaCache) SyncPolicy {
return wrapSelectSegmentFuncPolicy(func(buffers []*segmentBuffer, _ typeutil.Timestamp) []int64 {
segmentIDs := lo.Map(buffers, func(buffer *segmentBuffer, _ int) int64 { return buffer.segmentID })
return meta.GetSegmentIDsBy(metacache.WithSegmentIDs(segmentIDs...), metacache.WithCompacted())
}, "segment compacted")
}

func GetSyncStaleBufferPolicy(staleDuration time.Duration) SyncPolicy {
return func(buffers []*segmentBuffer, ts typeutil.Timestamp) []int64 {
return wrapSelectSegmentFuncPolicy(func(buffers []*segmentBuffer, ts typeutil.Timestamp) []int64 {
current := tsoutil.PhysicalTime(ts)
return lo.FilterMap(buffers, func(buf *segmentBuffer, _ int) (int64, bool) {
minTs := buf.MinTimestamp()
start := tsoutil.PhysicalTime(minTs)

return buf.segmentID, current.Sub(start) > staleDuration
})
}
}, "buffer stale")
}

func GetFlushingSegmentsPolicy(meta metacache.MetaCache) SyncPolicy {
return func(_ []*segmentBuffer, _ typeutil.Timestamp) []int64 {
return wrapSelectSegmentFuncPolicy(func(_ []*segmentBuffer, _ typeutil.Timestamp) []int64 {
return meta.GetSegmentIDsBy(metacache.WithSegmentState(commonpb.SegmentState_Flushing))
}
}, "segment flushing")
}

func GetFlushTsPolicy(flushTimestamp *atomic.Uint64, meta metacache.MetaCache) SyncPolicy {
return func(buffers []*segmentBuffer, ts typeutil.Timestamp) []int64 {
return wrapSelectSegmentFuncPolicy(func(buffers []*segmentBuffer, ts typeutil.Timestamp) []int64 {
flushTs := flushTimestamp.Load()
if flushTs != nonFlushTS && ts >= flushTs {
// flush segment start pos < flushTs && checkpoint > flushTs
Expand All @@ -61,5 +94,5 @@ func GetFlushTsPolicy(flushTimestamp *atomic.Uint64, meta metacache.MetaCache) S
return ids
}
return nil
}
}, "flush ts")
}
Loading

0 comments on commit f047028

Please sign in to comment.