Skip to content

Commit

Permalink
feat: add is_pushed to relationship tables
Browse files Browse the repository at this point in the history
  • Loading branch information
adityathebe committed Mar 4, 2024
1 parent 9f1c672 commit 66d7f04
Show file tree
Hide file tree
Showing 6 changed files with 198 additions and 6 deletions.
4 changes: 4 additions & 0 deletions models/checks.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,10 @@ func (c *CheckConfigRelationship) Save(db *gorm.DB) error {
}).Create(c).Error
}

func (c CheckConfigRelationship) PK() string {
return c.ConfigID.String() + "," + c.CheckID.String() + "," + c.CanaryID.String() + "," + c.SelectorID
}

func (CheckConfigRelationship) TableName() string {
return "check_config_relationships"
}
22 changes: 17 additions & 5 deletions models/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,11 @@ type ComponentRelationship struct {
DeletedAt *time.Time `json:"deleted_at,omitempty"`
}

func (cr ComponentRelationship) TableName() string {
func (cr ComponentRelationship) PK() string {
return cr.ComponentID.String() + "," + cr.RelationshipID.String() + "," + cr.SelectorID
}

func (ComponentRelationship) TableName() string {
return "component_relationships"
}

Expand All @@ -462,6 +466,14 @@ type ConfigComponentRelationship struct {
DeletedAt *time.Time `json:"deleted_at,omitempty"`
}

func (t ConfigComponentRelationship) PK() string {
return t.ComponentID.String() + "," + t.ConfigID.String()
}

func (ConfigComponentRelationship) TableName() string {
return "config_component_relationships"
}

var ConfigID = func(c ConfigComponentRelationship, i int) string {
return c.ConfigID.String()
}
Expand All @@ -470,10 +482,6 @@ var ConfigSelectorID = func(c ConfigComponentRelationship, i int) string {
return c.SelectorID
}

func (cr ConfigComponentRelationship) TableName() string {
return "config_component_relationships"
}

type CheckComponentRelationship struct {
ComponentID uuid.UUID `json:"component_id,omitempty"`
CheckID uuid.UUID `json:"check_id,omitempty"`
Expand All @@ -491,6 +499,10 @@ func (c *CheckComponentRelationship) Save(db *gorm.DB) error {
}).Create(c).Error
}

func (c CheckComponentRelationship) PK() string {
return c.ComponentID.String() + "," + c.CheckID.String() + "," + c.CanaryID.String() + "," + c.SelectorID
}

func (CheckComponentRelationship) TableName() string {
return "check_component_relationships"
}
4 changes: 4 additions & 0 deletions models/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,10 @@ type ConfigRelationship struct {
DeletedAt *time.Time `json:"deleted_at,omitempty"`
}

func (cr ConfigRelationship) PK() string {
return cr.RelatedID + "," + cr.ConfigID + cr.SelectorID
}

func (cr ConfigRelationship) TableName() string {
return "config_relationships"
}
Expand Down
28 changes: 27 additions & 1 deletion schema/components.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,11 @@ table "component_relationships" {
null = true
type = text
}
column "is_pushed" {
null = false
default = false
type = bool
}

foreign_key "component_relationships_component_id_fkey" {
columns = [column.component_id]
Expand All @@ -130,6 +135,10 @@ table "component_relationships" {
on_delete = CASCADE
}

index "component_relationships_is_pushed_idx" {
columns = [column.is_pushed]
where = "is_pushed IS FALSE"
}
index "component_relationships_component_id_relationship_id_select_key" {
unique = true
columns = [column.component_id, column.relationship_id, column.selector_id]
Expand Down Expand Up @@ -386,6 +395,11 @@ table "check_component_relationships" {
null = true
type = text
}
column "is_pushed" {
null = false
default = false
type = bool
}
foreign_key "check_component_relationships_canary_id_fkey" {
columns = [column.canary_id]
ref_columns = [table.canaries.column.id]
Expand All @@ -408,7 +422,10 @@ table "check_component_relationships" {
unique = true
columns = [column.component_id, column.check_id, column.canary_id, column.selector_id]
}

index "check_component_relationships_is_pushed_idx" {
columns = [column.is_pushed]
where = "is_pushed IS FALSE"
}
index "idx_check_component_relationships_deleted_at" {
columns = [column.deleted_at]
}
Expand Down Expand Up @@ -443,6 +460,11 @@ table "config_component_relationships" {
null = true
type = text
}
column "is_pushed" {
null = false
default = false
type = bool
}
foreign_key "config_component_relationships_component_id_fkey" {
columns = [column.component_id]
ref_columns = [table.components.column.id]
Expand All @@ -462,4 +484,8 @@ table "config_component_relationships" {
index "idx_config_component_relationships_deleted_at" {
columns = [column.deleted_at]
}
index "config_component_relationships_is_pushed_idx" {
columns = [column.is_pushed]
where = "is_pushed IS FALSE"
}
}
18 changes: 18 additions & 0 deletions schema/config.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,11 @@ table "config_relationships" {
null = true
type = text
}
column "is_pushed" {
null = false
default = false
type = bool
}
foreign_key "config_relationships_config_id_fkey" {
columns = [column.config_id]
ref_columns = [table.config_items.column.id]
Expand All @@ -381,6 +386,10 @@ table "config_relationships" {
index "idx_config_relationships_deleted_at" {
columns = [column.deleted_at]
}
index "config_relationships_is_pushed_idx" {
columns = [column.is_pushed]
where = "is_pushed IS FALSE"
}
}

table "check_config_relationships" {
Expand Down Expand Up @@ -415,6 +424,11 @@ table "check_config_relationships" {
null = true
type = text
}
column "is_pushed" {
null = false
default = false
type = bool
}
foreign_key "check_config_relationships_canary_id_fkey" {
columns = [column.canary_id]
ref_columns = [table.canaries.column.id]
Expand All @@ -437,6 +451,10 @@ table "check_config_relationships" {
unique = true
columns = [column.config_id, column.check_id, column.canary_id, column.selector_id]
}
index "check_config_relationships_is_pushed_idx" {
columns = [column.is_pushed]
where = "is_pushed IS FALSE"
}
}

table "config_scrapers" {
Expand Down
128 changes: 128 additions & 0 deletions upstream/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,36 @@ func ReconcileAll(ctx context.Context, config UpstreamConfig, batchSize int) (in
count += c
}

if c, err := reconcileComponentRelationships(ctx, config, batchSize); err != nil {
return c, err
} else {
count += c
}

if c, err := reconcileConfigComponentRelationship(ctx, config, batchSize); err != nil {
return c, err
} else {
count += c
}

if c, err := reconcileCheckComponentRelationship(ctx, config, batchSize); err != nil {
return c, err
} else {
count += c
}

if c, err := reconcileConfigRelationship(ctx, config, batchSize); err != nil {
return c, err
} else {
count += c
}

if c, err := reconcileCheckConfigRelationship(ctx, config, batchSize); err != nil {
return c, err
} else {
count += c
}

return count, nil
}

Expand Down Expand Up @@ -108,6 +138,54 @@ func SyncConfigAnalyses(ctx context.Context, config UpstreamConfig, batchSize in
return reconcileTable[models.ConfigAnalysis](ctx, config, fetcher, batchSize)
}

func reconcileComponentRelationships(ctx context.Context, config UpstreamConfig, batchSize int) (int, error) {
fetcher := ctx.DB().Select("component_relationships.*").
Joins("LEFT JOIN components c ON component_relationships.component_id = c.id").
Joins("LEFT JOIN components rel ON component_relationships.relationship_id = rel.id").
Where("c.agent_id = ? AND rel.agent_id = ?", uuid.Nil, uuid.Nil).
Where("component_relationships.is_pushed IS FALSE")

return reconcileTable[models.ComponentRelationship](ctx, config, fetcher, batchSize)
}

func reconcileConfigComponentRelationship(ctx context.Context, config UpstreamConfig, batchSize int) (int, error) {
fetcher := ctx.DB().Select("config_component_relationships.*").
Joins("LEFT JOIN components c ON config_component_relationships.component_id = c.id").
Joins("LEFT JOIN config_items ci ON config_component_relationships.config_id = ci.id").
Where("c.agent_id = ? AND ci.agent_id = ?", uuid.Nil, uuid.Nil).
Where("config_component_relationships.is_pushed IS FALSE")

return reconcileTable[models.ConfigComponentRelationship](ctx, config, fetcher, batchSize)
}

func reconcileCheckComponentRelationship(ctx context.Context, config UpstreamConfig, batchSize int) (int, error) {
fetcher := ctx.DB().Select("check_component_relationships.*").
Joins("LEFT JOIN components c ON check_component_relationships.component_id = c.id").
Joins("LEFT JOIN canaries ON check_component_relationships.canary_id = canaries.id").
Where("c.agent_id = ? AND canaries.agent_id = ?", uuid.Nil, uuid.Nil).
Where("check_component_relationships.is_pushed IS FALSE")

return reconcileTable[models.CheckComponentRelationship](ctx, config, fetcher, batchSize)
}

func reconcileConfigRelationship(ctx context.Context, config UpstreamConfig, batchSize int) (int, error) {
fetcher := ctx.DB().Select("config_relationships.*").
Joins("LEFT JOIN config_items ci ON config_relationships.config_id = ci.id").
Where("ci.agent_id = ?", uuid.Nil, uuid.Nil).
Where("config_relationships.is_pushed IS FALSE")

return reconcileTable[models.ConfigRelationship](ctx, config, fetcher, batchSize)
}

func reconcileCheckConfigRelationship(ctx context.Context, config UpstreamConfig, batchSize int) (int, error) {
fetcher := ctx.DB().Select("check_config_relationships.*").
Joins("LEFT JOIN config_items ci ON check_config_relationships.config_id = ci.id").
Where("ci.agent_id = ?", uuid.Nil, uuid.Nil).
Where("check_config_relationships.is_pushed IS FALSE")

return reconcileTable[models.CheckConfigRelationship](ctx, config, fetcher, batchSize)
}

// ReconcileTable pushes all unpushed items in a table to upstream.
func reconcileTable[T dbTable](ctx context.Context, config UpstreamConfig, fetcher *gorm.DB, batchSize int) (int, error) {
client := NewUpstreamClient(config)
Expand Down Expand Up @@ -162,6 +240,56 @@ func reconcileTable[T dbTable](ctx context.Context, config UpstreamConfig, fetch
return 0, fmt.Errorf("failed to update is_pushed for check_statuses: %w", err)
}

case "component_relationships":
ids := lo.Map(items, func(a T, _ int) []string {
c := any(a).(models.ComponentRelationship)
return []string{c.ComponentID.String(), c.RelationshipID.String(), c.SelectorID}
})

if err := ctx.DB().Model(&models.ComponentRelationship{}).Where("(component_id, relationship_id, selector_id) IN ?", ids).Update("is_pushed", true).Error; err != nil {
return 0, fmt.Errorf("failed to update is_pushed for component_relationships: %w", err)
}

case "config_component_relationships":
ids := lo.Map(items, func(a T, _ int) []string {
c := any(a).(models.ConfigComponentRelationship)
return []string{c.ComponentID.String(), c.ConfigID.String()}
})

if err := ctx.DB().Model(&models.ConfigComponentRelationship{}).Where("(component_id, config_id) IN ?", ids).Update("is_pushed", true).Error; err != nil {
return 0, fmt.Errorf("failed to update is_pushed for config_component_relationships: %w", err)
}

case "config_relationships":
ids := lo.Map(items, func(a T, _ int) []string {
c := any(a).(models.ConfigRelationship)
return []string{c.RelatedID, c.ConfigID, c.SelectorID}
})

if err := ctx.DB().Model(&models.ConfigRelationship{}).Where("(related_id, config_id, selector_id) IN ?", ids).Update("is_pushed", true).Error; err != nil {
return 0, fmt.Errorf("failed to update is_pushed for config_component_relationships: %w", err)
}

case "check_config_relationships":
ids := lo.Map(items, func(a T, _ int) []string {
c := any(a).(models.CheckConfigRelationship)
return []string{c.ConfigID.String(), c.CheckID.String(), c.CanaryID.String(), c.SelectorID}
})

if err := ctx.DB().Model(&models.CheckConfigRelationship{}).Where("(config_id, check_id, canary_id, selector_id) IN ?", ids).Update("is_pushed", true).Error; err != nil {
return 0, fmt.Errorf("failed to update is_pushed for config_component_relationships: %w", err)
}

case "check_component_relationships":
ids := lo.Map(items, func(a T, _ int) []string {
c := any(a).(models.CheckComponentRelationship)
return []string{c.ComponentID.String(), c.CheckID.String(), c.CanaryID.String(), c.SelectorID}
})

if err := ctx.DB().Model(&models.CheckComponentRelationship{}).Where("(component_id, check_id, canary_id, selector_id) IN ?", ids).Update("is_pushed", true).Error; err != nil {
return 0, fmt.Errorf("failed to update is_pushed for config_component_relationships: %w", err)
}

default:
ids := lo.Map(items, func(a T, _ int) string { return a.PK() })
if err := ctx.DB().Model(anon).Where("id IN ?", ids).Update("is_pushed", true).Error; err != nil {
Expand Down

0 comments on commit 66d7f04

Please sign in to comment.