From d04d339058bc6b388bdfaad1e051bf2e0045d3b3 Mon Sep 17 00:00:00 2001 From: Aditya Thebe Date: Tue, 17 Sep 2024 10:55:17 +0545 Subject: [PATCH] feat: notification delay & event_queue delay --- connection/zz_generated.deepcopy.go | 1 - models/event_queue.go | 1 + models/notifications.go | 1 + postq/event.go | 1 + schema/notifications.hcl | 9 +++++++-- schema/system.hcl | 5 +++++ shell/zz_generated.deepcopy.go | 1 - types/zz_generated.deepcopy.go | 6 ++++-- 8 files changed, 19 insertions(+), 6 deletions(-) diff --git a/connection/zz_generated.deepcopy.go b/connection/zz_generated.deepcopy.go index 745eac2e..d45d5f27 100644 --- a/connection/zz_generated.deepcopy.go +++ b/connection/zz_generated.deepcopy.go @@ -1,5 +1,4 @@ //go:build !ignore_autogenerated -// +build !ignore_autogenerated // Code generated by controller-gen. DO NOT EDIT. diff --git a/models/event_queue.go b/models/event_queue.go index 035a7c94..18d69ea3 100644 --- a/models/event_queue.go +++ b/models/event_queue.go @@ -16,6 +16,7 @@ type Event struct { Name string `json:"name"` CreatedAt time.Time `json:"created_at"` Properties types.JSONStringMap `json:"properties"` + Delay *time.Duration `json:"delay,omitempty"` Error *string `json:"error,omitempty"` Attempts int `json:"attempts"` LastAttempt *time.Time `json:"last_attempt"` diff --git a/models/notifications.go b/models/notifications.go index 8596b257..c412b8e8 100644 --- a/models/notifications.go +++ b/models/notifications.go @@ -16,6 +16,7 @@ type Notification struct { Namespace string `json:"namespace,omitempty"` Events pq.StringArray `json:"events" gorm:"type:[]text"` Title string `json:"title,omitempty"` + WaitFor *time.Duration `json:"wait_for,omitempty"` Template string `json:"template,omitempty"` Filter string `json:"filter,omitempty"` PersonID *uuid.UUID `json:"person_id,omitempty"` diff --git a/postq/event.go b/postq/event.go index 19edb737..5ae9135e 100644 --- a/postq/event.go +++ b/postq/event.go @@ -35,6 +35,7 @@ func fetchEvents(ctx context.Context, tx *gorm.DB, watchEvents []string, batchSi WHERE id IN ( SELECT id FROM event_queue WHERE + (delay IS NULL OR created_at + (delay * INTERVAL '1 second' / 1000000000) <= NOW()) AND attempts <= @MaxAttempts AND name = ANY(@Events) AND (last_attempt IS NULL OR last_attempt <= NOW() - INTERVAL '1 SECOND' * @BaseDelay * POWER(attempts, @Exponent)) diff --git a/schema/notifications.hcl b/schema/notifications.hcl index fb3021bc..474e0f18 100644 --- a/schema/notifications.hcl +++ b/schema/notifications.hcl @@ -6,8 +6,8 @@ table "notifications" { default = sql("generate_ulid()") } column "name" { - null = false - type = text + null = false + type = text default = sql("generate_ulid()") # temporary default value to make the migration possible. we can remove this later. } column "namespace" { @@ -54,6 +54,11 @@ table "notifications" { null = true type = text } + column "wait_for" { + null = true + type = bigint + comment = "duration in nanoseconds" + } column "group_by" { null = true type = sql("text[]") diff --git a/schema/system.hcl b/schema/system.hcl index 25272c72..69381ec4 100644 --- a/schema/system.hcl +++ b/schema/system.hcl @@ -59,6 +59,11 @@ table "event_queue" { null = true type = text } + column "delay" { + null = true + type = bigint + comment = "wait for this duration (nanoseconds) before consuming" + } column "created_at" { null = false type = timestamptz diff --git a/shell/zz_generated.deepcopy.go b/shell/zz_generated.deepcopy.go index 14ee4a8c..f7479ad4 100644 --- a/shell/zz_generated.deepcopy.go +++ b/shell/zz_generated.deepcopy.go @@ -1,5 +1,4 @@ //go:build !ignore_autogenerated -// +build !ignore_autogenerated // Code generated by controller-gen. DO NOT EDIT. diff --git a/types/zz_generated.deepcopy.go b/types/zz_generated.deepcopy.go index eac6d0d3..6a69e995 100644 --- a/types/zz_generated.deepcopy.go +++ b/types/zz_generated.deepcopy.go @@ -373,7 +373,8 @@ func (in *Summary) DeepCopyInto(out *Summary) { if val == nil { (*out)[key] = nil } else { - in, out := &val, &outVal + inVal := (*in)[key] + in, out := &inVal, &outVal *out = make(map[string]int, len(*in)) for key, val := range *in { (*out)[key] = val @@ -390,7 +391,8 @@ func (in *Summary) DeepCopyInto(out *Summary) { if val == nil { (*out)[key] = nil } else { - in, out := &val, &outVal + inVal := (*in)[key] + in, out := &inVal, &outVal *out = make(map[string]int, len(*in)) for key, val := range *in { (*out)[key] = val