Skip to content

Commit

Permalink
feat: use the same reconcileTable func
Browse files Browse the repository at this point in the history
  • Loading branch information
adityathebe committed Mar 1, 2024
1 parent 0854e22 commit cd7f41c
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 96 deletions.
4 changes: 4 additions & 0 deletions models/checks.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,10 @@ func (s CheckStatus) GetTime() (time.Time, error) {
return time.Parse(time.DateTime, s.Time)
}

func (s CheckStatus) PK() string {
return s.CheckID.String() + s.Time
}

func (CheckStatus) TableName() string {
return "check_statuses"
}
Expand Down
8 changes: 8 additions & 0 deletions models/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,10 @@ type ConfigChange struct {
IsPushed bool `json:"is_pushed,omitempty"`
}

func (c ConfigChange) PK() string {
return c.ID
}

func (c ConfigChange) TableName() string {
return "config_changes"
}
Expand Down Expand Up @@ -245,6 +249,10 @@ type ConfigAnalysis struct {
IsPushed bool `json:"is_pushed,omitempty"`
}

func (a ConfigAnalysis) PK() string {
return a.ID.String()
}

func (a ConfigAnalysis) TableName() string {
return "config_analysis"
}
Expand Down
8 changes: 4 additions & 4 deletions tests/upstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"github.com/flanksource/duty/upstream"
)

var _ = ginkgo.Describe("Config Changes & Analyses sync test", ginkgo.Ordered, func() {
var _ = ginkgo.Describe("Reconcile Test", ginkgo.Ordered, func() {
var upstreamCtx *context.Context
var echoCloser, drop func()
var upstreamConf upstream.UpstreamConfig
Expand Down Expand Up @@ -60,7 +60,7 @@ var _ = ginkgo.Describe("Config Changes & Analyses sync test", ginkgo.Ordered, f
})

ginkgo.It("should push config items first to satisfy foregin keys for changes & analyses", func() {
count, err := upstream.SyncIsPushedTable[models.ConfigItem](DefaultContext, upstreamConf, 100)
count, err := upstream.ReconcileTable[models.ConfigItem](DefaultContext, upstreamConf, 100)
Expect(err).To(BeNil())
Expect(count).To(Not(BeZero()))
})
Expand Down Expand Up @@ -121,7 +121,7 @@ var _ = ginkgo.Describe("Config Changes & Analyses sync test", ginkgo.Ordered, f
}
})

ginkgo.It("should push artifacts", func() {
ginkgo.It("should sync artifacts to upstream", func() {
var pushed int
err := DefaultContext.DB().Select("COUNT(*)").Where("is_pushed = true").Model(&models.Artifact{}).Scan(&pushed).Error
Expect(err).ToNot(HaveOccurred())
Expand All @@ -132,7 +132,7 @@ var _ = ginkgo.Describe("Config Changes & Analyses sync test", ginkgo.Ordered, f
Expect(err).ToNot(HaveOccurred())
Expect(artifacts).To(BeZero())

count, err := upstream.SyncIsPushedTable[models.Artifact](DefaultContext, upstreamConf, 10)
count, err := upstream.ReconcileTable[models.Artifact](DefaultContext, upstreamConf, 10)
Expect(err).ToNot(HaveOccurred())

err = upstreamCtx.DB().Select("COUNT(*)").Model(&models.Artifact{}).Scan(&artifacts).Error
Expand Down
126 changes: 34 additions & 92 deletions upstream/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,122 +7,64 @@ import (
"github.com/flanksource/duty/models"
"github.com/google/uuid"
"github.com/samber/lo"
"gorm.io/gorm"
)

// ReconcileTable pushes all unpushed items in a table to upstream.
func ReconcileTable[T dbTable](ctx context.Context, config UpstreamConfig, batchSize int) (int, error) {
return reconcileTable[T](ctx, config, nil, batchSize)
}

// SyncCheckStatuses pushes check statuses, that haven't already been pushed, to upstream.
func SyncCheckStatuses(ctx context.Context, config UpstreamConfig, batchSize int) (int, error) {
client := NewUpstreamClient(config)
count := 0
for {
var checkStatuses []models.CheckStatus
if err := ctx.DB().Select("check_statuses.*").
Joins("LEFT JOIN checks ON checks.id = check_statuses.check_id").
Where("checks.agent_id = ?", uuid.Nil).
Where("check_statuses.is_pushed IS FALSE").
Limit(batchSize).
Find(&checkStatuses).Error; err != nil {
return 0, fmt.Errorf("failed to fetch check_statuses: %w", err)
}

if len(checkStatuses) == 0 {
return count, nil
}

ctx.Tracef("pushing %d check_statuses to upstream", len(checkStatuses))
if err := client.Push(ctx, &PushData{CheckStatuses: checkStatuses}); err != nil {
return 0, fmt.Errorf("failed to push check_statuses to upstream: %w", err)
}

for i := range checkStatuses {
checkStatuses[i].IsPushed = true
}
fetcher := ctx.DB().Select("check_statuses.*").
Joins("LEFT JOIN checks ON checks.id = check_statuses.check_id").
Where("checks.agent_id = ?", uuid.Nil).
Where("check_statuses.is_pushed IS FALSE")

if err := ctx.DB().Save(&checkStatuses).Error; err != nil {
return 0, fmt.Errorf("failed to save check_statuses: %w", err)
}
count += len(checkStatuses)
}
return reconcileTable[models.CheckStatus](ctx, config, fetcher, batchSize)
}

// SyncConfigChanges pushes config changes, that haven't already been pushed, to upstream.
func SyncConfigChanges(ctx context.Context, config UpstreamConfig, batchSize int) (int, error) {
client := NewUpstreamClient(config)
count := 0
for {
var configChanges []models.ConfigChange
if err := ctx.DB().Select("config_changes.*").
Joins("LEFT JOIN config_items ON config_items.id = config_changes.config_id").
Where("config_items.agent_id = ?", uuid.Nil).
Where("config_changes.is_pushed IS FALSE").
Limit(batchSize).
Find(&configChanges).Error; err != nil {
return 0, fmt.Errorf("failed to fetch config_changes: %w", err)
}

if len(configChanges) == 0 {
return count, nil
}
fetcher := ctx.DB().Select("config_changes.*").
Joins("LEFT JOIN config_items ON config_items.id = config_changes.config_id").
Where("config_items.agent_id = ?", uuid.Nil).
Where("config_changes.is_pushed IS FALSE")

ctx.Tracef("pushing %d config_changes to upstream", len(configChanges))
if err := client.Push(ctx, &PushData{ConfigChanges: configChanges}); err != nil {
return 0, fmt.Errorf("failed to push config_changes to upstream: %w", err)
}

ids := lo.Map(configChanges, func(c models.ConfigChange, _ int) string { return c.ID })
if err := ctx.DB().Model(&models.ConfigChange{}).Where("id IN ?", ids).Update("is_pushed", true).Error; err != nil {
return 0, fmt.Errorf("failed to update is_pushed on config_changes: %w", err)
}

count += len(configChanges)
}
return reconcileTable[models.ConfigChange](ctx, config, fetcher, batchSize)
}

// SyncConfigAnalyses pushes config analyses, that haven't already been pushed, to upstream.
func SyncConfigAnalyses(ctx context.Context, config UpstreamConfig, batchSize int) (int, error) {
client := NewUpstreamClient(config)
count := 0
for {
var analyses []models.ConfigAnalysis
if err := ctx.DB().Select("config_analysis.*").
Joins("LEFT JOIN config_items ON config_items.id = config_analysis.config_id").
Where("config_items.agent_id = ?", uuid.Nil).
Where("config_analysis.is_pushed IS FALSE").
Limit(batchSize).
Find(&analyses).Error; err != nil {
return 0, fmt.Errorf("failed to fetch config_analysis: %w", err)
}
fetcher := ctx.DB().Select("config_analysis.*").
Joins("LEFT JOIN config_items ON config_items.id = config_analysis.config_id").
Where("config_items.agent_id = ?", uuid.Nil).
Where("config_analysis.is_pushed IS FALSE")

if len(analyses) == 0 {
return count, nil
}

ctx.Tracef("pushing %d config_analyses to upstream", len(analyses))
if err := client.Push(ctx, &PushData{ConfigAnalysis: analyses}); err != nil {
return 0, fmt.Errorf("failed to push config_analysis to upstream: %w", err)
}

ids := lo.Map(analyses, func(a models.ConfigAnalysis, _ int) string { return a.ID.String() })
if err := ctx.DB().Model(&models.ConfigAnalysis{}).Where("id IN ?", ids).Update("is_pushed", true).Error; err != nil {
return 0, fmt.Errorf("failed to update is_pushed on config_analysis: %w", err)
}

count += len(analyses)
}
return reconcileTable[models.ConfigAnalysis](ctx, config, fetcher, batchSize)
}

func SyncIsPushedTable[T dbTable](ctx context.Context, config UpstreamConfig, batchSize int) (int, error) {
// ReconcileTable pushes all unpushed items in a table to upstream.
func reconcileTable[T dbTable](ctx context.Context, config UpstreamConfig, fetcher *gorm.DB, batchSize int) (int, error) {
client := NewUpstreamClient(config)
var anon T
table := anon.TableName()

var count int
for {
var items []T
if err := ctx.DB().
Where("is_pushed IS FALSE").
Limit(batchSize).
Find(&items).Error; err != nil {
return 0, fmt.Errorf("failed to fetch unpushed items for table %s: %w", table, err)
if fetcher != nil {
if err := fetcher.Limit(batchSize).Find(&items).Error; err != nil {
return 0, fmt.Errorf("failed to fetch unpushed items for table %s: %w", table, err)
}
} else {
if err := ctx.DB().
Where("is_pushed IS FALSE").
Limit(batchSize).
Find(&items).Error; err != nil {
return 0, fmt.Errorf("failed to fetch unpushed items for table %s: %w", table, err)
}
}

if len(items) == 0 {
Expand Down

0 comments on commit cd7f41c

Please sign in to comment.