Skip to content

Commit

Permalink
enhance: [2.4] Add context trace for querycoord queryable check (#37524
Browse files Browse the repository at this point in the history
…) (#37534)

Cherry-pick from master
pr: #37524

When check health logic failed to collection not-queryable, the related
reason is hard to find in log.

This PR add context for log with trace id and print unqueryable
collection info log.

Signed-off-by: Congqi Xia <[email protected]>
  • Loading branch information
congqixia authored Nov 8, 2024
1 parent 7b71411 commit cedc340
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 14 deletions.
4 changes: 2 additions & 2 deletions internal/querycoordv2/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -919,7 +919,7 @@ func (s *Server) GetShardLeaders(ctx context.Context, req *querypb.GetShardLeade
}, nil
}

leaders, err := utils.GetShardLeaders(s.meta, s.targetMgr, s.dist, s.nodeMgr, req.GetCollectionID())
leaders, err := utils.GetShardLeaders(ctx, s.meta, s.targetMgr, s.dist, s.nodeMgr, req.GetCollectionID())
return &querypb.GetShardLeadersResponse{
Status: merr.Status(err),
Shards: leaders,
Expand All @@ -936,7 +936,7 @@ func (s *Server) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthReque
return componentutil.CheckHealthRespWithErrMsg(errReasons...), nil
}

if err := utils.CheckCollectionsQueryable(s.meta, s.targetMgr, s.dist, s.nodeMgr); err != nil {
if err := utils.CheckCollectionsQueryable(ctx, s.meta, s.targetMgr, s.dist, s.nodeMgr); err != nil {
return componentutil.CheckHealthRespWithErr(err), nil
}

Expand Down
4 changes: 3 additions & 1 deletion internal/querycoordv2/utils/meta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

"github.com/milvus-io/milvus-proto/go-api/v2/rgpb"
etcdKV "github.com/milvus-io/milvus/internal/kv/etcd"
Expand All @@ -39,14 +40,15 @@ import (
func TestSpawnReplicasWithRG(t *testing.T) {
paramtable.Init()
config := GenerateEtcdConfig()
cli, _ := etcd.GetEtcdClient(
cli, err := etcd.GetEtcdClient(
config.UseEmbedEtcd.GetAsBool(),
config.EtcdUseSSL.GetAsBool(),
config.Endpoints.GetAsStrings(),
config.EtcdTLSCert.GetValue(),
config.EtcdTLSKey.GetValue(),
config.EtcdTLSCACert.GetValue(),
config.EtcdTLSMinVersion.GetValue())
require.NoError(t, err)
kv := etcdKV.NewEtcdKV(cli, config.MetaRootPath.GetValue())

store := querycoord.NewCatalog(kv)
Expand Down
22 changes: 11 additions & 11 deletions internal/querycoordv2/utils/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,11 @@ func CheckLeaderAvailable(nodeMgr *session.NodeManager, targetMgr meta.TargetMan
return nil
}

func checkLoadStatus(m *meta.Meta, collectionID int64) error {
func checkLoadStatus(ctx context.Context, m *meta.Meta, collectionID int64) error {
percentage := m.CollectionManager.CalculateLoadPercentage(collectionID)
if percentage < 0 {
err := merr.WrapErrCollectionNotLoaded(collectionID)
log.Warn("failed to GetShardLeaders", zap.Error(err))
log.Ctx(ctx).Warn("failed to GetShardLeaders", zap.Error(err))
return err
}
collection := m.CollectionManager.GetCollection(collectionID)
Expand All @@ -102,7 +102,7 @@ func checkLoadStatus(m *meta.Meta, collectionID int64) error {
if percentage < 100 {
err := merr.WrapErrCollectionNotFullyLoaded(collectionID)
msg := fmt.Sprintf("collection %v is not fully loaded", collectionID)
log.Warn(msg)
log.Ctx(ctx).Warn(msg)
return err
}
return nil
Expand Down Expand Up @@ -169,26 +169,26 @@ func GetShardLeadersWithChannels(m *meta.Meta, targetMgr *meta.TargetManager, di
return ret, nil
}

func GetShardLeaders(m *meta.Meta, targetMgr *meta.TargetManager, dist *meta.DistributionManager, nodeMgr *session.NodeManager, collectionID int64) ([]*querypb.ShardLeadersList, error) {
if err := checkLoadStatus(m, collectionID); err != nil {
func GetShardLeaders(ctx context.Context, m *meta.Meta, targetMgr *meta.TargetManager, dist *meta.DistributionManager, nodeMgr *session.NodeManager, collectionID int64) ([]*querypb.ShardLeadersList, error) {
if err := checkLoadStatus(ctx, m, collectionID); err != nil {
return nil, err
}

channels := targetMgr.GetDmChannelsByCollection(collectionID, meta.CurrentTarget)
if len(channels) == 0 {
msg := "loaded collection do not found any channel in target, may be in recovery"
err := merr.WrapErrCollectionOnRecovering(collectionID, msg)
log.Warn("failed to get channels", zap.Error(err))
log.Ctx(ctx).Warn("failed to get channels", zap.Error(err))
return nil, err
}
return GetShardLeadersWithChannels(m, targetMgr, dist, nodeMgr, collectionID, channels)
}

// CheckCollectionsQueryable check all channels are watched and all segments are loaded for this collection
func CheckCollectionsQueryable(m *meta.Meta, targetMgr *meta.TargetManager, dist *meta.DistributionManager, nodeMgr *session.NodeManager) error {
func CheckCollectionsQueryable(ctx context.Context, m *meta.Meta, targetMgr *meta.TargetManager, dist *meta.DistributionManager, nodeMgr *session.NodeManager) error {
maxInterval := paramtable.Get().QueryCoordCfg.UpdateCollectionLoadStatusInterval.GetAsDuration(time.Minute)
for _, coll := range m.GetAllCollections() {
err := checkCollectionQueryable(m, targetMgr, dist, nodeMgr, coll)
err := checkCollectionQueryable(ctx, m, targetMgr, dist, nodeMgr, coll)
if err != nil && !coll.IsReleasing() && time.Since(coll.UpdatedAt) >= maxInterval {
return err
}
Expand All @@ -197,17 +197,17 @@ func CheckCollectionsQueryable(m *meta.Meta, targetMgr *meta.TargetManager, dist
}

// checkCollectionQueryable check all channels are watched and all segments are loaded for this collection
func checkCollectionQueryable(m *meta.Meta, targetMgr *meta.TargetManager, dist *meta.DistributionManager, nodeMgr *session.NodeManager, coll *meta.Collection) error {
func checkCollectionQueryable(ctx context.Context, m *meta.Meta, targetMgr *meta.TargetManager, dist *meta.DistributionManager, nodeMgr *session.NodeManager, coll *meta.Collection) error {
collectionID := coll.GetCollectionID()
if err := checkLoadStatus(m, collectionID); err != nil {
if err := checkLoadStatus(ctx, m, collectionID); err != nil {
return err
}

channels := targetMgr.GetDmChannelsByCollection(collectionID, meta.CurrentTarget)
if len(channels) == 0 {
msg := "loaded collection do not found any channel in target, may be in recovery"
err := merr.WrapErrCollectionOnRecovering(collectionID, msg)
log.Warn("failed to get channels", zap.Error(err))
log.Ctx(ctx).Warn("failed to get channels", zap.Error(err))
return err
}

Expand Down

0 comments on commit cedc340

Please sign in to comment.