From c62b44a5eb4053ed1ae6007aeebd03316245f74b Mon Sep 17 00:00:00 2001 From: Aditya Thebe Date: Fri, 13 Sep 2024 11:37:45 +0545 Subject: [PATCH] feat: dedup unsent notifications in history --- models/notifications.go | 18 ++++++-- schema/notifications.hcl | 14 ++++-- tests/notification_test.go | 87 ++++++++++++++++++++++++++++++++++++++ views/021_notification.sql | 37 +++++++++++++++- 4 files changed, 148 insertions(+), 8 deletions(-) create mode 100644 tests/notification_test.go diff --git a/models/notifications.go b/models/notifications.go index 9b307671..3532dbb5 100644 --- a/models/notifications.go +++ b/models/notifications.go @@ -49,20 +49,30 @@ func (n Notification) AsMap(removeFields ...string) map[string]any { } const ( - NotificationStatusError = "error" - NotificationStatusSent = "sent" - NotificationStatusSending = "sending" + NotificationStatusError = "error" + NotificationStatusSent = "sent" + NotificationStatusSending = "sending" + NotificationStatusSilenced = "silenced" + NotificationStatusRepeatInterval = "repeat-interval" ) type NotificationSendHistory struct { ID uuid.UUID `json:"id,omitempty" gorm:"default:generate_ulid()"` NotificationID uuid.UUID `json:"notification_id"` - Body string `json:"body,omitempty"` + Body *string `json:"body,omitempty"` Error *string `json:"error,omitempty"` DurationMillis int64 `json:"duration_millis,omitempty"` CreatedAt time.Time `json:"created_at" time_format:"postgres_timestamp"` Status string `json:"status,omitempty"` + // Notifications that were silenced or blocked by repeat intervals + // use this counter. + Count int `json:"count"` + + // Notifications that were silenced or blocked by repeat intervals + // use this as the first observed timestamp. + FirstObserved time.Time `json:"first_observed"` + // Name of the original event that caused this notification SourceEvent string `json:"source_event"` diff --git a/schema/notifications.hcl b/schema/notifications.hcl index 96ed279a..c21eb690 100644 --- a/schema/notifications.hcl +++ b/schema/notifications.hcl @@ -113,15 +113,23 @@ table "notification_send_history" { type = uuid } column "body" { - null = false + null = true # nullable for unsent notifications type = text } - column "status" { null = true type = text } - + column "count" { + null = false + default = 1 + type = integer + } + column "first_observed" { + null = false + type = timestamptz + default = sql("now()") + } column "source_event" { null = false type = text diff --git a/tests/notification_test.go b/tests/notification_test.go new file mode 100644 index 00000000..87f468e9 --- /dev/null +++ b/tests/notification_test.go @@ -0,0 +1,87 @@ +package tests + +import ( + "fmt" + "time" + + "github.com/flanksource/duty/models" + "github.com/google/uuid" + "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +var _ = ginkgo.Describe("unsent notification deduplication", ginkgo.Ordered, func() { + notification := models.Notification{ + Events: []string{"check.failed", "check.passed"}, + Source: models.SourceCRD, + } + + var ( + dummyResources = []uuid.UUID{uuid.New(), uuid.New(), uuid.New()} + statuses = []string{models.NotificationStatusSilenced, models.NotificationStatusRepeatInterval} + silenceWindow = time.Second * 2 + ) + + ginkgo.BeforeAll(func() { + err := DefaultContext.DB().Create(¬ification).Error + Expect(err).To(BeNil()) + }) + + for _, sourceEvent := range notification.Events { + for _, sendStatus := range statuses { + for i, dummyResource := range dummyResources { + ginkgo.It(fmt.Sprintf("Event[%s] Resource[%d] should save unsent notifications to history", sourceEvent, i+1), func() { + iteration := 10 + for j := 0; j < iteration; j++ { + query := "SELECT * FROM insert_unsent_notification_to_history(?, ?, ?, ?, ?)" + err := DefaultContext.DB().Exec(query, notification.ID, sourceEvent, dummyResource, sendStatus, silenceWindow).Error + Expect(err).To(BeNil()) + } + + var sentHistories []models.NotificationSendHistory + err := DefaultContext.DB().Model(&models.NotificationSendHistory{}). + Where("status = ?", sendStatus). + Where("resource_id = ?", dummyResource). + Where("source_event = ?", sourceEvent).Find(&sentHistories).Error + Expect(err).To(BeNil()) + Expect(len(sentHistories)).To(Equal(1)) + + sentHistory := sentHistories[0] + Expect(sentHistory.ResourceID).To(Equal(dummyResource)) + Expect(sentHistory.Status).To(Equal(sendStatus)) + Expect(sentHistory.Count).To(Equal(iteration)) + Expect(sentHistory.FirstObserved).To(BeTemporally("<", sentHistory.CreatedAt)) + }) + } + } + } + + ginkgo.It("should not dedup out of window", func() { + time.Sleep(silenceWindow) // wait for window to pass + + var ( + dummyResource = dummyResources[0] + sourceEvent = notification.Events[0] + sendStatus = models.NotificationStatusSilenced + ) + + query := "SELECT * FROM insert_unsent_notification_to_history(?, ?, ?, ?, ?)" + err := DefaultContext.DB().Exec(query, notification.ID, sourceEvent, dummyResource, models.NotificationStatusSilenced, silenceWindow).Error + Expect(err).To(BeNil()) + + var sentHistories []models.NotificationSendHistory + err = DefaultContext.DB().Model(&models.NotificationSendHistory{}). + Where("status = ?", sendStatus). + Where("resource_id = ?", dummyResource). + Where("source_event = ?", sourceEvent).Order("created_at DESC").Find(&sentHistories).Error + Expect(err).To(BeNil()) + Expect(len(sentHistories)).To(Equal(2), "Expected 2 histories for two different window") + + sentHistory := sentHistories[0] // The first one is the latest + + Expect(sentHistory.ResourceID).To(Equal(dummyResource)) + Expect(sentHistory.Status).To(Equal(models.NotificationStatusSilenced)) + Expect(sentHistory.Count).To(Equal(1)) + Expect(sentHistory.FirstObserved.Unix()).To(Equal(sentHistory.CreatedAt.Unix())) + }) +}) diff --git a/views/021_notification.sql b/views/021_notification.sql index e8432217..c34398fe 100644 --- a/views/021_notification.sql +++ b/views/021_notification.sql @@ -32,4 +32,39 @@ $$ LANGUAGE plpgsql; CREATE OR REPLACE TRIGGER reset_notification_error_before_update_trigger BEFORE UPDATE ON notifications FOR EACH ROW -EXECUTE PROCEDURE reset_notification_error_before_update(); \ No newline at end of file +EXECUTE PROCEDURE reset_notification_error_before_update(); + +--- A function to insert only those notifications that were unsent. +--- It deals with the deduplication of inserting the same notification again if it was silenced or blocked by repeatInterval. +CREATE OR REPLACE FUNCTION insert_unsent_notification_to_history( + p_notification_id UUID, + p_source_event TEXT, + p_resource_id UUID, + p_status TEXT, + p_window INTERVAL +) RETURNS VOID AS $$ +DECLARE + v_existing_id UUID; +BEGIN + IF p_status NOT IN ('silenced', 'repeat-interval') THEN + RAISE EXCEPTION 'Status must be silenced or repeat-interval'; + END IF; + + SELECT id INTO v_existing_id FROM notification_send_history + WHERE notification_id = p_notification_id + AND source_event = p_source_event + AND resource_id = p_resource_id + AND status = p_status + AND created_at > NOW() - p_window + ORDER BY created_at DESC + LIMIT 1; + + IF v_existing_id IS NOT NULL THEN + UPDATE notification_send_history SET count = count + 1, created_at = CURRENT_TIMESTAMP + WHERE id = v_existing_id; + ELSE + INSERT INTO notification_send_history (notification_id, status, source_event, resource_id) + VALUES (p_notification_id, p_status, p_source_event, p_resource_id); + END IF; +END; +$$ LANGUAGE plpgsql; \ No newline at end of file