diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index ef8bc1fa641be..7256edc15044f 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -110,6 +110,7 @@ type Server struct { address string watchClient kv.WatchKV kv kv.MetaKv + metaRootPath string meta *meta segmentManager Manager allocator allocator.Allocator @@ -306,6 +307,15 @@ func (s *Server) Init() error { if err = s.initSession(); err != nil { return err } + if err := s.initKV(); err != nil { + return err + } + if streamingutil.IsStreamingServiceEnabled() { + s.streamingCoord = streamingcoord.NewServerBuilder(). + WithETCD(s.etcdCli). + WithMetaKV(s.kv). + WithSession(s.session).Build() + } if s.enableActiveStandBy { s.activateFunc = func() error { log.Info("DataCoord switch from standby to active, activating") @@ -365,10 +375,7 @@ func (s *Server) initDataCoord() error { // Initialize streaming coordinator. if streamingutil.IsStreamingServiceEnabled() { - s.streamingCoord = streamingcoord.NewServerBuilder(). - WithETCD(s.etcdCli). - WithMetaKV(s.kv). - WithSession(s.session).Build() + if err = s.streamingCoord.Init(context.TODO()); err != nil { return err } @@ -648,31 +655,36 @@ func (s *Server) initSegmentManager() error { return nil } -func (s *Server) initMeta(chunkManager storage.ChunkManager) error { - if s.meta != nil { +func (s *Server) initKV() error { + if s.kv != nil { return nil } s.watchClient = etcdkv.NewEtcdKV(s.etcdCli, Params.EtcdCfg.MetaRootPath.GetValue(), etcdkv.WithRequestTimeout(paramtable.Get().ServiceParam.EtcdCfg.RequestTimeout.GetAsDuration(time.Millisecond))) metaType := Params.MetaStoreCfg.MetaStoreType.GetValue() log.Info("data coordinator connecting to metadata store", zap.String("metaType", metaType)) - metaRootPath := "" if metaType == util.MetaStoreTypeTiKV { - metaRootPath = Params.TiKVCfg.MetaRootPath.GetValue() - s.kv = tikv.NewTiKV(s.tikvCli, metaRootPath, + s.metaRootPath = Params.TiKVCfg.MetaRootPath.GetValue() + s.kv = tikv.NewTiKV(s.tikvCli, s.metaRootPath, tikv.WithRequestTimeout(paramtable.Get().ServiceParam.TiKVCfg.RequestTimeout.GetAsDuration(time.Millisecond))) } else if metaType == util.MetaStoreTypeEtcd { - metaRootPath = Params.EtcdCfg.MetaRootPath.GetValue() - s.kv = etcdkv.NewEtcdKV(s.etcdCli, metaRootPath, + s.metaRootPath = Params.EtcdCfg.MetaRootPath.GetValue() + s.kv = etcdkv.NewEtcdKV(s.etcdCli, s.metaRootPath, etcdkv.WithRequestTimeout(paramtable.Get().ServiceParam.EtcdCfg.RequestTimeout.GetAsDuration(time.Millisecond))) } else { return retry.Unrecoverable(fmt.Errorf("not supported meta store: %s", metaType)) } log.Info("data coordinator successfully connected to metadata store", zap.String("metaType", metaType)) + return nil +} +func (s *Server) initMeta(chunkManager storage.ChunkManager) error { + if s.meta != nil { + return nil + } reloadEtcdFn := func() error { var err error - catalog := datacoord.NewCatalog(s.kv, chunkManager.RootPath(), metaRootPath) + catalog := datacoord.NewCatalog(s.kv, chunkManager.RootPath(), s.metaRootPath) s.meta, err = newMeta(s.ctx, catalog, chunkManager) if err != nil { return err diff --git a/internal/streamingcoord/server/builder.go b/internal/streamingcoord/server/builder.go index b39908bf35b64..f15ca49ccdbc7 100644 --- a/internal/streamingcoord/server/builder.go +++ b/internal/streamingcoord/server/builder.go @@ -4,10 +4,13 @@ import ( clientv3 "go.etcd.io/etcd/client/v3" "github.com/milvus-io/milvus/internal/metastore/kv/streamingcoord" + "github.com/milvus-io/milvus/internal/streamingcoord/server/balancer" "github.com/milvus-io/milvus/internal/streamingcoord/server/resource" + "github.com/milvus-io/milvus/internal/streamingcoord/server/service" "github.com/milvus-io/milvus/internal/util/componentutil" "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/pkg/kv" + "github.com/milvus-io/milvus/pkg/util/syncutil" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -41,8 +44,11 @@ func (s *ServerBuilder) Build() *Server { resource.OptETCD(s.etcdClient), resource.OptStreamingCatalog(streamingcoord.NewCataLog(s.metaKV)), ) + balancer := syncutil.NewFuture[balancer.Balancer]() return &Server{ session: s.session, componentStateService: componentutil.NewComponentStateService(typeutil.StreamingCoordRole), + assignmentService: service.NewAssignmentService(balancer), + balancer: balancer, } } diff --git a/internal/streamingcoord/server/server.go b/internal/streamingcoord/server/server.go index 2c739c9dfa321..9e3db0ae4172f 100644 --- a/internal/streamingcoord/server/server.go +++ b/internal/streamingcoord/server/server.go @@ -14,6 +14,7 @@ import ( "github.com/milvus-io/milvus/internal/util/streamingutil/util" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/streaming/proto/streamingpb" + "github.com/milvus-io/milvus/pkg/util/syncutil" ) // Server is the streamingcoord server. @@ -26,7 +27,7 @@ type Server struct { componentStateService *componentutil.ComponentStateService // state. // basic component variables can be used at service level. - balancer balancer.Balancer + balancer *syncutil.Future[balancer.Balancer] } // Init initializes the streamingcoord server. @@ -39,7 +40,6 @@ func (s *Server) Init(ctx context.Context) (err error) { return err } // Init all grpc service of streamingcoord server. - s.initService() s.componentStateService.OnInitialized(s.session.GetServerID()) log.Info("streamingcoord server initialized") return nil @@ -51,15 +51,14 @@ func (s *Server) initBasicComponent(ctx context.Context) error { var err error // Read new incoming topics from configuration, and register it into balancer. newIncomingTopics := util.GetAllTopicsFromConfiguration() - s.balancer, err = balancer.RecoverBalancer(ctx, "pchannel_count_fair", newIncomingTopics.Collect()...) + balancer, err := balancer.RecoverBalancer(ctx, "pchannel_count_fair", newIncomingTopics.Collect()...) + if err != nil { + return err + } + s.balancer.Set(balancer) return err } -// initService initializes the grpc service. -func (s *Server) initService() { - s.assignmentService = service.NewAssignmentService(s.balancer) -} - // registerGRPCService register all grpc service to grpc server. func (s *Server) RegisterGRPCService(grpcServer *grpc.Server) { streamingpb.RegisterStreamingCoordAssignmentServiceServer(grpcServer, s.assignmentService) @@ -76,6 +75,6 @@ func (s *Server) Start() { func (s *Server) Stop() { s.componentStateService.OnStopping() log.Info("close balancer...") - s.balancer.Close() + s.balancer.Get().Close() log.Info("streamingcoord server stopped") } diff --git a/internal/streamingcoord/server/service/assignment.go b/internal/streamingcoord/server/service/assignment.go index 4584c03e6436c..8c26cb1996205 100644 --- a/internal/streamingcoord/server/service/assignment.go +++ b/internal/streamingcoord/server/service/assignment.go @@ -8,13 +8,14 @@ import ( "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/streaming/proto/streamingpb" "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/util/syncutil" ) var _ streamingpb.StreamingCoordAssignmentServiceServer = (*assignmentServiceImpl)(nil) // NewAssignmentService returns a new assignment service. func NewAssignmentService( - balancer balancer.Balancer, + balancer *syncutil.Future[balancer.Balancer], ) streamingpb.StreamingCoordAssignmentServiceServer { return &assignmentServiceImpl{ balancer: balancer, @@ -28,7 +29,7 @@ type AssignmentService interface { // assignmentServiceImpl is the implementation of the assignment service. type assignmentServiceImpl struct { - balancer balancer.Balancer + balancer *syncutil.Future[balancer.Balancer] listenerTotal prometheus.Gauge } @@ -37,5 +38,9 @@ func (s *assignmentServiceImpl) AssignmentDiscover(server streamingpb.StreamingC s.listenerTotal.Inc() defer s.listenerTotal.Dec() - return discover.NewAssignmentDiscoverServer(s.balancer, server).Execute() + balancer, err := s.balancer.GetWithContext(server.Context()) + if err != nil { + return err + } + return discover.NewAssignmentDiscoverServer(balancer, server).Execute() }