diff --git a/internal/internal_worker_base.go b/internal/internal_worker_base.go index 73ca9461b..4602a1f97 100644 --- a/internal/internal_worker_base.go +++ b/internal/internal_worker_base.go @@ -142,7 +142,7 @@ type ( logger *zap.Logger metricsScope tally.Scope - concurrency *worker.Concurrency + concurrency *worker.ConcurrencyLimit pollerAutoScaler *pollerAutoScaler taskQueueCh chan interface{} sessionTokenBucket *sessionTokenBucket @@ -168,7 +168,7 @@ func createPollRetryPolicy() backoff.RetryPolicy { func newBaseWorker(options baseWorkerOptions, logger *zap.Logger, metricsScope tally.Scope, sessionTokenBucket *sessionTokenBucket) *baseWorker { ctx, cancel := context.WithCancel(context.Background()) - concurrency := &worker.Concurrency{ + concurrency := &worker.ConcurrencyLimit{ PollerPermit: worker.NewPermit(options.pollerCount), TaskPermit: worker.NewPermit(options.maxConcurrentTask), } @@ -190,7 +190,7 @@ func newBaseWorker(options baseWorkerOptions, logger *zap.Logger, metricsScope t retrier: backoff.NewConcurrentRetrier(pollOperationRetryPolicy), logger: logger.With(zapcore.Field{Key: tagWorkerType, Type: zapcore.StringType, String: options.workerType}), metricsScope: tagScope(metricsScope, tagWorkerType, options.workerType), - concurrency: concurrency, + concurrency: concurrency, pollerAutoScaler: pollerAS, taskQueueCh: make(chan interface{}), // no buffer, so poller only able to poll new task after previous is dispatched. limiterContext: ctx, diff --git a/internal/worker/concurrency.go b/internal/worker/concurrency.go index 45d0dadc8..65c0f695f 100644 --- a/internal/worker/concurrency.go +++ b/internal/worker/concurrency.go @@ -30,13 +30,13 @@ import ( var _ Permit = (*permit)(nil) -// Concurrency contains synchronization primitives for dynamically controlling the concurrencies in workers -type Concurrency struct { +// ConcurrencyLimit contains synchronization primitives for dynamically controlling the concurrencies in workers +type ConcurrencyLimit struct { PollerPermit Permit // controls concurrency of pollers TaskPermit Permit // controlls concurrency of task processings } -// Permit is an adaptive +// Permit is an adaptive permit issuer to control concurrency type Permit interface { Acquire(context.Context, int) error AcquireChan(context.Context, *sync.WaitGroup) <-chan struct{}