Skip to content

Commit

Permalink
fix initialization
Browse files Browse the repository at this point in the history
  • Loading branch information
shijiesheng committed Nov 21, 2024
1 parent 02e5f11 commit 6c9af52
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 11 deletions.
3 changes: 2 additions & 1 deletion internal/internal_poller_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ type (
func newPollerScaler(
options pollerAutoScalerOptions,
logger *zap.Logger,
permit worker.Permit,
hooks ...func()) *pollerAutoScaler {
if !options.Enabled {
return nil
Expand All @@ -91,7 +92,7 @@ func newPollerScaler(
isDryRun: options.DryRun,
cooldownTime: options.Cooldown,
logger: logger,
permit: worker.NewPermit(options.InitCount),
permit: permit,
wg: &sync.WaitGroup{},
ctx: ctx,
cancel: cancel,
Expand Down
2 changes: 2 additions & 0 deletions internal/internal_poller_autoscaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"time"

"go.uber.org/cadence/internal/common/testlogger"
"go.uber.org/cadence/internal/worker"

"github.com/stretchr/testify/assert"
"go.uber.org/atomic"
Expand Down Expand Up @@ -171,6 +172,7 @@ func Test_pollerAutoscaler(t *testing.T) {
TargetUtilization: float64(tt.args.targetMilliUsage) / 1000,
},
testlogger.NewZap(t),
worker.NewPermit(tt.args.initialPollerCount),
// hook function that collects number of iterations
func() {
autoscalerEpoch.Add(1)
Expand Down
23 changes: 14 additions & 9 deletions internal/internal_worker_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,24 +168,29 @@ func createPollRetryPolicy() backoff.RetryPolicy {
func newBaseWorker(options baseWorkerOptions, logger *zap.Logger, metricsScope tally.Scope, sessionTokenBucket *sessionTokenBucket) *baseWorker {
ctx, cancel := context.WithCancel(context.Background())

dynamic := &worker.DynamicParams{
PollerPermit: worker.NewPermit(options.pollerCount),
TaskPermit: worker.NewPermit(options.maxConcurrentTask),
}

var pollerAS *pollerAutoScaler
if pollerOptions := options.pollerAutoScaler; pollerOptions.Enabled {
dynamic.PollerPermit = worker.NewPermit(pollerOptions.InitCount)
pollerAS = newPollerScaler(
pollerOptions,
logger,
dynamic.PollerPermit,
)
}

bw := &baseWorker{
options: options,
shutdownCh: make(chan struct{}),
taskLimiter: rate.NewLimiter(rate.Limit(options.maxTaskPerSecond), 1),
retrier: backoff.NewConcurrentRetrier(pollOperationRetryPolicy),
logger: logger.With(zapcore.Field{Key: tagWorkerType, Type: zapcore.StringType, String: options.workerType}),
metricsScope: tagScope(metricsScope, tagWorkerType, options.workerType),
dynamic: &worker.DynamicParams{
TaskPermit: worker.NewPermit(options.maxConcurrentTask),
},
options: options,
shutdownCh: make(chan struct{}),
taskLimiter: rate.NewLimiter(rate.Limit(options.maxTaskPerSecond), 1),
retrier: backoff.NewConcurrentRetrier(pollOperationRetryPolicy),
logger: logger.With(zapcore.Field{Key: tagWorkerType, Type: zapcore.StringType, String: options.workerType}),
metricsScope: tagScope(metricsScope, tagWorkerType, options.workerType),
dynamic: dynamic,
pollerAutoScaler: pollerAS,
taskQueueCh: make(chan interface{}), // no buffer, so poller only able to poll new task after previous is dispatched.
limiterContext: ctx,
Expand Down
3 changes: 2 additions & 1 deletion internal/worker/dynamic_params.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ var _ Permit = (*permit)(nil)

// Synchronization contains synchronization primitives for dynamic configuration.
type DynamicParams struct {
TaskPermit Permit
PollerPermit Permit // controls concurrency of pollers
TaskPermit Permit // controlls concurrency of task processings
}

// Permit is an adaptive
Expand Down

0 comments on commit 6c9af52

Please sign in to comment.