diff --git a/pkg/rsqueue/impls/database/go.mod b/pkg/rsqueue/impls/database/go.mod index 50aa5de..a7355ac 100644 --- a/pkg/rsqueue/impls/database/go.mod +++ b/pkg/rsqueue/impls/database/go.mod @@ -5,9 +5,9 @@ go 1.20 require ( github.com/fortytw2/leaktest v1.3.0 github.com/google/uuid v1.1.2 - github.com/rstudio/platform-lib/pkg/rsnotify v1.4.0 + github.com/rstudio/platform-lib/pkg/rsnotify v1.5.1 github.com/rstudio/platform-lib/pkg/rsnotify/listeners/local v1.4.1 - github.com/rstudio/platform-lib/pkg/rsqueue v0.1.1 + github.com/rstudio/platform-lib/pkg/rsqueue v0.2.2 gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c ) diff --git a/pkg/rsqueue/impls/database/go.sum b/pkg/rsqueue/impls/database/go.sum index bf867fe..6a0fe1a 100644 --- a/pkg/rsqueue/impls/database/go.sum +++ b/pkg/rsqueue/impls/database/go.sum @@ -9,13 +9,11 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= -github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= -github.com/rstudio/platform-lib/pkg/rsnotify v1.4.0 h1:tMeO7PckDZzDROBmu5nZJ1pORN3dNDWMWv57X0xpMf4= -github.com/rstudio/platform-lib/pkg/rsnotify v1.4.0/go.mod h1:OTMZNgESF0Y2THisqq2gd4P/yF9YLCUW1GNXy5ainYM= +github.com/rstudio/platform-lib/pkg/rsnotify v1.5.1 h1:hkLLKye0iBqW5r5ae6qnX5viU9qj0Eiwh+vUpkuy2zI= +github.com/rstudio/platform-lib/pkg/rsnotify v1.5.1/go.mod h1:OTMZNgESF0Y2THisqq2gd4P/yF9YLCUW1GNXy5ainYM= github.com/rstudio/platform-lib/pkg/rsnotify/listeners/local v1.4.1 h1:65BsZFFHKW9Bm0MBi3Jw7a72C2XqRnmA+519Cm+6zFk= github.com/rstudio/platform-lib/pkg/rsnotify/listeners/local v1.4.1/go.mod h1:3fj43uPHSrY30ZJmAgbSmmy9d7heQVnL/MXVwOz+5Pc= -github.com/rstudio/platform-lib/pkg/rsqueue v0.1.1 h1:y6hxq3z1F98KFEnfSSYDm5BP4C9MlSxwnNPKHGluV7w= -github.com/rstudio/platform-lib/pkg/rsqueue v0.1.1/go.mod h1:v47Q+KI/vBl6NLgrMLry6x3J651uhppVvCCGffq/bDI= -gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +github.com/rstudio/platform-lib/pkg/rsqueue v0.2.2 h1:nt1/zcuMLGiCExbmQLww27Tj2jGWZgyxXI6+FOVcODM= +github.com/rstudio/platform-lib/pkg/rsqueue v0.2.2/go.mod h1:ac7W+fPG4pcb8yYp/npchGewPJezAu6UjlN0DjPEuH8= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= diff --git a/pkg/rsqueue/impls/database/queue.go b/pkg/rsqueue/impls/database/queue.go index 8516362..888f80c 100644 --- a/pkg/rsqueue/impls/database/queue.go +++ b/pkg/rsqueue/impls/database/queue.go @@ -46,6 +46,8 @@ type DatabaseQueue struct { chunkMatcher dbqueuetypes.DatabaseQueueChunkMatcher wrapper metrics.JobLifecycleWrapper + + metrics metrics.Metrics } type DatabaseQueueConfig struct { @@ -62,6 +64,7 @@ type DatabaseQueueConfig struct { ChunkMsgsChan <-chan listener.Notification StopChan chan bool JobLifecycleWrapper metrics.JobLifecycleWrapper + Metrics metrics.Metrics } func NewDatabaseQueue(cfg DatabaseQueueConfig) (queue.Queue, error) { @@ -83,6 +86,8 @@ func NewDatabaseQueue(cfg DatabaseQueueConfig) (queue.Queue, error) { unsubscribe: make(chan (<-chan listener.Notification)), wrapper: cfg.JobLifecycleWrapper, + + metrics: cfg.Metrics, } go rq.broadcast(cfg.StopChan, cfg.QueueMsgsChan, cfg.WorkMsgsChan, cfg.ChunkMsgsChan) @@ -246,6 +251,7 @@ func (q *DatabaseQueue) PollAddress(address string) <-chan error { errCh := make(chan error) go func() { + var done, ticked bool for { isDone, err := q.store.IsQueueAddressComplete(address) if err != nil { @@ -258,11 +264,16 @@ func (q *DatabaseQueue) PollAddress(address string) <-chan error { } else if isDone { q.debugLogger.Debugf("Queue work with address %s completed", address) close(errCh) + if ticked && q.metrics != nil { + // We detected this work was done after a ticker tick instead of in response + // to a work complete or chunk notification. + q.metrics.QueueNotificationMiss(q.name, address) + } return } // Wait for a notification or an interval, then poll again - done := func() bool { + done, ticked = func() (bool, bool) { completedMsgs := q.SubscribeOne(q.notifyTypeWorkComplete, func(n listener.Notification) bool { if wn, ok := n.(*agenttypes.WorkCompleteNotification); ok { return wn.Address == address @@ -281,12 +292,12 @@ func (q *DatabaseQueue) PollAddress(address string) <-chan error { select { case <-completedMsgs: q.debugLogger.Debugf("Queue was notified that work with address %s completed", address) - return false + return false, false case <-chunkMsgs: q.debugLogger.Debugf("Queue was notified that chunk with address %s is ready", address) - return true + return true, false case <-tick.C: - return false + return false, true } } }() diff --git a/pkg/rsqueue/impls/database/queue_test.go b/pkg/rsqueue/impls/database/queue_test.go index a612761..de61257 100644 --- a/pkg/rsqueue/impls/database/queue_test.go +++ b/pkg/rsqueue/impls/database/queue_test.go @@ -39,6 +39,14 @@ func (s *QueueSuite) SetUpTest(c *check.C) { s.store.hasAddress = true } +type fakeMetrics struct { + count int +} + +func (f *fakeMetrics) QueueNotificationMiss(name, address string) { + f.count++ +} + type QueueTestStore struct { llf *local.ListenerProvider err error @@ -523,6 +531,7 @@ func (s *QueueSuite) TestPollTickOk(c *check.C) { defer leaktest.Check(c) s.store.poll = false + fm := &fakeMetrics{} q := &DatabaseQueue{ store: s.store, debugLogger: &fakeLogger{}, @@ -530,6 +539,7 @@ func (s *QueueSuite) TestPollTickOk(c *check.C) { subscribe: make(chan broadcaster.Subscription), unsubscribe: make(chan (<-chan listener.Notification)), wrapper: &fakeWrapper{}, + metrics: fm, } queueMsgs := make(chan listener.Notification) @@ -552,6 +562,7 @@ func (s *QueueSuite) TestPollTickOk(c *check.C) { <-errCh c.Assert(s.store.polled > 1, check.Equals, true) + c.Assert(fm.count, check.Equals, 1) } func (s *QueueSuite) TestPollNotifyOk(c *check.C) {