From afac153c263516aa27bba7244a31e175eeed131d Mon Sep 17 00:00:00 2001 From: Zhen Ye Date: Tue, 17 Dec 2024 11:42:44 +0800 Subject: [PATCH] enhance: move the lifetime implementation out of server level lifetime (#38442) issue: #38399 - move the lifetime implementation of common code out of the server level lifetime implementation Signed-off-by: chyezh --- .../streaming/internal/producer/producer.go | 10 +-- internal/distributed/streaming/wal.go | 23 ++++--- internal/distributed/streaming/wal_test.go | 4 +- .../client/assignment/assignment_impl.go | 12 ++-- .../client/assignment/discoverer.go | 10 ++- .../server/balancer/balancer_impl.go | 13 ++-- .../client/handler/handler_client.go | 3 +- .../client/handler/handler_client_impl.go | 11 ++- .../client/handler/handler_client_test.go | 4 +- .../client/handler/producer/producer_impl.go | 9 ++- .../client/manager/manager_client.go | 4 +- .../client/manager/manager_client_impl.go | 19 +++--- .../client/manager/manager_test.go | 4 +- .../server/wal/adaptor/opener.go | 10 ++- .../server/wal/adaptor/wal_adaptor.go | 26 ++++--- .../segment/manager/pchannel_manager.go | 20 +++--- .../server/walmanager/manager_impl.go | 46 +++++-------- .../streamingutil/service/resolver/builder.go | 10 ++- .../resolver/resolver_with_discoverer.go | 3 +- .../resolver/watch_based_grpc_resolver.go | 13 ++-- pkg/util/typeutil/lifetime.go | 68 +++++++++++++++++++ pkg/util/typeutil/lifetime_test.go | 36 ++++++++++ 22 files changed, 225 insertions(+), 133 deletions(-) create mode 100644 pkg/util/typeutil/lifetime.go create mode 100644 pkg/util/typeutil/lifetime_test.go diff --git a/internal/distributed/streaming/internal/producer/producer.go b/internal/distributed/streaming/internal/producer/producer.go index 5d487dd66322a..1feaab90a6fdf 100644 --- a/internal/distributed/streaming/internal/producer/producer.go +++ b/internal/distributed/streaming/internal/producer/producer.go @@ -15,8 +15,8 @@ import ( "github.com/milvus-io/milvus/internal/util/streamingutil/status" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/streaming/util/message" - "github.com/milvus-io/milvus/pkg/util/lifetime" "github.com/milvus-io/milvus/pkg/util/syncutil" + "github.com/milvus-io/milvus/pkg/util/typeutil" ) var errGracefulShutdown = errors.New("graceful shutdown") @@ -35,7 +35,7 @@ func NewResumableProducer(f factory, opts *ProducerOptions) *ResumableProducer { cancel: cancel, stopResumingCh: make(chan struct{}), resumingExitCh: make(chan struct{}), - lifetime: lifetime.NewLifetime(lifetime.Working), + lifetime: typeutil.NewLifetime(), logger: log.With(zap.String("pchannel", opts.PChannel)), opts: opts, @@ -63,7 +63,7 @@ type ResumableProducer struct { // A: cancel the ctx will cancel the underlying running producer. // Use producer Close is better way to stop producer. - lifetime lifetime.Lifetime[lifetime.State] + lifetime *typeutil.Lifetime logger *log.MLogger opts *ProducerOptions @@ -78,7 +78,7 @@ type ResumableProducer struct { // Produce produce a new message to log service. func (p *ResumableProducer) Produce(ctx context.Context, msg message.MutableMessage) (result *producer.ProduceResult, err error) { - if p.lifetime.Add(lifetime.IsWorking) != nil { + if !p.lifetime.Add(typeutil.LifetimeStateWorking) { return nil, errors.Wrapf(errs.ErrClosed, "produce on closed producer") } metricGuard := p.metrics.StartProduce(msg.EstimateSize()) @@ -185,7 +185,7 @@ func (p *ResumableProducer) createNewProducer() (producer.Producer, error) { // gracefulClose graceful close the producer. func (p *ResumableProducer) gracefulClose() error { - p.lifetime.SetState(lifetime.Stopped) + p.lifetime.SetState(typeutil.LifetimeStateStopped) p.lifetime.Wait() // close the stop resuming background to avoid create new producer. close(p.stopResumingCh) diff --git a/internal/distributed/streaming/wal.go b/internal/distributed/streaming/wal.go index 4277568472af1..8caba0186bc05 100644 --- a/internal/distributed/streaming/wal.go +++ b/internal/distributed/streaming/wal.go @@ -16,9 +16,11 @@ import ( "github.com/milvus-io/milvus/pkg/streaming/util/types" "github.com/milvus-io/milvus/pkg/util/conc" "github.com/milvus-io/milvus/pkg/util/funcutil" - "github.com/milvus-io/milvus/pkg/util/lifetime" + "github.com/milvus-io/milvus/pkg/util/typeutil" ) +var ErrWALAccesserClosed = status.NewOnShutdownError("wal accesser closed") + // newWALAccesser creates a new wal accesser. func newWALAccesser(c *clientv3.Client) *walAccesserImpl { // Create a new streaming coord client. @@ -26,7 +28,7 @@ func newWALAccesser(c *clientv3.Client) *walAccesserImpl { // Create a new streamingnode handler client. handlerClient := handler.NewHandlerClient(streamingCoordClient.Assignment()) return &walAccesserImpl{ - lifetime: lifetime.NewLifetime(lifetime.Working), + lifetime: typeutil.NewLifetime(), streamingCoordAssignmentClient: streamingCoordClient, handlerClient: handlerClient, producerMutex: sync.Mutex{}, @@ -40,7 +42,7 @@ func newWALAccesser(c *clientv3.Client) *walAccesserImpl { // walAccesserImpl is the implementation of WALAccesser. type walAccesserImpl struct { - lifetime lifetime.Lifetime[lifetime.State] + lifetime *typeutil.Lifetime // All services streamingCoordAssignmentClient client.Client @@ -55,8 +57,8 @@ type walAccesserImpl struct { // RawAppend writes a record to the log. func (w *walAccesserImpl) RawAppend(ctx context.Context, msg message.MutableMessage, opts ...AppendOption) (*types.AppendResult, error) { assertValidMessage(msg) - if err := w.lifetime.Add(lifetime.IsWorking); err != nil { - return nil, status.NewOnShutdownError("wal accesser closed, %s", err.Error()) + if !w.lifetime.Add(typeutil.LifetimeStateWorking) { + return nil, ErrWALAccesserClosed } defer w.lifetime.Done() @@ -66,10 +68,11 @@ func (w *walAccesserImpl) RawAppend(ctx context.Context, msg message.MutableMess // Read returns a scanner for reading records from the wal. func (w *walAccesserImpl) Read(_ context.Context, opts ReadOption) Scanner { - if err := w.lifetime.Add(lifetime.IsWorking); err != nil { - newErrScanner(status.NewOnShutdownError("wal accesser closed, %s", err.Error())) + if !w.lifetime.Add(typeutil.LifetimeStateWorking) { + newErrScanner(ErrWALAccesserClosed) } defer w.lifetime.Done() + if opts.VChannel == "" { return newErrScanner(status.NewInvaildArgument("vchannel is required")) } @@ -87,8 +90,8 @@ func (w *walAccesserImpl) Read(_ context.Context, opts ReadOption) Scanner { } func (w *walAccesserImpl) Txn(ctx context.Context, opts TxnOption) (Txn, error) { - if err := w.lifetime.Add(lifetime.IsWorking); err != nil { - return nil, status.NewOnShutdownError("wal accesser closed, %s", err.Error()) + if !w.lifetime.Add(typeutil.LifetimeStateWorking) { + return nil, ErrWALAccesserClosed } if opts.VChannel == "" { @@ -131,7 +134,7 @@ func (w *walAccesserImpl) Txn(ctx context.Context, opts TxnOption) (Txn, error) // Close closes all the wal accesser. func (w *walAccesserImpl) Close() { - w.lifetime.SetState(lifetime.Stopped) + w.lifetime.SetState(typeutil.LifetimeStateStopped) w.lifetime.Wait() w.producerMutex.Lock() diff --git a/internal/distributed/streaming/wal_test.go b/internal/distributed/streaming/wal_test.go index 2ed2ed37d9d97..db527c044eddb 100644 --- a/internal/distributed/streaming/wal_test.go +++ b/internal/distributed/streaming/wal_test.go @@ -18,7 +18,7 @@ import ( "github.com/milvus-io/milvus/pkg/streaming/util/types" "github.com/milvus-io/milvus/pkg/streaming/walimpls/impls/walimplstest" "github.com/milvus-io/milvus/pkg/util/conc" - "github.com/milvus-io/milvus/pkg/util/lifetime" + "github.com/milvus-io/milvus/pkg/util/typeutil" ) const ( @@ -34,7 +34,7 @@ func TestWAL(t *testing.T) { handler.EXPECT().Close().Return() w := &walAccesserImpl{ - lifetime: lifetime.NewLifetime(lifetime.Working), + lifetime: typeutil.NewLifetime(), streamingCoordAssignmentClient: coordClient, handlerClient: handler, producerMutex: sync.Mutex{}, diff --git a/internal/streamingcoord/client/assignment/assignment_impl.go b/internal/streamingcoord/client/assignment/assignment_impl.go index 23c632e7d16db..134517d46ea15 100644 --- a/internal/streamingcoord/client/assignment/assignment_impl.go +++ b/internal/streamingcoord/client/assignment/assignment_impl.go @@ -13,8 +13,8 @@ import ( "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/streaming/proto/streamingpb" "github.com/milvus-io/milvus/pkg/streaming/util/types" - "github.com/milvus-io/milvus/pkg/util/lifetime" "github.com/milvus-io/milvus/pkg/util/syncutil" + "github.com/milvus-io/milvus/pkg/util/typeutil" ) // NewAssignmentService creates a new assignment service. @@ -23,7 +23,7 @@ func NewAssignmentService(service lazygrpc.Service[streamingpb.StreamingCoordAss s := &AssignmentServiceImpl{ ctx: ctx, cancel: cancel, - lifetime: lifetime.NewLifetime(lifetime.Working), + lifetime: typeutil.NewLifetime(), watcher: newWatcher(), service: service, resumingExitCh: make(chan struct{}), @@ -38,7 +38,7 @@ func NewAssignmentService(service lazygrpc.Service[streamingpb.StreamingCoordAss type AssignmentServiceImpl struct { ctx context.Context cancel context.CancelFunc - lifetime lifetime.Lifetime[lifetime.State] + lifetime *typeutil.Lifetime watcher *watcher service lazygrpc.Service[streamingpb.StreamingCoordAssignmentServiceClient] resumingExitCh chan struct{} @@ -49,7 +49,7 @@ type AssignmentServiceImpl struct { // AssignmentDiscover watches the assignment discovery. func (c *AssignmentServiceImpl) AssignmentDiscover(ctx context.Context, cb func(*types.VersionedStreamingNodeAssignments) error) error { - if c.lifetime.Add(lifetime.IsWorking) != nil { + if !c.lifetime.Add(typeutil.LifetimeStateWorking) { return status.NewOnShutdownError("assignment service client is closing") } defer c.lifetime.Done() @@ -59,7 +59,7 @@ func (c *AssignmentServiceImpl) AssignmentDiscover(ctx context.Context, cb func( // ReportAssignmentError reports the assignment error to server. func (c *AssignmentServiceImpl) ReportAssignmentError(ctx context.Context, pchannel types.PChannelInfo, assignmentErr error) error { - if c.lifetime.Add(lifetime.IsWorking) != nil { + if !c.lifetime.Add(typeutil.LifetimeStateWorking) { return status.NewOnShutdownError("assignment service client is closing") } defer c.lifetime.Done() @@ -75,7 +75,7 @@ func (c *AssignmentServiceImpl) ReportAssignmentError(ctx context.Context, pchan // Close closes the assignment service. func (c *AssignmentServiceImpl) Close() { - c.lifetime.SetState(lifetime.Stopped) + c.lifetime.SetState(typeutil.LifetimeStateStopped) c.lifetime.Wait() c.cancel() diff --git a/internal/streamingcoord/client/assignment/discoverer.go b/internal/streamingcoord/client/assignment/discoverer.go index 6059dc9f0d6c8..b9f92f27e5e0b 100644 --- a/internal/streamingcoord/client/assignment/discoverer.go +++ b/internal/streamingcoord/client/assignment/discoverer.go @@ -8,14 +8,13 @@ import ( "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/streaming/proto/streamingpb" "github.com/milvus-io/milvus/pkg/streaming/util/types" - "github.com/milvus-io/milvus/pkg/util/lifetime" "github.com/milvus-io/milvus/pkg/util/typeutil" ) // newAssignmentDiscoverClient creates a new assignment discover client. func newAssignmentDiscoverClient(w *watcher, streamClient streamingpb.StreamingCoordAssignmentService_AssignmentDiscoverClient) *assignmentDiscoverClient { c := &assignmentDiscoverClient{ - lifetime: lifetime.NewLifetime(lifetime.Working), + lifetime: typeutil.NewLifetime(), w: w, streamClient: streamClient, logger: log.With(), @@ -29,7 +28,7 @@ func newAssignmentDiscoverClient(w *watcher, streamClient streamingpb.StreamingC // assignmentDiscoverClient is the client for assignment discover. type assignmentDiscoverClient struct { - lifetime lifetime.Lifetime[lifetime.State] + lifetime *typeutil.Lifetime w *watcher logger *log.MLogger requestCh chan *streamingpb.AssignmentDiscoverRequest @@ -40,7 +39,7 @@ type assignmentDiscoverClient struct { // ReportAssignmentError reports the assignment error to server. func (c *assignmentDiscoverClient) ReportAssignmentError(pchannel types.PChannelInfo, err error) { - if err := c.lifetime.Add(lifetime.IsWorking); err != nil { + if !c.lifetime.Add(typeutil.LifetimeStateWorking) { return } defer c.lifetime.Done() @@ -75,9 +74,8 @@ func (c *assignmentDiscoverClient) Available() <-chan struct{} { // Close closes the assignment discover client. func (c *assignmentDiscoverClient) Close() { - c.lifetime.SetState(lifetime.Stopped) + c.lifetime.SetState(typeutil.LifetimeStateStopped) c.lifetime.Wait() - c.lifetime.Close() close(c.requestCh) c.wg.Wait() diff --git a/internal/streamingcoord/server/balancer/balancer_impl.go b/internal/streamingcoord/server/balancer/balancer_impl.go index 70b087e9864f7..f40bbcb3c62b4 100644 --- a/internal/streamingcoord/server/balancer/balancer_impl.go +++ b/internal/streamingcoord/server/balancer/balancer_impl.go @@ -13,7 +13,6 @@ import ( "github.com/milvus-io/milvus/internal/util/streamingutil/status" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/streaming/util/types" - "github.com/milvus-io/milvus/pkg/util/lifetime" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/syncutil" "github.com/milvus-io/milvus/pkg/util/typeutil" @@ -32,7 +31,7 @@ func RecoverBalancer( return nil, errors.Wrap(err, "fail to recover channel manager") } b := &balancerImpl{ - lifetime: lifetime.NewLifetime(lifetime.Working), + lifetime: typeutil.NewLifetime(), logger: log.With(zap.String("policy", policy)), channelMetaManager: manager, policy: mustGetPolicy(policy), @@ -45,7 +44,7 @@ func RecoverBalancer( // balancerImpl is a implementation of Balancer. type balancerImpl struct { - lifetime lifetime.Lifetime[lifetime.State] + lifetime *typeutil.Lifetime logger *log.MLogger channelMetaManager *channel.ChannelManager policy Policy // policy is the balance policy, TODO: should be dynamic in future. @@ -55,7 +54,7 @@ type balancerImpl struct { // WatchChannelAssignments watches the balance result. func (b *balancerImpl) WatchChannelAssignments(ctx context.Context, cb func(version typeutil.VersionInt64Pair, relations []types.PChannelInfoAssigned) error) error { - if b.lifetime.Add(lifetime.IsWorking) != nil { + if !b.lifetime.Add(typeutil.LifetimeStateWorking) { return status.NewOnShutdownError("balancer is closing") } defer b.lifetime.Done() @@ -63,7 +62,7 @@ func (b *balancerImpl) WatchChannelAssignments(ctx context.Context, cb func(vers } func (b *balancerImpl) MarkAsUnavailable(ctx context.Context, pChannels []types.PChannelInfo) error { - if b.lifetime.Add(lifetime.IsWorking) != nil { + if !b.lifetime.Add(typeutil.LifetimeStateWorking) { return status.NewOnShutdownError("balancer is closing") } defer b.lifetime.Done() @@ -73,7 +72,7 @@ func (b *balancerImpl) MarkAsUnavailable(ctx context.Context, pChannels []types. // Trigger trigger a re-balance. func (b *balancerImpl) Trigger(ctx context.Context) error { - if b.lifetime.Add(lifetime.IsWorking) != nil { + if !b.lifetime.Add(typeutil.LifetimeStateWorking) { return status.NewOnShutdownError("balancer is closing") } defer b.lifetime.Done() @@ -93,7 +92,7 @@ func (b *balancerImpl) sendRequestAndWaitFinish(ctx context.Context, newReq *req // Close close the balancer. func (b *balancerImpl) Close() { - b.lifetime.SetState(lifetime.Stopped) + b.lifetime.SetState(typeutil.LifetimeStateStopped) b.lifetime.Wait() b.backgroundTaskNotifier.Cancel() diff --git a/internal/streamingnode/client/handler/handler_client.go b/internal/streamingnode/client/handler/handler_client.go index 29f0f1c9c9ed2..f386be11e2cb2 100644 --- a/internal/streamingnode/client/handler/handler_client.go +++ b/internal/streamingnode/client/handler/handler_client.go @@ -23,7 +23,6 @@ import ( "github.com/milvus-io/milvus/pkg/streaming/util/types" "github.com/milvus-io/milvus/pkg/tracer" "github.com/milvus-io/milvus/pkg/util/interceptor" - "github.com/milvus-io/milvus/pkg/util/lifetime" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -100,7 +99,7 @@ func NewHandlerClient(w types.AssignmentDiscoverWatcher) HandlerClient { }) watcher := assignment.NewWatcher(rb.Resolver()) return &handlerClientImpl{ - lifetime: lifetime.NewLifetime(lifetime.Working), + lifetime: typeutil.NewLifetime(), service: lazygrpc.WithServiceCreator(conn, streamingpb.NewStreamingNodeHandlerServiceClient), rb: rb, watcher: watcher, diff --git a/internal/streamingnode/client/handler/handler_client_impl.go b/internal/streamingnode/client/handler/handler_client_impl.go index b054fd8c303a6..d2a52f66fad4f 100644 --- a/internal/streamingnode/client/handler/handler_client_impl.go +++ b/internal/streamingnode/client/handler/handler_client_impl.go @@ -18,13 +18,13 @@ import ( "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/streaming/proto/streamingpb" "github.com/milvus-io/milvus/pkg/streaming/util/types" - "github.com/milvus-io/milvus/pkg/util/lifetime" + "github.com/milvus-io/milvus/pkg/util/typeutil" ) var errWaitNextBackoff = errors.New("wait for next backoff") type handlerClientImpl struct { - lifetime lifetime.Lifetime[lifetime.State] + lifetime *typeutil.Lifetime service lazygrpc.Service[streamingpb.StreamingNodeHandlerServiceClient] rb resolver.Builder watcher assignment.Watcher @@ -35,7 +35,7 @@ type handlerClientImpl struct { // CreateProducer creates a producer. func (hc *handlerClientImpl) CreateProducer(ctx context.Context, opts *ProducerOptions) (Producer, error) { - if hc.lifetime.Add(lifetime.IsWorking) != nil { + if !hc.lifetime.Add(typeutil.LifetimeStateWorking) { return nil, ErrClientClosed } defer hc.lifetime.Done() @@ -58,7 +58,7 @@ func (hc *handlerClientImpl) CreateProducer(ctx context.Context, opts *ProducerO // CreateConsumer creates a consumer. func (hc *handlerClientImpl) CreateConsumer(ctx context.Context, opts *ConsumerOptions) (Consumer, error) { - if hc.lifetime.Add(lifetime.IsWorking) != nil { + if !hc.lifetime.Add(typeutil.LifetimeStateWorking) { return nil, ErrClientClosed } defer hc.lifetime.Done() @@ -135,9 +135,8 @@ func (hc *handlerClientImpl) waitForNextBackoff(ctx context.Context, pchannel st // Close closes the handler client. func (hc *handlerClientImpl) Close() { - hc.lifetime.SetState(lifetime.Stopped) + hc.lifetime.SetState(typeutil.LifetimeStateStopped) hc.lifetime.Wait() - hc.lifetime.Close() hc.watcher.Close() hc.service.Close() diff --git a/internal/streamingnode/client/handler/handler_client_test.go b/internal/streamingnode/client/handler/handler_client_test.go index 7779b316b9ced..3aa571e142cfe 100644 --- a/internal/streamingnode/client/handler/handler_client_test.go +++ b/internal/streamingnode/client/handler/handler_client_test.go @@ -22,8 +22,8 @@ import ( "github.com/milvus-io/milvus/pkg/streaming/util/message" "github.com/milvus-io/milvus/pkg/streaming/util/options" "github.com/milvus-io/milvus/pkg/streaming/util/types" - "github.com/milvus-io/milvus/pkg/util/lifetime" "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/util/typeutil" ) func TestHandlerClient(t *testing.T) { @@ -50,7 +50,7 @@ func TestHandlerClient(t *testing.T) { pK := 0 handler := &handlerClientImpl{ - lifetime: lifetime.NewLifetime(lifetime.Working), + lifetime: typeutil.NewLifetime(), service: service, rb: rb, watcher: w, diff --git a/internal/streamingnode/client/handler/producer/producer_impl.go b/internal/streamingnode/client/handler/producer/producer_impl.go index 1317eaccc635d..54ec3224d0f02 100644 --- a/internal/streamingnode/client/handler/producer/producer_impl.go +++ b/internal/streamingnode/client/handler/producer/producer_impl.go @@ -15,7 +15,6 @@ import ( "github.com/milvus-io/milvus/pkg/streaming/proto/streamingpb" "github.com/milvus-io/milvus/pkg/streaming/util/message" "github.com/milvus-io/milvus/pkg/streaming/util/types" - "github.com/milvus-io/milvus/pkg/util/lifetime" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -62,7 +61,7 @@ func CreateProducer( zap.String("pchannel", opts.Assignment.Channel.Name), zap.Int64("term", opts.Assignment.Channel.Term), zap.Int64("streamingNodeID", opts.Assignment.Node.ServerID)), - lifetime: lifetime.NewLifetime[lifetime.State](lifetime.Working), + lifetime: typeutil.NewLifetime(), idAllocator: typeutil.NewIDAllocator(), grpcStreamClient: produceClient, pendingRequests: sync.Map{}, @@ -97,7 +96,7 @@ type producerImpl struct { assignment types.PChannelInfoAssigned walName string logger *log.MLogger - lifetime lifetime.Lifetime[lifetime.State] + lifetime *typeutil.Lifetime idAllocator *typeutil.IDAllocator grpcStreamClient *produceGrpcClient @@ -126,7 +125,7 @@ func (p *producerImpl) Assignment() types.PChannelInfoAssigned { // Produce sends the produce message to server. func (p *producerImpl) Produce(ctx context.Context, msg message.MutableMessage) (*ProduceResult, error) { - if p.lifetime.Add(lifetime.IsWorking) != nil { + if !p.lifetime.Add(typeutil.LifetimeStateWorking) { return nil, status.NewOnShutdownError("producer client is shutting down") } defer p.lifetime.Done() @@ -196,7 +195,7 @@ func (p *producerImpl) Available() <-chan struct{} { // Close close the producer client. func (p *producerImpl) Close() { // Wait for all message has been sent. - p.lifetime.SetState(lifetime.Stopped) + p.lifetime.SetState(typeutil.LifetimeStateStopped) p.lifetime.Wait() close(p.requestCh) diff --git a/internal/streamingnode/client/manager/manager_client.go b/internal/streamingnode/client/manager/manager_client.go index 32649450786e3..1f26c532f3b5b 100644 --- a/internal/streamingnode/client/manager/manager_client.go +++ b/internal/streamingnode/client/manager/manager_client.go @@ -19,7 +19,6 @@ import ( "github.com/milvus-io/milvus/pkg/streaming/util/types" "github.com/milvus-io/milvus/pkg/tracer" "github.com/milvus-io/milvus/pkg/util/interceptor" - "github.com/milvus-io/milvus/pkg/util/lifetime" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -61,7 +60,8 @@ func NewManagerClient(etcdCli *clientv3.Client) ManagerClient { ) }) return &managerClientImpl{ - lifetime: lifetime.NewLifetime(lifetime.Working), + lifetime: typeutil.NewLifetime(), + stopped: make(chan struct{}), rb: rb, service: lazygrpc.WithServiceCreator(conn, streamingpb.NewStreamingNodeManagerServiceClient), } diff --git a/internal/streamingnode/client/manager/manager_client_impl.go b/internal/streamingnode/client/manager/manager_client_impl.go index d07b2b4ce1d30..98e352315525a 100644 --- a/internal/streamingnode/client/manager/manager_client_impl.go +++ b/internal/streamingnode/client/manager/manager_client_impl.go @@ -16,21 +16,22 @@ import ( "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/streaming/proto/streamingpb" "github.com/milvus-io/milvus/pkg/streaming/util/types" - "github.com/milvus-io/milvus/pkg/util/lifetime" + "github.com/milvus-io/milvus/pkg/util/typeutil" ) var _ ManagerClient = (*managerClientImpl)(nil) // managerClientImpl implements ManagerClient. type managerClientImpl struct { - lifetime lifetime.Lifetime[lifetime.State] + lifetime *typeutil.Lifetime + stopped chan struct{} rb resolver.Builder service lazygrpc.Service[streamingpb.StreamingNodeManagerServiceClient] } func (c *managerClientImpl) WatchNodeChanged(ctx context.Context) (<-chan struct{}, error) { - if c.lifetime.Add(lifetime.IsWorking) != nil { + if !c.lifetime.Add(typeutil.LifetimeStateWorking) { return nil, status.NewOnShutdownError("manager client is closing") } defer c.lifetime.Done() @@ -42,7 +43,7 @@ func (c *managerClientImpl) WatchNodeChanged(ctx context.Context) (<-chan struct select { case <-ctx.Done(): return ctx.Err() - case <-c.lifetime.CloseCh(): + case <-c.stopped: return status.NewOnShutdownError("manager client is closing") case resultCh <- struct{}{}: } @@ -54,7 +55,7 @@ func (c *managerClientImpl) WatchNodeChanged(ctx context.Context) (<-chan struct // CollectAllStatus collects status in all underlying streamingnode. func (c *managerClientImpl) CollectAllStatus(ctx context.Context) (map[int64]*types.StreamingNodeStatus, error) { - if c.lifetime.Add(lifetime.IsWorking) != nil { + if !c.lifetime.Add(typeutil.LifetimeStateWorking) { return nil, status.NewOnShutdownError("manager client is closing") } defer c.lifetime.Done() @@ -129,7 +130,7 @@ func (c *managerClientImpl) getAllStreamingNodeStatus(ctx context.Context, state // Assign a wal instance for the channel on log node of given server id. func (c *managerClientImpl) Assign(ctx context.Context, pchannel types.PChannelInfoAssigned) error { - if c.lifetime.Add(lifetime.IsWorking) != nil { + if !c.lifetime.Add(typeutil.LifetimeStateWorking) { return status.NewOnShutdownError("manager client is closing") } defer c.lifetime.Done() @@ -150,7 +151,7 @@ func (c *managerClientImpl) Assign(ctx context.Context, pchannel types.PChannelI // Remove the wal instance for the channel on log node of given server id. func (c *managerClientImpl) Remove(ctx context.Context, pchannel types.PChannelInfoAssigned) error { - if c.lifetime.Add(lifetime.IsWorking) != nil { + if !c.lifetime.Add(typeutil.LifetimeStateWorking) { return status.NewOnShutdownError("manager client is closing") } defer c.lifetime.Done() @@ -182,9 +183,9 @@ func (c *managerClientImpl) Remove(ctx context.Context, pchannel types.PChannelI // Close closes the manager client. func (c *managerClientImpl) Close() { - c.lifetime.SetState(lifetime.Stopped) + c.lifetime.SetState(typeutil.LifetimeStateStopped) + close(c.stopped) c.lifetime.Wait() - c.lifetime.Close() c.service.Close() c.rb.Close() diff --git a/internal/streamingnode/client/manager/manager_test.go b/internal/streamingnode/client/manager/manager_test.go index fd7bbd037d42f..3f9a4451d89ce 100644 --- a/internal/streamingnode/client/manager/manager_test.go +++ b/internal/streamingnode/client/manager/manager_test.go @@ -21,7 +21,6 @@ import ( "github.com/milvus-io/milvus/pkg/streaming/proto/streamingpb" "github.com/milvus-io/milvus/pkg/streaming/util/types" "github.com/milvus-io/milvus/pkg/util/etcd" - "github.com/milvus-io/milvus/pkg/util/lifetime" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -30,7 +29,8 @@ func TestManager(t *testing.T) { rb := mock_resolver.NewMockBuilder(t) managerService := mock_lazygrpc.NewMockService[streamingpb.StreamingNodeManagerServiceClient](t) m := &managerClientImpl{ - lifetime: lifetime.NewLifetime(lifetime.Working), + lifetime: typeutil.NewLifetime(), + stopped: make(chan struct{}), rb: rb, service: managerService, } diff --git a/internal/streamingnode/server/wal/adaptor/opener.go b/internal/streamingnode/server/wal/adaptor/opener.go index 1551a60aff0ec..de1b7963c586a 100644 --- a/internal/streamingnode/server/wal/adaptor/opener.go +++ b/internal/streamingnode/server/wal/adaptor/opener.go @@ -10,7 +10,6 @@ import ( "github.com/milvus-io/milvus/internal/util/streamingutil/status" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/streaming/walimpls" - "github.com/milvus-io/milvus/pkg/util/lifetime" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -19,7 +18,7 @@ var _ wal.Opener = (*openerAdaptorImpl)(nil) // adaptImplsToOpener creates a new wal opener with opener impls. func adaptImplsToOpener(opener walimpls.OpenerImpls, builders []interceptors.InterceptorBuilder) wal.Opener { return &openerAdaptorImpl{ - lifetime: lifetime.NewLifetime(lifetime.Working), + lifetime: typeutil.NewLifetime(), opener: opener, idAllocator: typeutil.NewIDAllocator(), walInstances: typeutil.NewConcurrentMap[int64, wal.WAL](), @@ -29,7 +28,7 @@ func adaptImplsToOpener(opener walimpls.OpenerImpls, builders []interceptors.Int // openerAdaptorImpl is the wrapper of OpenerImpls to Opener. type openerAdaptorImpl struct { - lifetime lifetime.Lifetime[lifetime.State] + lifetime *typeutil.Lifetime opener walimpls.OpenerImpls idAllocator *typeutil.IDAllocator walInstances *typeutil.ConcurrentMap[int64, wal.WAL] // store all wal instances allocated by these allocator. @@ -38,7 +37,7 @@ type openerAdaptorImpl struct { // Open opens a wal instance for the channel. func (o *openerAdaptorImpl) Open(ctx context.Context, opt *wal.OpenOption) (wal.WAL, error) { - if o.lifetime.Add(lifetime.IsWorking) != nil { + if !o.lifetime.Add(typeutil.LifetimeStateWorking) { return nil, status.NewOnShutdownError("wal opener is on shutdown") } defer o.lifetime.Done() @@ -67,9 +66,8 @@ func (o *openerAdaptorImpl) Open(ctx context.Context, opt *wal.OpenOption) (wal. // Close the wal opener, release the underlying resources. func (o *openerAdaptorImpl) Close() { - o.lifetime.SetState(lifetime.Stopped) + o.lifetime.SetState(typeutil.LifetimeStateStopped) o.lifetime.Wait() - o.lifetime.Close() // close all wal instances. o.walInstances.Range(func(id int64, l wal.WAL) bool { diff --git a/internal/streamingnode/server/wal/adaptor/wal_adaptor.go b/internal/streamingnode/server/wal/adaptor/wal_adaptor.go index eaf0550f594b2..c84ec8b63aba4 100644 --- a/internal/streamingnode/server/wal/adaptor/wal_adaptor.go +++ b/internal/streamingnode/server/wal/adaptor/wal_adaptor.go @@ -16,7 +16,6 @@ import ( "github.com/milvus-io/milvus/pkg/streaming/util/types" "github.com/milvus-io/milvus/pkg/streaming/walimpls" "github.com/milvus-io/milvus/pkg/util/conc" - "github.com/milvus-io/milvus/pkg/util/lifetime" "github.com/milvus-io/milvus/pkg/util/syncutil" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -36,7 +35,8 @@ func adaptImplsToWAL( WAL: syncutil.NewFuture[wal.WAL](), } wal := &walAdaptorImpl{ - lifetime: lifetime.NewLifetime(lifetime.Working), + lifetime: typeutil.NewLifetime(), + available: make(chan struct{}), idAllocator: typeutil.NewIDAllocator(), inner: basicWAL, // TODO: make the pool size configurable. @@ -57,7 +57,8 @@ func adaptImplsToWAL( // walAdaptorImpl is a wrapper of WALImpls to extend it into a WAL interface. type walAdaptorImpl struct { - lifetime lifetime.Lifetime[lifetime.State] + lifetime *typeutil.Lifetime + available chan struct{} idAllocator *typeutil.IDAllocator inner walimpls.WALImpls appendExecutionPool *conc.Pool[struct{}] @@ -80,7 +81,7 @@ func (w *walAdaptorImpl) Channel() types.PChannelInfo { // Append writes a record to the log. func (w *walAdaptorImpl) Append(ctx context.Context, msg message.MutableMessage) (*wal.AppendResult, error) { - if w.lifetime.Add(lifetime.IsWorking) != nil { + if !w.lifetime.Add(typeutil.LifetimeStateWorking) { return nil, status.NewOnShutdownError("wal is on shutdown") } defer w.lifetime.Done() @@ -137,7 +138,7 @@ func (w *walAdaptorImpl) Append(ctx context.Context, msg message.MutableMessage) // AppendAsync writes a record to the log asynchronously. func (w *walAdaptorImpl) AppendAsync(ctx context.Context, msg message.MutableMessage, cb func(*wal.AppendResult, error)) { - if w.lifetime.Add(lifetime.IsWorking) != nil { + if !w.lifetime.Add(typeutil.LifetimeStateWorking) { cb(nil, status.NewOnShutdownError("wal is on shutdown")) return } @@ -154,7 +155,7 @@ func (w *walAdaptorImpl) AppendAsync(ctx context.Context, msg message.MutableMes // Read returns a scanner for reading records from the wal. func (w *walAdaptorImpl) Read(ctx context.Context, opts wal.ReadOption) (wal.Scanner, error) { - if w.lifetime.Add(lifetime.IsWorking) != nil { + if !w.lifetime.Add(typeutil.LifetimeStateWorking) { return nil, status.NewOnShutdownError("wal is on shutdown") } defer w.lifetime.Done() @@ -177,12 +178,17 @@ func (w *walAdaptorImpl) Read(ctx context.Context, opts wal.ReadOption) (wal.Sca // IsAvailable returns whether the wal is available. func (w *walAdaptorImpl) IsAvailable() bool { - return !w.lifetime.IsClosed() + select { + case <-w.available: + return false + default: + return true + } } // Available returns a channel that will be closed when the wal is shut down. func (w *walAdaptorImpl) Available() <-chan struct{} { - return w.lifetime.CloseCh() + return w.available } // Close overrides Scanner Close function. @@ -195,9 +201,9 @@ func (w *walAdaptorImpl) Close() { logger.Info("wal graceful close done, wait for operation to be finished...") // begin to close the wal. - w.lifetime.SetState(lifetime.Stopped) + w.lifetime.SetState(typeutil.LifetimeStateStopped) w.lifetime.Wait() - w.lifetime.Close() + close(w.available) logger.Info("wal begin to close scanners...") diff --git a/internal/streamingnode/server/wal/interceptors/segment/manager/pchannel_manager.go b/internal/streamingnode/server/wal/interceptors/segment/manager/pchannel_manager.go index d34778b90048a..fe30a7e2fbde2 100644 --- a/internal/streamingnode/server/wal/interceptors/segment/manager/pchannel_manager.go +++ b/internal/streamingnode/server/wal/interceptors/segment/manager/pchannel_manager.go @@ -15,9 +15,9 @@ import ( "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/streaming/proto/streamingpb" "github.com/milvus-io/milvus/pkg/streaming/util/types" - "github.com/milvus-io/milvus/pkg/util/lifetime" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/syncutil" + "github.com/milvus-io/milvus/pkg/util/typeutil" ) // RecoverPChannelSegmentAllocManager recovers the segment assignment manager at the specified pchannel. @@ -45,7 +45,7 @@ func RecoverPChannelSegmentAllocManager( logger := log.With(zap.Any("pchannel", pchannel)) return &PChannelSegmentAllocManager{ - lifetime: lifetime.NewLifetime(lifetime.Working), + lifetime: typeutil.NewLifetime(), logger: logger, pchannel: pchannel, managers: managers, @@ -56,7 +56,7 @@ func RecoverPChannelSegmentAllocManager( // PChannelSegmentAllocManager is a segment assign manager of determined pchannel. type PChannelSegmentAllocManager struct { - lifetime lifetime.Lifetime[lifetime.State] + lifetime *typeutil.Lifetime logger *log.MLogger pchannel types.PChannelInfo @@ -175,7 +175,7 @@ func (m *PChannelSegmentAllocManager) SealAndFenceSegmentUntil(ctx context.Conte // TryToSealSegments tries to seal the specified segments. func (m *PChannelSegmentAllocManager) TryToSealSegments(ctx context.Context, infos ...stats.SegmentBelongs) { - if err := m.lifetime.Add(lifetime.IsWorking); err != nil { + if !m.lifetime.Add(typeutil.LifetimeStateWorking) { return } defer m.lifetime.Done() @@ -197,7 +197,7 @@ func (m *PChannelSegmentAllocManager) TryToSealSegments(ctx context.Context, inf } func (m *PChannelSegmentAllocManager) MustSealSegments(ctx context.Context, infos ...stats.SegmentBelongs) { - if err := m.lifetime.Add(lifetime.IsWorking); err != nil { + if !m.lifetime.Add(typeutil.LifetimeStateWorking) { return } defer m.lifetime.Done() @@ -221,7 +221,7 @@ func (m *PChannelSegmentAllocManager) MustSealSegments(ctx context.Context, info // TryToSealWaitedSegment tries to seal the wait for sealing segment. func (m *PChannelSegmentAllocManager) TryToSealWaitedSegment(ctx context.Context) { - if err := m.lifetime.Add(lifetime.IsWorking); err != nil { + if !m.lifetime.Add(typeutil.LifetimeStateWorking) { return } defer m.lifetime.Done() @@ -236,7 +236,7 @@ func (m *PChannelSegmentAllocManager) IsNoWaitSeal() bool { // WaitUntilNoWaitSeal waits until no segment wait for seal. func (m *PChannelSegmentAllocManager) WaitUntilNoWaitSeal(ctx context.Context) error { - if err := m.lifetime.Add(lifetime.IsWorking); err != nil { + if err := m.checkLifetime(); err != nil { return err } defer m.lifetime.Done() @@ -246,8 +246,8 @@ func (m *PChannelSegmentAllocManager) WaitUntilNoWaitSeal(ctx context.Context) e // checkLifetime checks the lifetime of the segment manager. func (m *PChannelSegmentAllocManager) checkLifetime() error { - if err := m.lifetime.Add(lifetime.IsWorking); err != nil { - m.logger.Warn("unreachable: segment assignment manager is not working, so the wal is on closing", zap.Error(err)) + if !m.lifetime.Add(typeutil.LifetimeStateWorking) { + m.logger.Warn("unreachable: segment assignment manager is not working, so the wal is on closing") return errors.New("segment assignment manager is not working") } return nil @@ -256,7 +256,7 @@ func (m *PChannelSegmentAllocManager) checkLifetime() error { // Close try to persist all stats and invalid the manager. func (m *PChannelSegmentAllocManager) Close(ctx context.Context) { m.logger.Info("segment assignment manager start to close") - m.lifetime.SetState(lifetime.Stopped) + m.lifetime.SetState(typeutil.LifetimeStateStopped) m.lifetime.Wait() // Try to seal all wait diff --git a/internal/streamingnode/server/walmanager/manager_impl.go b/internal/streamingnode/server/walmanager/manager_impl.go index 52ffe090adea0..087e8bce44910 100644 --- a/internal/streamingnode/server/walmanager/manager_impl.go +++ b/internal/streamingnode/server/walmanager/manager_impl.go @@ -3,7 +3,6 @@ package walmanager import ( "context" - "github.com/cockroachdb/errors" "go.uber.org/zap" "github.com/milvus-io/milvus/internal/streamingnode/server/wal" @@ -12,10 +11,11 @@ import ( "github.com/milvus-io/milvus/internal/util/streamingutil/util" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/streaming/util/types" - "github.com/milvus-io/milvus/pkg/util/lifetime" "github.com/milvus-io/milvus/pkg/util/typeutil" ) +var errWALManagerClosed = status.NewOnShutdownError("wal manager is closed") + // OpenManager create a wal manager. func OpenManager() (Manager, error) { walName := util.MustSelectWALName() @@ -30,7 +30,7 @@ func OpenManager() (Manager, error) { // newManager create a wal manager. func newManager(opener wal.Opener) Manager { return &managerImpl{ - lifetime: lifetime.NewLifetime(managerOpenable | managerRemoveable | managerGetable), + lifetime: typeutil.NewGenericLifetime[managerState](managerOpenable | managerRemoveable | managerGetable), wltMap: typeutil.NewConcurrentMap[string, *walLifetime](), opener: opener, } @@ -38,7 +38,7 @@ func newManager(opener wal.Opener) Manager { // All management operation for a wal will be serialized with order of term. type managerImpl struct { - lifetime lifetime.Lifetime[managerState] + lifetime *typeutil.GenericLifetime[managerState] wltMap *typeutil.ConcurrentMap[string, *walLifetime] opener wal.Opener // wal allocator @@ -47,8 +47,8 @@ type managerImpl struct { // Open opens a wal instance for the channel on this Manager. func (m *managerImpl) Open(ctx context.Context, channel types.PChannelInfo) (err error) { // reject operation if manager is closing. - if err := m.lifetime.Add(isOpenable); err != nil { - return status.NewOnShutdownError("wal manager is closed, %s", err.Error()) + if !m.lifetime.AddIf(isOpenable) { + return errWALManagerClosed } defer func() { m.lifetime.Done() @@ -65,8 +65,8 @@ func (m *managerImpl) Open(ctx context.Context, channel types.PChannelInfo) (err // Remove removes the wal instance for the channel. func (m *managerImpl) Remove(ctx context.Context, channel types.PChannelInfo) (err error) { // reject operation if manager is closing. - if err := m.lifetime.Add(isRemoveable); err != nil { - return status.NewOnShutdownError("wal manager is closed, %s", err.Error()) + if !m.lifetime.AddIf(isRemoveable) { + return errWALManagerClosed } defer func() { m.lifetime.Done() @@ -84,8 +84,8 @@ func (m *managerImpl) Remove(ctx context.Context, channel types.PChannelInfo) (e // Return nil if the wal instance is not found. func (m *managerImpl) GetAvailableWAL(channel types.PChannelInfo) (wal.WAL, error) { // reject operation if manager is closing. - if err := m.lifetime.Add(isGetable); err != nil { - return nil, status.NewOnShutdownError("wal manager is closed, %s", err) + if !m.lifetime.AddIf(isGetable) { + return nil, errWALManagerClosed } defer m.lifetime.Done() @@ -104,8 +104,8 @@ func (m *managerImpl) GetAvailableWAL(channel types.PChannelInfo) (wal.WAL, erro // GetAllAvailableChannels returns all available channel info. func (m *managerImpl) GetAllAvailableChannels() ([]types.PChannelInfo, error) { // reject operation if manager is closing. - if err := m.lifetime.Add(isGetable); err != nil { - return nil, status.NewOnShutdownError("wal manager is closed, %s", err) + if !m.lifetime.AddIf(isGetable) { + return nil, errWALManagerClosed } defer m.lifetime.Done() @@ -132,7 +132,6 @@ func (m *managerImpl) Close() { }) m.lifetime.SetState(managerStopped) m.lifetime.Wait() - m.lifetime.Close() // close all underlying wal instance by allocator if there's resource leak. m.opener.Close() @@ -163,23 +162,14 @@ const ( managerGetable managerState = 0x1 << 2 ) -func isGetable(state managerState) error { - if state&managerGetable != 0 { - return nil - } - return errors.New("wal manager can not do get operation") +func isGetable(state managerState) bool { + return state&managerGetable != 0 } -func isRemoveable(state managerState) error { - if state&managerRemoveable != 0 { - return nil - } - return errors.New("wal manager can not do remove operation") +func isRemoveable(state managerState) bool { + return state&managerRemoveable != 0 } -func isOpenable(state managerState) error { - if state&managerOpenable != 0 { - return nil - } - return errors.New("wal manager can not do open operation") +func isOpenable(state managerState) bool { + return state&managerOpenable != 0 } diff --git a/internal/util/streamingutil/service/resolver/builder.go b/internal/util/streamingutil/service/resolver/builder.go index 165a5381c256a..9412d84e9c6a5 100644 --- a/internal/util/streamingutil/service/resolver/builder.go +++ b/internal/util/streamingutil/service/resolver/builder.go @@ -10,7 +10,6 @@ import ( "github.com/milvus-io/milvus/internal/util/streamingutil/service/discoverer" "github.com/milvus-io/milvus/pkg/streaming/util/types" - "github.com/milvus-io/milvus/pkg/util/lifetime" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -38,7 +37,7 @@ func NewSessionBuilder(c *clientv3.Client, role string) Builder { func newBuilder(scheme string, d discoverer.Discoverer) Builder { resolver := newResolverWithDiscoverer(scheme, d, 1*time.Second) // configurable. return &builderImpl{ - lifetime: lifetime.NewLifetime(lifetime.Working), + lifetime: typeutil.NewLifetime(), scheme: scheme, resolver: resolver, } @@ -46,7 +45,7 @@ func newBuilder(scheme string, d discoverer.Discoverer) Builder { // builderImpl implements resolver.Builder. type builderImpl struct { - lifetime lifetime.Lifetime[lifetime.State] + lifetime *typeutil.Lifetime scheme string resolver *resolverWithDiscoverer } @@ -60,7 +59,7 @@ type builderImpl struct { // Resolver is built when a Builder constructed. // So build operation just register a new watcher into the existed resolver to share the resolver result. func (b *builderImpl) Build(_ resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) (resolver.Resolver, error) { - if err := b.lifetime.Add(lifetime.IsWorking); err != nil { + if !b.lifetime.Add(typeutil.LifetimeStateWorking) { return nil, errors.New("builder is closed") } defer b.lifetime.Done() @@ -84,8 +83,7 @@ func (b *builderImpl) Scheme() string { // Close closes the builder also close the underlying resolver. func (b *builderImpl) Close() { - b.lifetime.SetState(lifetime.Stopped) + b.lifetime.SetState(typeutil.LifetimeStateStopped) b.lifetime.Wait() - b.lifetime.Close() b.resolver.Close() } diff --git a/internal/util/streamingutil/service/resolver/resolver_with_discoverer.go b/internal/util/streamingutil/service/resolver/resolver_with_discoverer.go index bfcab754fd135..a10677c57bc36 100644 --- a/internal/util/streamingutil/service/resolver/resolver_with_discoverer.go +++ b/internal/util/streamingutil/service/resolver/resolver_with_discoverer.go @@ -10,7 +10,6 @@ import ( "github.com/milvus-io/milvus/internal/util/streamingutil/service/discoverer" "github.com/milvus-io/milvus/pkg/log" - "github.com/milvus-io/milvus/pkg/util/lifetime" "github.com/milvus-io/milvus/pkg/util/syncutil" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -115,7 +114,7 @@ func (r *resolverWithDiscoverer) doDiscover() { defer func() { // Check if all grpc resolver is stopped. for r := range grpcResolvers { - if err := lifetime.IsWorking(r.State()); err == nil { + if r.State() == typeutil.LifetimeStateWorking { r.logger.Warn("resolver is stopped before grpc watcher exist, maybe bug here") break } diff --git a/internal/util/streamingutil/service/resolver/watch_based_grpc_resolver.go b/internal/util/streamingutil/service/resolver/watch_based_grpc_resolver.go index d861870c69354..10eb98f16d33f 100644 --- a/internal/util/streamingutil/service/resolver/watch_based_grpc_resolver.go +++ b/internal/util/streamingutil/service/resolver/watch_based_grpc_resolver.go @@ -6,7 +6,7 @@ import ( "google.golang.org/grpc/resolver" "github.com/milvus-io/milvus/pkg/log" - "github.com/milvus-io/milvus/pkg/util/lifetime" + "github.com/milvus-io/milvus/pkg/util/typeutil" ) var _ resolver.Resolver = (*watchBasedGRPCResolver)(nil) @@ -14,7 +14,7 @@ var _ resolver.Resolver = (*watchBasedGRPCResolver)(nil) // newWatchBasedGRPCResolver creates a new watch based grpc resolver. func newWatchBasedGRPCResolver(cc resolver.ClientConn, logger *log.MLogger) *watchBasedGRPCResolver { return &watchBasedGRPCResolver{ - lifetime: lifetime.NewLifetime(lifetime.Working), + lifetime: typeutil.NewLifetime(), cc: cc, logger: logger, } @@ -22,7 +22,7 @@ func newWatchBasedGRPCResolver(cc resolver.ClientConn, logger *log.MLogger) *wat // watchBasedGRPCResolver is a watch based grpc resolver. type watchBasedGRPCResolver struct { - lifetime lifetime.Lifetime[lifetime.State] + lifetime *typeutil.Lifetime cc resolver.ClientConn logger *log.MLogger @@ -38,15 +38,14 @@ func (r *watchBasedGRPCResolver) ResolveNow(_ resolver.ResolveNowOptions) { // Close closes the resolver. // Do nothing. func (r *watchBasedGRPCResolver) Close() { - r.lifetime.SetState(lifetime.Stopped) + r.lifetime.SetState(typeutil.LifetimeStateStopped) r.lifetime.Wait() - r.lifetime.Close() } // Update updates the state of the resolver. // Return error if the resolver is closed. func (r *watchBasedGRPCResolver) Update(state VersionedState) error { - if r.lifetime.Add(lifetime.IsWorking) != nil { + if !r.lifetime.Add(typeutil.LifetimeStateWorking) { return errors.New("resolver is closed") } defer r.lifetime.Done() @@ -61,6 +60,6 @@ func (r *watchBasedGRPCResolver) Update(state VersionedState) error { } // State returns the state of the resolver. -func (r *watchBasedGRPCResolver) State() lifetime.State { +func (r *watchBasedGRPCResolver) State() typeutil.LifetimeState { return r.lifetime.GetState() } diff --git a/pkg/util/typeutil/lifetime.go b/pkg/util/typeutil/lifetime.go new file mode 100644 index 0000000000000..22957bbfa19c1 --- /dev/null +++ b/pkg/util/typeutil/lifetime.go @@ -0,0 +1,68 @@ +package typeutil + +import "sync" + +type LifetimeState int + +var ( + LifetimeStateWorking LifetimeState = 0 + LifetimeStateStopped LifetimeState = 1 +) + +// NewLifetime returns a new instance of Lifetime with default state logic. +func NewLifetime() *Lifetime { + return NewGenericLifetime(LifetimeStateWorking) +} + +// NewGenericLifetime returns a new instance of Lifetime with init state and isHealthy logic. +// WARNING: This type is a unsafe type, the lifetime state transfer should never be a loop. +// The state is controlled by the user, and the user should ensure the state transfer is correct. +func NewGenericLifetime[State comparable](initState State) *GenericLifetime[State] { + return &GenericLifetime[State]{ + mu: sync.Mutex{}, + wg: sync.WaitGroup{}, + state: initState, + } +} + +type Lifetime = GenericLifetime[LifetimeState] + +// GenericLifetime is a common component lifetime control logic. +type GenericLifetime[State comparable] struct { + mu sync.Mutex + wg sync.WaitGroup + state State +} + +func (l *GenericLifetime[State]) GetState() State { + return l.state +} + +func (l *GenericLifetime[State]) SetState(s State) { + l.mu.Lock() + defer l.mu.Unlock() + l.state = s +} + +func (l *GenericLifetime[State]) Add(s State) bool { + return l.AddIf(func(s2 State) bool { return s == s2 }) +} + +func (l *GenericLifetime[State]) AddIf(pred func(s State) bool) bool { + l.mu.Lock() + defer l.mu.Unlock() + + if !pred(l.state) { + return false + } + l.wg.Add(1) + return true +} + +func (l *GenericLifetime[State]) Done() { + l.wg.Done() +} + +func (l *GenericLifetime[State]) Wait() { + l.wg.Wait() +} diff --git a/pkg/util/typeutil/lifetime_test.go b/pkg/util/typeutil/lifetime_test.go new file mode 100644 index 0000000000000..f05e41ace04c1 --- /dev/null +++ b/pkg/util/typeutil/lifetime_test.go @@ -0,0 +1,36 @@ +package typeutil + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestLifetime(t *testing.T) { + l := NewLifetime() + assert.True(t, l.Add(LifetimeStateWorking)) + assert.False(t, l.Add(LifetimeStateStopped)) + assert.Equal(t, l.GetState(), LifetimeStateWorking) + done := make(chan struct{}) + go func() { + l.Wait() + close(done) + }() + select { + case <-time.After(10 * time.Millisecond): + case <-done: + assert.Fail(t, "lifetime should not be stopped") + } + l.SetState(LifetimeStateStopped) + assert.Equal(t, l.GetState(), LifetimeStateStopped) + assert.False(t, l.Add(LifetimeStateWorking)) + select { + case <-time.After(10 * time.Millisecond): + case <-done: + assert.Fail(t, "lifetime should not be stopped") + } + l.Done() + <-done + l.Wait() +}