Skip to content

Commit

Permalink
custom ispushed updater
Browse files Browse the repository at this point in the history
  • Loading branch information
adityathebe committed Mar 4, 2024
1 parent 88944e7 commit 390a780
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 76 deletions.
18 changes: 18 additions & 0 deletions models/checks.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,15 @@ type CheckStatus struct {
IsPushed bool `json:"is_pushed,omitempty"`
}

func (s CheckStatus) UpdateIsPushed(db *gorm.DB, items []DBTable) error {
ids := lo.Map(items, func(a DBTable, _ int) []any {
c := any(a).(CheckStatus)
return []any{c.CheckID, c.Time}
})

return db.Model(&CheckStatus{}).Where("(check_id, time) IN ?", ids).Update("is_pushed", true).Error
}

func (s CheckStatus) GetUnpushed(db *gorm.DB) ([]DBTable, error) {
var items []CheckStatus
err := db.Select("check_statuses.*").
Expand Down Expand Up @@ -220,6 +229,15 @@ type CheckConfigRelationship struct {
DeletedAt *time.Time `json:"deleted_at,omitempty"`
}

func (s CheckConfigRelationship) UpdateIsPushed(db *gorm.DB, items []DBTable) error {
ids := lo.Map(items, func(a DBTable, _ int) []string {
c := any(a).(CheckConfigRelationship)
return []string{c.ConfigID.String(), c.CheckID.String(), c.CanaryID.String(), c.SelectorID}
})

return db.Model(&CheckConfigRelationship{}).Where("(config_id, check_id, canary_id, selector_id) IN ?", ids).Update("is_pushed", true).Error
}

func (c CheckConfigRelationship) GetUnpushed(db *gorm.DB) ([]DBTable, error) {
var items []CheckConfigRelationship
err := db.Select("check_config_relationships.*").
Expand Down
27 changes: 27 additions & 0 deletions models/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,15 @@ type ComponentRelationship struct {
DeletedAt *time.Time `json:"deleted_at,omitempty"`
}

func (s ComponentRelationship) UpdateIsPushed(db *gorm.DB, items []DBTable) error {
ids := lo.Map(items, func(a DBTable, _ int) []string {
c := any(a).(ComponentRelationship)
return []string{c.ComponentID.String(), c.RelationshipID.String(), c.SelectorID}
})

return db.Model(&ComponentRelationship{}).Where("(component_id, relationship_id, selector_id) IN ?", ids).Update("is_pushed", true).Error
}

func (cr ComponentRelationship) GetUnpushed(db *gorm.DB) ([]DBTable, error) {
var items []ComponentRelationship
err := db.Select("component_relationships.*").
Expand Down Expand Up @@ -484,6 +493,15 @@ type ConfigComponentRelationship struct {
DeletedAt *time.Time `json:"deleted_at,omitempty"`
}

func (s ConfigComponentRelationship) UpdateIsPushed(db *gorm.DB, items []DBTable) error {
ids := lo.Map(items, func(a DBTable, _ int) []string {
c := any(a).(ConfigComponentRelationship)
return []string{c.ComponentID.String(), c.ConfigID.String()}
})

return db.Model(&ConfigComponentRelationship{}).Where("(component_id, config_id) IN ?", ids).Update("is_pushed", true).Error
}

func (t ConfigComponentRelationship) GetUnpushed(db *gorm.DB) ([]DBTable, error) {
var items []ConfigComponentRelationship
err := db.Select("config_component_relationships.*").
Expand Down Expand Up @@ -521,6 +539,15 @@ type CheckComponentRelationship struct {
DeletedAt *time.Time `json:"deleted_at,omitempty"`
}

func (s CheckComponentRelationship) UpdateIsPushed(db *gorm.DB, items []DBTable) error {
ids := lo.Map(items, func(a DBTable, _ int) []string {
c := any(a).(CheckComponentRelationship)
return []string{c.ComponentID.String(), c.CheckID.String(), c.CanaryID.String(), c.SelectorID}
})

return db.Model(&CheckComponentRelationship{}).Where("(component_id, check_id, canary_id, selector_id) IN ?", ids).Update("is_pushed", true).Error
}

func (t CheckComponentRelationship) GetUnpushed(db *gorm.DB) ([]DBTable, error) {
var items []CheckComponentRelationship
err := db.Select("check_component_relationships.*").
Expand Down
9 changes: 9 additions & 0 deletions models/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,15 @@ type ConfigRelationship struct {
DeletedAt *time.Time `json:"deleted_at,omitempty"`
}

func (s ConfigRelationship) UpdateIsPushed(db *gorm.DB, items []DBTable) error {
ids := lo.Map(items, func(a DBTable, _ int) []string {
c := any(a).(ConfigRelationship)
return []string{c.RelatedID, c.ConfigID, c.SelectorID}
})

return db.Model(&ConfigRelationship{}).Where("(related_id, config_id, selector_id) IN ?", ids).Update("is_pushed", true).Error
}

func (t ConfigRelationship) GetUnpushed(db *gorm.DB) ([]DBTable, error) {
var items []ConfigRelationship
err := db.Select("config_relationships.*").
Expand Down
13 changes: 1 addition & 12 deletions upstream/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,19 +228,8 @@ func UpdateAgentLastReceived(ctx context.Context, id uuid.UUID) error {
}).Error
}

type dbTable interface {
PK() string
TableName() string
}

type pushableTable interface {
PK() string
TableName() string
GetUnpushed(db *gorm.DB) ([]models.DBTable, error)
}

// saveIndividuallyWithRetries saves the given records one by one and retries only on foreign key violation error.
func saveIndividuallyWithRetries[T dbTable](ctx context.Context, items []T, maxRetries int) error {
func saveIndividuallyWithRetries[T models.DBTable](ctx context.Context, items []T, maxRetries int) error {
var retries int
for {
var failed []T
Expand Down
78 changes: 15 additions & 63 deletions upstream/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,18 @@ import (
"github.com/flanksource/duty/context"
"github.com/flanksource/duty/models"
"github.com/samber/lo"
"gorm.io/gorm"
)

type pushableTable interface {
models.DBTable
GetUnpushed(db *gorm.DB) ([]models.DBTable, error)
}

type customIsPushedUpdater interface {
UpdateIsPushed(db *gorm.DB, items []models.DBTable) error
}

var reconciledTables = []pushableTable{
models.Topology{},
models.ConfigScraper{},
Expand Down Expand Up @@ -79,75 +89,17 @@ func reconcileTable(ctx context.Context, config UpstreamConfig, table pushableTa
if err := client.Push(ctx, NewPushData(items)); err != nil {
return 0, fmt.Errorf("failed to push %s to upstream: %w", table, err)
}
count += len(items)

switch table.TableName() {
case "check_statuses":
ids := lo.Map(items, func(a models.DBTable, _ int) []any {
c := any(a).(models.CheckStatus)
return []any{c.CheckID, c.Time}
})

if err := ctx.DB().Model(&models.CheckStatus{}).Where("(check_id, time) IN ?", ids).Update("is_pushed", true).Error; err != nil {
return 0, fmt.Errorf("failed to update is_pushed for check_statuses: %w", err)
}

case "component_relationships":
ids := lo.Map(items, func(a models.DBTable, _ int) []string {
c := any(a).(models.ComponentRelationship)
return []string{c.ComponentID.String(), c.RelationshipID.String(), c.SelectorID}
})

if err := ctx.DB().Model(&models.ComponentRelationship{}).Where("(component_id, relationship_id, selector_id) IN ?", ids).Update("is_pushed", true).Error; err != nil {
return 0, fmt.Errorf("failed to update is_pushed for component_relationships: %w", err)
}

case "config_component_relationships":
ids := lo.Map(items, func(a models.DBTable, _ int) []string {
c := any(a).(models.ConfigComponentRelationship)
return []string{c.ComponentID.String(), c.ConfigID.String()}
})

if err := ctx.DB().Model(&models.ConfigComponentRelationship{}).Where("(component_id, config_id) IN ?", ids).Update("is_pushed", true).Error; err != nil {
return 0, fmt.Errorf("failed to update is_pushed for config_component_relationships: %w", err)
}

case "config_relationships":
ids := lo.Map(items, func(a models.DBTable, _ int) []string {
c := any(a).(models.ConfigRelationship)
return []string{c.RelatedID, c.ConfigID, c.SelectorID}
})

if err := ctx.DB().Model(&models.ConfigRelationship{}).Where("(related_id, config_id, selector_id) IN ?", ids).Update("is_pushed", true).Error; err != nil {
return 0, fmt.Errorf("failed to update is_pushed for config_component_relationships: %w", err)
}

case "check_config_relationships":
ids := lo.Map(items, func(a models.DBTable, _ int) []string {
c := any(a).(models.CheckConfigRelationship)
return []string{c.ConfigID.String(), c.CheckID.String(), c.CanaryID.String(), c.SelectorID}
})

if err := ctx.DB().Model(&models.CheckConfigRelationship{}).Where("(config_id, check_id, canary_id, selector_id) IN ?", ids).Update("is_pushed", true).Error; err != nil {
return 0, fmt.Errorf("failed to update is_pushed for config_component_relationships: %w", err)
}

case "check_component_relationships":
ids := lo.Map(items, func(a models.DBTable, _ int) []string {
c := any(a).(models.CheckComponentRelationship)
return []string{c.ComponentID.String(), c.CheckID.String(), c.CanaryID.String(), c.SelectorID}
})

if err := ctx.DB().Model(&models.CheckComponentRelationship{}).Where("(component_id, check_id, canary_id, selector_id) IN ?", ids).Update("is_pushed", true).Error; err != nil {
return 0, fmt.Errorf("failed to update is_pushed for config_component_relationships: %w", err)
if c, ok := table.(customIsPushedUpdater); ok {
if err := c.UpdateIsPushed(ctx.DB(), items); err != nil {
return 0, fmt.Errorf("failed to update is_pushed for %s: %w", table, err)
}

default:
} else {
ids := lo.Map(items, func(a models.DBTable, _ int) string { return a.PK() })
if err := ctx.DB().Model(table).Where("id IN ?", ids).Update("is_pushed", true).Error; err != nil {
return 0, fmt.Errorf("failed to update is_pushed on %s: %w", table, err)
}
}

count += len(items)
}
}
2 changes: 1 addition & 1 deletion upstream/upstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ type PushData struct {
Artifacts []models.Artifact `json:"artifacts,omitempty"`
}

func NewPushData[T dbTable](records []T) *PushData {
func NewPushData[T models.DBTable](records []T) *PushData {
var p PushData
if len(records) == 0 {
return &p
Expand Down

0 comments on commit 390a780

Please sign in to comment.