From f6e19b88515cfc1b28055f364437cc8ff09d974c Mon Sep 17 00:00:00 2001 From: Viktor Sych Date: Tue, 19 Dec 2023 00:08:48 +0200 Subject: [PATCH] [fix] delayed_sidekiq race condition --- CHANGELOG.md | 1 + Gemfile | 1 + README.md | 9 ++-- lib/chewy/strategy/delayed_sidekiq.rb | 8 +-- .../strategy/delayed_sidekiq/scheduler.rb | 49 +++++++++++++------ lib/chewy/strategy/delayed_sidekiq/worker.rb | 36 +++++++++++--- spec/chewy/strategy/delayed_sidekiq_spec.rb | 9 ++-- 7 files changed, 80 insertions(+), 33 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 073d61140..c4fb7bdd2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ ### Changes ### Bugs Fixed +* [#937](https://github.com/toptal/chewy/pull/937): Fix for race condition while using the `delayed_sidekiq` strategy. Also, fix for Redis bloating in case of reindexing error ([@skcc321](https://github.com/skcc321)) ## 7.5.1 (2024-01-30) diff --git a/Gemfile b/Gemfile index e838bbb9d..2c146a9ed 100644 --- a/Gemfile +++ b/Gemfile @@ -8,6 +8,7 @@ gem 'sidekiq', require: false gem 'kaminari-core', require: false gem 'parallel', require: false +gem 'redis', require: false gem 'ruby-progressbar', require: false gem 'guard' diff --git a/README.md b/README.md index 4082fd038..23c13993e 100644 --- a/README.md +++ b/README.md @@ -776,9 +776,12 @@ Chewy.settings[:sidekiq] = {queue: :low} #### `:delayed_sidekiq` -It accumulates ids of records to be reindexed during the latency window in redis and then does the reindexing of all accumulated records at once. -The strategy is very useful in case of frequently mutated records. -It supports `update_fields` option, so it will try to select just enough data from the DB +It accumulates IDs of records to be reindexed during the latency window in Redis and then performs the reindexing of all accumulated records at once. +This strategy is very useful in the case of frequently mutated records. +It supports the `update_fields` option, so it will attempt to select just enough data from the database. + +Keep in mind, this strategy does not guarantee reindexing in the event of Sidekiq worker termination or an error during the reindexing phase. +This behavior is intentional to prevent continuous growth of Redis db. There are three options that can be defined in the index: ```ruby diff --git a/lib/chewy/strategy/delayed_sidekiq.rb b/lib/chewy/strategy/delayed_sidekiq.rb index 0bdd25c88..23b373b2d 100644 --- a/lib/chewy/strategy/delayed_sidekiq.rb +++ b/lib/chewy/strategy/delayed_sidekiq.rb @@ -9,11 +9,11 @@ class DelayedSidekiq < Sidekiq # leak and potential flaky tests. def self.clear_timechunks! ::Sidekiq.redis do |redis| - timechunk_sets = redis.smembers(Chewy::Strategy::DelayedSidekiq::Scheduler::ALL_SETS_KEY) - break if timechunk_sets.empty? + keys_to_delete = redis.keys("#{Scheduler::KEY_PREFIX}*") - redis.pipelined do |pipeline| - timechunk_sets.each { |set| pipeline.del(set) } + # Delete keys one by one + keys_to_delete.each do |key| + redis.del(key) end end end diff --git a/lib/chewy/strategy/delayed_sidekiq/scheduler.rb b/lib/chewy/strategy/delayed_sidekiq/scheduler.rb index f1010a3ee..d931c338b 100644 --- a/lib/chewy/strategy/delayed_sidekiq/scheduler.rb +++ b/lib/chewy/strategy/delayed_sidekiq/scheduler.rb @@ -12,13 +12,43 @@ class Strategy class DelayedSidekiq require_relative 'worker' + LUA_SCRIPT = <<~LUA + local timechunk_key = KEYS[1] + local timechunks_key = KEYS[2] + local serialize_data = ARGV[1] + local at = ARGV[2] + local ttl = tonumber(ARGV[3]) + + local schedule_job = false + + -- Check if the 'sadd?' method is available + if redis.call('exists', 'sadd?') == 1 then + redis.call('sadd?', timechunk_key, serialize_data) + else + redis.call('sadd', timechunk_key, serialize_data) + end + + -- Set expiration for timechunk_key + redis.call('expire', timechunk_key, ttl) + + -- Check if timechunk_key exists in the sorted set + if not redis.call('zrank', timechunks_key, timechunk_key) then + -- Add timechunk_key to the sorted set + redis.call('zadd', timechunks_key, at, timechunk_key) + -- Set expiration for timechunks_key + redis.call('expire', timechunks_key, ttl) + schedule_job = true + end + + return schedule_job + LUA + class Scheduler DEFAULT_TTL = 60 * 60 * 24 # in seconds DEFAULT_LATENCY = 10 DEFAULT_MARGIN = 2 DEFAULT_QUEUE = 'chewy' KEY_PREFIX = 'chewy:delayed_sidekiq' - ALL_SETS_KEY = "#{KEY_PREFIX}:all_sets".freeze FALLBACK_FIELDS = 'all' FIELDS_IDS_SEPARATOR = ';' IDS_SEPARATOR = ',' @@ -67,21 +97,8 @@ def initialize(type, ids, options = {}) # | chewy:delayed_sidekiq:CitiesIndex:1679347868 def postpone ::Sidekiq.redis do |redis| - # warning: Redis#sadd will always return an Integer in Redis 5.0.0. Use Redis#sadd? instead - if redis.respond_to?(:sadd?) - redis.sadd?(ALL_SETS_KEY, timechunks_key) - redis.sadd?(timechunk_key, serialize_data) - else - redis.sadd(ALL_SETS_KEY, timechunks_key) - redis.sadd(timechunk_key, serialize_data) - end - - redis.expire(timechunk_key, ttl) - - unless redis.zrank(timechunks_key, timechunk_key) - redis.zadd(timechunks_key, at, timechunk_key) - redis.expire(timechunks_key, ttl) - + # do the redis stuff in a single command to avoid concurrency issues + if redis.eval(LUA_SCRIPT, keys: [timechunk_key, timechunks_key], argv: [serialize_data, at, ttl]) ::Sidekiq::Client.push( 'queue' => sidekiq_queue, 'at' => at + margin, diff --git a/lib/chewy/strategy/delayed_sidekiq/worker.rb b/lib/chewy/strategy/delayed_sidekiq/worker.rb index 4d17a4cd1..af5fa793d 100644 --- a/lib/chewy/strategy/delayed_sidekiq/worker.rb +++ b/lib/chewy/strategy/delayed_sidekiq/worker.rb @@ -6,13 +6,40 @@ class DelayedSidekiq class Worker include ::Sidekiq::Worker + LUA_SCRIPT = <<~LUA + local type = ARGV[1] + local score = tonumber(ARGV[2]) + local prefix = ARGV[3] + local timechunks_key = prefix .. ":" .. type .. ":timechunks" + + -- Get timechunk_keys with scores less than or equal to the specified score + local timechunk_keys = redis.call('zrangebyscore', timechunks_key, '-inf', score) + + -- Get all members from the sets associated with the timechunk_keys + local members = {} + for _, timechunk_key in ipairs(timechunk_keys) do + local set_members = redis.call('smembers', timechunk_key) + for _, member in ipairs(set_members) do + table.insert(members, member) + end + end + + -- Remove timechunk_keys and their associated sets + for _, timechunk_key in ipairs(timechunk_keys) do + redis.call('del', timechunk_key) + end + + -- Remove timechunks with scores less than or equal to the specified score + redis.call('zremrangebyscore', timechunks_key, '-inf', score) + + return members + LUA + def perform(type, score, options = {}) options[:refresh] = !Chewy.disable_refresh_async if Chewy.disable_refresh_async ::Sidekiq.redis do |redis| - timechunks_key = "#{Scheduler::KEY_PREFIX}:#{type}:timechunks" - timechunk_keys = redis.zrangebyscore(timechunks_key, -1, score) - members = timechunk_keys.flat_map { |timechunk_key| redis.smembers(timechunk_key) }.compact + members = redis.eval(LUA_SCRIPT, keys: [], argv: [type, score, Scheduler::KEY_PREFIX]) # extract ids and fields & do the reset of records ids, fields = extract_ids_and_fields(members) @@ -22,9 +49,6 @@ def perform(type, score, options = {}) index.strategy_config.delayed_sidekiq.reindex_wrapper.call do options.any? ? index.import!(ids, **options) : index.import!(ids) end - - redis.del(timechunk_keys) - redis.zremrangebyscore(timechunks_key, -1, score) end end diff --git a/spec/chewy/strategy/delayed_sidekiq_spec.rb b/spec/chewy/strategy/delayed_sidekiq_spec.rb index c0d43e576..256eb7511 100644 --- a/spec/chewy/strategy/delayed_sidekiq_spec.rb +++ b/spec/chewy/strategy/delayed_sidekiq_spec.rb @@ -2,7 +2,7 @@ if defined?(Sidekiq) require 'sidekiq/testing' - require 'mock_redis' + require 'redis' describe Chewy::Strategy::DelayedSidekiq do around do |example| @@ -10,9 +10,10 @@ end before do - redis = MockRedis.new + redis = Redis.new allow(Sidekiq).to receive(:redis).and_yield(redis) Sidekiq::Worker.clear_all + described_class.clear_timechunks! end before do @@ -115,7 +116,7 @@ context 'one call with update_fields another one without update_fields' do it 'does reindex of all fields' do Timecop.freeze do - expect(CitiesIndex).to receive(:import!).with([other_city.id, city.id]).once + expect(CitiesIndex).to receive(:import!).with([city.id, other_city.id]).once scheduler = Chewy::Strategy::DelayedSidekiq::Scheduler.new(CitiesIndex, [city.id], update_fields: ['name']) scheduler.postpone scheduler = Chewy::Strategy::DelayedSidekiq::Scheduler.new(CitiesIndex, [other_city.id]) @@ -128,7 +129,7 @@ context 'both calls with different update fields' do it 'deos reindex with union of fields' do Timecop.freeze do - expect(CitiesIndex).to receive(:import!).with([other_city.id, city.id], update_fields: %w[description name]).once + expect(CitiesIndex).to receive(:import!).with([city.id, other_city.id], update_fields: %w[name description]).once scheduler = Chewy::Strategy::DelayedSidekiq::Scheduler.new(CitiesIndex, [city.id], update_fields: ['name']) scheduler.postpone scheduler = Chewy::Strategy::DelayedSidekiq::Scheduler.new(CitiesIndex, [other_city.id], update_fields: ['description'])