Skip to content

Commit

Permalink
enhance: move the lifetime implementation out of server level lifetime (
Browse files Browse the repository at this point in the history
#38442)

issue: #38399

- move the lifetime implementation of common code out of the server
level lifetime implementation

Signed-off-by: chyezh <[email protected]>
  • Loading branch information
chyezh authored Dec 17, 2024
1 parent 28fdbc4 commit afac153
Show file tree
Hide file tree
Showing 22 changed files with 225 additions and 133 deletions.
10 changes: 5 additions & 5 deletions internal/distributed/streaming/internal/producer/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ import (
"github.com/milvus-io/milvus/internal/util/streamingutil/status"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/util/lifetime"
"github.com/milvus-io/milvus/pkg/util/syncutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

var errGracefulShutdown = errors.New("graceful shutdown")
Expand All @@ -35,7 +35,7 @@ func NewResumableProducer(f factory, opts *ProducerOptions) *ResumableProducer {
cancel: cancel,
stopResumingCh: make(chan struct{}),
resumingExitCh: make(chan struct{}),
lifetime: lifetime.NewLifetime(lifetime.Working),
lifetime: typeutil.NewLifetime(),
logger: log.With(zap.String("pchannel", opts.PChannel)),
opts: opts,

Expand Down Expand Up @@ -63,7 +63,7 @@ type ResumableProducer struct {
// A: cancel the ctx will cancel the underlying running producer.
// Use producer Close is better way to stop producer.

lifetime lifetime.Lifetime[lifetime.State]
lifetime *typeutil.Lifetime
logger *log.MLogger
opts *ProducerOptions

Expand All @@ -78,7 +78,7 @@ type ResumableProducer struct {

// Produce produce a new message to log service.
func (p *ResumableProducer) Produce(ctx context.Context, msg message.MutableMessage) (result *producer.ProduceResult, err error) {
if p.lifetime.Add(lifetime.IsWorking) != nil {
if !p.lifetime.Add(typeutil.LifetimeStateWorking) {
return nil, errors.Wrapf(errs.ErrClosed, "produce on closed producer")
}
metricGuard := p.metrics.StartProduce(msg.EstimateSize())
Expand Down Expand Up @@ -185,7 +185,7 @@ func (p *ResumableProducer) createNewProducer() (producer.Producer, error) {

// gracefulClose graceful close the producer.
func (p *ResumableProducer) gracefulClose() error {
p.lifetime.SetState(lifetime.Stopped)
p.lifetime.SetState(typeutil.LifetimeStateStopped)
p.lifetime.Wait()
// close the stop resuming background to avoid create new producer.
close(p.stopResumingCh)
Expand Down
23 changes: 13 additions & 10 deletions internal/distributed/streaming/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,19 @@ import (
"github.com/milvus-io/milvus/pkg/streaming/util/types"
"github.com/milvus-io/milvus/pkg/util/conc"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/lifetime"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

var ErrWALAccesserClosed = status.NewOnShutdownError("wal accesser closed")

// newWALAccesser creates a new wal accesser.
func newWALAccesser(c *clientv3.Client) *walAccesserImpl {
// Create a new streaming coord client.
streamingCoordClient := client.NewClient(c)
// Create a new streamingnode handler client.
handlerClient := handler.NewHandlerClient(streamingCoordClient.Assignment())
return &walAccesserImpl{
lifetime: lifetime.NewLifetime(lifetime.Working),
lifetime: typeutil.NewLifetime(),
streamingCoordAssignmentClient: streamingCoordClient,
handlerClient: handlerClient,
producerMutex: sync.Mutex{},
Expand All @@ -40,7 +42,7 @@ func newWALAccesser(c *clientv3.Client) *walAccesserImpl {

// walAccesserImpl is the implementation of WALAccesser.
type walAccesserImpl struct {
lifetime lifetime.Lifetime[lifetime.State]
lifetime *typeutil.Lifetime

// All services
streamingCoordAssignmentClient client.Client
Expand All @@ -55,8 +57,8 @@ type walAccesserImpl struct {
// RawAppend writes a record to the log.
func (w *walAccesserImpl) RawAppend(ctx context.Context, msg message.MutableMessage, opts ...AppendOption) (*types.AppendResult, error) {
assertValidMessage(msg)
if err := w.lifetime.Add(lifetime.IsWorking); err != nil {
return nil, status.NewOnShutdownError("wal accesser closed, %s", err.Error())
if !w.lifetime.Add(typeutil.LifetimeStateWorking) {
return nil, ErrWALAccesserClosed
}
defer w.lifetime.Done()

Expand All @@ -66,10 +68,11 @@ func (w *walAccesserImpl) RawAppend(ctx context.Context, msg message.MutableMess

// Read returns a scanner for reading records from the wal.
func (w *walAccesserImpl) Read(_ context.Context, opts ReadOption) Scanner {
if err := w.lifetime.Add(lifetime.IsWorking); err != nil {
newErrScanner(status.NewOnShutdownError("wal accesser closed, %s", err.Error()))
if !w.lifetime.Add(typeutil.LifetimeStateWorking) {
newErrScanner(ErrWALAccesserClosed)
}
defer w.lifetime.Done()

if opts.VChannel == "" {
return newErrScanner(status.NewInvaildArgument("vchannel is required"))
}
Expand All @@ -87,8 +90,8 @@ func (w *walAccesserImpl) Read(_ context.Context, opts ReadOption) Scanner {
}

func (w *walAccesserImpl) Txn(ctx context.Context, opts TxnOption) (Txn, error) {
if err := w.lifetime.Add(lifetime.IsWorking); err != nil {
return nil, status.NewOnShutdownError("wal accesser closed, %s", err.Error())
if !w.lifetime.Add(typeutil.LifetimeStateWorking) {
return nil, ErrWALAccesserClosed
}

if opts.VChannel == "" {
Expand Down Expand Up @@ -131,7 +134,7 @@ func (w *walAccesserImpl) Txn(ctx context.Context, opts TxnOption) (Txn, error)

// Close closes all the wal accesser.
func (w *walAccesserImpl) Close() {
w.lifetime.SetState(lifetime.Stopped)
w.lifetime.SetState(typeutil.LifetimeStateStopped)
w.lifetime.Wait()

w.producerMutex.Lock()
Expand Down
4 changes: 2 additions & 2 deletions internal/distributed/streaming/wal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"github.com/milvus-io/milvus/pkg/streaming/util/types"
"github.com/milvus-io/milvus/pkg/streaming/walimpls/impls/walimplstest"
"github.com/milvus-io/milvus/pkg/util/conc"
"github.com/milvus-io/milvus/pkg/util/lifetime"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

const (
Expand All @@ -34,7 +34,7 @@ func TestWAL(t *testing.T) {
handler.EXPECT().Close().Return()

w := &walAccesserImpl{
lifetime: lifetime.NewLifetime(lifetime.Working),
lifetime: typeutil.NewLifetime(),
streamingCoordAssignmentClient: coordClient,
handlerClient: handler,
producerMutex: sync.Mutex{},
Expand Down
12 changes: 6 additions & 6 deletions internal/streamingcoord/client/assignment/assignment_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import (
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/streaming/proto/streamingpb"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
"github.com/milvus-io/milvus/pkg/util/lifetime"
"github.com/milvus-io/milvus/pkg/util/syncutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

// NewAssignmentService creates a new assignment service.
Expand All @@ -23,7 +23,7 @@ func NewAssignmentService(service lazygrpc.Service[streamingpb.StreamingCoordAss
s := &AssignmentServiceImpl{
ctx: ctx,
cancel: cancel,
lifetime: lifetime.NewLifetime(lifetime.Working),
lifetime: typeutil.NewLifetime(),
watcher: newWatcher(),
service: service,
resumingExitCh: make(chan struct{}),
Expand All @@ -38,7 +38,7 @@ func NewAssignmentService(service lazygrpc.Service[streamingpb.StreamingCoordAss
type AssignmentServiceImpl struct {
ctx context.Context
cancel context.CancelFunc
lifetime lifetime.Lifetime[lifetime.State]
lifetime *typeutil.Lifetime
watcher *watcher
service lazygrpc.Service[streamingpb.StreamingCoordAssignmentServiceClient]
resumingExitCh chan struct{}
Expand All @@ -49,7 +49,7 @@ type AssignmentServiceImpl struct {

// AssignmentDiscover watches the assignment discovery.
func (c *AssignmentServiceImpl) AssignmentDiscover(ctx context.Context, cb func(*types.VersionedStreamingNodeAssignments) error) error {
if c.lifetime.Add(lifetime.IsWorking) != nil {
if !c.lifetime.Add(typeutil.LifetimeStateWorking) {
return status.NewOnShutdownError("assignment service client is closing")
}
defer c.lifetime.Done()
Expand All @@ -59,7 +59,7 @@ func (c *AssignmentServiceImpl) AssignmentDiscover(ctx context.Context, cb func(

// ReportAssignmentError reports the assignment error to server.
func (c *AssignmentServiceImpl) ReportAssignmentError(ctx context.Context, pchannel types.PChannelInfo, assignmentErr error) error {
if c.lifetime.Add(lifetime.IsWorking) != nil {
if !c.lifetime.Add(typeutil.LifetimeStateWorking) {
return status.NewOnShutdownError("assignment service client is closing")
}
defer c.lifetime.Done()
Expand All @@ -75,7 +75,7 @@ func (c *AssignmentServiceImpl) ReportAssignmentError(ctx context.Context, pchan

// Close closes the assignment service.
func (c *AssignmentServiceImpl) Close() {
c.lifetime.SetState(lifetime.Stopped)
c.lifetime.SetState(typeutil.LifetimeStateStopped)
c.lifetime.Wait()

c.cancel()
Expand Down
10 changes: 4 additions & 6 deletions internal/streamingcoord/client/assignment/discoverer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,13 @@ import (
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/streaming/proto/streamingpb"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
"github.com/milvus-io/milvus/pkg/util/lifetime"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

// newAssignmentDiscoverClient creates a new assignment discover client.
func newAssignmentDiscoverClient(w *watcher, streamClient streamingpb.StreamingCoordAssignmentService_AssignmentDiscoverClient) *assignmentDiscoverClient {
c := &assignmentDiscoverClient{
lifetime: lifetime.NewLifetime(lifetime.Working),
lifetime: typeutil.NewLifetime(),
w: w,
streamClient: streamClient,
logger: log.With(),
Expand All @@ -29,7 +28,7 @@ func newAssignmentDiscoverClient(w *watcher, streamClient streamingpb.StreamingC

// assignmentDiscoverClient is the client for assignment discover.
type assignmentDiscoverClient struct {
lifetime lifetime.Lifetime[lifetime.State]
lifetime *typeutil.Lifetime
w *watcher
logger *log.MLogger
requestCh chan *streamingpb.AssignmentDiscoverRequest
Expand All @@ -40,7 +39,7 @@ type assignmentDiscoverClient struct {

// ReportAssignmentError reports the assignment error to server.
func (c *assignmentDiscoverClient) ReportAssignmentError(pchannel types.PChannelInfo, err error) {
if err := c.lifetime.Add(lifetime.IsWorking); err != nil {
if !c.lifetime.Add(typeutil.LifetimeStateWorking) {
return
}
defer c.lifetime.Done()
Expand Down Expand Up @@ -75,9 +74,8 @@ func (c *assignmentDiscoverClient) Available() <-chan struct{} {

// Close closes the assignment discover client.
func (c *assignmentDiscoverClient) Close() {
c.lifetime.SetState(lifetime.Stopped)
c.lifetime.SetState(typeutil.LifetimeStateStopped)
c.lifetime.Wait()
c.lifetime.Close()

close(c.requestCh)
c.wg.Wait()
Expand Down
13 changes: 6 additions & 7 deletions internal/streamingcoord/server/balancer/balancer_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/milvus-io/milvus/internal/util/streamingutil/status"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
"github.com/milvus-io/milvus/pkg/util/lifetime"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/syncutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
Expand All @@ -32,7 +31,7 @@ func RecoverBalancer(
return nil, errors.Wrap(err, "fail to recover channel manager")
}
b := &balancerImpl{
lifetime: lifetime.NewLifetime(lifetime.Working),
lifetime: typeutil.NewLifetime(),
logger: log.With(zap.String("policy", policy)),
channelMetaManager: manager,
policy: mustGetPolicy(policy),
Expand All @@ -45,7 +44,7 @@ func RecoverBalancer(

// balancerImpl is a implementation of Balancer.
type balancerImpl struct {
lifetime lifetime.Lifetime[lifetime.State]
lifetime *typeutil.Lifetime
logger *log.MLogger
channelMetaManager *channel.ChannelManager
policy Policy // policy is the balance policy, TODO: should be dynamic in future.
Expand All @@ -55,15 +54,15 @@ type balancerImpl struct {

// WatchChannelAssignments watches the balance result.
func (b *balancerImpl) WatchChannelAssignments(ctx context.Context, cb func(version typeutil.VersionInt64Pair, relations []types.PChannelInfoAssigned) error) error {
if b.lifetime.Add(lifetime.IsWorking) != nil {
if !b.lifetime.Add(typeutil.LifetimeStateWorking) {
return status.NewOnShutdownError("balancer is closing")
}
defer b.lifetime.Done()
return b.channelMetaManager.WatchAssignmentResult(ctx, cb)
}

func (b *balancerImpl) MarkAsUnavailable(ctx context.Context, pChannels []types.PChannelInfo) error {
if b.lifetime.Add(lifetime.IsWorking) != nil {
if !b.lifetime.Add(typeutil.LifetimeStateWorking) {
return status.NewOnShutdownError("balancer is closing")
}
defer b.lifetime.Done()
Expand All @@ -73,7 +72,7 @@ func (b *balancerImpl) MarkAsUnavailable(ctx context.Context, pChannels []types.

// Trigger trigger a re-balance.
func (b *balancerImpl) Trigger(ctx context.Context) error {
if b.lifetime.Add(lifetime.IsWorking) != nil {
if !b.lifetime.Add(typeutil.LifetimeStateWorking) {
return status.NewOnShutdownError("balancer is closing")
}
defer b.lifetime.Done()
Expand All @@ -93,7 +92,7 @@ func (b *balancerImpl) sendRequestAndWaitFinish(ctx context.Context, newReq *req

// Close close the balancer.
func (b *balancerImpl) Close() {
b.lifetime.SetState(lifetime.Stopped)
b.lifetime.SetState(typeutil.LifetimeStateStopped)
b.lifetime.Wait()

b.backgroundTaskNotifier.Cancel()
Expand Down
3 changes: 1 addition & 2 deletions internal/streamingnode/client/handler/handler_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/milvus-io/milvus/pkg/streaming/util/types"
"github.com/milvus-io/milvus/pkg/tracer"
"github.com/milvus-io/milvus/pkg/util/interceptor"
"github.com/milvus-io/milvus/pkg/util/lifetime"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
Expand Down Expand Up @@ -100,7 +99,7 @@ func NewHandlerClient(w types.AssignmentDiscoverWatcher) HandlerClient {
})
watcher := assignment.NewWatcher(rb.Resolver())
return &handlerClientImpl{
lifetime: lifetime.NewLifetime(lifetime.Working),
lifetime: typeutil.NewLifetime(),
service: lazygrpc.WithServiceCreator(conn, streamingpb.NewStreamingNodeHandlerServiceClient),
rb: rb,
watcher: watcher,
Expand Down
11 changes: 5 additions & 6 deletions internal/streamingnode/client/handler/handler_client_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ import (
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/streaming/proto/streamingpb"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
"github.com/milvus-io/milvus/pkg/util/lifetime"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

var errWaitNextBackoff = errors.New("wait for next backoff")

type handlerClientImpl struct {
lifetime lifetime.Lifetime[lifetime.State]
lifetime *typeutil.Lifetime
service lazygrpc.Service[streamingpb.StreamingNodeHandlerServiceClient]
rb resolver.Builder
watcher assignment.Watcher
Expand All @@ -35,7 +35,7 @@ type handlerClientImpl struct {

// CreateProducer creates a producer.
func (hc *handlerClientImpl) CreateProducer(ctx context.Context, opts *ProducerOptions) (Producer, error) {
if hc.lifetime.Add(lifetime.IsWorking) != nil {
if !hc.lifetime.Add(typeutil.LifetimeStateWorking) {
return nil, ErrClientClosed
}
defer hc.lifetime.Done()
Expand All @@ -58,7 +58,7 @@ func (hc *handlerClientImpl) CreateProducer(ctx context.Context, opts *ProducerO

// CreateConsumer creates a consumer.
func (hc *handlerClientImpl) CreateConsumer(ctx context.Context, opts *ConsumerOptions) (Consumer, error) {
if hc.lifetime.Add(lifetime.IsWorking) != nil {
if !hc.lifetime.Add(typeutil.LifetimeStateWorking) {
return nil, ErrClientClosed
}
defer hc.lifetime.Done()
Expand Down Expand Up @@ -135,9 +135,8 @@ func (hc *handlerClientImpl) waitForNextBackoff(ctx context.Context, pchannel st

// Close closes the handler client.
func (hc *handlerClientImpl) Close() {
hc.lifetime.SetState(lifetime.Stopped)
hc.lifetime.SetState(typeutil.LifetimeStateStopped)
hc.lifetime.Wait()
hc.lifetime.Close()

hc.watcher.Close()
hc.service.Close()
Expand Down
4 changes: 2 additions & 2 deletions internal/streamingnode/client/handler/handler_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ import (
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/streaming/util/options"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
"github.com/milvus-io/milvus/pkg/util/lifetime"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

func TestHandlerClient(t *testing.T) {
Expand All @@ -50,7 +50,7 @@ func TestHandlerClient(t *testing.T) {

pK := 0
handler := &handlerClientImpl{
lifetime: lifetime.NewLifetime(lifetime.Working),
lifetime: typeutil.NewLifetime(),
service: service,
rb: rb,
watcher: w,
Expand Down
Loading

0 comments on commit afac153

Please sign in to comment.