From 42372fb3f8d6497afd3ba3748c012d1a17679531 Mon Sep 17 00:00:00 2001 From: Aditya Thebe Date: Thu, 24 Oct 2024 15:37:01 +0545 Subject: [PATCH] feat: improve event consumers discard unnecessary pgnotify notifications. For every event we're currently calling ConsumeUntilEmpty. If 10 events are published in burst, we call ConsumeUntilEmpty for each of those 10 notifications. This PR fixes that behavior by simply discarding additional signals. --- postq/pg/router.go | 36 +++++++++++++++++++++++++++++++++--- postq/pg_consumer.go | 1 - 2 files changed, 33 insertions(+), 4 deletions(-) diff --git a/postq/pg/router.go b/postq/pg/router.go index 7dd72a72..98a09d99 100644 --- a/postq/pg/router.go +++ b/postq/pg/router.go @@ -20,6 +20,12 @@ func defaultRouteExtractor(payload string) (string, string, error) { // notifyRouter distributes the pgNotify event to multiple channels // based on the payload. type notifyRouter struct { + // when in signal mode, signals more than the channel size are simply discarded. + // + // i.e. if the channel size is 1 & 10 signals come in - we drop the last 8 signals. + // Essentially, we are squashing the last 9 signals into 1 signal and only publish 2 signals. + signalMode bool + registry map[string]chan string routeExtractor routeExtractorFn } @@ -36,14 +42,27 @@ func (t *notifyRouter) WithRouteExtractor(routeExtractor routeExtractorFn) *noti return t } +func (t *notifyRouter) GetOrCreateChannel(routes ...string) <-chan string { + return t.getOrCreateChannel(0, routes...) +} + +func (t *notifyRouter) GetOrCreateBufferedChannel(size int, routes ...string) <-chan string { + t.signalMode = size >= 0 + return t.getOrCreateChannel(size, routes...) +} + // GetOrCreateChannel creates a single channel for the given routes. // // If any of the routes already has a channel, we use that existing for all the routes. // // Caution: The caller needs to ensure that the route // groups do not overlap. -func (t *notifyRouter) GetOrCreateChannel(routes ...string) <-chan string { - pgNotifyChannel := make(chan string) +func (t *notifyRouter) getOrCreateChannel(size int, routes ...string) <-chan string { + // we create a channel with size one more than the requested amount + // so that we have one additional signal in the buffer that represents + // any further signals that come in. + pgNotifyChannel := make(chan string, size+1) + for _, we := range routes { if existing, ok := t.registry[we]; ok { pgNotifyChannel = existing @@ -80,8 +99,19 @@ func (t *notifyRouter) start(channel chan string) { } if ch, ok := t.registry[route]; ok { + // publish in a go routine as we don't want any slow consumers + // to block other fast consumers. go func() { - ch <- extractedPayload + if t.signalMode { + select { + case ch <- extractedPayload: + // message written + default: + // message dropped + } + } else { + ch <- extractedPayload + } }() } } diff --git a/postq/pg_consumer.go b/postq/pg_consumer.go index a8a9b2c9..8e80dfa4 100644 --- a/postq/pg_consumer.go +++ b/postq/pg_consumer.go @@ -65,7 +65,6 @@ func NewPGConsumer(consumerFunc ConsumerFunc, opt *ConsumerOption) (*PGConsumer, if opt.ErrorHandler != nil { ec.errorHandler = opt.ErrorHandler } - } return ec, nil