Skip to content

Commit

Permalink
enhance: extract the task step execution process
Browse files Browse the repository at this point in the history
Signed-off-by: SimFG <[email protected]>
  • Loading branch information
SimFG committed Dec 18, 2024
1 parent de88589 commit f121f1a
Show file tree
Hide file tree
Showing 10 changed files with 735 additions and 523 deletions.
133 changes: 94 additions & 39 deletions internal/rootcoord/alter_collection_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,39 +66,63 @@ func (a *alterCollectionTask) Execute(ctx context.Context) error {
return err
}

newColl := oldColl.Clone()
var newProperties []*commonpb.KeyValuePair
if len(a.Req.Properties) > 0 {
if ContainsKeyPairArray(a.Req.GetProperties(), oldColl.Properties) {
log.Info("skip to alter collection due to no changes were detected in the properties", zap.Int64("collectionID", oldColl.CollectionID))
return nil
}
newColl.Properties = MergeProperties(oldColl.Properties, a.Req.GetProperties())
newProperties = MergeProperties(oldColl.Properties, a.Req.GetProperties())
} else if len(a.Req.DeleteKeys) > 0 {
newColl.Properties = DeleteProperties(oldColl.Properties, a.Req.GetDeleteKeys())
newProperties = DeleteProperties(oldColl.Properties, a.Req.GetDeleteKeys())

Check warning on line 77 in internal/rootcoord/alter_collection_task.go

View check run for this annotation

Codecov / codecov/patch

internal/rootcoord/alter_collection_task.go#L77

Added line #L77 was not covered by tests
}

ts := a.GetTs()
redoTask := newBaseRedoTask(a.core.stepExecutor)
return executeAlterCollectionTaskSteps(ctx, a.core, oldColl, oldColl.Properties, newProperties, a.Req, ts)
}

func (a *alterCollectionTask) GetLockerKey() LockerKey {
collection := a.core.getCollectionIDStr(a.ctx, a.Req.GetDbName(), a.Req.GetCollectionName(), a.Req.GetCollectionID())
return NewLockerKeyChain(
NewClusterLockerKey(false),
NewDatabaseLockerKey(a.Req.GetDbName(), false),
NewCollectionLockerKey(collection, true),
)
}

func executeAlterCollectionTaskSteps(ctx context.Context,
core *Core,
col *model.Collection,
oldProperties []*commonpb.KeyValuePair,
newProperties []*commonpb.KeyValuePair,
request *milvuspb.AlterCollectionRequest,
ts Timestamp,
) error {
oldColl := col.Clone()
oldColl.Properties = oldProperties
newColl := col.Clone()
newColl.Properties = newProperties
redoTask := newBaseRedoTask(core.stepExecutor)
redoTask.AddSyncStep(&AlterCollectionStep{
baseStep: baseStep{core: a.core},
baseStep: baseStep{core: core},
oldColl: oldColl,
newColl: newColl,
ts: ts,
})

a.Req.CollectionID = oldColl.CollectionID
request.CollectionID = oldColl.CollectionID
redoTask.AddSyncStep(&BroadcastAlteredCollectionStep{
baseStep: baseStep{core: a.core},
req: a.Req,
core: a.core,
baseStep: baseStep{core: core},
req: request,
core: core,
})

// properties needs to be refreshed in the cache
aliases := a.core.meta.ListAliasesByID(ctx, oldColl.CollectionID)
aliases := core.meta.ListAliasesByID(ctx, oldColl.CollectionID)
redoTask.AddSyncStep(&expireCacheStep{
baseStep: baseStep{core: a.core},
dbName: a.Req.GetDbName(),
collectionNames: append(aliases, a.Req.GetCollectionName()),
baseStep: baseStep{core: core},
dbName: request.GetDbName(),
collectionNames: append(aliases, request.GetCollectionName()),
collectionID: oldColl.CollectionID,
opts: []proxyutil.ExpireCacheOpt{proxyutil.SetMsgType(commonpb.MsgType_AlterCollection)},
})
Expand All @@ -119,7 +143,7 @@ func (a *alterCollectionTask) Execute(ctx context.Context) error {
zap.Strings("newResourceGroups", newResourceGroups),
)
redoTask.AddAsyncStep(NewSimpleStep("", func(ctx context.Context) ([]nestedStep, error) {
resp, err := a.core.queryCoord.UpdateLoadConfig(ctx, &querypb.UpdateLoadConfigRequest{
resp, err := core.queryCoord.UpdateLoadConfig(ctx, &querypb.UpdateLoadConfigRequest{
CollectionIDs: []int64{oldColl.CollectionID},
ReplicaNumber: int32(newReplicaNumber),
ResourceGroups: newResourceGroups,
Expand Down Expand Up @@ -165,22 +189,13 @@ func (a *alterCollectionTask) Execute(ctx context.Context) error {
zap.String("database", newColl.DBName),
zap.String("replicateID", replicateID),
)
return nil, a.core.chanTimeTick.broadcastDmlChannels(newColl.PhysicalChannelNames, msgPack)
return nil, core.chanTimeTick.broadcastDmlChannels(newColl.PhysicalChannelNames, msgPack)
}))
}

return redoTask.Execute(ctx)
}

func (a *alterCollectionTask) GetLockerKey() LockerKey {
collection := a.core.getCollectionIDStr(a.ctx, a.Req.GetDbName(), a.Req.GetCollectionName(), a.Req.GetCollectionID())
return NewLockerKeyChain(
NewClusterLockerKey(false),
NewDatabaseLockerKey(a.Req.GetDbName(), false),
NewCollectionLockerKey(collection, true),
)
}

func DeleteProperties(oldProps []*commonpb.KeyValuePair, deleteKeys []string) []*commonpb.KeyValuePair {
propsMap := make(map[string]string)
for _, prop := range oldProps {
Expand Down Expand Up @@ -227,52 +242,92 @@ func (a *alterCollectionFieldTask) Execute(ctx context.Context) error {
return err
}

newColl := oldColl.Clone()
err = UpdateFieldProperties(newColl, a.Req.GetFieldName(), a.Req.GetProperties())
oldFieldProperties, err := GetFieldProperties(oldColl, a.Req.GetFieldName())

Check warning on line 245 in internal/rootcoord/alter_collection_task.go

View check run for this annotation

Codecov / codecov/patch

internal/rootcoord/alter_collection_task.go#L245

Added line #L245 was not covered by tests
if err != nil {
log.Warn("get field properties failed during changing collection state", zap.Error(err))

Check warning on line 247 in internal/rootcoord/alter_collection_task.go

View check run for this annotation

Codecov / codecov/patch

internal/rootcoord/alter_collection_task.go#L247

Added line #L247 was not covered by tests
return err
}
ts := a.GetTs()
redoTask := newBaseRedoTask(a.core.stepExecutor)
return executeAlterCollectionFieldTaskSteps(ctx, a.core, oldColl, oldFieldProperties, a.Req, ts)

Check warning on line 251 in internal/rootcoord/alter_collection_task.go

View check run for this annotation

Codecov / codecov/patch

internal/rootcoord/alter_collection_task.go#L251

Added line #L251 was not covered by tests
}

func (a *alterCollectionFieldTask) GetLockerKey() LockerKey {
collection := a.core.getCollectionIDStr(a.ctx, a.Req.GetDbName(), a.Req.GetCollectionName(), 0)
return NewLockerKeyChain(
NewClusterLockerKey(false),
NewDatabaseLockerKey(a.Req.GetDbName(), false),
NewCollectionLockerKey(collection, true),
)

Check warning on line 260 in internal/rootcoord/alter_collection_task.go

View check run for this annotation

Codecov / codecov/patch

internal/rootcoord/alter_collection_task.go#L254-L260

Added lines #L254 - L260 were not covered by tests
}

func executeAlterCollectionFieldTaskSteps(ctx context.Context,
core *Core,
col *model.Collection,
oldFieldProperties []*commonpb.KeyValuePair,
request *milvuspb.AlterCollectionFieldRequest,
ts Timestamp,
) error {
var err error
filedName := request.GetFieldName()
newFieldProperties := UpdateFieldPropertyParams(oldFieldProperties, request.GetProperties())
oldColl := col.Clone()
err = ResetFieldProperties(oldColl, filedName, oldFieldProperties)
if err != nil {
return err
}
newColl := col.Clone()
err = ResetFieldProperties(newColl, filedName, newFieldProperties)
if err != nil {
return err
}
redoTask := newBaseRedoTask(core.stepExecutor)

Check warning on line 283 in internal/rootcoord/alter_collection_task.go

View check run for this annotation

Codecov / codecov/patch

internal/rootcoord/alter_collection_task.go#L269-L283

Added lines #L269 - L283 were not covered by tests
redoTask.AddSyncStep(&AlterCollectionStep{
baseStep: baseStep{core: a.core},
baseStep: baseStep{core: core},

Check warning on line 285 in internal/rootcoord/alter_collection_task.go

View check run for this annotation

Codecov / codecov/patch

internal/rootcoord/alter_collection_task.go#L285

Added line #L285 was not covered by tests
oldColl: oldColl,
newColl: newColl,
ts: ts,
})

redoTask.AddSyncStep(&BroadcastAlteredCollectionStep{
baseStep: baseStep{core: a.core},
baseStep: baseStep{core: core},

Check warning on line 292 in internal/rootcoord/alter_collection_task.go

View check run for this annotation

Codecov / codecov/patch

internal/rootcoord/alter_collection_task.go#L292

Added line #L292 was not covered by tests
req: &milvuspb.AlterCollectionRequest{
Base: a.Req.Base,
DbName: a.Req.DbName,
CollectionName: a.Req.CollectionName,
Base: request.Base,
DbName: request.DbName,
CollectionName: request.CollectionName,

Check warning on line 296 in internal/rootcoord/alter_collection_task.go

View check run for this annotation

Codecov / codecov/patch

internal/rootcoord/alter_collection_task.go#L294-L296

Added lines #L294 - L296 were not covered by tests
CollectionID: oldColl.CollectionID,
},
core: a.core,
core: core,

Check warning on line 299 in internal/rootcoord/alter_collection_task.go

View check run for this annotation

Codecov / codecov/patch

internal/rootcoord/alter_collection_task.go#L299

Added line #L299 was not covered by tests
})
collectionNames := []string{}
redoTask.AddSyncStep(&expireCacheStep{
baseStep: baseStep{core: a.core},
dbName: a.Req.GetDbName(),
collectionNames: append(collectionNames, a.Req.GetCollectionName()),
baseStep: baseStep{core: core},
dbName: request.GetDbName(),
collectionNames: []string{request.GetCollectionName()},

Check warning on line 304 in internal/rootcoord/alter_collection_task.go

View check run for this annotation

Codecov / codecov/patch

internal/rootcoord/alter_collection_task.go#L302-L304

Added lines #L302 - L304 were not covered by tests
collectionID: oldColl.CollectionID,
opts: []proxyutil.ExpireCacheOpt{proxyutil.SetMsgType(commonpb.MsgType_AlterCollectionField)},
})

return redoTask.Execute(ctx)
}

func UpdateFieldProperties(coll *model.Collection, fieldName string, updatedProps []*commonpb.KeyValuePair) error {
func ResetFieldProperties(coll *model.Collection, fieldName string, newProps []*commonpb.KeyValuePair) error {

Check warning on line 312 in internal/rootcoord/alter_collection_task.go

View check run for this annotation

Codecov / codecov/patch

internal/rootcoord/alter_collection_task.go#L312

Added line #L312 was not covered by tests
for i, field := range coll.Fields {
if field.Name == fieldName {
coll.Fields[i].TypeParams = UpdateFieldPropertyParams(field.TypeParams, updatedProps)
coll.Fields[i].TypeParams = newProps

Check warning on line 315 in internal/rootcoord/alter_collection_task.go

View check run for this annotation

Codecov / codecov/patch

internal/rootcoord/alter_collection_task.go#L315

Added line #L315 was not covered by tests
return nil
}
}
return merr.WrapErrParameterInvalidMsg("field %s does not exist in collection", fieldName)
}

func GetFieldProperties(coll *model.Collection, fieldName string) ([]*commonpb.KeyValuePair, error) {
for _, field := range coll.Fields {
if field.Name == fieldName {
return field.TypeParams, nil
}

Check warning on line 326 in internal/rootcoord/alter_collection_task.go

View check run for this annotation

Codecov / codecov/patch

internal/rootcoord/alter_collection_task.go#L322-L326

Added lines #L322 - L326 were not covered by tests
}
return nil, merr.WrapErrParameterInvalidMsg("field %s does not exist in collection", fieldName)

Check warning on line 328 in internal/rootcoord/alter_collection_task.go

View check run for this annotation

Codecov / codecov/patch

internal/rootcoord/alter_collection_task.go#L328

Added line #L328 was not covered by tests
}

func UpdateFieldPropertyParams(oldProps, updatedProps []*commonpb.KeyValuePair) []*commonpb.KeyValuePair {
props := make(map[string]string)
for _, prop := range oldProps {
Expand Down
109 changes: 60 additions & 49 deletions internal/rootcoord/alter_database_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,30 +79,78 @@ func (a *alterDatabaseTask) Execute(ctx context.Context) error {
return err
}

newDB := oldDB.Clone()
var newProperties []*commonpb.KeyValuePair
if (len(a.Req.GetProperties())) > 0 {
if ContainsKeyPairArray(a.Req.GetProperties(), oldDB.Properties) {
log.Info("skip to alter database due to no changes were detected in the properties", zap.String("databaseName", a.Req.GetDbName()))
return nil
}
ret := MergeProperties(oldDB.Properties, a.Req.GetProperties())
newDB.Properties = ret
newProperties = MergeProperties(oldDB.Properties, a.Req.GetProperties())
} else if (len(a.Req.GetDeleteKeys())) > 0 {
ret := DeleteProperties(oldDB.Properties, a.Req.GetDeleteKeys())
newDB.Properties = ret
newProperties = DeleteProperties(oldDB.Properties, a.Req.GetDeleteKeys())

Check warning on line 90 in internal/rootcoord/alter_database_task.go

View check run for this annotation

Codecov / codecov/patch

internal/rootcoord/alter_database_task.go#L90

Added line #L90 was not covered by tests
}

ts := a.GetTs()
redoTask := newBaseRedoTask(a.core.stepExecutor)
return executeAlterDatabaseTaskSteps(ctx, a.core, oldDB, oldDB.Properties, newProperties, a.ts)
}

func (a *alterDatabaseTask) GetLockerKey() LockerKey {
return NewLockerKeyChain(
NewClusterLockerKey(false),
NewDatabaseLockerKey(a.Req.GetDbName(), true),
)
}

func MergeProperties(oldProps []*commonpb.KeyValuePair, updatedProps []*commonpb.KeyValuePair) []*commonpb.KeyValuePair {
_, existEndTS := common.GetReplicateEndTS(updatedProps)
if existEndTS {
updatedProps = append(updatedProps, &commonpb.KeyValuePair{
Key: common.ReplicateIDKey,
Value: "",
})
}

props := make(map[string]string)
for _, prop := range oldProps {
props[prop.Key] = prop.Value
}

for _, prop := range updatedProps {
props[prop.Key] = prop.Value
}

propKV := make([]*commonpb.KeyValuePair, 0)

for key, value := range props {
propKV = append(propKV, &commonpb.KeyValuePair{
Key: key,
Value: value,
})
}

return propKV
}

func executeAlterDatabaseTaskSteps(ctx context.Context,
core *Core,
dbInfo *model.Database,
oldProperties []*commonpb.KeyValuePair,
newProperties []*commonpb.KeyValuePair,
ts Timestamp,
) error {
oldDB := dbInfo.Clone()
oldDB.Properties = oldProperties
newDB := dbInfo.Clone()
newDB.Properties = newProperties
redoTask := newBaseRedoTask(core.stepExecutor)
redoTask.AddSyncStep(&AlterDatabaseStep{
baseStep: baseStep{core: a.core},
baseStep: baseStep{core: core},
oldDB: oldDB,
newDB: newDB,
ts: ts,
})

redoTask.AddSyncStep(&expireCacheStep{
baseStep: baseStep{core: a.core},
baseStep: baseStep{core: core},
dbName: newDB.Name,
ts: ts,
// make sure to send the "expire cache" request
Expand All @@ -129,7 +177,7 @@ func (a *alterDatabaseTask) Execute(ctx context.Context) error {
zap.Strings("newResourceGroups", newResourceGroups),
)
redoTask.AddAsyncStep(NewSimpleStep("", func(ctx context.Context) ([]nestedStep, error) {
colls, err := a.core.meta.ListCollections(ctx, oldDB.Name, a.ts, true)
colls, err := core.meta.ListCollections(ctx, oldDB.Name, ts, true)
if err != nil {
log.Ctx(ctx).Warn("failed to trigger update load config for database", zap.Int64("dbID", oldDB.ID), zap.Error(err))
return nil, err
Expand All @@ -138,7 +186,7 @@ func (a *alterDatabaseTask) Execute(ctx context.Context) error {
return nil, nil
}

resp, err := a.core.queryCoord.UpdateLoadConfig(ctx, &querypb.UpdateLoadConfigRequest{
resp, err := core.queryCoord.UpdateLoadConfig(ctx, &querypb.UpdateLoadConfigRequest{
CollectionIDs: lo.Map(colls, func(coll *model.Collection, _ int) int64 { return coll.CollectionID }),
ReplicaNumber: int32(newReplicaNumber),
ResourceGroups: newResourceGroups,
Expand Down Expand Up @@ -180,46 +228,9 @@ func (a *alterDatabaseTask) Execute(ctx context.Context) error {
}
msgPack.Msgs = append(msgPack.Msgs, msg)
log.Info("send replicate end msg for db", zap.String("db", newDB.Name), zap.String("replicateID", replicateID))
return nil, a.core.chanTimeTick.broadcastDmlChannels(a.core.chanTimeTick.listDmlChannels(), msgPack)
return nil, core.chanTimeTick.broadcastDmlChannels(core.chanTimeTick.listDmlChannels(), msgPack)
}))
}

return redoTask.Execute(ctx)
}

func (a *alterDatabaseTask) GetLockerKey() LockerKey {
return NewLockerKeyChain(
NewClusterLockerKey(false),
NewDatabaseLockerKey(a.Req.GetDbName(), true),
)
}

func MergeProperties(oldProps []*commonpb.KeyValuePair, updatedProps []*commonpb.KeyValuePair) []*commonpb.KeyValuePair {
_, existEndTS := common.GetReplicateEndTS(updatedProps)
if existEndTS {
updatedProps = append(updatedProps, &commonpb.KeyValuePair{
Key: common.ReplicateIDKey,
Value: "",
})
}

props := make(map[string]string)
for _, prop := range oldProps {
props[prop.Key] = prop.Value
}

for _, prop := range updatedProps {
props[prop.Key] = prop.Value
}

propKV := make([]*commonpb.KeyValuePair, 0)

for key, value := range props {
propKV = append(propKV, &commonpb.KeyValuePair{
Key: key,
Value: value,
})
}

return propKV
}
Loading

0 comments on commit f121f1a

Please sign in to comment.