diff --git a/internal/common/debug/example_test.go b/internal/common/debug/example_test.go index bb6be1f70..168f09063 100644 --- a/internal/common/debug/example_test.go +++ b/internal/common/debug/example_test.go @@ -25,18 +25,19 @@ import ( "fmt" "sort" "sync" - - "go.uber.org/atomic" ) type ( // pollerTrackerImpl implements the PollerTracker interface pollerTrackerImpl struct { - pollerCount atomic.Int32 + sync.RWMutex + count map[string]int64 } // stopperImpl implements the Stopper interface stopperImpl struct { + sync.Once + workerType string pollerTracker *pollerTrackerImpl } @@ -93,40 +94,67 @@ func (asi *activityStopperImpl) Stop() { }) } -func (p *pollerTrackerImpl) Start() Stopper { - p.pollerCount.Inc() - return &stopperImpl{ - pollerTracker: p, - } +func (p *pollerTrackerImpl) Start(workerType string) Stopper { + p.Lock() + defer p.Unlock() + p.count[workerType]++ + return &stopperImpl{pollerTracker: p, workerType: workerType} } -func (p *pollerTrackerImpl) Stats() int32 { - return p.pollerCount.Load() +func (p *pollerTrackerImpl) Stats() Pollers { + var pollers Pollers + p.RLock() + defer p.RUnlock() + for pollerType, count := range p.count { + if count > 0 { + pollers = append(pollers, struct { + Type string + Count int64 + }{Type: pollerType, Count: count}) + } + } + sort.Slice(pollers, func(i, j int) bool { + return pollers[i].Type < pollers[j].Type + }) + return pollers } func (s *stopperImpl) Stop() { - s.pollerTracker.pollerCount.Dec() + s.Do(func() { + s.pollerTracker.Lock() + defer s.pollerTracker.Unlock() + s.pollerTracker.count[s.workerType]-- + if s.pollerTracker.count[s.workerType] == 0 { + delete(s.pollerTracker.count, s.workerType) + } + }) } func Example() { var pollerTracker PollerTracker - pollerTracker = &pollerTrackerImpl{} + pollerTracker = &pollerTrackerImpl{count: make(map[string]int64)} // Initially, poller count should be 0 - fmt.Println(fmt.Sprintf("poller stats: %d", pollerTracker.Stats())) - - // Start a poller and verify that the count increments - stopper1 := pollerTracker.Start() - fmt.Println(fmt.Sprintf("poller stats: %d", pollerTracker.Stats())) - - // Start another poller and verify that the count increments again - stopper2 := pollerTracker.Start() - fmt.Println(fmt.Sprintf("poller stats: %d", pollerTracker.Stats())) - + jsonPollers, _ := json.MarshalIndent(pollerTracker.Stats(), "", " ") + fmt.Println(string(jsonPollers)) + + // Start pollers and verify that the count increments + stopper1 := pollerTracker.Start("ActivityWorker") + jsonPollers, _ = json.MarshalIndent(pollerTracker.Stats(), "", " ") + fmt.Println(string(jsonPollers)) + stopper2 := pollerTracker.Start("ActivityWorker") + jsonPollers, _ = json.MarshalIndent(pollerTracker.Stats(), "", " ") + fmt.Println(string(jsonPollers)) + // Start another poller type and verify that the count increments + stopper3 := pollerTracker.Start("WorkflowWorker") + jsonPollers, _ = json.MarshalIndent(pollerTracker.Stats(), "", " ") + fmt.Println(string(jsonPollers)) // Stop the pollers and verify the counter stopper1.Stop() stopper2.Stop() - fmt.Println(fmt.Sprintf("poller stats: %d", pollerTracker.Stats())) + stopper3.Stop() + jsonPollers, _ = json.MarshalIndent(pollerTracker.Stats(), "", " ") + fmt.Println(string(jsonPollers)) var activityTracker ActivityTracker activityTracker = &activityTrackerImpl{activityCount: make(map[ActivityInfo]int64)} @@ -157,10 +185,30 @@ func Example() { fmt.Println(string(jsonActivities)) // Output: - // poller stats: 0 - // poller stats: 1 - // poller stats: 2 - // poller stats: 0 + // null + // [ + // { + // "Type": "ActivityWorker", + // "Count": 1 + // } + // ] + // [ + // { + // "Type": "ActivityWorker", + // "Count": 2 + // } + // ] + // [ + // { + // "Type": "ActivityWorker", + // "Count": 2 + // }, + // { + // "Type": "WorkflowWorker", + // "Count": 1 + // } + // ] + // null // [ // { // "Info": { diff --git a/internal/common/debug/types.go b/internal/common/debug/types.go index 7438a5202..31afaaaf0 100644 --- a/internal/common/debug/types.go +++ b/internal/common/debug/types.go @@ -34,9 +34,9 @@ type ( PollerTracker interface { // Start collects information on poller start up. // consumers should provide a concurrency-safe implementation. - Start() Stopper + Start(workerType string) Stopper // Stats return the number or running pollers - Stats() int32 + Stats() Pollers } // WorkerStats provides a set of methods that can be used to collect @@ -70,6 +70,11 @@ type ( Count int64 } + Pollers []struct { + Type string + Count int64 + } + // Debugger exposes stats collected on a running Worker // Deprecated: in development and very likely to change Debugger interface { diff --git a/internal/common/debug/workerstats_noop.go b/internal/common/debug/workerstats_noop.go index d6e9b09e0..f31affaab 100644 --- a/internal/common/debug/workerstats_noop.go +++ b/internal/common/debug/workerstats_noop.go @@ -29,9 +29,9 @@ type ( activityTrackerNoopImpl struct{} ) -func (lc *pollerTrackerNoopImpl) Start() Stopper { return &stopperNoopImpl{} } -func (lc *pollerTrackerNoopImpl) Stats() int32 { return 0 } -func (r *stopperNoopImpl) Stop() {} +func (lc *pollerTrackerNoopImpl) Start(workerType string) Stopper { return &stopperNoopImpl{} } +func (lc *pollerTrackerNoopImpl) Stats() Pollers { return nil } +func (r *stopperNoopImpl) Stop() {} // NewNoopPollerTracker creates a new PollerTracker instance func NewNoopPollerTracker() PollerTracker { return &pollerTrackerNoopImpl{} } diff --git a/internal/common/debug/workerstats_noop_test.go b/internal/common/debug/workerstats_noop_test.go index 7177410d9..403032339 100644 --- a/internal/common/debug/workerstats_noop_test.go +++ b/internal/common/debug/workerstats_noop_test.go @@ -30,9 +30,9 @@ func TestWorkerStats(t *testing.T) { pollerTracker := NewNoopPollerTracker() activityTracker := NewNoopActivityTracker() assert.NotNil(t, pollerTracker) - assert.NotNil(t, pollerTracker.Start()) - assert.Equal(t, int32(0), pollerTracker.Stats()) - assert.NotPanics(t, pollerTracker.Start().Stop) + assert.NotNil(t, pollerTracker.Start("")) + assert.Nil(t, pollerTracker.Stats()) + assert.NotPanics(t, pollerTracker.Start("").Stop) assert.NotNil(t, activityTracker.Start(ActivityInfo{})) assert.Nil(t, activityTracker.Stats()) } diff --git a/internal/internal_worker_base.go b/internal/internal_worker_base.go index ba9da7818..85f777481 100644 --- a/internal/internal_worker_base.go +++ b/internal/internal_worker_base.go @@ -236,7 +236,7 @@ func (bw *baseWorker) isShutdown() bool { func (bw *baseWorker) runPoller() { defer bw.shutdownWG.Done() - defer bw.options.pollerTracker.Start().Stop() + defer bw.options.pollerTracker.Start(bw.options.workerType).Stop() bw.metricsScope.Counter(metrics.PollerStartCounter).Inc(1)