Skip to content

Commit

Permalink
chore: removed hash based reconciler
Browse files Browse the repository at this point in the history
  • Loading branch information
adityathebe committed Mar 1, 2024
1 parent 6ab44bc commit 0854e22
Show file tree
Hide file tree
Showing 11 changed files with 91 additions and 384 deletions.
8 changes: 8 additions & 0 deletions models/artifacts.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,11 @@ type Artifact struct {
DeletedAt *time.Time `json:"deleted_at,omitempty" yaml:"deleted_at,omitempty" time_format:"postgres_timestamp"`
ExpiresAt *time.Time `json:"expires_at,omitempty" yaml:"expires_at,omitempty" time_format:"postgres_timestamp"`
}

func (t Artifact) TableName() string {
return "artifacts"
}

func (t Artifact) PK() string {
return t.ID.String()
}
4 changes: 4 additions & 0 deletions models/canary.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ func (c Canary) GetCheckID(checkName string) string {
return c.Checks[checkName]
}

func (c Canary) PK() string {
return "canaries"
}

func (c Canary) TableName() string {
return "canaries"
}
Expand Down
4 changes: 4 additions & 0 deletions models/checks.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ type Check struct {
TotalRuns int `json:"totalRuns,omitempty" gorm:"-"`
}

func (c Check) PK() string {
return c.ID.String()
}

func (c Check) TableName() string {
return "checks"
}
Expand Down
4 changes: 4 additions & 0 deletions models/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,10 @@ type ConfigScraper struct {
DeletedAt *time.Time `json:"deleted_at,omitempty"`
}

func (c ConfigScraper) PK() string {
return c.ID.String()
}

func (c ConfigScraper) TableName() string {
return "config_scrapers"
}
Expand Down
4 changes: 4 additions & 0 deletions models/topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ type Topology struct {
DeletedAt *time.Time `json:"deleted_at,omitempty" time_format:"postgres_timestamp"`
}

func (t Topology) PK() string {
return t.ID.String()
}

func (Topology) TableName() string {
return "topologies"
}
Expand Down
9 changes: 2 additions & 7 deletions tests/upstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,7 @@ var _ = ginkgo.Describe("Config Changes & Analyses sync test", ginkgo.Ordered, f
})

e.Use(upstream.AgentAuthMiddleware(cache.New(time.Hour, time.Hour)))

e.POST("/upstream/push", upstream.PushHandler)
e.GET("/upstream/pull", upstream.PullHandler([]string{"config_scrapers", "config_items"}))
e.GET("/upstream/status", upstream.StatusHandler([]string{"config_scrapers", "config_items"}))

port, echoCloser = setup.RunEcho(e)

Expand All @@ -63,9 +60,7 @@ var _ = ginkgo.Describe("Config Changes & Analyses sync test", ginkgo.Ordered, f
})

ginkgo.It("should push config items first to satisfy foregin keys for changes & analyses", func() {
reconciler := upstream.NewUpstreamReconciler(upstreamConf, 100)

count, err := reconciler.Sync(DefaultContext, "config_items")
count, err := upstream.SyncIsPushedTable[models.ConfigItem](DefaultContext, upstreamConf, 100)
Expect(err).To(BeNil())
Expect(count).To(Not(BeZero()))
})
Expand Down Expand Up @@ -137,7 +132,7 @@ var _ = ginkgo.Describe("Config Changes & Analyses sync test", ginkgo.Ordered, f
Expect(err).ToNot(HaveOccurred())
Expect(artifacts).To(BeZero())

count, err := upstream.SyncArtifacts(DefaultContext, upstreamConf, 10)
count, err := upstream.SyncIsPushedTable[models.Artifact](DefaultContext, upstreamConf, 10)
Expect(err).ToNot(HaveOccurred())

err = upstreamCtx.DB().Select("COUNT(*)").Model(&models.Artifact{}).Scan(&artifacts).Error
Expand Down
7 changes: 7 additions & 0 deletions upstream/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,10 @@ func (t *UpstreamClient) push(ctx context.Context, method string, msg *PushData)
histogram.Label(StatusLabel, StatusOK).Since(start)
return nil
}

func parseResponse(body string) string {
if len(body) > 200 {
body = body[0:200]
}
return body
}
79 changes: 0 additions & 79 deletions upstream/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,10 @@ import (
"net/http"
"time"

"github.com/flanksource/commons/collections"
"github.com/flanksource/commons/logger"
"github.com/flanksource/duty/api"
"github.com/flanksource/duty/context"
"github.com/flanksource/duty/models"
"github.com/flanksource/duty/query"
"github.com/labstack/echo/v4"
"github.com/patrickmn/go-cache"
"go.opentelemetry.io/otel/attribute"
Expand Down Expand Up @@ -66,48 +64,6 @@ func AgentAuthMiddleware(agentCache *cache.Cache) func(echo.HandlerFunc) echo.Ha
}
}

// PullHandler returns a handler that returns all the ids of items it has received from the requested agent.
func PullHandler(allowedTables []string) func(echo.Context) error {
return func(c echo.Context) error {
ctx := c.Request().Context().(context.Context)
histogram := ctx.Histogram("push_queue_pull_handler")
start := time.Now()
defer func() {
histogram.Since(start)
}()
var req PaginateRequest
if err := c.Bind(&req); err != nil {
histogram = histogram.Label(StatusLabel, StatusError)
return c.JSON(http.StatusBadRequest, api.HTTPError{Error: err.Error()})
}

ctx.GetSpan().SetAttributes(
attribute.String("request.table", req.Table),
attribute.String("request.from", req.From),
attribute.Int("request.size", req.Size),
)

if !collections.Contains(allowedTables, req.Table) {
histogram = histogram.Label(StatusLabel, StatusError)
return c.JSON(http.StatusForbidden, api.HTTPError{Error: fmt.Sprintf("table=%s is not allowed", req.Table)})
}

agent := ctx.Agent()
histogram = histogram.Label(AgentLabel, agent.ID.String())

resp, err := query.GetAllResourceIDsOfAgent(ctx, req.Table, req.From, req.Size, agent.ID)
if err != nil {
histogram.Label(StatusLabel, StatusError)
return c.JSON(http.StatusInternalServerError, api.HTTPError{Error: err.Error(), Message: "failed to get resource ids"})
}
histogram.Label(StatusLabel, StatusOK)
ctx.Counter("push_queue_pull_handler_records", AgentLabel, agent.ID.String()).Add(len(resp))
ctx.GetSpan().SetAttributes(attribute.Int("response.count", len(resp)))

return c.JSON(http.StatusOK, resp)
}
}

// PushHandler returns an echo handler that saves the push data from agents.
func PushHandler(c echo.Context) error {
ctx := c.Request().Context().(context.Context)
Expand Down Expand Up @@ -179,41 +135,6 @@ func DeleteHandler(c echo.Context) error {
return nil
}

// StatusHandler returns a handler that returns the summary of all ids the upstream has received.
func StatusHandler(allowedTables []string) func(echo.Context) error {
return func(c echo.Context) error {
ctx := c.Request().Context().(context.Context)
var req PaginateRequest
if err := c.Bind(&req); err != nil {
return c.JSON(http.StatusBadRequest, api.HTTPError{Error: err.Error()})
}

start := time.Now()
ctx.GetSpan().SetAttributes(
attribute.String("request.table", req.Table),
attribute.String("request.from", req.From),
attribute.Int("request.size", req.Size),
)
if !collections.Contains(allowedTables, req.Table) {
return c.JSON(http.StatusForbidden, api.HTTPError{Error: fmt.Sprintf("table=%s is not allowed", req.Table)})
}

agent := ctx.Agent()

histogram := ctx.Histogram("push_queue_status_handler")
histogram = histogram.Label(AgentLabel, agent.ID.String())

response, err := GetPrimaryKeysHash(ctx, req, agent.ID)
if err != nil {
histogram.Label(StatusLabel, StatusError).Since(start)
return c.JSON(http.StatusInternalServerError, api.HTTPError{Error: err.Error(), Message: "failed to push status response"})
}

histogram.Label(StatusLabel, StatusOK).Since(start)
return c.JSON(http.StatusOK, response)
}
}

func PingHandler(c echo.Context) error {
start := time.Now()
ctx := c.Request().Context().(context.Context)
Expand Down
30 changes: 16 additions & 14 deletions upstream/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,33 +110,35 @@ func SyncConfigAnalyses(ctx context.Context, config UpstreamConfig, batchSize in
}
}

// SyncArtifacts pushes artifacts that haven't already been pushed to upstream.
func SyncArtifacts(ctx context.Context, config UpstreamConfig, batchSize int) (int, error) {
func SyncIsPushedTable[T dbTable](ctx context.Context, config UpstreamConfig, batchSize int) (int, error) {
client := NewUpstreamClient(config)
count := 0
var anon T
table := anon.TableName()

var count int
for {
var artifacts []models.Artifact
var items []T
if err := ctx.DB().
Where("is_pushed IS FALSE").
Limit(batchSize).
Find(&artifacts).Error; err != nil {
return 0, fmt.Errorf("failed to fetch artifacts: %w", err)
Find(&items).Error; err != nil {
return 0, fmt.Errorf("failed to fetch unpushed items for table %s: %w", table, err)
}

if len(artifacts) == 0 {
if len(items) == 0 {
return count, nil
}

ctx.Tracef("pushing %d artifacts to upstream", len(artifacts))
if err := client.Push(ctx, &PushData{Artifacts: artifacts}); err != nil {
return 0, fmt.Errorf("failed to push artifacts to upstream: %w", err)
ctx.Tracef("pushing %s %d to upstream", table, len(items))
if err := client.Push(ctx, NewPushData(items)); err != nil {
return 0, fmt.Errorf("failed to push %s to upstream: %w", table, err)
}

ids := lo.Map(artifacts, func(a models.Artifact, _ int) string { return a.ID.String() })
if err := ctx.DB().Model(&models.Artifact{}).Where("id IN ?", ids).Update("is_pushed", true).Error; err != nil {
return 0, fmt.Errorf("failed to update is_pushed on artifacts: %w", err)
ids := lo.Map(items, func(a T, _ int) string { return a.PK() })
if err := ctx.DB().Model(anon).Where("id IN ?", ids).Update("is_pushed", true).Error; err != nil {
return 0, fmt.Errorf("failed to update is_pushed on %s: %w", table, err)
}

count += len(artifacts)
count += len(items)
}
}
Loading

0 comments on commit 0854e22

Please sign in to comment.