Skip to content

Commit

Permalink
Record engine version for segment index
Browse files Browse the repository at this point in the history
Signed-off-by: longjiquan <[email protected]>
  • Loading branch information
longjiquan committed Sep 27, 2023
1 parent ec17e08 commit 94325ee
Show file tree
Hide file tree
Showing 16 changed files with 554 additions and 565 deletions.
38 changes: 22 additions & 16 deletions internal/datacoord/index_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,24 +77,31 @@ type indexBuilder struct {

meta *meta

policy buildIndexPolicy
nodeManager *IndexNodeManager
chunkManager storage.ChunkManager
policy buildIndexPolicy
nodeManager *IndexNodeManager
chunkManager storage.ChunkManager
indexEngineVersionManager *IndexEngineVersionManager
}

func newIndexBuilder(ctx context.Context, metaTable *meta, nodeManager *IndexNodeManager, chunkManager storage.ChunkManager) *indexBuilder {
func newIndexBuilder(
ctx context.Context,
metaTable *meta, nodeManager *IndexNodeManager,
chunkManager storage.ChunkManager,
indexEngineVersionManager *IndexEngineVersionManager,
) *indexBuilder {
ctx, cancel := context.WithCancel(ctx)

ib := &indexBuilder{
ctx: ctx,
cancel: cancel,
meta: metaTable,
tasks: make(map[int64]indexTaskState),
notifyChan: make(chan struct{}, 1),
scheduleDuration: Params.DataCoordCfg.IndexTaskSchedulerInterval.GetAsDuration(time.Millisecond),
policy: defaultBuildIndexPolicy,
nodeManager: nodeManager,
chunkManager: chunkManager,
ctx: ctx,
cancel: cancel,
meta: metaTable,
tasks: make(map[int64]indexTaskState),
notifyChan: make(chan struct{}, 1),
scheduleDuration: Params.DataCoordCfg.IndexTaskSchedulerInterval.GetAsDuration(time.Millisecond),
policy: defaultBuildIndexPolicy,
nodeManager: nodeManager,
chunkManager: chunkManager,
indexEngineVersionManager: indexEngineVersionManager,
}
ib.reloadFromKV()
return ib
Expand Down Expand Up @@ -228,7 +235,7 @@ func (ib *indexBuilder) process(buildID UniqueID) bool {
}
indexParams := ib.meta.GetIndexParams(meta.CollectionID, meta.IndexID)
if isFlatIndex(getIndexType(indexParams)) || meta.NumRows < Params.DataCoordCfg.MinSegmentNumRowsToEnableIndex.GetAsInt64() {
log.Ctx(ib.ctx).Debug("segment does not need index really", zap.Int64("buildID", buildID),
log.Ctx(ib.ctx).Info("segment does not need index really", zap.Int64("buildID", buildID),
zap.Int64("segmentID", meta.SegmentID), zap.Int64("num rows", meta.NumRows))
if err := ib.meta.FinishTask(&indexpb.IndexTaskInfo{
BuildID: buildID,
Expand Down Expand Up @@ -301,8 +308,7 @@ func (ib *indexBuilder) process(buildID UniqueID) bool {
IndexParams: indexParams,
TypeParams: typeParams,
NumRows: meta.NumRows,
CurrentIndexVersion: meta.CurrentIndexVersion,
MinimalIndexVersion: meta.MinimalIndexVersion,
CurrentIndexVersion: ib.indexEngineVersionManager.GetCurrentIndexEngineVersion(),
}
if err := ib.assignTask(client, req); err != nil {
// need to release lock then reassign, so set task state to retry
Expand Down
7 changes: 4 additions & 3 deletions internal/datacoord/index_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -666,7 +666,7 @@ func TestIndexBuilder(t *testing.T) {
chunkManager := &mocks.ChunkManager{}
chunkManager.EXPECT().RootPath().Return("root")

ib := newIndexBuilder(ctx, mt, nodeManager, chunkManager)
ib := newIndexBuilder(ctx, mt, nodeManager, chunkManager, newIndexEngineVersionManager())

assert.Equal(t, 6, len(ib.tasks))
assert.Equal(t, indexTaskInit, ib.tasks[buildID])
Expand Down Expand Up @@ -737,8 +737,9 @@ func TestIndexBuilder_Error(t *testing.T) {
tasks: map[int64]indexTaskState{
buildID: indexTaskInit,
},
meta: createMetaTable(ec),
chunkManager: chunkManager,
meta: createMetaTable(ec),
chunkManager: chunkManager,
indexEngineVersionManager: newIndexEngineVersionManager(),
}

t.Run("meta not exist", func(t *testing.T) {
Expand Down
5 changes: 4 additions & 1 deletion internal/datacoord/index_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,7 @@ func (m *meta) FinishTask(taskInfo *indexpb.IndexTaskInfo) error {
segIdx.IndexFileKeys = common.CloneStringList(taskInfo.GetIndexFileKeys())
segIdx.FailReason = taskInfo.GetFailReason()
segIdx.IndexSize = taskInfo.GetSerializedSize()
segIdx.CurrentIndexVersion = taskInfo.GetCurrentIndexVersion()
return m.alterSegmentIndexes([]*model.SegmentIndex{segIdx})
}

Expand All @@ -539,7 +540,9 @@ func (m *meta) FinishTask(taskInfo *indexpb.IndexTaskInfo) error {
}

log.Info("finish index task success", zap.Int64("buildID", taskInfo.GetBuildID()),
zap.String("state", taskInfo.GetState().String()), zap.String("fail reason", taskInfo.GetFailReason()))
zap.String("state", taskInfo.GetState().String()), zap.String("fail reason", taskInfo.GetFailReason()),
zap.Int32("current_index_version", taskInfo.GetCurrentIndexVersion()),
)
m.updateIndexTasksMetrics()
metrics.FlushedSegmentFileNum.WithLabelValues(metrics.IndexFileLabel).Observe(float64(len(taskInfo.GetIndexFileKeys())))
return nil
Expand Down
24 changes: 9 additions & 15 deletions internal/datacoord/index_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,25 +51,20 @@ func (s *Server) startIndexService(ctx context.Context) {
}

func (s *Server) createIndexForSegment(segment *SegmentInfo, indexID UniqueID) error {
indexEngineVersion := s.IndexEngineVersionManager.GetCurrentIndexEngineVersion()
log.Info("create index for segment", zap.Int64("segmentID", segment.ID), zap.Int64("indexID", indexID),
zap.Int32("index_engine_version", indexEngineVersion),
)
log.Info("create index for segment", zap.Int64("segmentID", segment.ID), zap.Int64("indexID", indexID))
buildID, err := s.allocator.allocID(context.Background())
if err != nil {
return err
}
segIndex := &model.SegmentIndex{
SegmentID: segment.ID,
CollectionID: segment.CollectionID,
PartitionID: segment.PartitionID,
NumRows: segment.NumOfRows,
IndexID: indexID,
BuildID: buildID,
CreateTime: uint64(segment.ID),
WriteHandoff: false,
CurrentIndexVersion: s.IndexEngineVersionManager.GetCurrentIndexEngineVersion(),
MinimalIndexVersion: s.IndexEngineVersionManager.GetMinimalIndexEngineVersion(),
SegmentID: segment.ID,
CollectionID: segment.CollectionID,
PartitionID: segment.PartitionID,
NumRows: segment.NumOfRows,
IndexID: indexID,
BuildID: buildID,
CreateTime: uint64(segment.ID),
WriteHandoff: false,
}
if err = s.meta.AddSegmentIndex(segIndex); err != nil {
return err
Expand Down Expand Up @@ -692,7 +687,6 @@ func (s *Server) GetIndexInfos(ctx context.Context, req *indexpb.GetIndexInfoReq
IndexVersion: segIdx.IndexVersion,
NumRows: segIdx.NumRows,
CurrentIndexVersion: segIdx.CurrentIndexVersion,
MinimalIndexVersion: segIdx.MinimalIndexVersion,
})
}
}
Expand Down
14 changes: 7 additions & 7 deletions internal/datacoord/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ type Server struct {
// segReferManager *SegmentReferenceManager
indexBuilder *indexBuilder
indexNodeManager *IndexNodeManager
IndexEngineVersionManager *IndexEngineVersionManager
indexEngineVersionManager *IndexEngineVersionManager

// manage ways that data coord access other coord
broker Broker
Expand Down Expand Up @@ -525,13 +525,13 @@ func (s *Server) initServiceDiscovery() error {
}
s.inEventCh = s.session.WatchServices(typeutil.IndexNodeRole, inRevision+1, nil)

s.IndexEngineVersionManager = newIndexEngineVersionManager()
s.indexEngineVersionManager = newIndexEngineVersionManager()
qnSessions, qnRevision, err := s.session.GetSessions(typeutil.QueryNodeRole)
if err != nil {
log.Warn("DataCoord get QueryNode sessions failed", zap.Error(err))
return err
}
s.IndexEngineVersionManager.Startup(qnSessions)
s.indexEngineVersionManager.Startup(qnSessions)
s.qnEventCh = s.session.WatchServicesWithVersionRange(typeutil.QueryNodeRole, r, qnRevision+1, nil)

return nil
Expand Down Expand Up @@ -578,7 +578,7 @@ func (s *Server) initMeta(chunkManager storage.ChunkManager) error {

func (s *Server) initIndexBuilder(manager storage.ChunkManager) {
if s.indexBuilder == nil {
s.indexBuilder = newIndexBuilder(s.ctx, s.meta, s.indexNodeManager, manager)
s.indexBuilder = newIndexBuilder(s.ctx, s.meta, s.indexNodeManager, manager, s.indexEngineVersionManager)
}
}

Expand Down Expand Up @@ -911,16 +911,16 @@ func (s *Server) handleSessionEvent(ctx context.Context, role string, event *ses
log.Info("received querynode register",
zap.String("address", event.Session.Address),
zap.Int64("serverID", event.Session.ServerID))
s.IndexEngineVersionManager.AddNode(event.Session)
s.indexEngineVersionManager.AddNode(event.Session)
case sessionutil.SessionDelEvent:
log.Info("received querynode unregister",
zap.String("address", event.Session.Address),
zap.Int64("serverID", event.Session.ServerID))
s.IndexEngineVersionManager.RemoveNode(event.Session)
s.indexEngineVersionManager.RemoveNode(event.Session)
case sessionutil.SessionUpdateEvent:
serverID := event.Session.ServerID
log.Info("received querynode SessionUpdateEvent", zap.Int64("serverID", serverID))
s.IndexEngineVersionManager.Update(event.Session)
s.indexEngineVersionManager.Update(event.Session)
default:
log.Warn("receive unknown service event type",
zap.Any("type", event.EventType))
Expand Down
8 changes: 8 additions & 0 deletions internal/indexnode/indexnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,14 @@ var _ types.IndexNodeComponent = (*IndexNode)(nil)
// Params is a GlobalParamTable singleton of indexnode
var Params *paramtable.ComponentParam = paramtable.Get()

func getCurrentIndexVersion(v int32) int32 {
cCurrent := int32(C.GetCurrentIndexVersion())
if cCurrent < v {
return cCurrent
}
return v
}

type taskKey struct {
ClusterID string
BuildID UniqueID
Expand Down
11 changes: 6 additions & 5 deletions internal/indexnode/indexnode_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ func (i *IndexNode) CreateJob(ctx context.Context, req *indexpb.CreateJobRequest
zap.Any("indexParams", req.GetIndexParams()),
zap.Int64("numRows", req.GetNumRows()),
zap.Int32("current_index_version", req.GetCurrentIndexVersion()),
zap.Int32("minimal_index_version", req.GetMinimalIndexVersion()),
)
ctx, sp := otel.Tracer(typeutil.IndexNodeRole).Start(ctx, "IndexNode-CreateIndex", trace.WithAttributes(
attribute.Int64("indexBuildID", req.GetBuildID()),
Expand Down Expand Up @@ -142,10 +141,11 @@ func (i *IndexNode) QueryJobs(ctx context.Context, req *indexpb.QueryJobsRequest
i.foreachTaskInfo(func(ClusterID string, buildID UniqueID, info *taskInfo) {
if ClusterID == req.GetClusterID() {
infos[buildID] = &taskInfo{
state: info.state,
fileKeys: common.CloneStringList(info.fileKeys),
serializedSize: info.serializedSize,
failReason: info.failReason,
state: info.state,
fileKeys: common.CloneStringList(info.fileKeys),
serializedSize: info.serializedSize,
failReason: info.failReason,
currentIndexVersion: info.currentIndexVersion,
}
}
})
Expand All @@ -166,6 +166,7 @@ func (i *IndexNode) QueryJobs(ctx context.Context, req *indexpb.QueryJobsRequest
ret.IndexInfos[i].IndexFileKeys = info.fileKeys
ret.IndexInfos[i].SerializedSize = info.serializedSize
ret.IndexInfos[i].FailReason = info.failReason
ret.IndexInfos[i].CurrentIndexVersion = info.currentIndexVersion
log.RatedDebug(5, "querying index build task",
zap.Int64("indexBuildID", buildID),
zap.String("state", info.state.String()),
Expand Down
59 changes: 31 additions & 28 deletions internal/indexnode/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,12 @@ var (
type Blob = storage.Blob

type taskInfo struct {
cancel context.CancelFunc
state commonpb.IndexState
fileKeys []string
serializedSize uint64
failReason string
cancel context.CancelFunc
state commonpb.IndexState
fileKeys []string
serializedSize uint64
failReason string
currentIndexVersion int32

// task statistics
statistic *indexpb.JobInfo
Expand All @@ -81,27 +82,28 @@ type indexBuildTask struct {
cancel context.CancelFunc
ctx context.Context

cm storage.ChunkManager
index indexcgowrapper.CodecIndex
savePaths []string
req *indexpb.CreateJobRequest
BuildID UniqueID
nodeID UniqueID
ClusterID string
collectionID UniqueID
partitionID UniqueID
segmentID UniqueID
fieldID UniqueID
fieldType schemapb.DataType
fieldData storage.FieldData
indexBlobs []*storage.Blob
newTypeParams map[string]string
newIndexParams map[string]string
serializedSize uint64
tr *timerecord.TimeRecorder
queueDur time.Duration
statistic indexpb.JobInfo
node *IndexNode
cm storage.ChunkManager
index indexcgowrapper.CodecIndex
savePaths []string
req *indexpb.CreateJobRequest
currentIndexVersion int32
BuildID UniqueID
nodeID UniqueID
ClusterID string
collectionID UniqueID
partitionID UniqueID
segmentID UniqueID
fieldID UniqueID
fieldType schemapb.DataType
fieldData storage.FieldData
indexBlobs []*storage.Blob
newTypeParams map[string]string
newIndexParams map[string]string
serializedSize uint64
tr *timerecord.TimeRecorder
queueDur time.Duration
statistic indexpb.JobInfo
node *IndexNode
}

func (it *indexBuildTask) Reset() {
Expand Down Expand Up @@ -337,7 +339,8 @@ func (it *indexBuildTask) BuildIndex(ctx context.Context) error {
}
}

if err := buildIndexInfo.AppendIndexEngineVersion(it.req.GetCurrentIndexVersion()); err != nil {
it.currentIndexVersion = getCurrentIndexVersion(it.req.GetCurrentIndexVersion())
if err := buildIndexInfo.AppendIndexEngineVersion(it.currentIndexVersion); err != nil {
log.Ctx(ctx).Warn("append index engine version failed", zap.Error(err))
return err
}
Expand Down Expand Up @@ -389,7 +392,7 @@ func (it *indexBuildTask) SaveIndexFiles(ctx context.Context) error {
}

it.statistic.EndTime = time.Now().UnixMicro()
it.node.storeIndexFilesAndStatistic(it.ClusterID, it.BuildID, saveFileKeys, it.serializedSize, &it.statistic)
it.node.storeIndexFilesAndStatistic(it.ClusterID, it.BuildID, saveFileKeys, it.serializedSize, &it.statistic, it.currentIndexVersion)
log.Ctx(ctx).Debug("save index files done", zap.Strings("IndexFiles", saveFileKeys))
saveIndexFileDur := it.tr.RecordSpan()
metrics.IndexNodeSaveIndexFileLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Observe(saveIndexFileDur.Seconds())
Expand Down
10 changes: 9 additions & 1 deletion internal/indexnode/taskinfo_ops.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,22 @@ func (i *IndexNode) foreachTaskInfo(fn func(ClusterID string, buildID UniqueID,
}
}

func (i *IndexNode) storeIndexFilesAndStatistic(ClusterID string, buildID UniqueID, fileKeys []string, serializedSize uint64, statistic *indexpb.JobInfo) {
func (i *IndexNode) storeIndexFilesAndStatistic(
ClusterID string,
buildID UniqueID,
fileKeys []string,
serializedSize uint64,
statistic *indexpb.JobInfo,
currentIndexVersion int32,
) {
key := taskKey{ClusterID: ClusterID, BuildID: buildID}
i.stateLock.Lock()
defer i.stateLock.Unlock()
if info, ok := i.tasks[key]; ok {
info.fileKeys = common.CloneStringList(fileKeys)
info.serializedSize = serializedSize
info.statistic = proto.Clone(statistic).(*indexpb.JobInfo)
info.currentIndexVersion = currentIndexVersion
return
}
}
Expand Down
4 changes: 0 additions & 4 deletions internal/metastore/model/segment_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ type SegmentIndex struct {
// deprecated
WriteHandoff bool
CurrentIndexVersion int32
MinimalIndexVersion int32
}

func UnmarshalSegmentIndexModel(segIndex *indexpb.SegmentIndex) *SegmentIndex {
Expand All @@ -49,7 +48,6 @@ func UnmarshalSegmentIndexModel(segIndex *indexpb.SegmentIndex) *SegmentIndex {
IndexSize: segIndex.SerializeSize,
WriteHandoff: segIndex.WriteHandoff,
CurrentIndexVersion: segIndex.GetCurrentIndexVersion(),
MinimalIndexVersion: segIndex.GetMinimalIndexVersion(),
}
}

Expand All @@ -75,7 +73,6 @@ func MarshalSegmentIndexModel(segIdx *SegmentIndex) *indexpb.SegmentIndex {
SerializeSize: segIdx.IndexSize,
WriteHandoff: segIdx.WriteHandoff,
CurrentIndexVersion: segIdx.CurrentIndexVersion,
MinimalIndexVersion: segIdx.MinimalIndexVersion,
}
}

Expand All @@ -97,6 +94,5 @@ func CloneSegmentIndex(segIndex *SegmentIndex) *SegmentIndex {
IndexSize: segIndex.IndexSize,
WriteHandoff: segIndex.WriteHandoff,
CurrentIndexVersion: segIndex.CurrentIndexVersion,
MinimalIndexVersion: segIndex.MinimalIndexVersion,
}
}
Loading

0 comments on commit 94325ee

Please sign in to comment.