Skip to content

Commit

Permalink
Merge pull request #121 from rstudio/jon-metrics-2
Browse files Browse the repository at this point in the history
Database queue: Support queue notifications miss counter
  • Loading branch information
jonyoder authored Aug 2, 2023
2 parents fd56dae + 056deec commit 57e957a
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 12 deletions.
4 changes: 2 additions & 2 deletions pkg/rsqueue/impls/database/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ go 1.20
require (
github.com/fortytw2/leaktest v1.3.0
github.com/google/uuid v1.1.2
github.com/rstudio/platform-lib/pkg/rsnotify v1.4.0
github.com/rstudio/platform-lib/pkg/rsnotify v1.5.1
github.com/rstudio/platform-lib/pkg/rsnotify/listeners/local v1.4.1
github.com/rstudio/platform-lib/pkg/rsqueue v0.1.1
github.com/rstudio/platform-lib/pkg/rsqueue v0.2.2
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c
)

Expand Down
10 changes: 4 additions & 6 deletions pkg/rsqueue/impls/database/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,11 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/rstudio/platform-lib/pkg/rsnotify v1.4.0 h1:tMeO7PckDZzDROBmu5nZJ1pORN3dNDWMWv57X0xpMf4=
github.com/rstudio/platform-lib/pkg/rsnotify v1.4.0/go.mod h1:OTMZNgESF0Y2THisqq2gd4P/yF9YLCUW1GNXy5ainYM=
github.com/rstudio/platform-lib/pkg/rsnotify v1.5.1 h1:hkLLKye0iBqW5r5ae6qnX5viU9qj0Eiwh+vUpkuy2zI=
github.com/rstudio/platform-lib/pkg/rsnotify v1.5.1/go.mod h1:OTMZNgESF0Y2THisqq2gd4P/yF9YLCUW1GNXy5ainYM=
github.com/rstudio/platform-lib/pkg/rsnotify/listeners/local v1.4.1 h1:65BsZFFHKW9Bm0MBi3Jw7a72C2XqRnmA+519Cm+6zFk=
github.com/rstudio/platform-lib/pkg/rsnotify/listeners/local v1.4.1/go.mod h1:3fj43uPHSrY30ZJmAgbSmmy9d7heQVnL/MXVwOz+5Pc=
github.com/rstudio/platform-lib/pkg/rsqueue v0.1.1 h1:y6hxq3z1F98KFEnfSSYDm5BP4C9MlSxwnNPKHGluV7w=
github.com/rstudio/platform-lib/pkg/rsqueue v0.1.1/go.mod h1:v47Q+KI/vBl6NLgrMLry6x3J651uhppVvCCGffq/bDI=
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
github.com/rstudio/platform-lib/pkg/rsqueue v0.2.2 h1:nt1/zcuMLGiCExbmQLww27Tj2jGWZgyxXI6+FOVcODM=
github.com/rstudio/platform-lib/pkg/rsqueue v0.2.2/go.mod h1:ac7W+fPG4pcb8yYp/npchGewPJezAu6UjlN0DjPEuH8=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
19 changes: 15 additions & 4 deletions pkg/rsqueue/impls/database/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ type DatabaseQueue struct {
chunkMatcher dbqueuetypes.DatabaseQueueChunkMatcher

wrapper metrics.JobLifecycleWrapper

metrics metrics.Metrics
}

type DatabaseQueueConfig struct {
Expand All @@ -62,6 +64,7 @@ type DatabaseQueueConfig struct {
ChunkMsgsChan <-chan listener.Notification
StopChan chan bool
JobLifecycleWrapper metrics.JobLifecycleWrapper
Metrics metrics.Metrics
}

func NewDatabaseQueue(cfg DatabaseQueueConfig) (queue.Queue, error) {
Expand All @@ -83,6 +86,8 @@ func NewDatabaseQueue(cfg DatabaseQueueConfig) (queue.Queue, error) {
unsubscribe: make(chan (<-chan listener.Notification)),

wrapper: cfg.JobLifecycleWrapper,

metrics: cfg.Metrics,
}

go rq.broadcast(cfg.StopChan, cfg.QueueMsgsChan, cfg.WorkMsgsChan, cfg.ChunkMsgsChan)
Expand Down Expand Up @@ -246,6 +251,7 @@ func (q *DatabaseQueue) PollAddress(address string) <-chan error {
errCh := make(chan error)

go func() {
var done, ticked bool
for {
isDone, err := q.store.IsQueueAddressComplete(address)
if err != nil {
Expand All @@ -258,11 +264,16 @@ func (q *DatabaseQueue) PollAddress(address string) <-chan error {
} else if isDone {
q.debugLogger.Debugf("Queue work with address %s completed", address)
close(errCh)
if ticked && q.metrics != nil {
// We detected this work was done after a ticker tick instead of in response
// to a work complete or chunk notification.
q.metrics.QueueNotificationMiss(q.name, address)
}
return
}

// Wait for a notification or an interval, then poll again
done := func() bool {
done, ticked = func() (bool, bool) {
completedMsgs := q.SubscribeOne(q.notifyTypeWorkComplete, func(n listener.Notification) bool {
if wn, ok := n.(*agenttypes.WorkCompleteNotification); ok {
return wn.Address == address
Expand All @@ -281,12 +292,12 @@ func (q *DatabaseQueue) PollAddress(address string) <-chan error {
select {
case <-completedMsgs:
q.debugLogger.Debugf("Queue was notified that work with address %s completed", address)
return false
return false, false
case <-chunkMsgs:
q.debugLogger.Debugf("Queue was notified that chunk with address %s is ready", address)
return true
return true, false
case <-tick.C:
return false
return false, true
}
}
}()
Expand Down
11 changes: 11 additions & 0 deletions pkg/rsqueue/impls/database/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,14 @@ func (s *QueueSuite) SetUpTest(c *check.C) {
s.store.hasAddress = true
}

type fakeMetrics struct {
count int
}

func (f *fakeMetrics) QueueNotificationMiss(name, address string) {
f.count++
}

type QueueTestStore struct {
llf *local.ListenerProvider
err error
Expand Down Expand Up @@ -523,13 +531,15 @@ func (s *QueueSuite) TestPollTickOk(c *check.C) {
defer leaktest.Check(c)

s.store.poll = false
fm := &fakeMetrics{}
q := &DatabaseQueue{
store: s.store,
debugLogger: &fakeLogger{},
addressPollInterval: time.Millisecond * 50,
subscribe: make(chan broadcaster.Subscription),
unsubscribe: make(chan (<-chan listener.Notification)),
wrapper: &fakeWrapper{},
metrics: fm,
}

queueMsgs := make(chan listener.Notification)
Expand All @@ -552,6 +562,7 @@ func (s *QueueSuite) TestPollTickOk(c *check.C) {

<-errCh
c.Assert(s.store.polled > 1, check.Equals, true)
c.Assert(fm.count, check.Equals, 1)
}

func (s *QueueSuite) TestPollNotifyOk(c *check.C) {
Expand Down

0 comments on commit 57e957a

Please sign in to comment.