Skip to content

Commit

Permalink
fix: fk error with config items & components (#854)
Browse files Browse the repository at this point in the history
* fix: fk error with config items & components

* test: add similar test for config_items as well
  • Loading branch information
adityathebe authored Jun 20, 2024
1 parent ccbeabc commit c94f795
Show file tree
Hide file tree
Showing 4 changed files with 153 additions and 10 deletions.
22 changes: 20 additions & 2 deletions models/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,29 @@ type Component struct {
}

func (t Component) UpdateParentsIsPushed(db *gorm.DB, items []DBTable) error {
parentIDs := lo.Map(items, func(item DBTable, _ int) string {
componentWithTopology := lo.Filter(items, func(item DBTable, _ int) bool { return item.(Component).TopologyID != nil })
topologyParents := lo.Map(componentWithTopology, func(item DBTable, _ int) string {
return item.(Component).TopologyID.String()
})

return db.Model(&Topology{}).Where("id IN ?", parentIDs).Update("is_pushed", false).Error
if len(topologyParents) > 0 {
if err := db.Model(&Topology{}).Where("id IN ?", topologyParents).Update("is_pushed", false).Error; err != nil {
return err
}
}

// Components can also have another components as parent
componentWithComponentParent := lo.Filter(items, func(item DBTable, _ int) bool { return item.(Component).ParentId != nil })
componentParents := lo.Map(componentWithComponentParent, func(item DBTable, _ int) string {
return item.(Component).ParentId.String()
})
if len(componentParents) > 0 {
if err := db.Model(&Component{}).Where("id IN ?", componentParents).Update("is_pushed", false).Error; err != nil {
return err
}
}

return nil
}

func (t Component) GetUnpushed(db *gorm.DB) ([]DBTable, error) {
Expand Down
24 changes: 21 additions & 3 deletions models/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,29 @@ type ConfigItem struct {
}

func (t ConfigItem) UpdateParentsIsPushed(db *gorm.DB, items []DBTable) error {
parentIDs := lo.Map(items, func(item DBTable, _ int) string {
return lo.FromPtr(item.(ConfigItem).ScraperID)
configWithScraper := lo.Filter(items, func(item DBTable, _ int) bool { return item.(ConfigItem).ScraperID != nil })
scraperParents := lo.Map(configWithScraper, func(item DBTable, _ int) string {
return *item.(ConfigItem).ScraperID
})

return db.Model(&ConfigScraper{}).Where("id IN ?", parentIDs).Update("is_pushed", false).Error
if len(scraperParents) > 0 {
if err := db.Model(&ConfigScraper{}).Where("id IN ?", scraperParents).Update("is_pushed", false).Error; err != nil {
return err
}
}

// config items can also have another config items as parent
configWithConfigParent := lo.Filter(items, func(item DBTable, _ int) bool { return item.(ConfigItem).ParentID != nil })
configParents := lo.Map(configWithConfigParent, func(item DBTable, _ int) string {
return item.(ConfigItem).ParentID.String()
})
if len(configParents) > 0 {
if err := db.Model(&ConfigItem{}).Where("id IN ?", configParents).Update("is_pushed", false).Error; err != nil {
return err
}
}

return nil
}

func (t ConfigItem) GetUnpushed(db *gorm.DB) ([]DBTable, error) {
Expand Down
113 changes: 109 additions & 4 deletions tests/upstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/flanksource/commons/utils"
"github.com/flanksource/duty/context"
"github.com/flanksource/duty/models"
"github.com/flanksource/duty/tests/fixtures/dummy"
"github.com/flanksource/duty/tests/setup"
"github.com/flanksource/duty/upstream"
)
Expand Down Expand Up @@ -65,11 +66,59 @@ var _ = ginkgo.Describe("Reconcile Test", ginkgo.Ordered, func() {
}
})

ginkgo.It("should push config items first to satisfy foreign keys for changes & analyses", func() {
count, fkFailed, err := upstream.ReconcileSome(DefaultContext, upstreamConf, 100, "config_items")
Expect(err).To(BeNil())
ginkgo.It("should sync config items to upstream & deal with fk issue", func() {
{
var pushed int
err := DefaultContext.DB().Select("COUNT(*)").Where("is_pushed = true").Model(&models.ConfigItem{}).Scan(&pushed).Error
Expect(err).ToNot(HaveOccurred())
Expect(pushed).To(BeZero())
}

{
// Falsely, mark LogisticsAPIDeployment config as pushed. It's a parent config to other config items
// so we expect reconciliation to fail.
tx := DefaultContext.DB().Model(&models.ConfigItem{}).Where("id = ?", dummy.LogisticsAPIDeployment.ID).Update("is_pushed", true)
Expect(tx.Error).ToNot(HaveOccurred())
Expect(tx.RowsAffected).To(Equal(int64(1)))
}

var totalConfigsPushed int
err := upstreamCtx.DB().Select("COUNT(*)").Model(&models.ConfigItem{}).Scan(&totalConfigsPushed).Error
Expect(err).ToNot(HaveOccurred())
Expect(totalConfigsPushed).To(BeZero(), "upstream should have 0 config items as we haven't reconciled yet")

count, fkFailed, err := upstream.ReconcileSome(DefaultContext, upstreamConf, 1000, "config_items")
Expect(err).To(HaveOccurred())
Expect(fkFailed).To(Equal(2), "logistics replicaset & pod should fail to be synced")
Expect(count).To(Not(BeZero()))
Expect(fkFailed).To(BeZero())

err = upstreamCtx.DB().Select("COUNT(*)").Model(&models.ConfigItem{}).Scan(&totalConfigsPushed).Error
Expect(err).ToNot(HaveOccurred())
Expect(totalConfigsPushed).To(Equal(count))

var parentIsPushed bool
err = DefaultContext.DB().Model(&models.ConfigItem{}).Where("id = ?", dummy.LogisticsAPIDeployment.ID).Select("is_pushed").Scan(&parentIsPushed).Error
Expect(err).ToNot(HaveOccurred())
Expect(parentIsPushed).To(BeFalse(), "after the failed reconciliation, we expect the parent config to be marked as not pushed")

{
var pending int
err := DefaultContext.DB().Select("COUNT(*)").Where("is_pushed = false").Model(&models.ConfigItem{}).Scan(&pending).Error
Expect(err).ToNot(HaveOccurred())
Expect(pending).To(BeNumerically(">=", fkFailed))
}

{
count, fkFailed, err := upstream.ReconcileSome(DefaultContext, upstreamConf, 1000, "config_items")
Expect(err).To(BeNil())
Expect(fkFailed).To(BeZero())
Expect(count).To(Not(BeZero()))

var pending int
err = DefaultContext.DB().Select("COUNT(*)").Where("is_pushed = false").Model(&models.ConfigItem{}).Scan(&pending).Error
Expect(err).ToNot(HaveOccurred())
Expect(pending).To(BeZero())
}
})

ginkgo.It("should sync config_changes to upstream", func() {
Expand Down Expand Up @@ -101,6 +150,62 @@ var _ = ginkgo.Describe("Reconcile Test", ginkgo.Ordered, func() {
}
})

ginkgo.It("should sync components to upstream & deal with fk issue", func() {
{
var pushed int
err := DefaultContext.DB().Select("COUNT(*)").Where("is_pushed = true").Model(&models.Component{}).Scan(&pushed).Error
Expect(err).ToNot(HaveOccurred())
Expect(pushed).To(BeZero())
}

{
// Falsely, mark Logistic component as pushed. It's a parent component to other components
// so we expect reconciliation to fail.
tx := DefaultContext.DB().Model(&models.Component{}).Where("id = ?", dummy.Logistics.ID).Update("is_pushed", true)
Expect(tx.Error).ToNot(HaveOccurred())
Expect(tx.RowsAffected).To(Equal(int64(1)))
}

var totalComponents int
err := upstreamCtx.DB().Select("COUNT(*)").Model(&models.Component{}).Scan(&totalComponents).Error
Expect(err).ToNot(HaveOccurred())
Expect(totalComponents).To(BeZero(), "upstream should have 0 components as we haven't reconciled yet")

count, fkFailed, err := upstream.ReconcileSome(DefaultContext, upstreamConf, 1000, "components")
Expect(err).To(HaveOccurred())

Expect(fkFailed).To(Equal(4), "logistics api, ui, database & worker should fail to be synced")
Expect(count).To(Not(BeZero()))

err = upstreamCtx.DB().Select("COUNT(*)").Model(&models.Component{}).Scan(&totalComponents).Error
Expect(err).ToNot(HaveOccurred())
Expect(totalComponents).To(Equal(count))

var parentIsPushed bool
err = DefaultContext.DB().Model(&models.Component{}).Where("id = ?", dummy.Logistics.ID).Select("is_pushed").Scan(&parentIsPushed).Error
Expect(err).ToNot(HaveOccurred())
Expect(parentIsPushed).To(BeFalse(), "after the failed reconciliation, we expect the parent component to be marked as not pushed")

{
var pending int
err := DefaultContext.DB().Select("COUNT(*)").Where("is_pushed = false").Model(&models.Component{}).Scan(&pending).Error
Expect(err).ToNot(HaveOccurred())
Expect(pending).To(BeNumerically(">=", fkFailed))
}

{
count, fkFailed, err := upstream.ReconcileSome(DefaultContext, upstreamConf, 1000, "components")
Expect(err).To(BeNil())
Expect(fkFailed).To(BeZero())
Expect(count).To(Not(BeZero()))

var pending int
err = DefaultContext.DB().Select("COUNT(*)").Where("is_pushed = false").Model(&models.Component{}).Scan(&pending).Error
Expect(err).ToNot(HaveOccurred())
Expect(pending).To(BeZero())
}
})

ginkgo.It("should sync config_analyses to upstream", func() {
{
var pushed int
Expand Down
4 changes: 3 additions & 1 deletion upstream/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,9 @@ func saveIndividuallyWithRetries[T models.DBTable](ctx context.Context, items []
}

if retries > maxRetries {
return fmt.Errorf("failed to save %d items after %d retries", len(failed), maxRetries)
return api.Errorf(api.ECONFLICT, "foreign key error").
WithData(PushFKError{IDs: lo.Map(failed, func(i T, _ int) string { return i.PK() })}).
WithDebugInfo("foreign key error for %d items after %d retries", len(failed), retries)
}

items = failed
Expand Down

0 comments on commit c94f795

Please sign in to comment.