diff --git a/.gitignore b/.gitignore index 47c683cf..5a79fa32 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ .bin *.out.json ginkgo.report +.vscode \ No newline at end of file diff --git a/context/properties.go b/context/properties.go index 8a3d37b1..a9dd6e1b 100644 --- a/context/properties.go +++ b/context/properties.go @@ -34,10 +34,9 @@ func (p Properties) Duration(key string, def time.Duration) time.Duration { } else if dur, err := time.ParseDuration(d); err != nil { logger.Warnf("property[%s] invalid duration %s", key, d) return def - } else if err == nil { + } else { return dur } - return def } func (p Properties) Int(key string, def int) int { @@ -46,10 +45,9 @@ func (p Properties) Int(key string, def int) int { } else if i, err := strconv.Atoi(d); err != nil { logger.Warnf("property[%s] invalid int %s", key, d) return def - } else if err == nil { + } else { return i } - return def } func (p Properties) Off(key string) bool { @@ -84,7 +82,11 @@ func (k Context) Properties() Properties { return props } -func SetLocalProperty(ctx Context, property, value string) { +func SetLocalProperty(property, value string) { + if Local == nil { + Local = make(map[string]string) + } + Local[property] = value } diff --git a/models/artifacts.go b/models/artifacts.go index 0e987a77..48622841 100644 --- a/models/artifacts.go +++ b/models/artifacts.go @@ -4,6 +4,8 @@ import ( "time" "github.com/google/uuid" + "github.com/samber/lo" + "gorm.io/gorm" ) // Artifact represents the artifacts table @@ -25,3 +27,17 @@ type Artifact struct { DeletedAt *time.Time `json:"deleted_at,omitempty" yaml:"deleted_at,omitempty" time_format:"postgres_timestamp"` ExpiresAt *time.Time `json:"expires_at,omitempty" yaml:"expires_at,omitempty" time_format:"postgres_timestamp"` } + +func (t Artifact) TableName() string { + return "artifacts" +} + +func (t Artifact) PK() string { + return t.ID.String() +} + +func (t Artifact) GetUnpushed(db *gorm.DB) ([]DBTable, error) { + var items []Artifact + err := db.Where("is_pushed IS FALSE").Find(&items).Error + return lo.Map(items, func(i Artifact, _ int) DBTable { return i }), err +} diff --git a/models/canary.go b/models/canary.go index 02f47fae..17378532 100644 --- a/models/canary.go +++ b/models/canary.go @@ -5,6 +5,8 @@ import ( "github.com/flanksource/duty/types" "github.com/google/uuid" + "github.com/samber/lo" + "gorm.io/gorm" ) type Canary struct { @@ -22,10 +24,20 @@ type Canary struct { DeletedAt *time.Time `json:"deleted_at,omitempty" yaml:"deleted_at,omitempty" time_format:"postgres_timestamp"` } +func (t Canary) GetUnpushed(db *gorm.DB) ([]DBTable, error) { + var items []Canary + err := db.Where("is_pushed IS FALSE").Find(&items).Error + return lo.Map(items, func(i Canary, _ int) DBTable { return i }), err +} + func (c Canary) GetCheckID(checkName string) string { return c.Checks[checkName] } +func (c Canary) PK() string { + return c.ID.String() +} + func (c Canary) TableName() string { return "canaries" } diff --git a/models/checks.go b/models/checks.go index 173b13da..318fcaf8 100644 --- a/models/checks.go +++ b/models/checks.go @@ -6,6 +6,7 @@ import ( "github.com/flanksource/duty/types" "github.com/google/uuid" + "github.com/samber/lo" "gorm.io/gorm" "gorm.io/gorm/clause" ) @@ -59,6 +60,16 @@ type Check struct { TotalRuns int `json:"totalRuns,omitempty" gorm:"-"` } +func (t Check) GetUnpushed(db *gorm.DB) ([]DBTable, error) { + var items []Check + err := db.Where("is_pushed IS FALSE").Find(&items).Error + return lo.Map(items, func(i Check, _ int) DBTable { return i }), err +} + +func (c Check) PK() string { + return c.ID.String() +} + func (c Check) TableName() string { return "checks" } @@ -112,6 +123,29 @@ type CheckStatus struct { IsPushed bool `json:"is_pushed,omitempty"` } +func (s CheckStatus) UpdateIsPushed(db *gorm.DB, items []DBTable) error { + ids := lo.Map(items, func(a DBTable, _ int) []any { + c := any(a).(CheckStatus) + return []any{c.CheckID, c.Time} + }) + + return db.Model(&CheckStatus{}).Where("(check_id, time) IN ?", ids).Update("is_pushed", true).Error +} + +func (s CheckStatus) GetUnpushed(db *gorm.DB) ([]DBTable, error) { + var items []CheckStatus + err := db.Select("check_statuses.*"). + Joins("LEFT JOIN checks ON checks.id = check_statuses.check_id"). + Where("checks.agent_id = ?", uuid.Nil). + Where("check_statuses.is_pushed IS FALSE"). + Find(&items).Error + return lo.Map(items, func(i CheckStatus, _ int) DBTable { return i }), err +} + +func (s CheckStatus) PK() string { + return s.CheckID.String() + s.Time +} + func (s CheckStatus) GetTime() (time.Time, error) { return time.Parse(time.DateTime, s.Time) } @@ -195,6 +229,25 @@ type CheckConfigRelationship struct { DeletedAt *time.Time `json:"deleted_at,omitempty"` } +func (s CheckConfigRelationship) UpdateIsPushed(db *gorm.DB, items []DBTable) error { + ids := lo.Map(items, func(a DBTable, _ int) []string { + c := any(a).(CheckConfigRelationship) + return []string{c.ConfigID.String(), c.CheckID.String(), c.CanaryID.String(), c.SelectorID} + }) + + return db.Model(&CheckConfigRelationship{}).Where("(config_id, check_id, canary_id, selector_id) IN ?", ids).Update("is_pushed", true).Error +} + +func (c CheckConfigRelationship) GetUnpushed(db *gorm.DB) ([]DBTable, error) { + var items []CheckConfigRelationship + err := db.Select("check_config_relationships.*"). + Joins("LEFT JOIN config_items ci ON check_config_relationships.config_id = ci.id"). + Where("ci.agent_id = ?", uuid.Nil). + Where("check_config_relationships.is_pushed IS FALSE"). + Find(&items).Error + return lo.Map(items, func(i CheckConfigRelationship, _ int) DBTable { return i }), err +} + func (c *CheckConfigRelationship) Save(db *gorm.DB) error { return db.Clauses(clause.OnConflict{ Columns: []clause.Column{{Name: "canary_id"}, {Name: "check_id"}, {Name: "config_id"}, {Name: "selector_id"}}, @@ -202,6 +255,10 @@ func (c *CheckConfigRelationship) Save(db *gorm.DB) error { }).Create(c).Error } +func (c CheckConfigRelationship) PK() string { + return c.ConfigID.String() + "," + c.CheckID.String() + "," + c.CanaryID.String() + "," + c.SelectorID +} + func (CheckConfigRelationship) TableName() string { return "check_config_relationships" } diff --git a/models/common.go b/models/common.go index 4957ef28..93c1f8a8 100644 --- a/models/common.go +++ b/models/common.go @@ -26,3 +26,8 @@ func asMap(t any, removeFields ...string) map[string]any { return m } + +type DBTable interface { + PK() string + TableName() string +} diff --git a/models/components.go b/models/components.go index ba68b771..5be67388 100644 --- a/models/components.go +++ b/models/components.go @@ -12,6 +12,7 @@ import ( "github.com/flanksource/commons/logger" "github.com/flanksource/duty/types" "github.com/google/uuid" + "github.com/samber/lo" "gorm.io/gorm" "gorm.io/gorm/clause" "gorm.io/gorm/schema" @@ -72,6 +73,12 @@ type Component struct { NodeProcessed bool `json:"-" gorm:"-"` } +func (t Component) GetUnpushed(db *gorm.DB) ([]DBTable, error) { + var items []Component + err := db.Where("is_pushed IS FALSE").Find(&items).Error + return lo.Map(items, func(i Component, _ int) DBTable { return i }), err +} + func (c Component) PK() string { return c.ID.String() } @@ -449,7 +456,31 @@ type ComponentRelationship struct { DeletedAt *time.Time `json:"deleted_at,omitempty"` } -func (cr ComponentRelationship) TableName() string { +func (s ComponentRelationship) UpdateIsPushed(db *gorm.DB, items []DBTable) error { + ids := lo.Map(items, func(a DBTable, _ int) []string { + c := any(a).(ComponentRelationship) + return []string{c.ComponentID.String(), c.RelationshipID.String(), c.SelectorID} + }) + + return db.Model(&ComponentRelationship{}).Where("(component_id, relationship_id, selector_id) IN ?", ids).Update("is_pushed", true).Error +} + +func (cr ComponentRelationship) GetUnpushed(db *gorm.DB) ([]DBTable, error) { + var items []ComponentRelationship + err := db.Select("component_relationships.*"). + Joins("LEFT JOIN components c ON component_relationships.component_id = c.id"). + Joins("LEFT JOIN components rel ON component_relationships.relationship_id = rel.id"). + Where("c.agent_id = ? AND rel.agent_id = ?", uuid.Nil, uuid.Nil). + Where("component_relationships.is_pushed IS FALSE"). + Find(&items).Error + return lo.Map(items, func(i ComponentRelationship, _ int) DBTable { return i }), err +} + +func (cr ComponentRelationship) PK() string { + return cr.ComponentID.String() + "," + cr.RelationshipID.String() + "," + cr.SelectorID +} + +func (ComponentRelationship) TableName() string { return "component_relationships" } @@ -462,6 +493,34 @@ type ConfigComponentRelationship struct { DeletedAt *time.Time `json:"deleted_at,omitempty"` } +func (s ConfigComponentRelationship) UpdateIsPushed(db *gorm.DB, items []DBTable) error { + ids := lo.Map(items, func(a DBTable, _ int) []string { + c := any(a).(ConfigComponentRelationship) + return []string{c.ComponentID.String(), c.ConfigID.String()} + }) + + return db.Model(&ConfigComponentRelationship{}).Where("(component_id, config_id) IN ?", ids).Update("is_pushed", true).Error +} + +func (t ConfigComponentRelationship) GetUnpushed(db *gorm.DB) ([]DBTable, error) { + var items []ConfigComponentRelationship + err := db.Select("config_component_relationships.*"). + Joins("LEFT JOIN components c ON config_component_relationships.component_id = c.id"). + Joins("LEFT JOIN config_items ci ON config_component_relationships.config_id = ci.id"). + Where("c.agent_id = ? AND ci.agent_id = ?", uuid.Nil, uuid.Nil). + Where("config_component_relationships.is_pushed IS FALSE"). + Find(&items).Error + return lo.Map(items, func(i ConfigComponentRelationship, _ int) DBTable { return i }), err +} + +func (t ConfigComponentRelationship) PK() string { + return t.ComponentID.String() + "," + t.ConfigID.String() +} + +func (ConfigComponentRelationship) TableName() string { + return "config_component_relationships" +} + var ConfigID = func(c ConfigComponentRelationship, i int) string { return c.ConfigID.String() } @@ -470,10 +529,6 @@ var ConfigSelectorID = func(c ConfigComponentRelationship, i int) string { return c.SelectorID } -func (cr ConfigComponentRelationship) TableName() string { - return "config_component_relationships" -} - type CheckComponentRelationship struct { ComponentID uuid.UUID `json:"component_id,omitempty"` CheckID uuid.UUID `json:"check_id,omitempty"` @@ -484,6 +539,26 @@ type CheckComponentRelationship struct { DeletedAt *time.Time `json:"deleted_at,omitempty"` } +func (s CheckComponentRelationship) UpdateIsPushed(db *gorm.DB, items []DBTable) error { + ids := lo.Map(items, func(a DBTable, _ int) []string { + c := any(a).(CheckComponentRelationship) + return []string{c.ComponentID.String(), c.CheckID.String(), c.CanaryID.String(), c.SelectorID} + }) + + return db.Model(&CheckComponentRelationship{}).Where("(component_id, check_id, canary_id, selector_id) IN ?", ids).Update("is_pushed", true).Error +} + +func (t CheckComponentRelationship) GetUnpushed(db *gorm.DB) ([]DBTable, error) { + var items []CheckComponentRelationship + err := db.Select("check_component_relationships.*"). + Joins("LEFT JOIN components c ON check_component_relationships.component_id = c.id"). + Joins("LEFT JOIN canaries ON check_component_relationships.canary_id = canaries.id"). + Where("c.agent_id = ? AND canaries.agent_id = ?", uuid.Nil, uuid.Nil). + Where("check_component_relationships.is_pushed IS FALSE"). + Find(&items).Error + return lo.Map(items, func(i CheckComponentRelationship, _ int) DBTable { return i }), err +} + func (c *CheckComponentRelationship) Save(db *gorm.DB) error { return db.Clauses(clause.OnConflict{ Columns: []clause.Column{{Name: "canary_id"}, {Name: "check_id"}, {Name: "component_id"}, {Name: "selector_id"}}, @@ -491,6 +566,10 @@ func (c *CheckComponentRelationship) Save(db *gorm.DB) error { }).Create(c).Error } +func (c CheckComponentRelationship) PK() string { + return c.ComponentID.String() + "," + c.CheckID.String() + "," + c.CanaryID.String() + "," + c.SelectorID +} + func (CheckComponentRelationship) TableName() string { return "check_component_relationships" } diff --git a/models/config.go b/models/config.go index d8bc517a..2506ec5e 100644 --- a/models/config.go +++ b/models/config.go @@ -10,6 +10,7 @@ import ( "github.com/flanksource/duty/types" "github.com/google/uuid" "github.com/lib/pq" + "github.com/samber/lo" "gorm.io/gorm" "gorm.io/gorm/clause" ) @@ -93,6 +94,12 @@ type ConfigItem struct { DeleteReason string `json:"delete_reason,omitempty"` } +func (t ConfigItem) GetUnpushed(db *gorm.DB) ([]DBTable, error) { + var items []ConfigItem + err := db.Where("is_pushed IS FALSE").Find(&items).Error + return lo.Map(items, func(i ConfigItem, _ int) DBTable { return i }), err +} + func (t ConfigItem) PK() string { return t.ID.String() } @@ -147,6 +154,16 @@ type ConfigScraper struct { DeletedAt *time.Time `json:"deleted_at,omitempty"` } +func (t ConfigScraper) GetUnpushed(db *gorm.DB) ([]DBTable, error) { + var items []ConfigScraper + err := db.Where("is_pushed IS FALSE").Find(&items).Error + return lo.Map(items, func(i ConfigScraper, _ int) DBTable { return i }), err +} + +func (c ConfigScraper) PK() string { + return c.ID.String() +} + func (c ConfigScraper) TableName() string { return "config_scrapers" } @@ -173,6 +190,29 @@ type ConfigRelationship struct { DeletedAt *time.Time `json:"deleted_at,omitempty"` } +func (s ConfigRelationship) UpdateIsPushed(db *gorm.DB, items []DBTable) error { + ids := lo.Map(items, func(a DBTable, _ int) []string { + c := any(a).(ConfigRelationship) + return []string{c.RelatedID, c.ConfigID, c.SelectorID} + }) + + return db.Model(&ConfigRelationship{}).Where("(related_id, config_id, selector_id) IN ?", ids).Update("is_pushed", true).Error +} + +func (t ConfigRelationship) GetUnpushed(db *gorm.DB) ([]DBTable, error) { + var items []ConfigRelationship + err := db.Select("config_relationships.*"). + Joins("LEFT JOIN config_items ci ON config_relationships.config_id = ci.id"). + Where("ci.agent_id = ?", uuid.Nil). + Where("config_relationships.is_pushed IS FALSE"). + Find(&items).Error + return lo.Map(items, func(i ConfigRelationship, _ int) DBTable { return i }), err +} + +func (cr ConfigRelationship) PK() string { + return cr.RelatedID + "," + cr.ConfigID + cr.SelectorID +} + func (cr ConfigRelationship) TableName() string { return "config_relationships" } @@ -196,6 +236,19 @@ type ConfigChange struct { IsPushed bool `json:"is_pushed,omitempty"` } +func (t ConfigChange) GetUnpushed(db *gorm.DB) ([]DBTable, error) { + var items []ConfigChange + err := db.Select("config_changes.*"). + Joins("LEFT JOIN config_items ON config_items.id = config_changes.config_id"). + Where("config_items.agent_id = ?", uuid.Nil). + Where("config_changes.is_pushed IS FALSE").Find(&items).Error + return lo.Map(items, func(i ConfigChange, _ int) DBTable { return i }), err +} + +func (c ConfigChange) PK() string { + return c.ID +} + func (c ConfigChange) TableName() string { return "config_changes" } @@ -242,6 +295,20 @@ type ConfigAnalysis struct { IsPushed bool `json:"is_pushed,omitempty"` } +func (ConfigAnalysis) GetUnpushed(db *gorm.DB) ([]DBTable, error) { + var items []ConfigAnalysis + err := db.Select("config_analysis.*"). + Joins("LEFT JOIN config_items ON config_items.id = config_analysis.config_id"). + Where("config_items.agent_id = ?", uuid.Nil). + Where("config_analysis.is_pushed IS FALSE"). + Find(&items).Error + return lo.Map(items, func(i ConfigAnalysis, _ int) DBTable { return i }), err +} + +func (a ConfigAnalysis) PK() string { + return a.ID.String() +} + func (a ConfigAnalysis) TableName() string { return "config_analysis" } diff --git a/models/topology.go b/models/topology.go index 6fc6a331..5c91ce28 100644 --- a/models/topology.go +++ b/models/topology.go @@ -5,6 +5,8 @@ import ( "github.com/flanksource/duty/types" "github.com/google/uuid" + "github.com/samber/lo" + "gorm.io/gorm" ) // Topology represents the topologies database table @@ -23,6 +25,16 @@ type Topology struct { DeletedAt *time.Time `json:"deleted_at,omitempty" time_format:"postgres_timestamp"` } +func (t Topology) GetUnpushed(db *gorm.DB) ([]DBTable, error) { + var items []Topology + err := db.Where("is_pushed IS FALSE").Find(&items).Error + return lo.Map(items, func(i Topology, _ int) DBTable { return i }), err +} + +func (t Topology) PK() string { + return t.ID.String() +} + func (Topology) TableName() string { return "topologies" } diff --git a/schema/checks.hcl b/schema/checks.hcl index f0426e07..a2d01fd3 100644 --- a/schema/checks.hcl +++ b/schema/checks.hcl @@ -34,6 +34,11 @@ table "canaries" { null = true type = text } + column "is_pushed" { + null = false + default = false + type = bool + } column "created_at" { null = true type = timestamptz @@ -71,6 +76,10 @@ table "canaries" { unique = true columns = [column.agent_id, column.name, column.namespace, column.source] } + index "canaries_is_pushed_idx" { + columns = [column.is_pushed] + where = "is_pushed IS FALSE" + } } table "check_statuses" { @@ -196,6 +205,11 @@ table "checks" { null = true type = text } + column "is_pushed" { + null = false + default = false + type = bool + } column "transformed" { null = true type = boolean @@ -254,6 +268,10 @@ table "checks" { index "checks_canary_id_transformed_idx" { columns = [column.canary_id, column.transformed] } + index "checks_is_pushed_idx" { + columns = [column.is_pushed] + where = "is_pushed IS FALSE" + } index "idx_checks_agent" { columns = [column.agent_id] diff --git a/schema/components.hcl b/schema/components.hcl index 02f6e3a9..04152af8 100644 --- a/schema/components.hcl +++ b/schema/components.hcl @@ -41,6 +41,11 @@ table "topologies" { type = timestamptz default = sql("now()") } + column "is_pushed" { + null = false + default = false + type = bool + } column "schedule" { null = true type = text @@ -72,6 +77,10 @@ table "topologies" { unique = true columns = [column.agent_id, column.name, column.namespace] } + index "topologies_is_pushed_idx" { + columns = [column.is_pushed] + where = "is_pushed IS FALSE" + } } @@ -107,6 +116,11 @@ table "component_relationships" { null = true type = text } + column "is_pushed" { + null = false + default = false + type = bool + } foreign_key "component_relationships_component_id_fkey" { columns = [column.component_id] @@ -121,6 +135,10 @@ table "component_relationships" { on_delete = CASCADE } + index "component_relationships_is_pushed_idx" { + columns = [column.is_pushed] + where = "is_pushed IS FALSE" + } index "component_relationships_component_id_relationship_id_select_key" { unique = true columns = [column.component_id, column.relationship_id, column.selector_id] @@ -184,6 +202,11 @@ table "components" { type = boolean default = false } + column "is_pushed" { + null = false + default = false + type = bool + } column "status" { null = false type = text @@ -322,6 +345,10 @@ table "components" { unique = true columns = [column.topology_id, column.type, column.name, column.parent_id] } + index "components_is_pushed_idx" { + columns = [column.is_pushed] + where = "is_pushed IS FALSE" + } index "idx_components_deleted_at" { columns = [column.deleted_at] @@ -368,6 +395,11 @@ table "check_component_relationships" { null = true type = text } + column "is_pushed" { + null = false + default = false + type = bool + } foreign_key "check_component_relationships_canary_id_fkey" { columns = [column.canary_id] ref_columns = [table.canaries.column.id] @@ -390,7 +422,10 @@ table "check_component_relationships" { unique = true columns = [column.component_id, column.check_id, column.canary_id, column.selector_id] } - + index "check_component_relationships_is_pushed_idx" { + columns = [column.is_pushed] + where = "is_pushed IS FALSE" + } index "idx_check_component_relationships_deleted_at" { columns = [column.deleted_at] } @@ -425,6 +460,11 @@ table "config_component_relationships" { null = true type = text } + column "is_pushed" { + null = false + default = false + type = bool + } foreign_key "config_component_relationships_component_id_fkey" { columns = [column.component_id] ref_columns = [table.components.column.id] @@ -444,4 +484,8 @@ table "config_component_relationships" { index "idx_config_component_relationships_deleted_at" { columns = [column.deleted_at] } + index "config_component_relationships_is_pushed_idx" { + columns = [column.is_pushed] + where = "is_pushed IS FALSE" + } } diff --git a/schema/config.hcl b/schema/config.hcl index 8cee673a..72bf9713 100644 --- a/schema/config.hcl +++ b/schema/config.hcl @@ -258,6 +258,11 @@ table "config_items" { null = true type = text } + column "is_pushed" { + null = false + default = false + type = bool + } column "created_by" { null = true type = uuid @@ -305,6 +310,10 @@ table "config_items" { on_update = NO_ACTION on_delete = NO_ACTION } + index "config_items_is_pushed_idx" { + columns = [column.is_pushed] + where = "is_pushed IS FALSE" + } index "idx_config_items_scraper_id" { columns = [column.scraper_id] } @@ -357,6 +366,11 @@ table "config_relationships" { null = true type = text } + column "is_pushed" { + null = false + default = false + type = bool + } foreign_key "config_relationships_config_id_fkey" { columns = [column.config_id] ref_columns = [table.config_items.column.id] @@ -376,6 +390,10 @@ table "config_relationships" { index "idx_config_relationships_deleted_at" { columns = [column.deleted_at] } + index "config_relationships_is_pushed_idx" { + columns = [column.is_pushed] + where = "is_pushed IS FALSE" + } } table "check_config_relationships" { @@ -410,6 +428,11 @@ table "check_config_relationships" { null = true type = text } + column "is_pushed" { + null = false + default = false + type = bool + } foreign_key "check_config_relationships_canary_id_fkey" { columns = [column.canary_id] ref_columns = [table.canaries.column.id] @@ -432,6 +455,10 @@ table "check_config_relationships" { unique = true columns = [column.config_id, column.check_id, column.canary_id, column.selector_id] } + index "check_config_relationships_is_pushed_idx" { + columns = [column.is_pushed] + where = "is_pushed IS FALSE" + } } table "config_scrapers" { @@ -478,6 +505,11 @@ table "config_scrapers" { null = true type = timestamptz } + column "is_pushed" { + null = false + default = false + type = bool + } primary_key { columns = [column.id] } @@ -493,4 +525,8 @@ table "config_scrapers" { on_update = NO_ACTION on_delete = NO_ACTION } + index "config_scrapers_is_pushed_idx" { + columns = [column.is_pushed] + where = "is_pushed IS FALSE" + } } diff --git a/tests/upstream_test.go b/tests/upstream_test.go index 22c0126b..85d5e0e7 100644 --- a/tests/upstream_test.go +++ b/tests/upstream_test.go @@ -15,13 +15,16 @@ import ( "github.com/flanksource/duty/upstream" ) -var _ = ginkgo.Describe("Config Changes & Analyses sync test", ginkgo.Ordered, func() { +var _ = ginkgo.Describe("Reconcile Test", ginkgo.Ordered, func() { var upstreamCtx *context.Context var echoCloser, drop func() var upstreamConf upstream.UpstreamConfig const agentName = "my-agent" ginkgo.BeforeAll(func() { + DefaultContext.ClearCache() + context.SetLocalProperty("upstream.reconcile.pre-check", "false") + var err error upstreamCtx, drop, err = setup.NewDB(DefaultContext, "upstream") Expect(err).ToNot(HaveOccurred()) @@ -49,10 +52,7 @@ var _ = ginkgo.Describe("Config Changes & Analyses sync test", ginkgo.Ordered, f }) e.Use(upstream.AgentAuthMiddleware(cache.New(time.Hour, time.Hour))) - e.POST("/upstream/push", upstream.PushHandler) - e.GET("/upstream/pull", upstream.PullHandler([]string{"config_scrapers", "config_items"})) - e.GET("/upstream/status", upstream.StatusHandler([]string{"config_scrapers", "config_items"})) port, echoCloser = setup.RunEcho(e) @@ -63,9 +63,7 @@ var _ = ginkgo.Describe("Config Changes & Analyses sync test", ginkgo.Ordered, f }) ginkgo.It("should push config items first to satisfy foregin keys for changes & analyses", func() { - reconciler := upstream.NewUpstreamReconciler(upstreamConf, 100) - - count, err := reconciler.Sync(DefaultContext, "config_items") + count, err := upstream.ReconcileSome(DefaultContext, upstreamConf, 100, "config_items") Expect(err).To(BeNil()) Expect(count).To(Not(BeZero())) }) @@ -83,7 +81,7 @@ var _ = ginkgo.Describe("Config Changes & Analyses sync test", ginkgo.Ordered, f Expect(err).ToNot(HaveOccurred()) Expect(changes).To(BeZero()) - count, err := upstream.SyncConfigChanges(DefaultContext, upstreamConf, 10) + count, err := upstream.ReconcileSome(DefaultContext, upstreamConf, 10, "config_changes") Expect(err).ToNot(HaveOccurred()) err = upstreamCtx.DB().Select("COUNT(*)").Model(&models.ConfigChange{}).Scan(&changes).Error @@ -111,7 +109,7 @@ var _ = ginkgo.Describe("Config Changes & Analyses sync test", ginkgo.Ordered, f Expect(err).ToNot(HaveOccurred()) Expect(analyses).To(BeZero()) - count, err := upstream.SyncConfigAnalyses(DefaultContext, upstreamConf, 10) + count, err := upstream.ReconcileSome(DefaultContext, upstreamConf, 10, "config_analysis") Expect(err).ToNot(HaveOccurred()) err = upstreamCtx.DB().Select("COUNT(*)").Model(&models.ConfigAnalysis{}).Scan(&analyses).Error @@ -126,7 +124,7 @@ var _ = ginkgo.Describe("Config Changes & Analyses sync test", ginkgo.Ordered, f } }) - ginkgo.It("should push artifacts", func() { + ginkgo.It("should sync artifacts to upstream", func() { var pushed int err := DefaultContext.DB().Select("COUNT(*)").Where("is_pushed = true").Model(&models.Artifact{}).Scan(&pushed).Error Expect(err).ToNot(HaveOccurred()) @@ -137,7 +135,7 @@ var _ = ginkgo.Describe("Config Changes & Analyses sync test", ginkgo.Ordered, f Expect(err).ToNot(HaveOccurred()) Expect(artifacts).To(BeZero()) - count, err := upstream.SyncArtifacts(DefaultContext, upstreamConf, 10) + count, err := upstream.ReconcileSome(DefaultContext, upstreamConf, 10, "artifacts") Expect(err).ToNot(HaveOccurred()) err = upstreamCtx.DB().Select("COUNT(*)").Model(&models.Artifact{}).Scan(&artifacts).Error diff --git a/upstream/client.go b/upstream/client.go index 2dc48374..b98c97df 100644 --- a/upstream/client.go +++ b/upstream/client.go @@ -102,3 +102,10 @@ func (t *UpstreamClient) push(ctx context.Context, method string, msg *PushData) histogram.Label(StatusLabel, StatusOK).Since(start) return nil } + +func parseResponse(body string) string { + if len(body) > 200 { + body = body[0:200] + } + return body +} diff --git a/upstream/commands.go b/upstream/commands.go index e60cfadd..4df6f23e 100644 --- a/upstream/commands.go +++ b/upstream/commands.go @@ -228,13 +228,8 @@ func UpdateAgentLastReceived(ctx context.Context, id uuid.UUID) error { }).Error } -type dbTable interface { - PK() string - TableName() string -} - // saveIndividuallyWithRetries saves the given records one by one and retries only on foreign key violation error. -func saveIndividuallyWithRetries[T dbTable](ctx context.Context, items []T, maxRetries int) error { +func saveIndividuallyWithRetries[T models.DBTable](ctx context.Context, items []T, maxRetries int) error { var retries int for { var failed []T diff --git a/upstream/controllers.go b/upstream/controllers.go index 134ec523..5cb3014e 100644 --- a/upstream/controllers.go +++ b/upstream/controllers.go @@ -6,12 +6,10 @@ import ( "net/http" "time" - "github.com/flanksource/commons/collections" "github.com/flanksource/commons/logger" "github.com/flanksource/duty/api" "github.com/flanksource/duty/context" "github.com/flanksource/duty/models" - "github.com/flanksource/duty/query" "github.com/labstack/echo/v4" "github.com/patrickmn/go-cache" "go.opentelemetry.io/otel/attribute" @@ -66,48 +64,6 @@ func AgentAuthMiddleware(agentCache *cache.Cache) func(echo.HandlerFunc) echo.Ha } } -// PullHandler returns a handler that returns all the ids of items it has received from the requested agent. -func PullHandler(allowedTables []string) func(echo.Context) error { - return func(c echo.Context) error { - ctx := c.Request().Context().(context.Context) - histogram := ctx.Histogram("push_queue_pull_handler") - start := time.Now() - defer func() { - histogram.Since(start) - }() - var req PaginateRequest - if err := c.Bind(&req); err != nil { - histogram = histogram.Label(StatusLabel, StatusError) - return c.JSON(http.StatusBadRequest, api.HTTPError{Error: err.Error()}) - } - - ctx.GetSpan().SetAttributes( - attribute.String("request.table", req.Table), - attribute.String("request.from", req.From), - attribute.Int("request.size", req.Size), - ) - - if !collections.Contains(allowedTables, req.Table) { - histogram = histogram.Label(StatusLabel, StatusError) - return c.JSON(http.StatusForbidden, api.HTTPError{Error: fmt.Sprintf("table=%s is not allowed", req.Table)}) - } - - agent := ctx.Agent() - histogram = histogram.Label(AgentLabel, agent.ID.String()) - - resp, err := query.GetAllResourceIDsOfAgent(ctx, req.Table, req.From, req.Size, agent.ID) - if err != nil { - histogram.Label(StatusLabel, StatusError) - return c.JSON(http.StatusInternalServerError, api.HTTPError{Error: err.Error(), Message: "failed to get resource ids"}) - } - histogram.Label(StatusLabel, StatusOK) - ctx.Counter("push_queue_pull_handler_records", AgentLabel, agent.ID.String()).Add(len(resp)) - ctx.GetSpan().SetAttributes(attribute.Int("response.count", len(resp))) - - return c.JSON(http.StatusOK, resp) - } -} - // PushHandler returns an echo handler that saves the push data from agents. func PushHandler(c echo.Context) error { ctx := c.Request().Context().(context.Context) @@ -179,41 +135,6 @@ func DeleteHandler(c echo.Context) error { return nil } -// StatusHandler returns a handler that returns the summary of all ids the upstream has received. -func StatusHandler(allowedTables []string) func(echo.Context) error { - return func(c echo.Context) error { - ctx := c.Request().Context().(context.Context) - var req PaginateRequest - if err := c.Bind(&req); err != nil { - return c.JSON(http.StatusBadRequest, api.HTTPError{Error: err.Error()}) - } - - start := time.Now() - ctx.GetSpan().SetAttributes( - attribute.String("request.table", req.Table), - attribute.String("request.from", req.From), - attribute.Int("request.size", req.Size), - ) - if !collections.Contains(allowedTables, req.Table) { - return c.JSON(http.StatusForbidden, api.HTTPError{Error: fmt.Sprintf("table=%s is not allowed", req.Table)}) - } - - agent := ctx.Agent() - - histogram := ctx.Histogram("push_queue_status_handler") - histogram = histogram.Label(AgentLabel, agent.ID.String()) - - response, err := GetPrimaryKeysHash(ctx, req, agent.ID) - if err != nil { - histogram.Label(StatusLabel, StatusError).Since(start) - return c.JSON(http.StatusInternalServerError, api.HTTPError{Error: err.Error(), Message: "failed to push status response"}) - } - - histogram.Label(StatusLabel, StatusOK).Since(start) - return c.JSON(http.StatusOK, response) - } -} - func PingHandler(c echo.Context) error { start := time.Now() ctx := c.Request().Context().(context.Context) diff --git a/upstream/event_consumer.go b/upstream/event_consumer.go index 9c01cb02..1fecefe6 100644 --- a/upstream/event_consumer.go +++ b/upstream/event_consumer.go @@ -10,8 +10,6 @@ import ( ) const ( - EventPushQueueCreate = "push_queue.create" - // EventPushQueueDelete is fired when a record, on one of the the tables we're tracking, // is hard deleted. EventPushQueueDelete = "push_queue.delete" @@ -86,6 +84,14 @@ func DeleteFromUpstream(ctx context.Context, config UpstreamConfig, events []pos SelectorID: cl.ItemIDs[i][2], }) } + + case "check_statuses": + for i := range cl.ItemIDs { + upstreamMsg.CheckStatuses = append(upstreamMsg.CheckStatuses, models.CheckStatus{ + CheckID: uuid.MustParse(cl.ItemIDs[i][0]), + Time: cl.ItemIDs[i][1], + }) + } } } @@ -108,106 +114,6 @@ func DeleteFromUpstream(ctx context.Context, config UpstreamConfig, events []pos return failedEvents } -// getPushUpstreamConsumer acts as an adapter to supply PushToUpstream event consumer. -func NewPushUpstreamConsumer(config UpstreamConfig) func(ctx context.Context, events postq.Events) postq.Events { - return func(ctx context.Context, events postq.Events) postq.Events { - return PushToUpstream(ctx, config, events) - } -} - -// PushToUpstream fetches records specified in events from this instance and sends them to the upstream instance. -func PushToUpstream(ctx context.Context, config UpstreamConfig, events []postq.Event) []postq.Event { - ctx, span := ctx.StartSpan("PushToUpstream") - defer span.End() - upstreamMsg := &PushData{} - - var failedEvents []postq.Event - for _, cl := range GroupChangelogsByTables(events) { - switch cl.TableName { - case "topologies": - if err := ctx.DB().Omit("created_by").Where("id IN ?", cl.ItemIDs).Find(&upstreamMsg.Topologies).Error; err != nil { - errMsg := fmt.Errorf("error fetching topologies: %w", err) - failedEvents = append(failedEvents, addErrorToFailedEvents(cl.Events, errMsg)...) - } - - case "components": - if err := ctx.DB().Omit("created_by").Where("id IN ?", cl.ItemIDs).Find(&upstreamMsg.Components).Error; err != nil { - errMsg := fmt.Errorf("error fetching components: %w", err) - failedEvents = append(failedEvents, addErrorToFailedEvents(cl.Events, errMsg)...) - } - - case "canaries": - if err := ctx.DB().Omit("created_by").Where("id IN ?", cl.ItemIDs).Find(&upstreamMsg.Canaries).Error; err != nil { - errMsg := fmt.Errorf("error fetching canaries: %w", err) - failedEvents = append(failedEvents, addErrorToFailedEvents(cl.Events, errMsg)...) - } - - case "checks": - if err := ctx.DB().Omit("created_by").Where("id IN ?", cl.ItemIDs).Find(&upstreamMsg.Checks).Error; err != nil { - errMsg := fmt.Errorf("error fetching checks: %w", err) - failedEvents = append(failedEvents, addErrorToFailedEvents(cl.Events, errMsg)...) - } - - case "config_scrapers": - if err := ctx.DB().Omit("created_by").Where("id IN ?", cl.ItemIDs).Find(&upstreamMsg.ConfigScrapers).Error; err != nil { - errMsg := fmt.Errorf("error fetching config_scrapers: %w", err) - failedEvents = append(failedEvents, addErrorToFailedEvents(cl.Events, errMsg)...) - } - - case "config_items": - if err := ctx.DB().Omit("created_by").Where("id IN ?", cl.ItemIDs).Find(&upstreamMsg.ConfigItems).Error; err != nil { - errMsg := fmt.Errorf("error fetching config_items: %w", err) - failedEvents = append(failedEvents, addErrorToFailedEvents(cl.Events, errMsg)...) - } - - case "config_component_relationships": - if err := ctx.DB().Where("(component_id, config_id) IN ?", cl.ItemIDs).Find(&upstreamMsg.ConfigComponentRelationships).Error; err != nil { - errMsg := fmt.Errorf("error fetching config_component_relationships: %w", err) - failedEvents = append(failedEvents, addErrorToFailedEvents(cl.Events, errMsg)...) - } - - case "component_relationships": - if err := ctx.DB().Where("(component_id, relationship_id, selector_id) IN ?", cl.ItemIDs).Find(&upstreamMsg.ComponentRelationships).Error; err != nil { - errMsg := fmt.Errorf("error fetching component_relationships: %w", err) - failedEvents = append(failedEvents, addErrorToFailedEvents(cl.Events, errMsg)...) - } - - case "config_relationships": - if err := ctx.DB().Where("(related_id, config_id, selector_id) IN ?", cl.ItemIDs).Find(&upstreamMsg.ConfigRelationships).Error; err != nil { - errMsg := fmt.Errorf("error fetching config_relationships: %w", err) - failedEvents = append(failedEvents, addErrorToFailedEvents(cl.Events, errMsg)...) - } - } - } - - upstreamMsg.ApplyLabels(config.LabelsMap()) - - upstreamClient := NewUpstreamClient(config) - err := upstreamClient.Push(ctx, upstreamMsg) - if err == nil { - return failedEvents - } - - if len(events) == 1 { - errMsg := fmt.Errorf("failed to push to upstream: %w", err) - failedEvents = append(failedEvents, addErrorToFailedEvents(events, errMsg)...) - } else { - // Error encountered while pushing could be an SQL or Application error - // Since we do not know which event in the bulk is failing - // Process each event individually since upsteam.Push is idempotent - - for _, e := range events { - failedEvents = append(failedEvents, PushToUpstream(ctx, config, []postq.Event{e})...) - } - } - - if len(events) > 0 || len(failedEvents) > 0 { - ctx.Tracef("processed %d events, %d errors", len(events), len(failedEvents)) - } - - return failedEvents -} - func addErrorToFailedEvents(events []postq.Event, err error) []postq.Event { var failedEvents []postq.Event for _, e := range events { @@ -237,6 +143,8 @@ func GroupChangelogsByTables(events []postq.Event) []GroupedPushEvents { itemIDs = []string{cl.Properties["component_id"], cl.Properties["config_id"]} case "config_relationships": itemIDs = []string{cl.Properties["related_id"], cl.Properties["config_id"], cl.Properties["selector_id"]} + case "check_statuses": + itemIDs = []string{cl.Properties["check_id"], cl.Properties["time"]} default: itemIDs = []string{cl.Properties["id"]} } diff --git a/upstream/jobs.go b/upstream/jobs.go index b01c7bc6..d4785e5c 100644 --- a/upstream/jobs.go +++ b/upstream/jobs.go @@ -5,138 +5,100 @@ import ( "github.com/flanksource/duty/context" "github.com/flanksource/duty/models" - "github.com/google/uuid" "github.com/samber/lo" + "gorm.io/gorm" ) -// SyncCheckStatuses pushes check statuses, that haven't already been pushed, to upstream. -func SyncCheckStatuses(ctx context.Context, config UpstreamConfig, batchSize int) (int, error) { - client := NewUpstreamClient(config) - count := 0 - for { - var checkStatuses []models.CheckStatus - if err := ctx.DB().Select("check_statuses.*"). - Joins("LEFT JOIN checks ON checks.id = check_statuses.check_id"). - Where("checks.agent_id = ?", uuid.Nil). - Where("check_statuses.is_pushed IS FALSE"). - Limit(batchSize). - Find(&checkStatuses).Error; err != nil { - return 0, fmt.Errorf("failed to fetch check_statuses: %w", err) - } - - if len(checkStatuses) == 0 { - return count, nil - } - - ctx.Tracef("pushing %d check_statuses to upstream", len(checkStatuses)) - if err := client.Push(ctx, &PushData{CheckStatuses: checkStatuses}); err != nil { - return 0, fmt.Errorf("failed to push check_statuses to upstream: %w", err) - } - - for i := range checkStatuses { - checkStatuses[i].IsPushed = true - } - - if err := ctx.DB().Save(&checkStatuses).Error; err != nil { - return 0, fmt.Errorf("failed to save check_statuses: %w", err) - } - count += len(checkStatuses) - } +type pushableTable interface { + models.DBTable + GetUnpushed(db *gorm.DB) ([]models.DBTable, error) } -// SyncConfigChanges pushes config changes, that haven't already been pushed, to upstream. -func SyncConfigChanges(ctx context.Context, config UpstreamConfig, batchSize int) (int, error) { - client := NewUpstreamClient(config) - count := 0 - for { - var configChanges []models.ConfigChange - if err := ctx.DB().Select("config_changes.*"). - Joins("LEFT JOIN config_items ON config_items.id = config_changes.config_id"). - Where("config_items.agent_id = ?", uuid.Nil). - Where("config_changes.is_pushed IS FALSE"). - Limit(batchSize). - Find(&configChanges).Error; err != nil { - return 0, fmt.Errorf("failed to fetch config_changes: %w", err) - } - - if len(configChanges) == 0 { - return count, nil - } - - ctx.Tracef("pushing %d config_changes to upstream", len(configChanges)) - if err := client.Push(ctx, &PushData{ConfigChanges: configChanges}); err != nil { - return 0, fmt.Errorf("failed to push config_changes to upstream: %w", err) - } +type customIsPushedUpdater interface { + UpdateIsPushed(db *gorm.DB, items []models.DBTable) error +} - ids := lo.Map(configChanges, func(c models.ConfigChange, _ int) string { return c.ID }) - if err := ctx.DB().Model(&models.ConfigChange{}).Where("id IN ?", ids).Update("is_pushed", true).Error; err != nil { - return 0, fmt.Errorf("failed to update is_pushed on config_changes: %w", err) - } +var reconciledTables = []pushableTable{ + models.Topology{}, + models.ConfigScraper{}, + models.Canary{}, + models.Artifact{}, + + models.ConfigItem{}, + models.Check{}, + models.Component{}, + + models.ConfigChange{}, + models.ConfigAnalysis{}, + models.CheckStatus{}, + + models.CheckComponentRelationship{}, + models.CheckConfigRelationship{}, + models.ComponentRelationship{}, + models.ConfigComponentRelationship{}, + models.ConfigRelationship{}, +} - count += len(configChanges) +func ReconcileAll(ctx context.Context, config UpstreamConfig, batchSize int) (int, error) { + if ctx.Properties().Off("upstream.reconcile.pre-check") { + return ReconcileSome(ctx, config, batchSize) } -} -// SyncConfigAnalyses pushes config analyses, that haven't already been pushed, to upstream. -func SyncConfigAnalyses(ctx context.Context, config UpstreamConfig, batchSize int) (int, error) { - client := NewUpstreamClient(config) - count := 0 - for { - var analyses []models.ConfigAnalysis - if err := ctx.DB().Select("config_analysis.*"). - Joins("LEFT JOIN config_items ON config_items.id = config_analysis.config_id"). - Where("config_items.agent_id = ?", uuid.Nil). - Where("config_analysis.is_pushed IS FALSE"). - Limit(batchSize). - Find(&analyses).Error; err != nil { - return 0, fmt.Errorf("failed to fetch config_analysis: %w", err) - } + var tablesToReconcile []string + if err := ctx.DB().Table("unpushed_tables").Scan(&tablesToReconcile).Error; err != nil { + return 0, err + } - if len(analyses) == 0 { - return count, nil - } + return ReconcileSome(ctx, config, batchSize, tablesToReconcile...) +} - ctx.Tracef("pushing %d config_analyses to upstream", len(analyses)) - if err := client.Push(ctx, &PushData{ConfigAnalysis: analyses}); err != nil { - return 0, fmt.Errorf("failed to push config_analysis to upstream: %w", err) +func ReconcileSome(ctx context.Context, config UpstreamConfig, batchSize int, runOnly ...string) (int, error) { + var count int + for _, table := range reconciledTables { + if len(runOnly) > 0 && !lo.Contains(runOnly, table.TableName()) { + continue } - ids := lo.Map(analyses, func(a models.ConfigAnalysis, _ int) string { return a.ID.String() }) - if err := ctx.DB().Model(&models.ConfigAnalysis{}).Where("id IN ?", ids).Update("is_pushed", true).Error; err != nil { - return 0, fmt.Errorf("failed to update is_pushed on config_analysis: %w", err) + if c, err := reconcileTable(ctx, config, table, batchSize); err != nil { + return count, fmt.Errorf("failed to reconcile table %s: %w", table.TableName(), err) + } else { + count += c } - - count += len(analyses) } + + return count, nil } -// SyncArtifacts pushes artifacts that haven't already been pushed to upstream. -func SyncArtifacts(ctx context.Context, config UpstreamConfig, batchSize int) (int, error) { +// ReconcileTable pushes all unpushed items in a table to upstream. +func reconcileTable(ctx context.Context, config UpstreamConfig, table pushableTable, batchSize int) (int, error) { client := NewUpstreamClient(config) - count := 0 + + var count int for { - var artifacts []models.Artifact - if err := ctx.DB(). - Where("is_pushed IS FALSE"). - Limit(batchSize). - Find(&artifacts).Error; err != nil { - return 0, fmt.Errorf("failed to fetch artifacts: %w", err) + items, err := table.GetUnpushed(ctx.DB().Limit(batchSize)) + if err != nil { + return 0, fmt.Errorf("failed to fetch unpushed items for table %s: %w", table, err) } - if len(artifacts) == 0 { + if len(items) == 0 { return count, nil } - ctx.Tracef("pushing %d artifacts to upstream", len(artifacts)) - if err := client.Push(ctx, &PushData{Artifacts: artifacts}); err != nil { - return 0, fmt.Errorf("failed to push artifacts to upstream: %w", err) + ctx.Tracef("pushing %s %d to upstream", table, len(items)) + if err := client.Push(ctx, NewPushData(items)); err != nil { + return 0, fmt.Errorf("failed to push %s to upstream: %w", table, err) } + count += len(items) - ids := lo.Map(artifacts, func(a models.Artifact, _ int) string { return a.ID.String() }) - if err := ctx.DB().Model(&models.Artifact{}).Where("id IN ?", ids).Update("is_pushed", true).Error; err != nil { - return 0, fmt.Errorf("failed to update is_pushed on artifacts: %w", err) + if c, ok := table.(customIsPushedUpdater); ok { + if err := c.UpdateIsPushed(ctx.DB(), items); err != nil { + return 0, fmt.Errorf("failed to update is_pushed for %s: %w", table, err) + } + } else { + ids := lo.Map(items, func(a models.DBTable, _ int) string { return a.PK() }) + if err := ctx.DB().Model(table).Where("id IN ?", ids).Update("is_pushed", true).Error; err != nil { + return 0, fmt.Errorf("failed to update is_pushed on %s: %w", table, err) + } } - - count += len(artifacts) } } diff --git a/upstream/reconcile.go b/upstream/reconcile.go deleted file mode 100644 index 626777e5..00000000 --- a/upstream/reconcile.go +++ /dev/null @@ -1,196 +0,0 @@ -package upstream - -import ( - gocontext "context" - "encoding/json" - "errors" - "fmt" - "time" - - "github.com/flanksource/commons/http" - "github.com/flanksource/commons/logger" - "github.com/flanksource/duty/context" - "github.com/google/uuid" -) - -type PaginateRequest struct { - Table string `query:"table"` - From string `query:"from"` - Size int `query:"size"` -} - -type PaginateResponse struct { - Hash string `gorm:"column:sha256sum"` - Next string `gorm:"column:last_id"` - Total int `gorm:"column:total"` -} - -func (p PaginateRequest) String() string { - return fmt.Sprintf("table=%s next=%s, size=%d", p.Table, p.From, p.Size) -} - -func (p PaginateResponse) String() string { - return fmt.Sprintf("hash=%s, next=%s, count=%d", p.Hash, p.Next, p.Total) -} - -// UpstreamReconciler pushes missing resources from an agent to the upstream. -type UpstreamReconciler struct { - upstreamConf UpstreamConfig - upstreamClient *UpstreamClient - - // the max number of resources the agent fetches - // from the upstream in one request. - pageSize int -} - -func NewUpstreamReconciler(upstreamConf UpstreamConfig, pageSize int) *UpstreamReconciler { - return &UpstreamReconciler{ - upstreamConf: upstreamConf, - pageSize: pageSize, - upstreamClient: NewUpstreamClient(upstreamConf), - } -} - -// Sync compares all the resource of the given table against -// the upstream server and pushes any missing resources to the upstream. -func (t *UpstreamReconciler) Sync(ctx context.Context, table string) (int, error) { - logger.Debugf("Reconciling table %q with upstream", table) - - // Empty starting cursor, so we sync everything - return t.sync(ctx, table, uuid.Nil.String()) -} - -// SyncAfter pushes all the records of the given table that were updated in the given duration -func (t *UpstreamReconciler) SyncAfter(ctx context.Context, table string, after time.Duration) (int, error) { - logger.WithValues("since", time.Now().Add(-after).Format(time.RFC3339Nano)).Debugf("Reconciling table %q with upstream", table) - - // We start with a nil UUID and calculate hash in batches - next := uuid.Nil.String() - return t.sync(ctx, table, next) -} - -// Sync compares all the resource of the given table against -// the upstream server and pushes any missing resources to the upstream. -func (t *UpstreamReconciler) sync(ctx context.Context, table, next string) (int, error) { - var errorList []error - // We keep this counter to keep a track of attempts for a batch - pushed := 0 - for { - paginateRequest := PaginateRequest{From: next, Table: table, Size: t.pageSize} - - localStatus, err := GetPrimaryKeysHash(ctx, paginateRequest, uuid.Nil) - if err != nil { - return 0, fmt.Errorf("failed to fetch hash of primary keys from local db: %w", err) - } - - // Nothing left to push - if localStatus.Total == 0 { - break - } - - if localStatus.Hash == "" { - return 0, fmt.Errorf("empty row hash returned") - } - - upstreamStatus, err := t.fetchUpstreamStatus(ctx, paginateRequest) - if err != nil { - return 0, fmt.Errorf("failed to fetch upstream status: %w", err) - } - - next = localStatus.Next - - if upstreamStatus.Hash == localStatus.Hash { - logger.Debugf("[%s] pages matched, local(%s) == upstream(%s)", paginateRequest, localStatus, upstreamStatus) - continue - } - logger.Debugf("[%s] local(%s) == upstream(%s)", paginateRequest, localStatus, upstreamStatus) - - resp, err := t.fetchUpstreamResourceIDs(ctx, paginateRequest) - if err != nil { - return 0, fmt.Errorf("failed to fetch upstream resource ids: %w", err) - } - - pushData, err := GetMissingResourceIDs(ctx, resp, paginateRequest) - if err != nil { - return 0, fmt.Errorf("failed to fetch missing resource ids: %w", err) - } - - if pushData != nil && pushData.Count() > 0 { - logger.WithValues("table", table).Debugf("Pushing %d items to upstream. Next: %q", pushData.Count(), next) - - if err := t.upstreamClient.Push(ctx, pushData); err != nil { - errorList = append(errorList, fmt.Errorf("failed to push missing resource ids: %w", err)) - } - pushed += pushData.Count() - } - if next == "" { - break - } - } - - return pushed, errors.Join(errorList...) -} - -// fetchUpstreamResourceIDs requests all the existing resource ids from the upstream -// that were sent by this agent. -func (t *UpstreamReconciler) fetchUpstreamResourceIDs(ctx context.Context, request PaginateRequest) ([]string, error) { - httpReq := t.createPaginateRequest(ctx, request).QueryParam(AgentNameQueryParam, t.upstreamConf.AgentName) - httpResponse, err := httpReq.Get("pull") - if err != nil { - return nil, fmt.Errorf("error making request: %w", err) - } - - body, err := httpResponse.AsString() - if err != nil { - return nil, fmt.Errorf("error reading body: %w", err) - } - - if !httpResponse.IsOK() { - return nil, fmt.Errorf("upstream server returned error status[%d]: %s", httpResponse.StatusCode, parseResponse(body)) - } - - var response []string - if err := json.Unmarshal([]byte(body), &response); err != nil { - return nil, fmt.Errorf("invalid response format: %s", parseResponse(body)) - } - - return response, nil -} - -func (t *UpstreamReconciler) fetchUpstreamStatus(ctx gocontext.Context, request PaginateRequest) (*PaginateResponse, error) { - httpReq := t.createPaginateRequest(ctx, request).QueryParam(AgentNameQueryParam, t.upstreamConf.AgentName) - httpResponse, err := httpReq.Get("status") - if err != nil { - return nil, fmt.Errorf("error making request: %w", err) - } - - body, err := httpResponse.AsString() - if err != nil { - return nil, fmt.Errorf("error reading body: %w", err) - } - - if !httpResponse.IsOK() { - return nil, fmt.Errorf("upstream server returned error status[%d]: %s", httpResponse.StatusCode, parseResponse(body)) - } - - var response PaginateResponse - if err := json.Unmarshal([]byte(body), &response); err != nil { - return nil, fmt.Errorf("invalid response format: %s: %v", parseResponse(body), err) - } - - return &response, nil -} - -func (t *UpstreamReconciler) createPaginateRequest(ctx gocontext.Context, request PaginateRequest) *http.Request { - return t.upstreamClient.R(ctx). - QueryParam("table", request.Table). - QueryParam("from", request.From). - QueryParam("size", fmt.Sprintf("%d", request.Size)) -} - -func parseResponse(body string) string { - if len(body) > 200 { - body = body[0:200] - } - return body -} diff --git a/upstream/upstream.go b/upstream/upstream.go index dc4ad86d..fe46012e 100644 --- a/upstream/upstream.go +++ b/upstream/upstream.go @@ -76,6 +76,48 @@ type PushData struct { Artifacts []models.Artifact `json:"artifacts,omitempty"` } +func NewPushData[T models.DBTable](records []T) *PushData { + var p PushData + if len(records) == 0 { + return &p + } + + for i := range records { + switch t := any(records[i]).(type) { + case models.Canary: + p.Canaries = append(p.Canaries, t) + case models.Check: + p.Checks = append(p.Checks, t) + case models.Component: + p.Components = append(p.Components, t) + case models.ConfigScraper: + p.ConfigScrapers = append(p.ConfigScrapers, t) + case models.ConfigAnalysis: + p.ConfigAnalysis = append(p.ConfigAnalysis, t) + case models.ConfigChange: + p.ConfigChanges = append(p.ConfigChanges, t) + case models.ConfigItem: + p.ConfigItems = append(p.ConfigItems, t) + case models.CheckStatus: + p.CheckStatuses = append(p.CheckStatuses, t) + case models.ConfigRelationship: + p.ConfigRelationships = append(p.ConfigRelationships, t) + case models.ComponentRelationship: + p.ComponentRelationships = append(p.ComponentRelationships, t) + case models.ConfigComponentRelationship: + p.ConfigComponentRelationships = append(p.ConfigComponentRelationships, t) + case models.Topology: + p.Topologies = append(p.Topologies, t) + case models.PlaybookRunAction: + p.PlaybookActions = append(p.PlaybookActions, t) + case models.Artifact: + p.Artifacts = append(p.Artifacts, t) + } + } + + return &p +} + func (p *PushData) AddMetrics(counter context.Counter) { counter.Label("table", "artifacts").Add(len(p.Artifacts)) counter.Label("table", "canaries").Add(len(p.Canaries)) @@ -202,91 +244,3 @@ func (t *PushData) ApplyLabels(labels map[string]string) { t.Topologies[i].Labels = collections.MergeMap(t.Topologies[i].Labels, labels) } } - -func GetPrimaryKeysHash(ctx context.Context, req PaginateRequest, agentID uuid.UUID) (*PaginateResponse, error) { - var orderByClauses []string - if collections.Contains([]string{"components", "config_items"}, req.Table) { - orderByClauses = append(orderByClauses, "LENGTH(COALESCE(path, ''))") - } - orderByClauses = append(orderByClauses, "id") - query := fmt.Sprintf(` - WITH p_keys AS ( - SELECT id::TEXT, COALESCE(updated_at::text, '') as updated_at - FROM %s - WHERE id::TEXT > ? AND agent_id = ? - ORDER BY %s - LIMIT ? - ) - SELECT - encode(digest(string_agg(id || updated_at, ''), 'sha256'), 'hex') as sha256sum, - MAX(id) as last_id, - COUNT(*) as total - FROM - p_keys`, req.Table, strings.Join(orderByClauses, ",")) - - var resp PaginateResponse - err := ctx.DB().Raw(query, req.From, agentID, req.Size).Scan(&resp).Error - return &resp, err -} - -func GetMissingResourceIDs(ctx context.Context, ids []string, paginateReq PaginateRequest) (*PushData, error) { - var pushData PushData - - tx := ctx.DB().Where("agent_id = ?", uuid.Nil) - switch paginateReq.Table { - case "topologies": - if err := tx.Not(ids).Where("id::TEXT > ?", paginateReq.From).Limit(paginateReq.Size).Order("id").Find(&pushData.Topologies).Error; err != nil { - return nil, fmt.Errorf("error fetching topologies: %w", err) - } - - case "canaries": - if err := tx.Not(ids).Where("id::TEXT > ?", paginateReq.From).Limit(paginateReq.Size).Order("id").Find(&pushData.Canaries).Error; err != nil { - return nil, fmt.Errorf("error fetching canaries: %w", err) - } - - case "checks": - if err := tx.Not(ids).Where("id::TEXT > ?", paginateReq.From).Limit(paginateReq.Size).Order("id").Find(&pushData.Checks).Error; err != nil { - return nil, fmt.Errorf("error fetching checks: %w", err) - } - - case "components": - if err := tx.Not(ids).Where("id::TEXT > ?", paginateReq.From).Limit(paginateReq.Size).Order("LENGTH(COALESCE(path, ''))").Order("id").Find(&pushData.Components).Error; err != nil { - return nil, fmt.Errorf("error fetching components: %w", err) - } - - case "config_scrapers": - if err := tx.Not(ids).Where("id::TEXT > ?", paginateReq.From).Limit(paginateReq.Size).Order("id").Find(&pushData.ConfigScrapers).Error; err != nil { - return nil, fmt.Errorf("error fetching config scrapers: %w", err) - } - - case "config_items": - if err := tx.Not(ids).Where("id::TEXT > ?", paginateReq.From).Limit(paginateReq.Size).Order("LENGTH(COALESCE(path, ''))").Order("id").Find(&pushData.ConfigItems).Error; err != nil { - return nil, fmt.Errorf("error fetching config items: %w", err) - } - - case "check_statuses": - parts := strings.Split(paginateReq.From, ",") - if len(parts) != 2 { - return nil, fmt.Errorf("%s is not a valid next cursor. It must consist of check_id and time separated by a comma", paginateReq.From) - } - - tx := ctx.DB().Where("(check_id::TEXT, time::TEXT) > (?, ?)", parts[0], parts[1]) - - // Attach a Not IN query only if required - if len(ids) != 0 { - var pKeys = make([][]string, 0, len(ids)) - for _, pkey := range ids { - parts := strings.Split(pkey, ",") - pKeys = append(pKeys, parts) - } - - tx = tx.Where("(check_id::TEXT, time::TEXT) NOT IN (?)", pKeys) - } - - if err := tx.Limit(paginateReq.Size).Order("check_id, time").Find(&pushData.CheckStatuses).Error; err != nil { - return nil, fmt.Errorf("error fetching config items: %w", err) - } - } - - return &pushData, nil -} diff --git a/views/003_analysis_views.sql b/views/003_analysis_views.sql index 7271b85f..29159616 100644 --- a/views/003_analysis_views.sql +++ b/views/003_analysis_views.sql @@ -1,16 +1 @@ -CREATE OR REPLACE FUNCTION reset_is_pushed_before_update() -RETURNS TRIGGER AS $$ -BEGIN - -- If any column other than is_pushed is changed, reset is_pushed to false. - IF NEW IS DISTINCT FROM OLD AND NEW.is_pushed IS NOT DISTINCT FROM OLD.is_pushed THEN - NEW.is_pushed = false; - END IF; - - RETURN NEW; -END -$$ LANGUAGE plpgsql; - -CREATE OR REPLACE TRIGGER reset_is_pushed_before_update -BEFORE UPDATE ON config_analysis -FOR EACH ROW -EXECUTE PROCEDURE reset_is_pushed_before_update(); +-- Empty \ No newline at end of file diff --git a/views/006_config_views.sql b/views/006_config_views.sql index 02c673dc..2cb23e25 100644 --- a/views/006_config_views.sql +++ b/views/006_config_views.sql @@ -401,6 +401,7 @@ FOR EACH ROW EXECUTE FUNCTION insert_config_create_update_delete_in_event_queue(); DROP VIEW IF EXISTS config_detail; + CREATE OR REPLACE VIEW config_detail AS SELECT ci.*, diff --git a/views/012_changelog_trigger_func.sql b/views/012_changelog_trigger_func.sql index f93790e7..0b9bc491 100644 --- a/views/012_changelog_trigger_func.sql +++ b/views/012_changelog_trigger_func.sql @@ -1,85 +1,78 @@ --- Push table changes to event queue +-- Push hard deletes for tables that we sync to upstream CREATE -OR REPLACE FUNCTION push_changes_to_event_queue () RETURNS TRIGGER AS $$ +OR REPLACE FUNCTION push_deletes_to_event_queue () RETURNS TRIGGER AS $$ DECLARE rec RECORD; payload JSONB; - event_name TEXT := 'push_queue.create'; priority integer := 0; priority_table JSONB := '{ "topologies": 20, "canaries": 20, "config_scrapers": 20, - "checks": 10, - "components": 10, - "config_items": 10, - "config_analysis": 5, - "config_changes": 5, - "config_component_relationships": 5, - "component_relationships": 5, - "config_relationships": 5 + "checks": 40, + "components": 40, + "config_items": 40, + "config_component_relationships": 50, + "component_relationships": 50, + "config_relationships": 50, + "check_statuses": 50 }'; BEGIN - rec = NEW; - - IF TG_OP = 'DELETE' THEN - rec = OLD; - event_name = 'push_queue.delete'; + IF TG_OP != 'DELETE' THEN + RETURN NULL; END IF; + rec = OLD; CASE TG_TABLE_NAME WHEN 'component_relationships' THEN - IF TG_OP != 'DELETE' THEN - -- Set these fields to null for component_relationships to prevent excessive pushes - rec.updated_at = NULL; - OLD.updated_at = NULL; - - -- If it is same as the old record, then no action required - IF rec IS NOT DISTINCT FROM OLD THEN - RETURN NULL; - END IF; - END IF; - payload = jsonb_build_object('component_id', rec.component_id, 'relationship_id', rec.relationship_id, 'selector_id', rec.selector_id); WHEN 'config_component_relationships' THEN payload = jsonb_build_object('component_id', rec.component_id, 'config_id', rec.config_id); WHEN 'config_relationships' THEN payload = jsonb_build_object('related_id', rec.related_id, 'config_id', rec.config_id, 'selector_id', rec.selector_id); - WHEN 'checks' THEN - IF TG_OP != 'DELETE' THEN - -- Set these fields to null for checks to prevent excessive pushes - rec.updated_at = NULL; - OLD.updated_at = NULL; - - -- If it is same as the old record, then no action required - IF rec IS NOT DISTINCT FROM OLD THEN - RETURN NULL; - END IF; - END IF; - - payload = jsonb_build_object('id', rec.id); - WHEN 'canaries' THEN - IF TG_OP != 'DELETE' THEN - -- Set these fields to null for canaries to prevent excessive pushes - rec.updated_at = NULL; - OLD.updated_at = NULL; - - -- If it is same as the old record, then no action required - IF rec IS NOT DISTINCT FROM OLD THEN - RETURN NULL; - END IF; - END IF; - - payload = jsonb_build_object('id', rec.id); + WHEN 'check_statuses' THEN + payload = jsonb_build_object('check_id', rec.check_id, 'time', rec.time); ELSE payload = jsonb_build_object('id', rec.id); END CASE; -- Log changes to event queue priority = (priority_table->>TG_TABLE_NAME)::integer; - INSERT INTO event_queue (name, properties, priority) VALUES (event_name, jsonb_build_object('table', TG_TABLE_NAME) || payload, priority) + INSERT INTO event_queue (name, properties, priority) VALUES ('push_queue.delete', jsonb_build_object('table', TG_TABLE_NAME) || payload, priority) ON CONFLICT (name, properties) DO UPDATE SET created_at = NOW(), last_attempt = NULL, attempts = 0; RETURN NULL; END; -$$ LANGUAGE 'plpgsql' SECURITY DEFINER; \ No newline at end of file +$$ LANGUAGE 'plpgsql' SECURITY DEFINER; + +-- Apply trigger +DO $$ +DECLARE + table_name TEXT; +BEGIN + FOR table_name IN + SELECT t.table_name + FROM information_schema.tables t + WHERE t.table_schema = 'public' AND t.table_type = 'BASE TABLE' + AND t.table_name IN ( + 'topologies', + 'canaries', + 'config_scrapers', + 'checks', + 'components', + 'config_items', + 'config_component_relationships', + 'component_relationships', + 'config_relationships', + 'check_statuses' + ) + LOOP + EXECUTE format(' + CREATE OR REPLACE TRIGGER %1$I_change_to_event_queue + AFTER DELETE ON %1$I + FOR EACH ROW + EXECUTE PROCEDURE push_deletes_to_event_queue()', + table_name + ); + END LOOP; +END $$; \ No newline at end of file diff --git a/views/012_changelog_triggers_checks.sql b/views/012_changelog_triggers_checks.sql deleted file mode 100644 index 0a0ea75e..00000000 --- a/views/012_changelog_triggers_checks.sql +++ /dev/null @@ -1,22 +0,0 @@ --- Apply trigger -DO $$ -DECLARE - table_name TEXT; -BEGIN - FOR table_name IN - SELECT t.table_name - FROM information_schema.tables t - WHERE t.table_schema = 'public' AND t.table_type = 'BASE TABLE' - AND t.table_name IN ( - 'checks' - ) - LOOP - EXECUTE format(' - CREATE OR REPLACE TRIGGER %1$I_change_to_event_queue - AFTER INSERT OR UPDATE OR DELETE ON %1$I - FOR EACH ROW - EXECUTE PROCEDURE push_changes_to_event_queue()', - table_name - ); - END LOOP; -END $$; diff --git a/views/012_changelog_triggers_others.sql b/views/012_changelog_triggers_others.sql deleted file mode 100644 index 148f10f2..00000000 --- a/views/012_changelog_triggers_others.sql +++ /dev/null @@ -1,28 +0,0 @@ --- Apply trigger -DO $$ -DECLARE - table_name TEXT; -BEGIN - FOR table_name IN - SELECT t.table_name - FROM information_schema.tables t - WHERE t.table_schema = 'public' AND t.table_type = 'BASE TABLE' - AND t.table_name IN ( - 'canaries', - 'components', - 'config_scrapers', - 'config_component_relationships', - 'component_relationships', - 'config_relationships', - 'topologies' - ) - LOOP - EXECUTE format(' - CREATE OR REPLACE TRIGGER %1$I_change_to_event_queue - AFTER INSERT OR UPDATE OR DELETE ON %1$I - FOR EACH ROW - EXECUTE PROCEDURE push_changes_to_event_queue()', - table_name - ); - END LOOP; -END $$; diff --git a/views/012_changelog_triggers_scrapers.sql b/views/012_changelog_triggers_scrapers.sql deleted file mode 100644 index 036064b6..00000000 --- a/views/012_changelog_triggers_scrapers.sql +++ /dev/null @@ -1,22 +0,0 @@ --- Apply trigger -DO $$ -DECLARE - table_name TEXT; -BEGIN - FOR table_name IN - SELECT t.table_name - FROM information_schema.tables t - WHERE t.table_schema = 'public' AND t.table_type = 'BASE TABLE' - AND t.table_name IN ( - 'config_items' - ) - LOOP - EXECUTE format(' - CREATE OR REPLACE TRIGGER %1$I_change_to_event_queue - AFTER INSERT OR UPDATE OR DELETE ON %1$I - FOR EACH ROW - EXECUTE PROCEDURE push_changes_to_event_queue()', - table_name - ); - END LOOP; -END $$; diff --git a/views/013_updated_at_column_trigger.sql b/views/013_updated_at_column_trigger.sql index b68ce873..861501e1 100644 --- a/views/013_updated_at_column_trigger.sql +++ b/views/013_updated_at_column_trigger.sql @@ -16,6 +16,13 @@ BEGIN RETURN NEW; END IF; + -- If we're updating the `is_pushed` column, don't update the `updated_at` column + IF exist(oldrow, 'is_pushed') THEN + IF OLD.is_pushed != NEW.is_pushed THEN + RETURN NEW; + END IF; + END IF; + IF to_jsonb(NEW) ? 'deleted_at' THEN IF NEW.deleted_at IS NOT NULL THEN RETURN NEW; diff --git a/views/026_is_pushed_trigger.sql b/views/026_is_pushed_trigger.sql new file mode 100644 index 00000000..baa04984 --- /dev/null +++ b/views/026_is_pushed_trigger.sql @@ -0,0 +1,47 @@ +CREATE +OR REPLACE FUNCTION reset_is_pushed_before_update() RETURNS TRIGGER AS $$ +BEGIN + -- If any column other than is_pushed is changed, reset is_pushed to false. + IF NEW IS DISTINCT FROM OLD AND NEW.is_pushed IS NOT DISTINCT FROM OLD.is_pushed THEN + NEW.is_pushed = false; + END IF; + + RETURN NEW; +END +$$ LANGUAGE plpgsql; + +DO $$ +DECLARE + table_name TEXT; +BEGIN + FOR table_name IN + SELECT t.table_name + FROM information_schema.tables t + WHERE t.table_schema = current_schema() AND t.table_type = 'BASE TABLE' + AND t.table_name IN ( + 'artifacts', + 'topologies', + 'config_scrapers', + 'canaries', + 'components', + 'checks', + 'config_items', + 'config_analysis', + 'config_changes', + 'check_statuses', + 'check_component_relationships', + 'check_config_relationships', + 'component_relationships', + 'config_component_relationships', + 'config_relationships' + ) + LOOP + EXECUTE format(' + CREATE OR REPLACE TRIGGER %I_reset_is_pushed_before_update + BEFORE UPDATE ON %I + FOR EACH ROW + EXECUTE PROCEDURE reset_is_pushed_before_update()', + table_name, table_name + ); + END LOOP; +END $$; \ No newline at end of file diff --git a/views/027_upstream.sql b/views/027_upstream.sql new file mode 100644 index 00000000..3d314161 --- /dev/null +++ b/views/027_upstream.sql @@ -0,0 +1,11 @@ +CREATE OR REPLACE VIEW unpushed_tables +AS +SELECT + c.relname +FROM + pg_index i + JOIN pg_class c ON c.oid = i.indrelid + JOIN pg_class ic ON i.indexrelid = ic.oid +WHERE + i.indexrelid :: regclass :: TEXT LIKE '%_is_pushed_idx' + AND ic.reltuples > 0 \ No newline at end of file