Skip to content

Commit

Permalink
[fix] delayed_sidekiq race condition
Browse files Browse the repository at this point in the history
  • Loading branch information
skcc321 committed Apr 26, 2024
1 parent 3d559bb commit f6e19b8
Show file tree
Hide file tree
Showing 7 changed files with 80 additions and 33 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
### Changes

### Bugs Fixed
* [#937](https://github.com/toptal/chewy/pull/937): Fix for race condition while using the `delayed_sidekiq` strategy. Also, fix for Redis bloating in case of reindexing error ([@skcc321](https://github.com/skcc321))

## 7.5.1 (2024-01-30)

Expand Down
1 change: 1 addition & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ gem 'sidekiq', require: false
gem 'kaminari-core', require: false

gem 'parallel', require: false
gem 'redis', require: false
gem 'ruby-progressbar', require: false

gem 'guard'
Expand Down
9 changes: 6 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -776,9 +776,12 @@ Chewy.settings[:sidekiq] = {queue: :low}

#### `:delayed_sidekiq`

It accumulates ids of records to be reindexed during the latency window in redis and then does the reindexing of all accumulated records at once.
The strategy is very useful in case of frequently mutated records.
It supports `update_fields` option, so it will try to select just enough data from the DB
It accumulates IDs of records to be reindexed during the latency window in Redis and then performs the reindexing of all accumulated records at once.
This strategy is very useful in the case of frequently mutated records.
It supports the `update_fields` option, so it will attempt to select just enough data from the database.

Keep in mind, this strategy does not guarantee reindexing in the event of Sidekiq worker termination or an error during the reindexing phase.
This behavior is intentional to prevent continuous growth of Redis db.

There are three options that can be defined in the index:
```ruby
Expand Down
8 changes: 4 additions & 4 deletions lib/chewy/strategy/delayed_sidekiq.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ class DelayedSidekiq < Sidekiq
# leak and potential flaky tests.
def self.clear_timechunks!
::Sidekiq.redis do |redis|
timechunk_sets = redis.smembers(Chewy::Strategy::DelayedSidekiq::Scheduler::ALL_SETS_KEY)
break if timechunk_sets.empty?
keys_to_delete = redis.keys("#{Scheduler::KEY_PREFIX}*")

redis.pipelined do |pipeline|
timechunk_sets.each { |set| pipeline.del(set) }
# Delete keys one by one
keys_to_delete.each do |key|
redis.del(key)
end
end
end
Expand Down
49 changes: 33 additions & 16 deletions lib/chewy/strategy/delayed_sidekiq/scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,43 @@ class Strategy
class DelayedSidekiq
require_relative 'worker'

LUA_SCRIPT = <<~LUA
local timechunk_key = KEYS[1]
local timechunks_key = KEYS[2]
local serialize_data = ARGV[1]
local at = ARGV[2]
local ttl = tonumber(ARGV[3])
local schedule_job = false
-- Check if the 'sadd?' method is available
if redis.call('exists', 'sadd?') == 1 then
redis.call('sadd?', timechunk_key, serialize_data)
else
redis.call('sadd', timechunk_key, serialize_data)
end
-- Set expiration for timechunk_key
redis.call('expire', timechunk_key, ttl)
-- Check if timechunk_key exists in the sorted set
if not redis.call('zrank', timechunks_key, timechunk_key) then
-- Add timechunk_key to the sorted set
redis.call('zadd', timechunks_key, at, timechunk_key)
-- Set expiration for timechunks_key
redis.call('expire', timechunks_key, ttl)
schedule_job = true
end
return schedule_job
LUA

class Scheduler
DEFAULT_TTL = 60 * 60 * 24 # in seconds
DEFAULT_LATENCY = 10
DEFAULT_MARGIN = 2
DEFAULT_QUEUE = 'chewy'
KEY_PREFIX = 'chewy:delayed_sidekiq'
ALL_SETS_KEY = "#{KEY_PREFIX}:all_sets".freeze
FALLBACK_FIELDS = 'all'
FIELDS_IDS_SEPARATOR = ';'
IDS_SEPARATOR = ','
Expand Down Expand Up @@ -67,21 +97,8 @@ def initialize(type, ids, options = {})
# | chewy:delayed_sidekiq:CitiesIndex:1679347868
def postpone
::Sidekiq.redis do |redis|
# warning: Redis#sadd will always return an Integer in Redis 5.0.0. Use Redis#sadd? instead
if redis.respond_to?(:sadd?)
redis.sadd?(ALL_SETS_KEY, timechunks_key)
redis.sadd?(timechunk_key, serialize_data)
else
redis.sadd(ALL_SETS_KEY, timechunks_key)
redis.sadd(timechunk_key, serialize_data)
end

redis.expire(timechunk_key, ttl)

unless redis.zrank(timechunks_key, timechunk_key)
redis.zadd(timechunks_key, at, timechunk_key)
redis.expire(timechunks_key, ttl)

# do the redis stuff in a single command to avoid concurrency issues
if redis.eval(LUA_SCRIPT, keys: [timechunk_key, timechunks_key], argv: [serialize_data, at, ttl])
::Sidekiq::Client.push(
'queue' => sidekiq_queue,
'at' => at + margin,
Expand Down
36 changes: 30 additions & 6 deletions lib/chewy/strategy/delayed_sidekiq/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,40 @@ class DelayedSidekiq
class Worker
include ::Sidekiq::Worker

LUA_SCRIPT = <<~LUA
local type = ARGV[1]
local score = tonumber(ARGV[2])
local prefix = ARGV[3]
local timechunks_key = prefix .. ":" .. type .. ":timechunks"
-- Get timechunk_keys with scores less than or equal to the specified score
local timechunk_keys = redis.call('zrangebyscore', timechunks_key, '-inf', score)
-- Get all members from the sets associated with the timechunk_keys
local members = {}
for _, timechunk_key in ipairs(timechunk_keys) do
local set_members = redis.call('smembers', timechunk_key)
for _, member in ipairs(set_members) do
table.insert(members, member)
end
end
-- Remove timechunk_keys and their associated sets
for _, timechunk_key in ipairs(timechunk_keys) do
redis.call('del', timechunk_key)
end
-- Remove timechunks with scores less than or equal to the specified score
redis.call('zremrangebyscore', timechunks_key, '-inf', score)
return members
LUA

def perform(type, score, options = {})
options[:refresh] = !Chewy.disable_refresh_async if Chewy.disable_refresh_async

::Sidekiq.redis do |redis|
timechunks_key = "#{Scheduler::KEY_PREFIX}:#{type}:timechunks"
timechunk_keys = redis.zrangebyscore(timechunks_key, -1, score)
members = timechunk_keys.flat_map { |timechunk_key| redis.smembers(timechunk_key) }.compact
members = redis.eval(LUA_SCRIPT, keys: [], argv: [type, score, Scheduler::KEY_PREFIX])

# extract ids and fields & do the reset of records
ids, fields = extract_ids_and_fields(members)
Expand All @@ -22,9 +49,6 @@ def perform(type, score, options = {})
index.strategy_config.delayed_sidekiq.reindex_wrapper.call do
options.any? ? index.import!(ids, **options) : index.import!(ids)
end

redis.del(timechunk_keys)
redis.zremrangebyscore(timechunks_key, -1, score)
end
end

Expand Down
9 changes: 5 additions & 4 deletions spec/chewy/strategy/delayed_sidekiq_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,18 @@

if defined?(Sidekiq)
require 'sidekiq/testing'
require 'mock_redis'
require 'redis'

describe Chewy::Strategy::DelayedSidekiq do
around do |example|
Chewy.strategy(:bypass) { example.run }
end

before do
redis = MockRedis.new
redis = Redis.new
allow(Sidekiq).to receive(:redis).and_yield(redis)
Sidekiq::Worker.clear_all
described_class.clear_timechunks!
end

before do
Expand Down Expand Up @@ -115,7 +116,7 @@
context 'one call with update_fields another one without update_fields' do
it 'does reindex of all fields' do
Timecop.freeze do
expect(CitiesIndex).to receive(:import!).with([other_city.id, city.id]).once
expect(CitiesIndex).to receive(:import!).with([city.id, other_city.id]).once
scheduler = Chewy::Strategy::DelayedSidekiq::Scheduler.new(CitiesIndex, [city.id], update_fields: ['name'])
scheduler.postpone
scheduler = Chewy::Strategy::DelayedSidekiq::Scheduler.new(CitiesIndex, [other_city.id])
Expand All @@ -128,7 +129,7 @@
context 'both calls with different update fields' do
it 'deos reindex with union of fields' do
Timecop.freeze do
expect(CitiesIndex).to receive(:import!).with([other_city.id, city.id], update_fields: %w[description name]).once
expect(CitiesIndex).to receive(:import!).with([city.id, other_city.id], update_fields: %w[name description]).once
scheduler = Chewy::Strategy::DelayedSidekiq::Scheduler.new(CitiesIndex, [city.id], update_fields: ['name'])
scheduler.postpone
scheduler = Chewy::Strategy::DelayedSidekiq::Scheduler.new(CitiesIndex, [other_city.id], update_fields: ['description'])
Expand Down

0 comments on commit f6e19b8

Please sign in to comment.