diff --git a/postq/pg/router.go b/postq/pg/router.go index 7dd72a72..98a09d99 100644 --- a/postq/pg/router.go +++ b/postq/pg/router.go @@ -20,6 +20,12 @@ func defaultRouteExtractor(payload string) (string, string, error) { // notifyRouter distributes the pgNotify event to multiple channels // based on the payload. type notifyRouter struct { + // when in signal mode, signals more than the channel size are simply discarded. + // + // i.e. if the channel size is 1 & 10 signals come in - we drop the last 8 signals. + // Essentially, we are squashing the last 9 signals into 1 signal and only publish 2 signals. + signalMode bool + registry map[string]chan string routeExtractor routeExtractorFn } @@ -36,14 +42,27 @@ func (t *notifyRouter) WithRouteExtractor(routeExtractor routeExtractorFn) *noti return t } +func (t *notifyRouter) GetOrCreateChannel(routes ...string) <-chan string { + return t.getOrCreateChannel(0, routes...) +} + +func (t *notifyRouter) GetOrCreateBufferedChannel(size int, routes ...string) <-chan string { + t.signalMode = size >= 0 + return t.getOrCreateChannel(size, routes...) +} + // GetOrCreateChannel creates a single channel for the given routes. // // If any of the routes already has a channel, we use that existing for all the routes. // // Caution: The caller needs to ensure that the route // groups do not overlap. -func (t *notifyRouter) GetOrCreateChannel(routes ...string) <-chan string { - pgNotifyChannel := make(chan string) +func (t *notifyRouter) getOrCreateChannel(size int, routes ...string) <-chan string { + // we create a channel with size one more than the requested amount + // so that we have one additional signal in the buffer that represents + // any further signals that come in. + pgNotifyChannel := make(chan string, size+1) + for _, we := range routes { if existing, ok := t.registry[we]; ok { pgNotifyChannel = existing @@ -80,8 +99,19 @@ func (t *notifyRouter) start(channel chan string) { } if ch, ok := t.registry[route]; ok { + // publish in a go routine as we don't want any slow consumers + // to block other fast consumers. go func() { - ch <- extractedPayload + if t.signalMode { + select { + case ch <- extractedPayload: + // message written + default: + // message dropped + } + } else { + ch <- extractedPayload + } }() } } diff --git a/postq/pg_consumer.go b/postq/pg_consumer.go index a8a9b2c9..8e80dfa4 100644 --- a/postq/pg_consumer.go +++ b/postq/pg_consumer.go @@ -65,7 +65,6 @@ func NewPGConsumer(consumerFunc ConsumerFunc, opt *ConsumerOption) (*PGConsumer, if opt.ErrorHandler != nil { ec.errorHandler = opt.ErrorHandler } - } return ec, nil