Skip to content

Commit

Permalink
perf(cache): Use AddRateLimited for batch enqueue (#5228)
Browse files Browse the repository at this point in the history
Signed-off-by: Kevin Su <[email protected]>
  • Loading branch information
pingsutw authored Apr 12, 2024
1 parent 6159c27 commit 80ccda3
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 6 deletions.
2 changes: 1 addition & 1 deletion flytestdlib/cache/auto_refresh.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ func (w *autoRefresh) enqueueBatches(ctx context.Context) error {

for _, batch := range batches {
b := batch
w.workqueue.Add(&b)
w.workqueue.AddRateLimited(&b)
}

return nil
Expand Down
15 changes: 10 additions & 5 deletions flytestdlib/cache/auto_refresh_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@ import (
const fakeCacheItemValueLimit = 10

type fakeCacheItem struct {
val int
val int
isTerminal bool
}

func (f fakeCacheItem) IsTerminal() bool {
return false
return f.isTerminal
}

type terminalCacheItem struct {
Expand All @@ -42,11 +43,15 @@ func syncFakeItem(_ context.Context, batch Batch) ([]ItemSyncResponse, error) {
// After the item has gone through ten update cycles, leave it unchanged
continue
}

isTerminal := false
if item.val == fakeCacheItemValueLimit-1 {
isTerminal = true
}
items = append(items, ItemSyncResponse{
ID: obj.GetID(),
Item: fakeCacheItem{
val: item.val + 1,
val: item.val + 1,
isTerminal: isTerminal,
},
Action: Update,
})
Expand All @@ -60,7 +65,7 @@ func syncTerminalItem(_ context.Context, batch Batch) ([]ItemSyncResponse, error
}

func TestCacheFour(t *testing.T) {
testResyncPeriod := time.Millisecond
testResyncPeriod := 10 * time.Millisecond
rateLimiter := workqueue.DefaultControllerRateLimiter()

t.Run("normal operation", func(t *testing.T) {
Expand Down

0 comments on commit 80ccda3

Please sign in to comment.