diff --git a/internal/internal_worker_base.go b/internal/internal_worker_base.go index 04398926f..b2601ec9b 100644 --- a/internal/internal_worker_base.go +++ b/internal/internal_worker_base.go @@ -28,12 +28,10 @@ import ( "errors" "fmt" "os" - "runtime" "sync" "syscall" "time" - "github.com/shirou/gopsutil/cpu" "github.com/uber-go/tally" "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -140,10 +138,11 @@ type ( logger *zap.Logger metricsScope tally.Scope - pollerRequestCh chan struct{} - pollerAutoScaler *pollerAutoScaler - taskQueueCh chan interface{} - sessionTokenBucket *sessionTokenBucket + pollerRequestCh chan struct{} + pollerAutoScaler *pollerAutoScaler + workerUsageCollector *workerUsageCollector + taskQueueCh chan interface{} + sessionTokenBucket *sessionTokenBucket } polledTask struct { @@ -173,17 +172,29 @@ func newBaseWorker(options baseWorkerOptions, logger *zap.Logger, metricsScope t logger, ) } + // for now it's default to be enabled + var workerUC *workerUsageCollector + workerUC = newWorkerUsageCollector( + workerUsageCollectorOptions{ + Enabled: true, + Cooldown: 30 * time.Second, + Host: options.host, + MetricsScope: metricsScope, + }, + logger, + ) bw := &baseWorker{ - options: options, - shutdownCh: make(chan struct{}), - taskLimiter: rate.NewLimiter(rate.Limit(options.maxTaskPerSecond), 1), - retrier: backoff.NewConcurrentRetrier(pollOperationRetryPolicy), - logger: logger.With(zapcore.Field{Key: tagWorkerType, Type: zapcore.StringType, String: options.workerType}), - metricsScope: tagScope(metricsScope, tagWorkerType, options.workerType), - pollerRequestCh: make(chan struct{}, options.maxConcurrentTask), - pollerAutoScaler: pollerAS, - taskQueueCh: make(chan interface{}), // no buffer, so poller only able to poll new task after previous is dispatched. + options: options, + shutdownCh: make(chan struct{}), + taskLimiter: rate.NewLimiter(rate.Limit(options.maxTaskPerSecond), 1), + retrier: backoff.NewConcurrentRetrier(pollOperationRetryPolicy), + logger: logger.With(zapcore.Field{Key: tagWorkerType, Type: zapcore.StringType, String: options.workerType}), + metricsScope: tagScope(metricsScope, tagWorkerType, options.workerType), + pollerRequestCh: make(chan struct{}, options.maxConcurrentTask), + pollerAutoScaler: pollerAS, + workerUsageCollector: workerUC, + taskQueueCh: make(chan interface{}), // no buffer, so poller only able to poll new task after previous is dispatched. limiterContext: ctx, limiterContextCancel: cancel, @@ -207,6 +218,10 @@ func (bw *baseWorker) Start() { bw.pollerAutoScaler.Start() } + if bw.workerUsageCollector != nil { + bw.workerUsageCollector.Start() + } + for i := 0; i < bw.options.pollerCount; i++ { bw.shutdownWG.Add(1) go bw.runPoller() @@ -215,15 +230,6 @@ func (bw *baseWorker) Start() { bw.shutdownWG.Add(1) go bw.runTaskDispatcher() - // We want the emit function run once per host instead of run once per worker - //collectHardwareUsageOnce.Do(func() { - // bw.shutdownWG.Add(1) - // go bw.emitHardwareUsage() - //}) - - bw.shutdownWG.Add(1) - go bw.emitHardwareUsage() - bw.isWorkerStarted = true traceLog(func() { bw.logger.Info("Started Worker", @@ -407,6 +413,9 @@ func (bw *baseWorker) Stop() { if bw.pollerAutoScaler != nil { bw.pollerAutoScaler.Stop() } + if bw.workerUsageCollector != nil { + bw.workerUsageCollector.Stop() + } if success := util.AwaitWaitGroup(&bw.shutdownWG, bw.options.shutdownTimeout); !success { traceLog(func() { @@ -420,53 +429,3 @@ func (bw *baseWorker) Stop() { } return } - -func (bw *baseWorker) emitHardwareUsage() { - defer func() { - if p := recover(); p != nil { - bw.metricsScope.Counter(metrics.WorkerPanicCounter).Inc(1) - topLine := fmt.Sprintf("base worker for %s [panic]:", bw.options.workerType) - st := getStackTraceRaw(topLine, 7, 0) - bw.logger.Error("Unhandled panic in hardware emitting.", - zap.String(tagPanicError, fmt.Sprintf("%v", p)), - zap.String(tagPanicStack, st)) - } - }() - defer bw.shutdownWG.Done() - collectHardwareUsageOnce.Do( - func() { - ticker := time.NewTicker(hardwareMetricsCollectInterval) - for { - select { - case <-bw.shutdownCh: - ticker.Stop() - return - case <-ticker.C: - host := bw.options.host - scope := bw.metricsScope.Tagged(map[string]string{clientHostTag: host}) - - cpuPercent, err := cpu.Percent(0, false) - if err != nil { - bw.logger.Warn("Failed to get cpu percent", zap.Error(err)) - return - } - cpuCores, err := cpu.Counts(false) - if err != nil { - bw.logger.Warn("Failed to get number of cpu cores", zap.Error(err)) - return - } - scope.Gauge(metrics.NumCPUCores).Update(float64(cpuCores)) - scope.Gauge(metrics.CPUPercentage).Update(cpuPercent[0]) - - var memStats runtime.MemStats - runtime.ReadMemStats(&memStats) - - scope.Gauge(metrics.NumGoRoutines).Update(float64(runtime.NumGoroutine())) - scope.Gauge(metrics.TotalMemory).Update(float64(memStats.Sys)) - scope.Gauge(metrics.MemoryUsedHeap).Update(float64(memStats.HeapInuse)) - scope.Gauge(metrics.MemoryUsedStack).Update(float64(memStats.StackInuse)) - } - } - }) - -} diff --git a/internal/internal_worker_usage_collector.go b/internal/internal_worker_usage_collector.go new file mode 100644 index 000000000..3286526a4 --- /dev/null +++ b/internal/internal_worker_usage_collector.go @@ -0,0 +1,124 @@ +package internal + +import ( + "context" + "github.com/shirou/gopsutil/cpu" + "github.com/uber-go/tally" + "go.uber.org/cadence/internal/common/metrics" + "go.uber.org/zap" + "runtime" + "sync" + "time" +) + +type ( + workerUsageCollector struct { + cooldownTime time.Duration + logger *zap.Logger + ctx context.Context + wg *sync.WaitGroup // graceful stop + cancel context.CancelFunc + metricsScope tally.Scope + host string + } + + workerUsageCollectorOptions struct { + Enabled bool + Cooldown time.Duration + Host string + MetricsScope tally.Scope + } + + hardwareUsage struct { + NumCPUCores int + CPUPercent float64 + NumGoRoutines int + TotalMemory float64 + MemoryUsedHeap float64 + MemoryUsedStack float64 + } +) + +func newWorkerUsageCollector( + options workerUsageCollectorOptions, + logger *zap.Logger, +) *workerUsageCollector { + if !options.Enabled { + return nil + } + ctx, cancel := context.WithCancel(context.Background()) + return &workerUsageCollector{ + cooldownTime: options.Cooldown, + host: options.Host, + metricsScope: options.MetricsScope, + logger: logger, + ctx: ctx, + cancel: cancel, + wg: &sync.WaitGroup{}, + } +} + +func (w *workerUsageCollector) Start() { + w.wg.Add(1) + go func() { + defer func() { + if p := recover(); p != nil { + w.logger.Error("Unhandled panic in workerUsageCollector.") + } + }() + defer w.wg.Done() + collectHardwareUsageOnce.Do( + func() { + ticker := time.NewTicker(w.cooldownTime) + for { + select { + case <-w.ctx.Done(): + return + case <-ticker.C: + hardwareUsageData := w.collectHardwareUsage() + w.emitHardwareUsage(hardwareUsageData) + + } + } + }) + }() + return +} + +func (w *workerUsageCollector) Stop() { + w.cancel() + w.wg.Wait() +} + +func (w *workerUsageCollector) collectHardwareUsage() hardwareUsage { + cpuPercent, err := cpu.Percent(0, false) + if err != nil { + w.logger.Warn("Failed to get cpu percent", zap.Error(err)) + } + cpuCores, err := cpu.Counts(false) + if err != nil { + w.logger.Warn("Failed to get number of cpu cores", zap.Error(err)) + } + + var memStats runtime.MemStats + runtime.ReadMemStats(&memStats) + return hardwareUsage{ + NumCPUCores: cpuCores, + CPUPercent: cpuPercent[0], + NumGoRoutines: runtime.NumGoroutine(), + TotalMemory: float64(memStats.Sys), + MemoryUsedHeap: float64(memStats.HeapAlloc), + MemoryUsedStack: float64(memStats.StackInuse), + } +} + +// emitHardwareUsage emits collected hardware usage metrics to metrics scope +func (w *workerUsageCollector) emitHardwareUsage(usage hardwareUsage) { + scope := w.metricsScope.Tagged(map[string]string{clientHostTag: w.host}) + scope.Gauge(metrics.NumCPUCores).Update(float64(usage.NumCPUCores)) + scope.Gauge(metrics.CPUPercentage).Update(usage.CPUPercent) + scope.Gauge(metrics.NumGoRoutines).Update(float64(usage.NumGoRoutines)) + scope.Gauge(metrics.TotalMemory).Update(float64(usage.TotalMemory)) + scope.Gauge(metrics.MemoryUsedHeap).Update(float64(usage.MemoryUsedHeap)) + scope.Gauge(metrics.MemoryUsedStack).Update(float64(usage.MemoryUsedStack)) +}