Skip to content

Commit

Permalink
fix(stalled): take in count removeOnFail option
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf committed May 10, 2024
1 parent 7ee8f74 commit c0bc166
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 3 deletions.
4 changes: 3 additions & 1 deletion lib/commands/addJob-6.lua
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,10 @@ else
end
end

local opts = cmsgpack.unpack(ARGV[5])

-- Store the job.
rcall("HMSET", jobIdKey, "name", ARGV[3], "data", ARGV[4], "opts", ARGV[5], "timestamp", ARGV[6], "delay", ARGV[7], "priority", ARGV[9])
rcall("HMSET", jobIdKey, "name", ARGV[3], "data", ARGV[4], "opts", opts, "timestamp", ARGV[6], "delay", ARGV[7], "priority", ARGV[9])

-- Check if job is delayed
local delayedTimestamp = tonumber(ARGV[8])
Expand Down
53 changes: 52 additions & 1 deletion lib/commands/moveStalledJobsToWait-7.lua
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,29 @@

local rcall = redis.call

local function removeJob(jobId, baseKey)
local jobKey = baseKey .. jobId
rcall("DEL", jobKey, jobKey .. ':logs')
end

local function removeJobsByMaxAge(timestamp, maxAge, targetSet, prefix)
local start = timestamp - maxAge * 1000
local jobIds = rcall("ZREVRANGEBYSCORE", targetSet, start, "-inf")
for i, jobId in ipairs(jobIds) do
removeJob(jobId, prefix)
end
rcall("ZREMRANGEBYSCORE", targetSet, "-inf", start)
end

local function removeJobsByMaxCount(maxCount, targetSet, prefix)
local start = maxCount
local jobIds = rcall("ZREVRANGE", targetSet, start, -1)
for i, jobId in ipairs(jobIds) do
removeJob(jobId, prefix)
end
rcall("ZREMRANGEBYRANK", targetSet, 0, -(maxCount + 1))
end

local function batches(n, batchSize)
local i = 0

Expand Down Expand Up @@ -73,9 +96,37 @@ if(#stalling > 0) then
-- If this job has been stalled too many times, such as if it crashes the worker, then fail it.
local stalledCount = rcall("HINCRBY", jobKey, "stalledCounter", 1)
if(stalledCount > MAX_STALLED_JOB_COUNT) then
local rawOpts = rcall("HGET", jobKey, "opts")
local opts = cjson.decode(rawOpts)
local removeOnFailType = type(opts["removeOnFail"])
rcall("ZADD", KEYS[4], ARGV[3], jobId)
rcall("HSET", jobKey, "failedReason", "job stalled more than allowable limit")
rcall("HMSET", jobKey, "failedReason", "job stalled more than allowable limit",
"finishedOn", ARGV[3])
rcall("PUBLISH", KEYS[4], "{\"jobId\":\"" .. jobId .. "\", \"val\": \"job stalled more than maxStalledCount\"}")

if removeOnFailType == "number" then
removeJobsByMaxCount(opts["removeOnFail"],
KEYS[4], ARGV[2])
elseif removeOnFailType == "boolean" then
if opts["removeOnFail"] then
removeJob(jobId, ARGV[2])
rcall("ZREM", KEYS[4], jobId)
end
elseif removeOnFailType ~= "nil" then
local maxAge = opts["removeOnFail"]["age"]
local maxCount = opts["removeOnFail"]["count"]

if maxAge ~= nil then
removeJobsByMaxAge(ARGV[3], maxAge,
KEYS[4], ARGV[2])
end

if maxCount ~= nil and maxCount > 0 then
removeJobsByMaxCount(maxCount, KEYS[4],
ARGV[2])
end
end

table.insert(failed, jobId)
else
-- Move the job back to the wait queue, to immediately be picked up by a waiting worker.
Expand Down
2 changes: 1 addition & 1 deletion lib/scripts.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ const scripts = {
_.isUndefined(opts.customJobId) ? '' : opts.customJobId,
job.name,
job.data,
job.opts,
pack(job.opts),
job.timestamp,
job.delay,
job.delay ? job.timestamp + job.delay : 0,
Expand Down
40 changes: 40 additions & 0 deletions test/test_queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -1739,6 +1739,46 @@ describe('Queue', () => {
.catch(done);
});

it('removes failed stalled jobs that stall more than allowable stalled limit when removeOnFail is present', function(done) {
const FAILED_MESSAGE = 'job stalled more than allowable limit';
this.timeout(10000);

const queue2 = utils.buildQueue('running-stalled-job-' + uuid.v4(), {
settings: {
lockRenewTime: 2500,
lockDuration: 250,
stalledInterval: 500,
maxStalledCount: 1
}
});

let processedCount = 0;
queue2.process(job => {
processedCount++;
expect(job.data.foo).to.be.equal('bar');
return delay(1500);
});

queue2.on('completed', () => {
done(new Error('should not complete'));
});

queue2.on('failed', (job, err) => {
expect(processedCount).to.be.eql(2);
expect(job).to.be.null;
expect(err.message).to.be.eql(FAILED_MESSAGE);
done();
});

queue2
.add({ foo: 'bar' }, {removeOnFail: true})
.then(job => {
expect(job.id).to.be.ok;
expect(job.data.foo).to.be.eql('bar');
})
.catch(done);
});

it('should clear job from stalled set when job completed', done => {
const queue2 = utils.buildQueue('running-job-' + uuid.v4(), {
settings: {
Expand Down

0 comments on commit c0bc166

Please sign in to comment.