Skip to content

Commit

Permalink
rename
Browse files Browse the repository at this point in the history
  • Loading branch information
shijiesheng committed Nov 26, 2024
1 parent b99b8ce commit dc99021
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 6 deletions.
6 changes: 3 additions & 3 deletions internal/internal_worker_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ type (
logger *zap.Logger
metricsScope tally.Scope

concurrency *worker.Concurrency
concurrency *worker.ConcurrencyLimit
pollerAutoScaler *pollerAutoScaler
taskQueueCh chan interface{}
sessionTokenBucket *sessionTokenBucket
Expand All @@ -168,7 +168,7 @@ func createPollRetryPolicy() backoff.RetryPolicy {
func newBaseWorker(options baseWorkerOptions, logger *zap.Logger, metricsScope tally.Scope, sessionTokenBucket *sessionTokenBucket) *baseWorker {
ctx, cancel := context.WithCancel(context.Background())

concurrency := &worker.Concurrency{
concurrency := &worker.ConcurrencyLimit{
PollerPermit: worker.NewPermit(options.pollerCount),
TaskPermit: worker.NewPermit(options.maxConcurrentTask),
}
Expand All @@ -190,7 +190,7 @@ func newBaseWorker(options baseWorkerOptions, logger *zap.Logger, metricsScope t
retrier: backoff.NewConcurrentRetrier(pollOperationRetryPolicy),
logger: logger.With(zapcore.Field{Key: tagWorkerType, Type: zapcore.StringType, String: options.workerType}),
metricsScope: tagScope(metricsScope, tagWorkerType, options.workerType),
concurrency: concurrency,
concurrency: concurrency,
pollerAutoScaler: pollerAS,
taskQueueCh: make(chan interface{}), // no buffer, so poller only able to poll new task after previous is dispatched.
limiterContext: ctx,
Expand Down
6 changes: 3 additions & 3 deletions internal/worker/concurrency.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@ import (

var _ Permit = (*permit)(nil)

// Concurrency contains synchronization primitives for dynamically controlling the concurrencies in workers
type Concurrency struct {
// ConcurrencyLimit contains synchronization primitives for dynamically controlling the concurrencies in workers
type ConcurrencyLimit struct {
PollerPermit Permit // controls concurrency of pollers
TaskPermit Permit // controlls concurrency of task processings
}

// Permit is an adaptive
// Permit is an adaptive permit issuer to control concurrency
type Permit interface {
Acquire(context.Context, int) error
AcquireChan(context.Context, *sync.WaitGroup) <-chan struct{}
Expand Down

0 comments on commit dc99021

Please sign in to comment.