Skip to content

Commit

Permalink
feat: improve event consumers
Browse files Browse the repository at this point in the history
discard unnecessary pgnotify notifications.

For every event we're currently calling ConsumeUntilEmpty.
If 10 events are published in burst, we call ConsumeUntilEmpty for each
of those 10 notifications.

This PR fixes that behavior by simply discarding additional signals.
  • Loading branch information
adityathebe authored and moshloop committed Oct 25, 2024
1 parent ca1e45c commit 995c515
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 4 deletions.
36 changes: 33 additions & 3 deletions postq/pg/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ func defaultRouteExtractor(payload string) (string, string, error) {
// notifyRouter distributes the pgNotify event to multiple channels
// based on the payload.
type notifyRouter struct {
// when in signal mode, signals more than the channel size are simply discarded.
//
// i.e. if the channel size is 1 & 10 signals come in - we drop the last 8 signals.
// Essentially, we are squashing the last 9 signals into 1 signal and only publish 2 signals.
signalMode bool

registry map[string]chan string
routeExtractor routeExtractorFn
}
Expand All @@ -36,14 +42,27 @@ func (t *notifyRouter) WithRouteExtractor(routeExtractor routeExtractorFn) *noti
return t
}

func (t *notifyRouter) GetOrCreateChannel(routes ...string) <-chan string {
return t.getOrCreateChannel(0, routes...)
}

func (t *notifyRouter) GetOrCreateBufferedChannel(size int, routes ...string) <-chan string {
t.signalMode = size >= 0
return t.getOrCreateChannel(size, routes...)
}

// GetOrCreateChannel creates a single channel for the given routes.
//
// If any of the routes already has a channel, we use that existing for all the routes.
//
// Caution: The caller needs to ensure that the route
// groups do not overlap.
func (t *notifyRouter) GetOrCreateChannel(routes ...string) <-chan string {
pgNotifyChannel := make(chan string)
func (t *notifyRouter) getOrCreateChannel(size int, routes ...string) <-chan string {
// we create a channel with size one more than the requested amount
// so that we have one additional signal in the buffer that represents
// any further signals that come in.
pgNotifyChannel := make(chan string, size+1)

for _, we := range routes {
if existing, ok := t.registry[we]; ok {
pgNotifyChannel = existing
Expand Down Expand Up @@ -80,8 +99,19 @@ func (t *notifyRouter) start(channel chan string) {
}

if ch, ok := t.registry[route]; ok {
// publish in a go routine as we don't want any slow consumers
// to block other fast consumers.
go func() {
ch <- extractedPayload
if t.signalMode {
select {
case ch <- extractedPayload:
// message written
default:
// message dropped
}
} else {
ch <- extractedPayload
}
}()
}
}
Expand Down
1 change: 0 additions & 1 deletion postq/pg_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ func NewPGConsumer(consumerFunc ConsumerFunc, opt *ConsumerOption) (*PGConsumer,
if opt.ErrorHandler != nil {
ec.errorHandler = opt.ErrorHandler
}

}

return ec, nil
Expand Down

0 comments on commit 995c515

Please sign in to comment.