Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create separate worker usage data collection and move hardware emit there #1293

Open
wants to merge 22 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions internal/common/metrics/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,10 @@ const (
MemoryUsedStack = CadenceMetricsPrefix + "memory-used-stack"
NumGoRoutines = CadenceMetricsPrefix + "num-go-routines"

EstimatedHistorySize = CadenceMetricsPrefix + "estimated-history-size"
ServerSideHistorySize = CadenceMetricsPrefix + "server-side-history-size"
EstimatedHistorySize = CadenceMetricsPrefix + "estimated-history-size"
ServerSideHistorySize = CadenceMetricsPrefix + "server-side-history-size"
ConcurrentTaskQuota = CadenceMetricsPrefix + "concurrent-task-quota"
PollerRequestBufferUsage = CadenceMetricsPrefix + "poller-request-buffer-usage"

WorkerUsageCollectorPanic = CadenceMetricsPrefix + "worker-metrics-collector-panic"
)
105 changes: 34 additions & 71 deletions internal/internal_worker_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,10 @@ import (
"errors"
"fmt"
"os"
"runtime"
"sync"
"syscall"
"time"

"github.com/shirou/gopsutil/cpu"
"github.com/uber-go/tally"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
Expand All @@ -57,7 +55,7 @@ var (

var errShutdown = errors.New("worker shutting down")

var collectHardwareUsageOnce sync.Once
var emitOnce sync.Once

type (
// resultHandler that returns result
Expand Down Expand Up @@ -140,10 +138,11 @@ type (
logger *zap.Logger
metricsScope tally.Scope

pollerRequestCh chan struct{}
pollerAutoScaler *pollerAutoScaler
taskQueueCh chan interface{}
sessionTokenBucket *sessionTokenBucket
pollerRequestCh chan struct{}
pollerAutoScaler *pollerAutoScaler
workerUsageCollector *workerUsageCollector
taskQueueCh chan interface{}
sessionTokenBucket *sessionTokenBucket
}

polledTask struct {
Expand Down Expand Up @@ -173,17 +172,29 @@ func newBaseWorker(options baseWorkerOptions, logger *zap.Logger, metricsScope t
logger,
)
}
// for now it's default to be enabled
workerUC := newWorkerUsageCollector(
workerUsageCollectorOptions{
Enabled: true,
Cooldown: 30 * time.Second,
MetricsScope: metricsScope,
WorkerType: options.workerType,
EmitOnce: &emitOnce,
},
logger,
)

bw := &baseWorker{
options: options,
shutdownCh: make(chan struct{}),
taskLimiter: rate.NewLimiter(rate.Limit(options.maxTaskPerSecond), 1),
retrier: backoff.NewConcurrentRetrier(pollOperationRetryPolicy),
logger: logger.With(zapcore.Field{Key: tagWorkerType, Type: zapcore.StringType, String: options.workerType}),
metricsScope: tagScope(metricsScope, tagWorkerType, options.workerType),
pollerRequestCh: make(chan struct{}, options.maxConcurrentTask),
pollerAutoScaler: pollerAS,
taskQueueCh: make(chan interface{}), // no buffer, so poller only able to poll new task after previous is dispatched.
options: options,
shutdownCh: make(chan struct{}),
taskLimiter: rate.NewLimiter(rate.Limit(options.maxTaskPerSecond), 1),
retrier: backoff.NewConcurrentRetrier(pollOperationRetryPolicy),
logger: logger.With(zapcore.Field{Key: tagWorkerType, Type: zapcore.StringType, String: options.workerType}),
metricsScope: tagScope(metricsScope, tagWorkerType, options.workerType),
pollerRequestCh: make(chan struct{}, options.maxConcurrentTask),
pollerAutoScaler: pollerAS,
workerUsageCollector: workerUC,
taskQueueCh: make(chan interface{}), // no buffer, so poller only able to poll new task after previous is dispatched.

limiterContext: ctx,
limiterContextCancel: cancel,
Expand All @@ -207,6 +218,10 @@ func (bw *baseWorker) Start() {
bw.pollerAutoScaler.Start()
}

if bw.workerUsageCollector != nil {
bw.workerUsageCollector.Start()
}

for i := 0; i < bw.options.pollerCount; i++ {
bw.shutdownWG.Add(1)
go bw.runPoller()
Expand All @@ -215,11 +230,6 @@ func (bw *baseWorker) Start() {
bw.shutdownWG.Add(1)
go bw.runTaskDispatcher()

// We want the emit function run once per host instead of run once per worker
// since the emit function is host level metric.
bw.shutdownWG.Add(1)
go bw.emitHardwareUsage()

bw.isWorkerStarted = true
traceLog(func() {
bw.logger.Info("Started Worker",
Expand Down Expand Up @@ -403,6 +413,9 @@ func (bw *baseWorker) Stop() {
if bw.pollerAutoScaler != nil {
bw.pollerAutoScaler.Stop()
}
if bw.workerUsageCollector != nil {
bw.workerUsageCollector.Stop()
}

if success := util.AwaitWaitGroup(&bw.shutdownWG, bw.options.shutdownTimeout); !success {
traceLog(func() {
Expand All @@ -416,53 +429,3 @@ func (bw *baseWorker) Stop() {
}
return
}

func (bw *baseWorker) emitHardwareUsage() {
defer func() {
if p := recover(); p != nil {
bw.metricsScope.Counter(metrics.WorkerPanicCounter).Inc(1)
topLine := fmt.Sprintf("base worker for %s [panic]:", bw.options.workerType)
st := getStackTraceRaw(topLine, 7, 0)
bw.logger.Error("Unhandled panic in hardware emitting.",
zap.String(tagPanicError, fmt.Sprintf("%v", p)),
zap.String(tagPanicStack, st))
}
}()
defer bw.shutdownWG.Done()
collectHardwareUsageOnce.Do(
func() {
ticker := time.NewTicker(hardwareMetricsCollectInterval)
for {
select {
case <-bw.shutdownCh:
ticker.Stop()
return
case <-ticker.C:
host := bw.options.host
scope := bw.metricsScope.Tagged(map[string]string{clientHostTag: host})

cpuPercent, err := cpu.Percent(0, false)
if err != nil {
bw.logger.Warn("Failed to get cpu percent", zap.Error(err))
return
}
cpuCores, err := cpu.Counts(false)
if err != nil {
bw.logger.Warn("Failed to get number of cpu cores", zap.Error(err))
return
}
scope.Gauge(metrics.NumCPUCores).Update(float64(cpuCores))
scope.Gauge(metrics.CPUPercentage).Update(cpuPercent[0])

var memStats runtime.MemStats
runtime.ReadMemStats(&memStats)

scope.Gauge(metrics.NumGoRoutines).Update(float64(runtime.NumGoroutine()))
scope.Gauge(metrics.TotalMemory).Update(float64(memStats.Sys))
scope.Gauge(metrics.MemoryUsedHeap).Update(float64(memStats.HeapInuse))
scope.Gauge(metrics.MemoryUsedStack).Update(float64(memStats.StackInuse))
}
}
})

}
11 changes: 11 additions & 0 deletions internal/internal_worker_interfaces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,17 @@ type (
mockCtrl *gomock.Controller
service *workflowservicetest.MockClient
}

// fakeSyncOnce is a fake implementation of oncePerHost interface
// that DOES NOT ensure run only once per host
fakeSyncOnce struct {
}
)

func (m *fakeSyncOnce) Do(f func()) {
f()
}

func helloWorldWorkflowFunc(ctx Context, input []byte) error {
queryResult := startingQueryValue
SetQueryHandler(ctx, queryType, func() (string, error) {
Expand Down Expand Up @@ -206,6 +215,7 @@ func (s *InterfacesTestSuite) TestInterface() {
registry := newRegistry()
// Launch worker.
workflowWorker := newWorkflowWorker(s.service, domain, workflowExecutionParameters, nil, registry, nil)
workflowWorker.worker.workerUsageCollector.emitOncePerHost = &fakeSyncOnce{}
defer workflowWorker.Stop()
workflowWorker.Start()

Expand All @@ -221,6 +231,7 @@ func (s *InterfacesTestSuite) TestInterface() {

// Register activity instances and launch the worker.
activityWorker := newActivityWorker(s.service, domain, activityExecutionParameters, nil, registry, nil)
activityWorker.worker.workerUsageCollector.emitOncePerHost = &fakeSyncOnce{}
defer activityWorker.Stop()
activityWorker.Start()

Expand Down
151 changes: 151 additions & 0 deletions internal/internal_worker_usage_collector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
package internal

import (
"context"
"fmt"
"github.com/shirou/gopsutil/cpu"
"github.com/uber-go/tally"
"go.uber.org/cadence/internal/common/metrics"
"go.uber.org/zap"
"runtime"
"sync"
"time"
)

type (
workerUsageCollector struct {
workerType string
cooldownTime time.Duration
logger *zap.Logger
ctx context.Context
shutdownCh chan struct{}
wg *sync.WaitGroup // graceful stop
cancel context.CancelFunc
metricsScope tally.Scope
emitOncePerHost oncePerHost
}

workerUsageCollectorOptions struct {
Enabled bool
Cooldown time.Duration
MetricsScope tally.Scope
WorkerType string
EmitOnce oncePerHost
}

hardwareUsage struct {
NumCPUCores int
CPUPercent float64
NumGoRoutines int
TotalMemory float64
MemoryUsedHeap float64
MemoryUsedStack float64
}

oncePerHost interface {
Do(func())
}
)

func newWorkerUsageCollector(
options workerUsageCollectorOptions,
logger *zap.Logger,
) *workerUsageCollector {
if !options.Enabled {
return nil
}
ctx, cancel := context.WithCancel(context.Background())
return &workerUsageCollector{
workerType: options.WorkerType,
cooldownTime: options.Cooldown,
metricsScope: options.MetricsScope,
logger: logger,
ctx: ctx,
cancel: cancel,
wg: &sync.WaitGroup{},
emitOncePerHost: options.EmitOnce,
shutdownCh: make(chan struct{}),
}
}

func (w *workerUsageCollector) Start() {
w.wg.Add(1)
go func() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to spawn a goroutine per worker? Why not ensure only 1 running?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only the hardware emitting is once per host, all other metrics will be worker-specific. (e.g activity poll response vs. decision poll response)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now I see only w.collectHardwareUsage() which will just spawn bunch of data into the same scope. I would suggest separating hardware emitter and worker specific metrics.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's current design, for each type of metrics based on their origin, I will create a separate gorountine for each of them. But they would be contained under a single workerusagecollector so that their result can be collected and sent in one place

defer func() {
if p := recover(); p != nil {
w.metricsScope.Counter(metrics.WorkerUsageCollectorPanic).Inc(1)
topLine := fmt.Sprintf("WorkerUsageCollector panic for workertype: %v", w.workerType)
st := getStackTraceRaw(topLine, 7, 0)
w.logger.Error("WorkerUsageCollector panic.",
zap.String(tagPanicError, fmt.Sprintf("%v", p)),
zap.String(tagPanicStack, st))
}
}()
defer w.wg.Done()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are a few things problematic about this goroutine closure

  1. this wg.Done() will be called once goroutine for go w.runHardwareCollector() is started. It shouldn't be marked as done until runHardwareCollector() terminates so should be moved there
  2. no need for a panic recovery here
  3. no need for a goroutine to invoke runHardwareCollector.


w.wg.Add(1)
w.logger.Info(fmt.Sprintf("Going to start hardware collector for workertype: %v", w.workerType))
go w.runHardwareCollector()

}()
return
}

func (w *workerUsageCollector) Stop() {
close(w.shutdownCh)
w.wg.Wait()
w.cancel()

}

func (w *workerUsageCollector) runHardwareCollector() {
defer w.wg.Done()
ticker := time.NewTicker(w.cooldownTime)
defer ticker.Stop()
w.emitOncePerHost.Do(func() {
w.logger.Info(fmt.Sprintf("Started worker usage collector for workertype: %v", w.workerType))
for {
select {
case <-w.shutdownCh:
return
case <-ticker.C:
hardwareUsageData := w.collectHardwareUsage()
if w.metricsScope != nil {
w.emitHardwareUsage(hardwareUsageData)
}
}
}
})
}

func (w *workerUsageCollector) collectHardwareUsage() hardwareUsage {
cpuPercent, err := cpu.Percent(0, false)
if err != nil {
w.logger.Warn("Failed to get cpu percent", zap.Error(err))
}
cpuCores, err := cpu.Counts(false)
if err != nil {
w.logger.Warn("Failed to get number of cpu cores", zap.Error(err))
}

var memStats runtime.MemStats
runtime.ReadMemStats(&memStats)
return hardwareUsage{
NumCPUCores: cpuCores,
CPUPercent: cpuPercent[0],
NumGoRoutines: runtime.NumGoroutine(),
TotalMemory: float64(memStats.Sys),
MemoryUsedHeap: float64(memStats.HeapAlloc),
MemoryUsedStack: float64(memStats.StackInuse),
}
}

// emitHardwareUsage emits collected hardware usage metrics to metrics scope
func (w *workerUsageCollector) emitHardwareUsage(usage hardwareUsage) {
w.metricsScope.Gauge(metrics.NumCPUCores).Update(float64(usage.NumCPUCores))
w.metricsScope.Gauge(metrics.CPUPercentage).Update(usage.CPUPercent)
w.metricsScope.Gauge(metrics.NumGoRoutines).Update(float64(usage.NumGoRoutines))
w.metricsScope.Gauge(metrics.TotalMemory).Update(float64(usage.TotalMemory))
w.metricsScope.Gauge(metrics.MemoryUsedHeap).Update(float64(usage.MemoryUsedHeap))
w.metricsScope.Gauge(metrics.MemoryUsedStack).Update(float64(usage.MemoryUsedStack))
}