Skip to content

Commit

Permalink
feat: do not store notification events to job history. Keep track of …
Browse files Browse the repository at this point in the history
…any filter errors in the notification table itself. (#570)

* chore: always save notification to job history

* chore: only log to job history if there's a change

* feat: use notifications.error to avoid processing any failed
notifications

[skip ci]

* chore: delete pgnotify in utils

[skip ci]

* refactor: create a new expression package to evaluation CEL expressions.

[skip ci]

* chore: cleanup

[skip ci]

* chore: bump duty

* chore: refactor events/notifications.go

* refactor: addNotificationEvents
  • Loading branch information
adityathebe authored Sep 27, 2023
1 parent 9a660cc commit 72549a3
Show file tree
Hide file tree
Showing 15 changed files with 544 additions and 634 deletions.
25 changes: 25 additions & 0 deletions cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,12 @@ import (
"github.com/flanksource/incident-commander/events"
"github.com/flanksource/incident-commander/jobs"
"github.com/flanksource/incident-commander/logs"
"github.com/flanksource/incident-commander/notification"
"github.com/flanksource/incident-commander/playbook"
"github.com/flanksource/incident-commander/rbac"
"github.com/flanksource/incident-commander/responder"
"github.com/flanksource/incident-commander/snapshot"
"github.com/flanksource/incident-commander/teams"
"github.com/flanksource/incident-commander/upstream"
"github.com/flanksource/incident-commander/utils"
)
Expand Down Expand Up @@ -258,6 +261,8 @@ var Serve = &cobra.Command{
UpstreamPush: api.UpstreamConf,
})

go tableUpdatesHandler(api.DefaultContext)

go playbook.StartPlaybookRunConsumer(api.DefaultContext)

go playbook.ListenPlaybookPGNotify(api.DefaultContext)
Expand Down Expand Up @@ -335,3 +340,23 @@ func ServerCache(next echo.HandlerFunc) echo.HandlerFunc {
return next(c)
}
}

// tableUpdatesHandler handles all "table_activity" pg notifications.
func tableUpdatesHandler(ctx api.Context) {
notifyRouter := events.NewPgNotifyRouter()
go notifyRouter.Run(ctx, "table_activity")

notificationUpdateCh := notifyRouter.RegisterRoutes("notifications")
teamsUpdateChan := notifyRouter.RegisterRoutes("teams")

for {
select {
case id := <-notificationUpdateCh:
notification.PurgeCache(id)

case id := <-teamsUpdateChan:
responder.PurgeCache(id)
teams.PurgeCache(id)
}
}
}
4 changes: 4 additions & 0 deletions db/notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,7 @@ func PersistNotificationFromCRD(obj *v1.Notification) error {
func DeleteNotification(id string) error {
return Gorm.Delete(&models.Notification{}, "id = ?", id).Error
}

func UpdateNotificationError(id string, err string) error {
return Gorm.Model(&models.Notification{}).Where("id = ?", id).Update("error", err).Error
}
14 changes: 3 additions & 11 deletions events/event_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,6 @@ type (
//
// These events are generated by the database in response to updates on some of the tables.
const (
EventTeamUpdate = "team.update"
EventTeamDelete = "team.delete"

EventCheckPassed = "check.passed"
EventCheckFailed = "check.failed"

Expand All @@ -46,9 +43,6 @@ const (
EventComponentStatusWarning = "component.status.warning"
EventComponentStatusError = "component.status.error"

EventNotificationUpdate = "notification.update"
EventNotificationDelete = "notification.delete"

EventPlaybookSpecApprovalUpdated = "playbook.spec.approval.updated"

EventPlaybookApprovalInserted = "playbook.approval.inserted"
Expand Down Expand Up @@ -95,18 +89,16 @@ type Config struct {
func StartConsumers(ctx api.Context, config Config) {
// We listen to all PG Notifications on one channel and distribute it to other consumers
// based on the events.
notifyRouter := newPgNotifyRouter()
notifyRouter := NewPgNotifyRouter()
go notifyRouter.Run(ctx, eventQueueUpdateChannel)

uniqEvents := make(map[string]struct{})
allSyncHandlers := []SyncEventConsumer{
NewTeamConsumerSync(),
NewCheckConsumerSync(),
NewComponentConsumerSync(),
NewResponderConsumerSync(),
NewCommentConsumerSync(),
NewNotificationSaveConsumerSync(),
NewNotificationUpdatesConsumerSync(),
NewPlaybookApprovalConsumerSync(),
NewPlaybookApprovalSpecUpdatedConsumerSync(),
}
Expand All @@ -120,7 +112,7 @@ func StartConsumers(ctx api.Context, config Config) {
uniqEvents[event] = struct{}{}
}

pgNotifyChannel := notifyRouter.RegisterRoutes(allSyncHandlers[i].watchEvents)
pgNotifyChannel := notifyRouter.RegisterRoutes(allSyncHandlers[i].watchEvents...)
go allSyncHandlers[i].EventConsumer().Listen(ctx, pgNotifyChannel)
}

Expand All @@ -141,7 +133,7 @@ func StartConsumers(ctx api.Context, config Config) {
uniqEvents[event] = struct{}{}
}

pgNotifyChannel := notifyRouter.RegisterRoutes(asyncConsumers[i].watchEvents)
pgNotifyChannel := notifyRouter.RegisterRoutes(asyncConsumers[i].watchEvents...)
go asyncConsumers[i].EventConsumer().Listen(ctx, pgNotifyChannel)
}
}
Expand Down
Loading

0 comments on commit 72549a3

Please sign in to comment.