Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fix crash when enable standby and streaming #38239

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 24 additions & 12 deletions internal/datacoord/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@
address string
watchClient kv.WatchKV
kv kv.MetaKv
metaRootPath string
meta *meta
segmentManager Manager
allocator allocator.Allocator
Expand Down Expand Up @@ -306,6 +307,15 @@
if err = s.initSession(); err != nil {
return err
}
if err := s.initKV(); err != nil {
return err
}

Check warning on line 312 in internal/datacoord/server.go

View check run for this annotation

Codecov / codecov/patch

internal/datacoord/server.go#L311-L312

Added lines #L311 - L312 were not covered by tests
if streamingutil.IsStreamingServiceEnabled() {
s.streamingCoord = streamingcoord.NewServerBuilder().
WithETCD(s.etcdCli).
WithMetaKV(s.kv).
WithSession(s.session).Build()
}
if s.enableActiveStandBy {
s.activateFunc = func() error {
log.Info("DataCoord switch from standby to active, activating")
Expand Down Expand Up @@ -365,10 +375,7 @@

// Initialize streaming coordinator.
if streamingutil.IsStreamingServiceEnabled() {
s.streamingCoord = streamingcoord.NewServerBuilder().
WithETCD(s.etcdCli).
WithMetaKV(s.kv).
WithSession(s.session).Build()

if err = s.streamingCoord.Init(context.TODO()); err != nil {
return err
}
Expand Down Expand Up @@ -648,31 +655,36 @@
return nil
}

func (s *Server) initMeta(chunkManager storage.ChunkManager) error {
if s.meta != nil {
func (s *Server) initKV() error {
if s.kv != nil {
return nil
}
s.watchClient = etcdkv.NewEtcdKV(s.etcdCli, Params.EtcdCfg.MetaRootPath.GetValue(),
etcdkv.WithRequestTimeout(paramtable.Get().ServiceParam.EtcdCfg.RequestTimeout.GetAsDuration(time.Millisecond)))
metaType := Params.MetaStoreCfg.MetaStoreType.GetValue()
log.Info("data coordinator connecting to metadata store", zap.String("metaType", metaType))
metaRootPath := ""
if metaType == util.MetaStoreTypeTiKV {
metaRootPath = Params.TiKVCfg.MetaRootPath.GetValue()
s.kv = tikv.NewTiKV(s.tikvCli, metaRootPath,
s.metaRootPath = Params.TiKVCfg.MetaRootPath.GetValue()
s.kv = tikv.NewTiKV(s.tikvCli, s.metaRootPath,
tikv.WithRequestTimeout(paramtable.Get().ServiceParam.TiKVCfg.RequestTimeout.GetAsDuration(time.Millisecond)))
} else if metaType == util.MetaStoreTypeEtcd {
metaRootPath = Params.EtcdCfg.MetaRootPath.GetValue()
s.kv = etcdkv.NewEtcdKV(s.etcdCli, metaRootPath,
s.metaRootPath = Params.EtcdCfg.MetaRootPath.GetValue()
s.kv = etcdkv.NewEtcdKV(s.etcdCli, s.metaRootPath,
etcdkv.WithRequestTimeout(paramtable.Get().ServiceParam.EtcdCfg.RequestTimeout.GetAsDuration(time.Millisecond)))
} else {
return retry.Unrecoverable(fmt.Errorf("not supported meta store: %s", metaType))
}
log.Info("data coordinator successfully connected to metadata store", zap.String("metaType", metaType))
return nil
}

func (s *Server) initMeta(chunkManager storage.ChunkManager) error {
if s.meta != nil {
return nil
}
reloadEtcdFn := func() error {
var err error
catalog := datacoord.NewCatalog(s.kv, chunkManager.RootPath(), metaRootPath)
catalog := datacoord.NewCatalog(s.kv, chunkManager.RootPath(), s.metaRootPath)
s.meta, err = newMeta(s.ctx, catalog, chunkManager)
if err != nil {
return err
Expand Down
6 changes: 6 additions & 0 deletions internal/streamingcoord/server/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@ import (
clientv3 "go.etcd.io/etcd/client/v3"

"github.com/milvus-io/milvus/internal/metastore/kv/streamingcoord"
"github.com/milvus-io/milvus/internal/streamingcoord/server/balancer"
"github.com/milvus-io/milvus/internal/streamingcoord/server/resource"
"github.com/milvus-io/milvus/internal/streamingcoord/server/service"
"github.com/milvus-io/milvus/internal/util/componentutil"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/pkg/kv"
"github.com/milvus-io/milvus/pkg/util/syncutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

Expand Down Expand Up @@ -41,8 +44,11 @@ func (s *ServerBuilder) Build() *Server {
resource.OptETCD(s.etcdClient),
resource.OptStreamingCatalog(streamingcoord.NewCataLog(s.metaKV)),
)
balancer := syncutil.NewFuture[balancer.Balancer]()
return &Server{
session: s.session,
componentStateService: componentutil.NewComponentStateService(typeutil.StreamingCoordRole),
assignmentService: service.NewAssignmentService(balancer),
balancer: balancer,
}
}
17 changes: 8 additions & 9 deletions internal/streamingcoord/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
"github.com/milvus-io/milvus/internal/util/streamingutil/util"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/streaming/proto/streamingpb"
"github.com/milvus-io/milvus/pkg/util/syncutil"
)

// Server is the streamingcoord server.
Expand All @@ -26,7 +27,7 @@
componentStateService *componentutil.ComponentStateService // state.

// basic component variables can be used at service level.
balancer balancer.Balancer
balancer *syncutil.Future[balancer.Balancer]
}

// Init initializes the streamingcoord server.
Expand All @@ -39,7 +40,6 @@
return err
}
// Init all grpc service of streamingcoord server.
s.initService()
s.componentStateService.OnInitialized(s.session.GetServerID())
log.Info("streamingcoord server initialized")
return nil
Expand All @@ -51,15 +51,14 @@
var err error
// Read new incoming topics from configuration, and register it into balancer.
newIncomingTopics := util.GetAllTopicsFromConfiguration()
s.balancer, err = balancer.RecoverBalancer(ctx, "pchannel_count_fair", newIncomingTopics.Collect()...)
balancer, err := balancer.RecoverBalancer(ctx, "pchannel_count_fair", newIncomingTopics.Collect()...)
if err != nil {
return err
}

Check warning on line 57 in internal/streamingcoord/server/server.go

View check run for this annotation

Codecov / codecov/patch

internal/streamingcoord/server/server.go#L56-L57

Added lines #L56 - L57 were not covered by tests
s.balancer.Set(balancer)
return err
}

// initService initializes the grpc service.
func (s *Server) initService() {
s.assignmentService = service.NewAssignmentService(s.balancer)
}

// registerGRPCService register all grpc service to grpc server.
func (s *Server) RegisterGRPCService(grpcServer *grpc.Server) {
streamingpb.RegisterStreamingCoordAssignmentServiceServer(grpcServer, s.assignmentService)
Expand All @@ -76,6 +75,6 @@
func (s *Server) Stop() {
s.componentStateService.OnStopping()
log.Info("close balancer...")
s.balancer.Close()
s.balancer.Get().Close()
log.Info("streamingcoord server stopped")
}
11 changes: 8 additions & 3 deletions internal/streamingcoord/server/service/assignment.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/streaming/proto/streamingpb"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/syncutil"
)

var _ streamingpb.StreamingCoordAssignmentServiceServer = (*assignmentServiceImpl)(nil)

// NewAssignmentService returns a new assignment service.
func NewAssignmentService(
balancer balancer.Balancer,
balancer *syncutil.Future[balancer.Balancer],
) streamingpb.StreamingCoordAssignmentServiceServer {
return &assignmentServiceImpl{
balancer: balancer,
Expand All @@ -28,7 +29,7 @@

// assignmentServiceImpl is the implementation of the assignment service.
type assignmentServiceImpl struct {
balancer balancer.Balancer
balancer *syncutil.Future[balancer.Balancer]
listenerTotal prometheus.Gauge
}

Expand All @@ -37,5 +38,9 @@
s.listenerTotal.Inc()
defer s.listenerTotal.Dec()

return discover.NewAssignmentDiscoverServer(s.balancer, server).Execute()
balancer, err := s.balancer.GetWithContext(server.Context())
if err != nil {
return err
}

Check warning on line 44 in internal/streamingcoord/server/service/assignment.go

View check run for this annotation

Codecov / codecov/patch

internal/streamingcoord/server/service/assignment.go#L43-L44

Added lines #L43 - L44 were not covered by tests
return discover.NewAssignmentDiscoverServer(balancer, server).Execute()
}
Loading