diff --git a/internal/querycoordv2/services.go b/internal/querycoordv2/services.go index 42709e2d7a87a..045f04ead15b0 100644 --- a/internal/querycoordv2/services.go +++ b/internal/querycoordv2/services.go @@ -919,7 +919,7 @@ func (s *Server) GetShardLeaders(ctx context.Context, req *querypb.GetShardLeade }, nil } - leaders, err := utils.GetShardLeaders(s.meta, s.targetMgr, s.dist, s.nodeMgr, req.GetCollectionID()) + leaders, err := utils.GetShardLeaders(ctx, s.meta, s.targetMgr, s.dist, s.nodeMgr, req.GetCollectionID()) return &querypb.GetShardLeadersResponse{ Status: merr.Status(err), Shards: leaders, @@ -936,7 +936,7 @@ func (s *Server) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthReque return componentutil.CheckHealthRespWithErrMsg(errReasons...), nil } - if err := utils.CheckCollectionsQueryable(s.meta, s.targetMgr, s.dist, s.nodeMgr); err != nil { + if err := utils.CheckCollectionsQueryable(ctx, s.meta, s.targetMgr, s.dist, s.nodeMgr); err != nil { return componentutil.CheckHealthRespWithErr(err), nil } diff --git a/internal/querycoordv2/utils/meta_test.go b/internal/querycoordv2/utils/meta_test.go index 70a385cc610ef..d5412b0e69e52 100644 --- a/internal/querycoordv2/utils/meta_test.go +++ b/internal/querycoordv2/utils/meta_test.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/errors" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" "github.com/milvus-io/milvus-proto/go-api/v2/rgpb" etcdKV "github.com/milvus-io/milvus/internal/kv/etcd" @@ -39,7 +40,7 @@ import ( func TestSpawnReplicasWithRG(t *testing.T) { paramtable.Init() config := GenerateEtcdConfig() - cli, _ := etcd.GetEtcdClient( + cli, err := etcd.GetEtcdClient( config.UseEmbedEtcd.GetAsBool(), config.EtcdUseSSL.GetAsBool(), config.Endpoints.GetAsStrings(), @@ -47,6 +48,7 @@ func TestSpawnReplicasWithRG(t *testing.T) { config.EtcdTLSKey.GetValue(), config.EtcdTLSCACert.GetValue(), config.EtcdTLSMinVersion.GetValue()) + require.NoError(t, err) kv := etcdKV.NewEtcdKV(cli, config.MetaRootPath.GetValue()) store := querycoord.NewCatalog(kv) diff --git a/internal/querycoordv2/utils/util.go b/internal/querycoordv2/utils/util.go index 47765f53ace43..bcb23138ba583 100644 --- a/internal/querycoordv2/utils/util.go +++ b/internal/querycoordv2/utils/util.go @@ -86,11 +86,11 @@ func CheckLeaderAvailable(nodeMgr *session.NodeManager, targetMgr meta.TargetMan return nil } -func checkLoadStatus(m *meta.Meta, collectionID int64) error { +func checkLoadStatus(ctx context.Context, m *meta.Meta, collectionID int64) error { percentage := m.CollectionManager.CalculateLoadPercentage(collectionID) if percentage < 0 { err := merr.WrapErrCollectionNotLoaded(collectionID) - log.Warn("failed to GetShardLeaders", zap.Error(err)) + log.Ctx(ctx).Warn("failed to GetShardLeaders", zap.Error(err)) return err } collection := m.CollectionManager.GetCollection(collectionID) @@ -102,7 +102,7 @@ func checkLoadStatus(m *meta.Meta, collectionID int64) error { if percentage < 100 { err := merr.WrapErrCollectionNotFullyLoaded(collectionID) msg := fmt.Sprintf("collection %v is not fully loaded", collectionID) - log.Warn(msg) + log.Ctx(ctx).Warn(msg) return err } return nil @@ -169,8 +169,8 @@ func GetShardLeadersWithChannels(m *meta.Meta, targetMgr *meta.TargetManager, di return ret, nil } -func GetShardLeaders(m *meta.Meta, targetMgr *meta.TargetManager, dist *meta.DistributionManager, nodeMgr *session.NodeManager, collectionID int64) ([]*querypb.ShardLeadersList, error) { - if err := checkLoadStatus(m, collectionID); err != nil { +func GetShardLeaders(ctx context.Context, m *meta.Meta, targetMgr *meta.TargetManager, dist *meta.DistributionManager, nodeMgr *session.NodeManager, collectionID int64) ([]*querypb.ShardLeadersList, error) { + if err := checkLoadStatus(ctx, m, collectionID); err != nil { return nil, err } @@ -178,17 +178,17 @@ func GetShardLeaders(m *meta.Meta, targetMgr *meta.TargetManager, dist *meta.Dis if len(channels) == 0 { msg := "loaded collection do not found any channel in target, may be in recovery" err := merr.WrapErrCollectionOnRecovering(collectionID, msg) - log.Warn("failed to get channels", zap.Error(err)) + log.Ctx(ctx).Warn("failed to get channels", zap.Error(err)) return nil, err } return GetShardLeadersWithChannels(m, targetMgr, dist, nodeMgr, collectionID, channels) } // CheckCollectionsQueryable check all channels are watched and all segments are loaded for this collection -func CheckCollectionsQueryable(m *meta.Meta, targetMgr *meta.TargetManager, dist *meta.DistributionManager, nodeMgr *session.NodeManager) error { +func CheckCollectionsQueryable(ctx context.Context, m *meta.Meta, targetMgr *meta.TargetManager, dist *meta.DistributionManager, nodeMgr *session.NodeManager) error { maxInterval := paramtable.Get().QueryCoordCfg.UpdateCollectionLoadStatusInterval.GetAsDuration(time.Minute) for _, coll := range m.GetAllCollections() { - err := checkCollectionQueryable(m, targetMgr, dist, nodeMgr, coll) + err := checkCollectionQueryable(ctx, m, targetMgr, dist, nodeMgr, coll) if err != nil && !coll.IsReleasing() && time.Since(coll.UpdatedAt) >= maxInterval { return err } @@ -197,9 +197,9 @@ func CheckCollectionsQueryable(m *meta.Meta, targetMgr *meta.TargetManager, dist } // checkCollectionQueryable check all channels are watched and all segments are loaded for this collection -func checkCollectionQueryable(m *meta.Meta, targetMgr *meta.TargetManager, dist *meta.DistributionManager, nodeMgr *session.NodeManager, coll *meta.Collection) error { +func checkCollectionQueryable(ctx context.Context, m *meta.Meta, targetMgr *meta.TargetManager, dist *meta.DistributionManager, nodeMgr *session.NodeManager, coll *meta.Collection) error { collectionID := coll.GetCollectionID() - if err := checkLoadStatus(m, collectionID); err != nil { + if err := checkLoadStatus(ctx, m, collectionID); err != nil { return err } @@ -207,7 +207,7 @@ func checkCollectionQueryable(m *meta.Meta, targetMgr *meta.TargetManager, dist if len(channels) == 0 { msg := "loaded collection do not found any channel in target, may be in recovery" err := merr.WrapErrCollectionOnRecovering(collectionID, msg) - log.Warn("failed to get channels", zap.Error(err)) + log.Ctx(ctx).Warn("failed to get channels", zap.Error(err)) return err }