Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(stalled): take in count removeOnFail option #2734

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion lib/commands/addJob-6.lua
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,10 @@ else
end
end

local opts = cmsgpack.unpack(ARGV[5])

-- Store the job.
rcall("HMSET", jobIdKey, "name", ARGV[3], "data", ARGV[4], "opts", ARGV[5], "timestamp", ARGV[6], "delay", ARGV[7], "priority", ARGV[9])
rcall("HMSET", jobIdKey, "name", ARGV[3], "data", ARGV[4], "opts", opts, "timestamp", ARGV[6], "delay", ARGV[7], "priority", ARGV[9])

-- Check if job is delayed
local delayedTimestamp = tonumber(ARGV[8])
Expand Down
53 changes: 52 additions & 1 deletion lib/commands/moveStalledJobsToWait-7.lua
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,29 @@

local rcall = redis.call

local function removeJob(jobId, baseKey)
local jobKey = baseKey .. jobId
rcall("DEL", jobKey, jobKey .. ':logs')
end

local function removeJobsByMaxAge(timestamp, maxAge, targetSet, prefix)
local start = timestamp - maxAge * 1000
local jobIds = rcall("ZREVRANGEBYSCORE", targetSet, start, "-inf")
for i, jobId in ipairs(jobIds) do
removeJob(jobId, prefix)
end
rcall("ZREMRANGEBYSCORE", targetSet, "-inf", start)
end

local function removeJobsByMaxCount(maxCount, targetSet, prefix)
local start = maxCount
local jobIds = rcall("ZREVRANGE", targetSet, start, -1)
for i, jobId in ipairs(jobIds) do
removeJob(jobId, prefix)
end
rcall("ZREMRANGEBYRANK", targetSet, 0, -(maxCount + 1))
end

local function batches(n, batchSize)
local i = 0

Expand Down Expand Up @@ -73,9 +96,37 @@ if(#stalling > 0) then
-- If this job has been stalled too many times, such as if it crashes the worker, then fail it.
local stalledCount = rcall("HINCRBY", jobKey, "stalledCounter", 1)
if(stalledCount > MAX_STALLED_JOB_COUNT) then
local rawOpts = rcall("HGET", jobKey, "opts")
local opts = cjson.decode(rawOpts)
local removeOnFailType = type(opts["removeOnFail"])
rcall("ZADD", KEYS[4], ARGV[3], jobId)
rcall("HSET", jobKey, "failedReason", "job stalled more than allowable limit")
rcall("HMSET", jobKey, "failedReason", "job stalled more than allowable limit",
"finishedOn", ARGV[3])
rcall("PUBLISH", KEYS[4], "{\"jobId\":\"" .. jobId .. "\", \"val\": \"job stalled more than maxStalledCount\"}")

if removeOnFailType == "number" then
removeJobsByMaxCount(opts["removeOnFail"],
KEYS[4], ARGV[2])
elseif removeOnFailType == "boolean" then
if opts["removeOnFail"] then
removeJob(jobId, ARGV[2])
rcall("ZREM", KEYS[4], jobId)
end
elseif removeOnFailType ~= "nil" then
local maxAge = opts["removeOnFail"]["age"]
local maxCount = opts["removeOnFail"]["count"]

if maxAge ~= nil then
removeJobsByMaxAge(ARGV[3], maxAge,
KEYS[4], ARGV[2])
end

if maxCount ~= nil and maxCount > 0 then
removeJobsByMaxCount(maxCount, KEYS[4],
ARGV[2])
end
end

table.insert(failed, jobId)
else
-- Move the job back to the wait queue, to immediately be picked up by a waiting worker.
Expand Down
2 changes: 1 addition & 1 deletion lib/scripts.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ const scripts = {
_.isUndefined(opts.customJobId) ? '' : opts.customJobId,
job.name,
job.data,
job.opts,
pack(job.opts),
job.timestamp,
job.delay,
job.delay ? job.timestamp + job.delay : 0,
Expand Down
40 changes: 40 additions & 0 deletions test/test_queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -1739,6 +1739,46 @@ describe('Queue', () => {
.catch(done);
});

it('removes failed stalled jobs that stall more than allowable stalled limit when removeOnFail is present', function(done) {
const FAILED_MESSAGE = 'job stalled more than allowable limit';
this.timeout(10000);

const queue2 = utils.buildQueue('running-stalled-job-' + uuid.v4(), {
settings: {
lockRenewTime: 2500,
lockDuration: 250,
stalledInterval: 500,
maxStalledCount: 1
}
});

let processedCount = 0;
queue2.process(job => {
processedCount++;
expect(job.data.foo).to.be.equal('bar');
return delay(1500);
});

queue2.on('completed', () => {
done(new Error('should not complete'));
});

queue2.on('failed', (job, err) => {
expect(processedCount).to.be.eql(2);
expect(job).to.be.null;
expect(err.message).to.be.eql(FAILED_MESSAGE);
done();
});

queue2
.add({ foo: 'bar' }, {removeOnFail: true})
.then(job => {
expect(job.id).to.be.ok;
expect(job.data.foo).to.be.eql('bar');
})
.catch(done);
});

it('should clear job from stalled set when job completed', done => {
const queue2 = utils.buildQueue('running-job-' + uuid.v4(), {
settings: {
Expand Down
Loading