From a809738a7b6563e1261bca9cfecd3e9668a903d7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mi=C5=82osz=20Sm=C3=B3=C5=82ka?= Date: Sat, 2 Nov 2024 12:04:32 +0100 Subject: [PATCH 1/2] Router: publish messages in bulk --- message/router.go | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/message/router.go b/message/router.go index 1b93e85a6..baa873328 100644 --- a/message/router.go +++ b/message/router.go @@ -825,15 +825,13 @@ func (h *handler) publishProducedMessages(producedMessages Messages, msgFields w "publish_topic": h.publishTopic, })) - for _, msg := range producedMessages { - if err := h.publisher.Publish(h.publishTopic, msg); err != nil { - // todo - how to deal with it better/transactional/retry? - h.logger.Error("Cannot publish message", err, msgFields.Add(watermill.LogFields{ - "not_sent_message": fmt.Sprintf("%#v", producedMessages), - })) - - return err - } + if err := h.publisher.Publish(h.publishTopic, producedMessages...); err != nil { + // todo - how to deal with it better/transactional/retry? + h.logger.Error("Cannot publish messages", err, msgFields.Add(watermill.LogFields{ + "not_sent_message": fmt.Sprintf("%#v", producedMessages), + })) + + return err } return nil From 9606511ef3b9c6cf65e42b52b5191bf64702d418 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mi=C5=82osz=20Sm=C3=B3=C5=82ka?= Date: Sat, 2 Nov 2024 12:12:11 +0100 Subject: [PATCH 2/2] Update router.go --- message/router.go | 1 - 1 file changed, 1 deletion(-) diff --git a/message/router.go b/message/router.go index baa873328..0ecf48c6d 100644 --- a/message/router.go +++ b/message/router.go @@ -826,7 +826,6 @@ func (h *handler) publishProducedMessages(producedMessages Messages, msgFields w })) if err := h.publisher.Publish(h.publishTopic, producedMessages...); err != nil { - // todo - how to deal with it better/transactional/retry? h.logger.Error("Cannot publish messages", err, msgFields.Add(watermill.LogFields{ "not_sent_message": fmt.Sprintf("%#v", producedMessages), }))