Skip to content

Commit

Permalink
fix: fix crash when enable standby and streaming
Browse files Browse the repository at this point in the history
Signed-off-by: chyezh <[email protected]>
  • Loading branch information
chyezh committed Dec 5, 2024
1 parent 7944538 commit 60addad
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 24 deletions.
36 changes: 24 additions & 12 deletions internal/datacoord/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ type Server struct {
address string
watchClient kv.WatchKV
kv kv.MetaKv
metaRootPath string
meta *meta
segmentManager Manager
allocator allocator.Allocator
Expand Down Expand Up @@ -306,6 +307,15 @@ func (s *Server) Init() error {
if err = s.initSession(); err != nil {
return err
}
if err := s.initKV(); err != nil {
return err
}

Check warning on line 312 in internal/datacoord/server.go

View check run for this annotation

Codecov / codecov/patch

internal/datacoord/server.go#L311-L312

Added lines #L311 - L312 were not covered by tests
if streamingutil.IsStreamingServiceEnabled() {
s.streamingCoord = streamingcoord.NewServerBuilder().
WithETCD(s.etcdCli).
WithMetaKV(s.kv).
WithSession(s.session).Build()
}
if s.enableActiveStandBy {
s.activateFunc = func() error {
log.Info("DataCoord switch from standby to active, activating")
Expand Down Expand Up @@ -365,10 +375,7 @@ func (s *Server) initDataCoord() error {

// Initialize streaming coordinator.
if streamingutil.IsStreamingServiceEnabled() {
s.streamingCoord = streamingcoord.NewServerBuilder().
WithETCD(s.etcdCli).
WithMetaKV(s.kv).
WithSession(s.session).Build()

if err = s.streamingCoord.Init(context.TODO()); err != nil {
return err
}
Expand Down Expand Up @@ -648,31 +655,36 @@ func (s *Server) initSegmentManager() error {
return nil
}

func (s *Server) initMeta(chunkManager storage.ChunkManager) error {
if s.meta != nil {
func (s *Server) initKV() error {
if s.kv != nil {
return nil
}
s.watchClient = etcdkv.NewEtcdKV(s.etcdCli, Params.EtcdCfg.MetaRootPath.GetValue(),
etcdkv.WithRequestTimeout(paramtable.Get().ServiceParam.EtcdCfg.RequestTimeout.GetAsDuration(time.Millisecond)))
metaType := Params.MetaStoreCfg.MetaStoreType.GetValue()
log.Info("data coordinator connecting to metadata store", zap.String("metaType", metaType))
metaRootPath := ""
if metaType == util.MetaStoreTypeTiKV {
metaRootPath = Params.TiKVCfg.MetaRootPath.GetValue()
s.kv = tikv.NewTiKV(s.tikvCli, metaRootPath,
s.metaRootPath = Params.TiKVCfg.MetaRootPath.GetValue()
s.kv = tikv.NewTiKV(s.tikvCli, s.metaRootPath,
tikv.WithRequestTimeout(paramtable.Get().ServiceParam.TiKVCfg.RequestTimeout.GetAsDuration(time.Millisecond)))
} else if metaType == util.MetaStoreTypeEtcd {
metaRootPath = Params.EtcdCfg.MetaRootPath.GetValue()
s.kv = etcdkv.NewEtcdKV(s.etcdCli, metaRootPath,
s.metaRootPath = Params.EtcdCfg.MetaRootPath.GetValue()
s.kv = etcdkv.NewEtcdKV(s.etcdCli, s.metaRootPath,
etcdkv.WithRequestTimeout(paramtable.Get().ServiceParam.EtcdCfg.RequestTimeout.GetAsDuration(time.Millisecond)))
} else {
return retry.Unrecoverable(fmt.Errorf("not supported meta store: %s", metaType))
}
log.Info("data coordinator successfully connected to metadata store", zap.String("metaType", metaType))
return nil
}

func (s *Server) initMeta(chunkManager storage.ChunkManager) error {
if s.meta != nil {
return nil
}
reloadEtcdFn := func() error {
var err error
catalog := datacoord.NewCatalog(s.kv, chunkManager.RootPath(), metaRootPath)
catalog := datacoord.NewCatalog(s.kv, chunkManager.RootPath(), s.metaRootPath)
s.meta, err = newMeta(s.ctx, catalog, chunkManager)
if err != nil {
return err
Expand Down
6 changes: 6 additions & 0 deletions internal/streamingcoord/server/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@ import (
clientv3 "go.etcd.io/etcd/client/v3"

"github.com/milvus-io/milvus/internal/metastore/kv/streamingcoord"
"github.com/milvus-io/milvus/internal/streamingcoord/server/balancer"
"github.com/milvus-io/milvus/internal/streamingcoord/server/resource"
"github.com/milvus-io/milvus/internal/streamingcoord/server/service"
"github.com/milvus-io/milvus/internal/util/componentutil"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/pkg/kv"
"github.com/milvus-io/milvus/pkg/util/syncutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

Expand Down Expand Up @@ -41,8 +44,11 @@ func (s *ServerBuilder) Build() *Server {
resource.OptETCD(s.etcdClient),
resource.OptStreamingCatalog(streamingcoord.NewCataLog(s.metaKV)),
)
balancer := syncutil.NewFuture[balancer.Balancer]()
return &Server{
session: s.session,
componentStateService: componentutil.NewComponentStateService(typeutil.StreamingCoordRole),
assignmentService: service.NewAssignmentService(balancer),
balancer: balancer,
}
}
17 changes: 8 additions & 9 deletions internal/streamingcoord/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/milvus-io/milvus/internal/util/streamingutil/util"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/streaming/proto/streamingpb"
"github.com/milvus-io/milvus/pkg/util/syncutil"
)

// Server is the streamingcoord server.
Expand All @@ -26,7 +27,7 @@ type Server struct {
componentStateService *componentutil.ComponentStateService // state.

// basic component variables can be used at service level.
balancer balancer.Balancer
balancer *syncutil.Future[balancer.Balancer]
}

// Init initializes the streamingcoord server.
Expand All @@ -39,7 +40,6 @@ func (s *Server) Init(ctx context.Context) (err error) {
return err
}
// Init all grpc service of streamingcoord server.
s.initService()
s.componentStateService.OnInitialized(s.session.GetServerID())
log.Info("streamingcoord server initialized")
return nil
Expand All @@ -51,15 +51,14 @@ func (s *Server) initBasicComponent(ctx context.Context) error {
var err error
// Read new incoming topics from configuration, and register it into balancer.
newIncomingTopics := util.GetAllTopicsFromConfiguration()
s.balancer, err = balancer.RecoverBalancer(ctx, "pchannel_count_fair", newIncomingTopics.Collect()...)
balancer, err := balancer.RecoverBalancer(ctx, "pchannel_count_fair", newIncomingTopics.Collect()...)
if err != nil {
return err
}

Check warning on line 57 in internal/streamingcoord/server/server.go

View check run for this annotation

Codecov / codecov/patch

internal/streamingcoord/server/server.go#L56-L57

Added lines #L56 - L57 were not covered by tests
s.balancer.Set(balancer)
return err
}

// initService initializes the grpc service.
func (s *Server) initService() {
s.assignmentService = service.NewAssignmentService(s.balancer)
}

// registerGRPCService register all grpc service to grpc server.
func (s *Server) RegisterGRPCService(grpcServer *grpc.Server) {
streamingpb.RegisterStreamingCoordAssignmentServiceServer(grpcServer, s.assignmentService)
Expand All @@ -76,6 +75,6 @@ func (s *Server) Start() {
func (s *Server) Stop() {
s.componentStateService.OnStopping()
log.Info("close balancer...")
s.balancer.Close()
s.balancer.Get().Close()
log.Info("streamingcoord server stopped")
}
11 changes: 8 additions & 3 deletions internal/streamingcoord/server/service/assignment.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@ import (
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/streaming/proto/streamingpb"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/syncutil"
)

var _ streamingpb.StreamingCoordAssignmentServiceServer = (*assignmentServiceImpl)(nil)

// NewAssignmentService returns a new assignment service.
func NewAssignmentService(
balancer balancer.Balancer,
balancer *syncutil.Future[balancer.Balancer],
) streamingpb.StreamingCoordAssignmentServiceServer {
return &assignmentServiceImpl{
balancer: balancer,
Expand All @@ -28,7 +29,7 @@ type AssignmentService interface {

// assignmentServiceImpl is the implementation of the assignment service.
type assignmentServiceImpl struct {
balancer balancer.Balancer
balancer *syncutil.Future[balancer.Balancer]
listenerTotal prometheus.Gauge
}

Expand All @@ -37,5 +38,9 @@ func (s *assignmentServiceImpl) AssignmentDiscover(server streamingpb.StreamingC
s.listenerTotal.Inc()
defer s.listenerTotal.Dec()

return discover.NewAssignmentDiscoverServer(s.balancer, server).Execute()
balancer, err := s.balancer.GetWithContext(server.Context())
if err != nil {
return err
}

Check warning on line 44 in internal/streamingcoord/server/service/assignment.go

View check run for this annotation

Codecov / codecov/patch

internal/streamingcoord/server/service/assignment.go#L43-L44

Added lines #L43 - L44 were not covered by tests
return discover.NewAssignmentDiscoverServer(balancer, server).Execute()
}

0 comments on commit 60addad

Please sign in to comment.