diff --git a/upstream/jobs.go b/upstream/jobs.go index e4f00ace..08cf4d8c 100644 --- a/upstream/jobs.go +++ b/upstream/jobs.go @@ -138,33 +138,36 @@ func reconcileTable(ctx context.Context, config UpstreamConfig, table pushableTa count += len(items) - backoff := retry.WithMaxRetries(3, retry.NewConstant(time.Second)) - err = retry.Do(ctx, backoff, func(_ctx gocontext.Context) error { - ctx = _ctx.(context.Context) - - if c, ok := table.(customIsPushedUpdater); ok { - if err := c.UpdateIsPushed(ctx.DB(), items); err != nil { - if duty.IsDeadlockError(err) { - return retry.RetryableError(err) + batchSize := ctx.Properties().Int("update_is_pushed.batch.size", 200) + for _, batch := range lo.Chunk(items, batchSize) { + backoff := retry.WithJitter(time.Second, retry.WithMaxRetries(3, retry.NewExponential(time.Second))) + err = retry.Do(ctx, backoff, func(_ctx gocontext.Context) error { + ctx = _ctx.(context.Context) + + if c, ok := table.(customIsPushedUpdater); ok { + if err := c.UpdateIsPushed(ctx.DB(), batch); err != nil { + if duty.IsDeadlockError(err) { + return retry.RetryableError(err) + } + + return fmt.Errorf("failed to update is_pushed on %s: %w", table.TableName(), err) } - - return fmt.Errorf("failed to update is_pushed on %s: %w", table.TableName(), err) - } - } else { - ids := lo.Map(items, func(a models.DBTable, _ int) string { return a.PK() }) - if err := ctx.DB().Model(table).Where("id IN ?", ids).Update("is_pushed", true).Error; err != nil { - if duty.IsDeadlockError(err) { - return retry.RetryableError(err) + } else { + ids := lo.Map(batch, func(a models.DBTable, _ int) string { return a.PK() }) + if err := ctx.DB().Model(table).Where("id IN ?", ids).Update("is_pushed", true).Error; err != nil { + if duty.IsDeadlockError(err) { + return retry.RetryableError(err) + } + + return fmt.Errorf("failed to update is_pushed on %s: %w", table.TableName(), err) } - - return fmt.Errorf("failed to update is_pushed on %s: %w", table.TableName(), err) } - } - return nil - }) - if err != nil { - return count, err + return nil + }) + if err != nil { + return count, err + } } if pushError != nil {