Skip to content

Commit

Permalink
chore: perform is_pushed updates in batch
Browse files Browse the repository at this point in the history
  • Loading branch information
adityathebe authored and moshloop committed Jun 14, 2024
1 parent ff45af9 commit 957f539
Showing 1 changed file with 26 additions and 23 deletions.
49 changes: 26 additions & 23 deletions upstream/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,33 +138,36 @@ func reconcileTable(ctx context.Context, config UpstreamConfig, table pushableTa

count += len(items)

backoff := retry.WithMaxRetries(3, retry.NewConstant(time.Second))
err = retry.Do(ctx, backoff, func(_ctx gocontext.Context) error {
ctx = _ctx.(context.Context)

if c, ok := table.(customIsPushedUpdater); ok {
if err := c.UpdateIsPushed(ctx.DB(), items); err != nil {
if duty.IsDeadlockError(err) {
return retry.RetryableError(err)
batchSize := ctx.Properties().Int("update_is_pushed.batch.size", 200)
for _, batch := range lo.Chunk(items, batchSize) {
backoff := retry.WithJitter(time.Second, retry.WithMaxRetries(3, retry.NewExponential(time.Second)))
err = retry.Do(ctx, backoff, func(_ctx gocontext.Context) error {
ctx = _ctx.(context.Context)

if c, ok := table.(customIsPushedUpdater); ok {
if err := c.UpdateIsPushed(ctx.DB(), batch); err != nil {
if duty.IsDeadlockError(err) {
return retry.RetryableError(err)
}

return fmt.Errorf("failed to update is_pushed on %s: %w", table.TableName(), err)
}

return fmt.Errorf("failed to update is_pushed on %s: %w", table.TableName(), err)
}
} else {
ids := lo.Map(items, func(a models.DBTable, _ int) string { return a.PK() })
if err := ctx.DB().Model(table).Where("id IN ?", ids).Update("is_pushed", true).Error; err != nil {
if duty.IsDeadlockError(err) {
return retry.RetryableError(err)
} else {
ids := lo.Map(batch, func(a models.DBTable, _ int) string { return a.PK() })
if err := ctx.DB().Model(table).Where("id IN ?", ids).Update("is_pushed", true).Error; err != nil {
if duty.IsDeadlockError(err) {
return retry.RetryableError(err)
}

return fmt.Errorf("failed to update is_pushed on %s: %w", table.TableName(), err)
}

return fmt.Errorf("failed to update is_pushed on %s: %w", table.TableName(), err)
}
}

return nil
})
if err != nil {
return count, err
return nil
})
if err != nil {
return count, err
}
}

if pushError != nil {
Expand Down

0 comments on commit 957f539

Please sign in to comment.