Skip to content

Commit

Permalink
chore: inline postq
Browse files Browse the repository at this point in the history
  • Loading branch information
moshloop committed Aug 14, 2024
1 parent 3e4b71d commit c2a770b
Show file tree
Hide file tree
Showing 12 changed files with 549 additions and 25 deletions.
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ require (
github.com/flanksource/commons v1.26.1
github.com/flanksource/gomplate/v3 v3.24.22
github.com/flanksource/kommons v0.31.4
github.com/flanksource/postq v0.1.3
github.com/google/cel-go v0.20.1
github.com/google/go-cmp v0.6.0
github.com/google/gops v0.3.28
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -792,8 +792,6 @@ github.com/flanksource/kommons v0.31.4 h1:zksAgYjZuwPgS8XTejDIWEYB0nPSU1i3Jxcavm
github.com/flanksource/kommons v0.31.4/go.mod h1:70BPMzjTvejsqRyVyAm/ZCeZ176toCvauaZjU03svnE=
github.com/flanksource/kubectl-neat v1.0.4 h1:t5/9CqgE84oEtB0KitgJ2+WIeLfD+RhXSxYrqb4X8yI=
github.com/flanksource/kubectl-neat v1.0.4/go.mod h1:Un/Voyh3cmiZNKQrW/TkAl28nAA7vwnwDGVjRErKjOw=
github.com/flanksource/postq v0.1.3 h1:eTslG04hwxAvntZv8gIUsXKQPLGeLiRPNkZC+kQdL7c=
github.com/flanksource/postq v0.1.3/go.mod h1:AAuaPRhpqxvyF7JPs8X1NMsJVenh80ldpJPDVgWvFf8=
github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k=
github.com/fogleman/gg v1.3.0/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k=
github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k=
Expand Down
1 change: 0 additions & 1 deletion hack/migrate/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ require (
github.com/flanksource/is-healthy v1.0.26 // indirect
github.com/flanksource/kommons v0.31.4 // indirect
github.com/flanksource/kubectl-neat v1.0.4 // indirect
github.com/flanksource/postq v0.1.3 // indirect
github.com/ghodss/yaml v1.0.0 // indirect
github.com/go-errors/errors v1.4.2 // indirect
github.com/go-logr/logr v1.4.1 // indirect
Expand Down
2 changes: 0 additions & 2 deletions hack/migrate/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -784,8 +784,6 @@ github.com/flanksource/kommons v0.31.4 h1:zksAgYjZuwPgS8XTejDIWEYB0nPSU1i3Jxcavm
github.com/flanksource/kommons v0.31.4/go.mod h1:70BPMzjTvejsqRyVyAm/ZCeZ176toCvauaZjU03svnE=
github.com/flanksource/kubectl-neat v1.0.4 h1:t5/9CqgE84oEtB0KitgJ2+WIeLfD+RhXSxYrqb4X8yI=
github.com/flanksource/kubectl-neat v1.0.4/go.mod h1:Un/Voyh3cmiZNKQrW/TkAl28nAA7vwnwDGVjRErKjOw=
github.com/flanksource/postq v0.1.3 h1:eTslG04hwxAvntZv8gIUsXKQPLGeLiRPNkZC+kQdL7c=
github.com/flanksource/postq v0.1.3/go.mod h1:AAuaPRhpqxvyF7JPs8X1NMsJVenh80ldpJPDVgWvFf8=
github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k=
github.com/fogleman/gg v1.3.0/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k=
github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k=
Expand Down
46 changes: 27 additions & 19 deletions models/event_queue.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package models

import (
"context"
"time"

"github.com/flanksource/duty/types"
"github.com/flanksource/postq"
"github.com/google/uuid"
"gorm.io/gorm"
)

// Event represents the event queue table.

type Event struct {
ID uuid.UUID `gorm:"default:generate_ulid()"`
Name string `json:"name"`
Expand All @@ -19,38 +22,43 @@ type Event struct {
Priority int `json:"priority"`
}

func (t Event) ToPostQEvent() postq.Event {
return postq.Event{
ID: t.ID,
Name: t.Name,
Error: t.Error,
Attempts: t.Attempts,
LastAttempt: t.LastAttempt,
Properties: t.Properties,
CreatedAt: t.CreatedAt,
}
}

// We are using the term `Event` as it represents an event in the
// event_queue table, but the table is named event_queue
// to signify it's usage as a queue
func (Event) TableName() string {
return "event_queue"
}

func (e Event) PK() string {
return e.ID.String()
func (t *Event) SetError(err string) {
t.Error = &err
}

type Events []Event

func (events Events) ToPostQEvents() postq.Events {
var output []postq.Event
// Recreate creates the given failed events in batches after updating the
// attempts count.
func (events Events) Recreate(ctx context.Context, tx *gorm.DB) error {
if len(events) == 0 {
return nil
}

var batch Events
for _, event := range events {
output = append(output, event.ToPostQEvent())
batch = append(batch, Event{
Name: event.Name,
Properties: event.Properties,
Error: event.Error,
Attempts: event.Attempts + 1,
LastAttempt: event.LastAttempt,
Priority: event.Priority,
})
}

return output
return tx.CreateInBatches(batch, 100).Error
}

func (e Event) PK() string {
return e.ID.String()
}

type EventQueueSummary struct {
Expand Down
81 changes: 81 additions & 0 deletions postq/async_consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package postq

import (
"container/ring"
gocontext "context"
"fmt"

"github.com/flanksource/duty/context"
"github.com/flanksource/duty/models"
)

// AsyncEventHandlerFunc processes multiple events and returns the failed ones
type AsyncEventHandlerFunc func(context.Context, models.Events) models.Events

type AsyncEventConsumer struct {
eventLog *ring.Ring

// Name of the events in the push queue to watch for.
WatchEvents []string

// Number of events to be fetched and processed at a time.
BatchSize int

// An async event handler that consumes events.
Consumer AsyncEventHandlerFunc

// ConsumerOption is the configuration for the PGConsumer.
ConsumerOption *ConsumerOption

// EventFetcherOption contains configuration on how the events should be fetched.
EventFetcherOption *EventFetcherOption
}

// RecordEvents will record all the events fetched by the consumer in a ring buffer.
func (t *AsyncEventConsumer) RecordEvents(size int) {
t.eventLog = ring.New(size)
}

func (t AsyncEventConsumer) GetRecords() ([]models.Event, error) {
if t.eventLog == nil {
return nil, fmt.Errorf("event log is not initialized")
}

return getRecords(t.eventLog), nil
}

func (t *AsyncEventConsumer) Handle(ctx context.Context) (int, error) {
ctx = ctx.WithName("postq").Fast()
tx := ctx.DB().Begin()
defer tx.Rollback() //nolint:errcheck

events, err := fetchEvents(ctx, tx, t.WatchEvents, t.BatchSize, t.EventFetcherOption)
if err != nil {
return 0, fmt.Errorf("error fetching events: %w", err)
}

if t.eventLog != nil {
for _, event := range events {
t.eventLog.Value = event
t.eventLog = t.eventLog.Next()
}
}
c := ctx.Wrap(gocontext.Background())
failedEvents := t.Consumer(c, events)
if err := failedEvents.Recreate(ctx, tx); err != nil {
ctx.Debugf("error saving event attempt updates to event_queue: %v\n", err)
}

return len(events), tx.Commit().Error
}

func (t AsyncEventConsumer) EventConsumer() (*PGConsumer, error) {
return NewPGConsumer(t.Handle, t.ConsumerOption)
}

// AsyncHandler converts the given user defined handler into a async event handler.
func AsyncHandler(fn func(ctx context.Context, e models.Events) models.Events) AsyncEventHandlerFunc {
return func(ctx context.Context, e models.Events) models.Events {
return fn(ctx, e)
}
}
87 changes: 87 additions & 0 deletions postq/event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package postq

import (
"strings"

"github.com/flanksource/duty/context"
"github.com/flanksource/duty/models"
"github.com/lib/pq"
"github.com/samber/oops"
"gorm.io/gorm"
)

type EventFetcherOption struct {
// MaxAttempts is the number of times an event is attempted to process
// default: 3
MaxAttempts int

// BaseDelay is the base delay between retries
// default: 60 seconds
BaseDelay int

// Exponent is the exponent of the base delay
// default: 5 (along with baseDelay = 60, the retries are 1, 6, 31, 156 (in minutes))
Exponent int
}

// fetchEvents fetches given watch events from the `event_queue` table.
func fetchEvents(ctx context.Context, tx *gorm.DB, watchEvents []string, batchSize int, opts *EventFetcherOption) ([]models.Event, error) {
if batchSize == 0 {
batchSize = 1
}

const selectEventsQuery = `
DELETE FROM event_queue
WHERE id IN (
SELECT id FROM event_queue
WHERE
attempts <= @MaxAttempts AND
name = ANY(@Events) AND
(last_attempt IS NULL OR last_attempt <= NOW() - INTERVAL '1 SECOND' * @BaseDelay * POWER(attempts, @Exponent))
ORDER BY priority DESC, created_at ASC
FOR UPDATE SKIP LOCKED
LIMIT @BatchSize
)
RETURNING *
`

type EventArgs struct {
Events pq.StringArray
BatchSize int
MaxAttempts int
BaseDelay int
Exponent int
}

args := EventArgs{
Events: watchEvents,
BatchSize: batchSize,
MaxAttempts: 3,
BaseDelay: 60,
Exponent: 5,
}

if opts != nil {
if opts.MaxAttempts > 0 {
args.MaxAttempts = opts.MaxAttempts
}

if opts.BaseDelay > 0 {
args.BaseDelay = opts.BaseDelay
}

if opts.Exponent > 0 {
args.Exponent = opts.Exponent
}
}
var events []models.Event

if err := tx.Raw(selectEventsQuery, args).Scan(&events).Error; err != nil {
return nil, oops.Tags("db").Wrap(err)
}

if len(events) > 0 {
ctx.Tracef("queue=%s fetched=%d", strings.Join(watchEvents, ","), len(events))
}
return events, nil
}
57 changes: 57 additions & 0 deletions postq/pg/notify.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package pg

import (
gocontext "context"
"fmt"
"time"

"github.com/flanksource/duty/context"
"github.com/sethvargo/go-retry"
)

// Defaults ...
var (
DBReconnectMaxDuration = time.Minute * 5
DBReconnectBackoffBaseDuration = time.Second
)

// Listen listens to postgres notifications.
// On failure, it'll keep retrying with backoff
func Listen(ctx context.Context, channel string, pgNotify chan<- string) {
var listen = func(ctx context.Context, pgNotify chan<- string) error {
conn, err := ctx.Pool().Acquire(ctx)
if err != nil {
return fmt.Errorf("error acquiring database connection: %v", err)
}
defer conn.Release()

_, err = conn.Exec(ctx, fmt.Sprintf("LISTEN %s", channel))
if err != nil {
return fmt.Errorf("error listening to database notifications: %v", err)
}

for {
n, err := conn.Conn().WaitForNotification(ctx)
if err != nil {
return fmt.Errorf("error listening to database notifications: %v", err)
}

pgNotify <- n.Payload
}
}

// retry on failure.
for {
backoff := retry.WithMaxDuration(DBReconnectMaxDuration, retry.NewExponential(DBReconnectBackoffBaseDuration))
err := retry.Do(ctx, backoff, func(retryContext gocontext.Context) error {
ctx := retryContext.(context.Context)
if err := listen(ctx, pgNotify); err != nil {
return retry.RetryableError(err)
}

return nil
})

fmt.Printf("failed to connect to database: %v\n", err)
}
}
50 changes: 50 additions & 0 deletions postq/pg/router.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package pg

import (
"strings"

"github.com/flanksource/duty/context"
)

// notifyRouter distributes the pgNotify event to multiple channels
// based on the payload.
type notifyRouter struct {
registry map[string]chan string
}

func NewNotifyRouter() *notifyRouter {
return &notifyRouter{
registry: make(map[string]chan string),
}
}

// RegisterRoutes creates a single channel for the given routes and returns it.
func (t *notifyRouter) RegisterRoutes(routes ...string) <-chan string {
pgNotifyChannel := make(chan string)
for _, we := range routes {
t.registry[we] = pgNotifyChannel
}

return pgNotifyChannel
}

func (t *notifyRouter) Run(ctx context.Context, channel string) {
eventQueueNotifyChannel := make(chan string)
go Listen(ctx, channel, eventQueueNotifyChannel)

for payload := range eventQueueNotifyChannel {
if _, ok := t.registry[payload]; !ok || payload == "" {
continue
}

// The original payload is expected to be in the form of
// <route> <...optional payload>
fields := strings.Fields(payload)
route := fields[0]
derivedPayload := strings.Join(fields[1:], " ")

if ch, ok := t.registry[route]; ok {
ch <- derivedPayload
}
}
}
Loading

0 comments on commit c2a770b

Please sign in to comment.