Skip to content

Commit

Permalink
feat: dedup unsent notifications in history
Browse files Browse the repository at this point in the history
  • Loading branch information
adityathebe authored and moshloop committed Sep 13, 2024
1 parent 4b8f34c commit c62b44a
Show file tree
Hide file tree
Showing 4 changed files with 148 additions and 8 deletions.
18 changes: 14 additions & 4 deletions models/notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,20 +49,30 @@ func (n Notification) AsMap(removeFields ...string) map[string]any {
}

const (
NotificationStatusError = "error"
NotificationStatusSent = "sent"
NotificationStatusSending = "sending"
NotificationStatusError = "error"
NotificationStatusSent = "sent"
NotificationStatusSending = "sending"
NotificationStatusSilenced = "silenced"
NotificationStatusRepeatInterval = "repeat-interval"
)

type NotificationSendHistory struct {
ID uuid.UUID `json:"id,omitempty" gorm:"default:generate_ulid()"`
NotificationID uuid.UUID `json:"notification_id"`
Body string `json:"body,omitempty"`
Body *string `json:"body,omitempty"`
Error *string `json:"error,omitempty"`
DurationMillis int64 `json:"duration_millis,omitempty"`
CreatedAt time.Time `json:"created_at" time_format:"postgres_timestamp"`
Status string `json:"status,omitempty"`

// Notifications that were silenced or blocked by repeat intervals
// use this counter.
Count int `json:"count"`

// Notifications that were silenced or blocked by repeat intervals
// use this as the first observed timestamp.
FirstObserved time.Time `json:"first_observed"`

// Name of the original event that caused this notification
SourceEvent string `json:"source_event"`

Expand Down
14 changes: 11 additions & 3 deletions schema/notifications.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -113,15 +113,23 @@ table "notification_send_history" {
type = uuid
}
column "body" {
null = false
null = true # nullable for unsent notifications
type = text
}

column "status" {
null = true
type = text
}

column "count" {
null = false
default = 1
type = integer
}
column "first_observed" {
null = false
type = timestamptz
default = sql("now()")
}
column "source_event" {
null = false
type = text
Expand Down
87 changes: 87 additions & 0 deletions tests/notification_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package tests

import (
"fmt"
"time"

"github.com/flanksource/duty/models"
"github.com/google/uuid"
"github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

var _ = ginkgo.Describe("unsent notification deduplication", ginkgo.Ordered, func() {
notification := models.Notification{
Events: []string{"check.failed", "check.passed"},
Source: models.SourceCRD,
}

var (
dummyResources = []uuid.UUID{uuid.New(), uuid.New(), uuid.New()}
statuses = []string{models.NotificationStatusSilenced, models.NotificationStatusRepeatInterval}
silenceWindow = time.Second * 2
)

ginkgo.BeforeAll(func() {
err := DefaultContext.DB().Create(&notification).Error
Expect(err).To(BeNil())
})

for _, sourceEvent := range notification.Events {
for _, sendStatus := range statuses {
for i, dummyResource := range dummyResources {
ginkgo.It(fmt.Sprintf("Event[%s] Resource[%d] should save unsent notifications to history", sourceEvent, i+1), func() {
iteration := 10
for j := 0; j < iteration; j++ {
query := "SELECT * FROM insert_unsent_notification_to_history(?, ?, ?, ?, ?)"
err := DefaultContext.DB().Exec(query, notification.ID, sourceEvent, dummyResource, sendStatus, silenceWindow).Error
Expect(err).To(BeNil())
}

var sentHistories []models.NotificationSendHistory
err := DefaultContext.DB().Model(&models.NotificationSendHistory{}).
Where("status = ?", sendStatus).
Where("resource_id = ?", dummyResource).
Where("source_event = ?", sourceEvent).Find(&sentHistories).Error
Expect(err).To(BeNil())
Expect(len(sentHistories)).To(Equal(1))

sentHistory := sentHistories[0]
Expect(sentHistory.ResourceID).To(Equal(dummyResource))
Expect(sentHistory.Status).To(Equal(sendStatus))
Expect(sentHistory.Count).To(Equal(iteration))
Expect(sentHistory.FirstObserved).To(BeTemporally("<", sentHistory.CreatedAt))
})
}
}
}

ginkgo.It("should not dedup out of window", func() {
time.Sleep(silenceWindow) // wait for window to pass

var (
dummyResource = dummyResources[0]
sourceEvent = notification.Events[0]
sendStatus = models.NotificationStatusSilenced
)

query := "SELECT * FROM insert_unsent_notification_to_history(?, ?, ?, ?, ?)"
err := DefaultContext.DB().Exec(query, notification.ID, sourceEvent, dummyResource, models.NotificationStatusSilenced, silenceWindow).Error
Expect(err).To(BeNil())

var sentHistories []models.NotificationSendHistory
err = DefaultContext.DB().Model(&models.NotificationSendHistory{}).
Where("status = ?", sendStatus).
Where("resource_id = ?", dummyResource).
Where("source_event = ?", sourceEvent).Order("created_at DESC").Find(&sentHistories).Error
Expect(err).To(BeNil())
Expect(len(sentHistories)).To(Equal(2), "Expected 2 histories for two different window")

sentHistory := sentHistories[0] // The first one is the latest

Expect(sentHistory.ResourceID).To(Equal(dummyResource))
Expect(sentHistory.Status).To(Equal(models.NotificationStatusSilenced))
Expect(sentHistory.Count).To(Equal(1))
Expect(sentHistory.FirstObserved.Unix()).To(Equal(sentHistory.CreatedAt.Unix()))
})
})
37 changes: 36 additions & 1 deletion views/021_notification.sql
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,39 @@ $$ LANGUAGE plpgsql;
CREATE OR REPLACE TRIGGER reset_notification_error_before_update_trigger
BEFORE UPDATE ON notifications
FOR EACH ROW
EXECUTE PROCEDURE reset_notification_error_before_update();
EXECUTE PROCEDURE reset_notification_error_before_update();

--- A function to insert only those notifications that were unsent.
--- It deals with the deduplication of inserting the same notification again if it was silenced or blocked by repeatInterval.
CREATE OR REPLACE FUNCTION insert_unsent_notification_to_history(
p_notification_id UUID,
p_source_event TEXT,
p_resource_id UUID,
p_status TEXT,
p_window INTERVAL
) RETURNS VOID AS $$
DECLARE
v_existing_id UUID;
BEGIN
IF p_status NOT IN ('silenced', 'repeat-interval') THEN
RAISE EXCEPTION 'Status must be silenced or repeat-interval';
END IF;

SELECT id INTO v_existing_id FROM notification_send_history
WHERE notification_id = p_notification_id
AND source_event = p_source_event
AND resource_id = p_resource_id
AND status = p_status
AND created_at > NOW() - p_window
ORDER BY created_at DESC
LIMIT 1;

IF v_existing_id IS NOT NULL THEN
UPDATE notification_send_history SET count = count + 1, created_at = CURRENT_TIMESTAMP
WHERE id = v_existing_id;
ELSE
INSERT INTO notification_send_history (notification_id, status, source_event, resource_id)
VALUES (p_notification_id, p_status, p_source_event, p_resource_id);
END IF;
END;
$$ LANGUAGE plpgsql;

0 comments on commit c62b44a

Please sign in to comment.