From deaa234b2a97b5109cae5af3440e5790f5a496f1 Mon Sep 17 00:00:00 2001 From: Ilya Ozherelyev Date: Mon, 11 Nov 2024 15:07:06 +0100 Subject: [PATCH] Remove worker hardware utilization code --- internal/common/metrics/constants.go | 7 ---- internal/internal_worker.go | 5 --- internal/internal_worker_base.go | 58 +--------------------------- internal/internal_worker_test.go | 17 -------- internal/worker.go | 4 -- 5 files changed, 1 insertion(+), 90 deletions(-) diff --git a/internal/common/metrics/constants.go b/internal/common/metrics/constants.go index 33081e059..a20298443 100644 --- a/internal/common/metrics/constants.go +++ b/internal/common/metrics/constants.go @@ -110,13 +110,6 @@ const ( ReplaySkippedCounter = CadenceMetricsPrefix + "replay-skipped" ReplayLatency = CadenceMetricsPrefix + "replay-latency" - NumCPUCores = CadenceMetricsPrefix + "num-cpu-cores" - CPUPercentage = CadenceMetricsPrefix + "cpu-percentage" - TotalMemory = CadenceMetricsPrefix + "total-memory" - MemoryUsedHeap = CadenceMetricsPrefix + "memory-used-heap" - MemoryUsedStack = CadenceMetricsPrefix + "memory-used-stack" - NumGoRoutines = CadenceMetricsPrefix + "num-go-routines" - EstimatedHistorySize = CadenceMetricsPrefix + "estimated-history-size" ServerSideHistorySize = CadenceMetricsPrefix + "server-side-history-size" ConcurrentTaskQuota = CadenceMetricsPrefix + "concurrent-task-quota" diff --git a/internal/internal_worker.go b/internal/internal_worker.go index 07331b761..4319d0b54 100644 --- a/internal/internal_worker.go +++ b/internal/internal_worker.go @@ -83,7 +83,6 @@ const ( testTagsContextKey = "cadence-testTags" clientVersionTag = "cadence_client_version" clientGauge = "client_version_metric" - clientHostTag = "cadence_client_host" ) type ( @@ -330,9 +329,6 @@ func newWorkflowTaskWorkerInternal( // 3) the result pushed to laTunnel will be send as task to workflow worker to process. worker.taskQueueCh = laTunnel.resultCh - worker.options.host = params.Host - localActivityWorker.options.host = params.Host - return &workflowWorker{ executionParameters: params, workflowService: service, @@ -507,7 +503,6 @@ func newActivityTaskWorker( workerParams.MetricsScope, sessionTokenBucket, ) - base.options.host = workerParams.Host return &activityWorker{ executionParameters: workerParams, diff --git a/internal/internal_worker_base.go b/internal/internal_worker_base.go index 1dbe1429a..ae185a4f6 100644 --- a/internal/internal_worker_base.go +++ b/internal/internal_worker_base.go @@ -28,14 +28,12 @@ import ( "errors" "fmt" "os" - "runtime" "sync" "syscall" "time" "go.uber.org/cadence/internal/common/debug" - "github.com/shirou/gopsutil/cpu" "github.com/uber-go/tally" "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -50,7 +48,6 @@ import ( const ( retryPollOperationInitialInterval = 20 * time.Millisecond retryPollOperationMaxInterval = 10 * time.Second - hardwareMetricsCollectInterval = 30 * time.Second ) var ( @@ -59,8 +56,6 @@ var ( var errShutdown = errors.New("worker shutting down") -var collectHardwareUsageOnce sync.Once - type ( // resultHandler that returns result resultHandler func(result []byte, err error) @@ -223,7 +218,6 @@ func (bw *baseWorker) Start() { // We want the emit function run once per host instead of run once per worker // since the emit function is host level metric. bw.shutdownWG.Add(1) - go bw.emitHardwareUsage() bw.isWorkerStarted = true traceLog(func() { @@ -400,7 +394,7 @@ func (bw *baseWorker) Run() { bw.Stop() } -// Shutdown is a blocking call and cleans up all the resources associated with worker. +// Stop is a blocking call and cleans up all the resources associated with worker. func (bw *baseWorker) Stop() { if !bw.isWorkerStarted { return @@ -423,53 +417,3 @@ func (bw *baseWorker) Stop() { } return } - -func (bw *baseWorker) emitHardwareUsage() { - defer func() { - if p := recover(); p != nil { - bw.metricsScope.Counter(metrics.WorkerPanicCounter).Inc(1) - topLine := fmt.Sprintf("base worker for %s [panic]:", bw.options.workerType) - st := getStackTraceRaw(topLine, 7, 0) - bw.logger.Error("Unhandled panic in hardware emitting.", - zap.String(tagPanicError, fmt.Sprintf("%v", p)), - zap.String(tagPanicStack, st)) - } - }() - defer bw.shutdownWG.Done() - collectHardwareUsageOnce.Do( - func() { - ticker := time.NewTicker(hardwareMetricsCollectInterval) - for { - select { - case <-bw.shutdownCh: - ticker.Stop() - return - case <-ticker.C: - host := bw.options.host - scope := bw.metricsScope.Tagged(map[string]string{clientHostTag: host}) - - cpuPercent, err := cpu.Percent(0, false) - if err != nil { - bw.logger.Warn("Failed to get cpu percent", zap.Error(err)) - return - } - cpuCores, err := cpu.Counts(false) - if err != nil { - bw.logger.Warn("Failed to get number of cpu cores", zap.Error(err)) - return - } - scope.Gauge(metrics.NumCPUCores).Update(float64(cpuCores)) - scope.Gauge(metrics.CPUPercentage).Update(cpuPercent[0]) - - var memStats runtime.MemStats - runtime.ReadMemStats(&memStats) - - scope.Gauge(metrics.NumGoRoutines).Update(float64(runtime.NumGoroutine())) - scope.Gauge(metrics.TotalMemory).Update(float64(memStats.Sys)) - scope.Gauge(metrics.MemoryUsedHeap).Update(float64(memStats.HeapInuse)) - scope.Gauge(metrics.MemoryUsedStack).Update(float64(memStats.StackInuse)) - } - } - }) - -} diff --git a/internal/internal_worker_test.go b/internal/internal_worker_test.go index f23741e65..b18ae0183 100644 --- a/internal/internal_worker_test.go +++ b/internal/internal_worker_test.go @@ -239,16 +239,6 @@ func (s *internalWorkerTestSuite) TestCreateWorker_WithStrictNonDeterminism() { worker.Stop() } -func (s *internalWorkerTestSuite) TestCreateWorker_WithHost() { - worker := createWorkerWithHost(s.T(), s.service) - err := worker.Start() - require.NoError(s.T(), err) - time.Sleep(time.Millisecond * 200) - assert.Equal(s.T(), "test_host", worker.activityWorker.worker.options.host) - assert.Equal(s.T(), "test_host", worker.workflowWorker.worker.options.host) - worker.Stop() -} - func (s *internalWorkerTestSuite) TestCreateWorkerRun() { // Create service endpoint mockCtrl := gomock.NewController(s.T()) @@ -446,13 +436,6 @@ func createWorkerWithStrictNonDeterminismDisabled( return createWorkerWithThrottle(t, service, 0, WorkerOptions{WorkerBugPorts: WorkerBugPorts{DisableStrictNonDeterminismCheck: true}}) } -func createWorkerWithHost( - t *testing.T, - service *workflowservicetest.MockClient, -) *aggregatedWorker { - return createWorkerWithThrottle(t, service, 0, WorkerOptions{Host: "test_host"}) -} - func (s *internalWorkerTestSuite) testCompleteActivityHelper(opt *ClientOptions) { t := s.T() mockService := s.service diff --git a/internal/worker.go b/internal/worker.go index e246ab080..004239057 100644 --- a/internal/worker.go +++ b/internal/worker.go @@ -266,10 +266,6 @@ type ( // default: No provider Authorization auth.AuthorizationProvider - // Optional: Host is just string on the machine running the client - // default: empty string - Host string - // Optional: See WorkerBugPorts for more details // // Deprecated: All bugports are always deprecated and may be removed at any time.