diff --git a/lib/commands/addJob-6.lua b/lib/commands/addJob-6.lua index 59158dc44..9288f0b67 100644 --- a/lib/commands/addJob-6.lua +++ b/lib/commands/addJob-6.lua @@ -51,8 +51,10 @@ else end end +local opts = cmsgpack.unpack(ARGV[5]) + -- Store the job. -rcall("HMSET", jobIdKey, "name", ARGV[3], "data", ARGV[4], "opts", ARGV[5], "timestamp", ARGV[6], "delay", ARGV[7], "priority", ARGV[9]) +rcall("HMSET", jobIdKey, "name", ARGV[3], "data", ARGV[4], "opts", opts, "timestamp", ARGV[6], "delay", ARGV[7], "priority", ARGV[9]) -- Check if job is delayed local delayedTimestamp = tonumber(ARGV[8]) diff --git a/lib/commands/moveStalledJobsToWait-7.lua b/lib/commands/moveStalledJobsToWait-7.lua index 8167b1d0e..702176ae5 100644 --- a/lib/commands/moveStalledJobsToWait-7.lua +++ b/lib/commands/moveStalledJobsToWait-7.lua @@ -22,6 +22,29 @@ local rcall = redis.call +local function removeJob(jobId, baseKey) + local jobKey = baseKey .. jobId + rcall("DEL", jobKey, jobKey .. ':logs') +end + +local function removeJobsByMaxAge(timestamp, maxAge, targetSet, prefix) + local start = timestamp - maxAge * 1000 + local jobIds = rcall("ZREVRANGEBYSCORE", targetSet, start, "-inf") + for i, jobId in ipairs(jobIds) do + removeJob(jobId, prefix) + end + rcall("ZREMRANGEBYSCORE", targetSet, "-inf", start) +end + +local function removeJobsByMaxCount(maxCount, targetSet, prefix) + local start = maxCount + local jobIds = rcall("ZREVRANGE", targetSet, start, -1) + for i, jobId in ipairs(jobIds) do + removeJob(jobId, prefix) + end + rcall("ZREMRANGEBYRANK", targetSet, 0, -(maxCount + 1)) +end + local function batches(n, batchSize) local i = 0 @@ -73,9 +96,37 @@ if(#stalling > 0) then -- If this job has been stalled too many times, such as if it crashes the worker, then fail it. local stalledCount = rcall("HINCRBY", jobKey, "stalledCounter", 1) if(stalledCount > MAX_STALLED_JOB_COUNT) then + local rawOpts = rcall("HGET", jobKey, "opts") + local opts = cjson.decode(rawOpts) + local removeOnFailType = type(opts["removeOnFail"]) rcall("ZADD", KEYS[4], ARGV[3], jobId) - rcall("HSET", jobKey, "failedReason", "job stalled more than allowable limit") + rcall("HMSET", jobKey, "failedReason", "job stalled more than allowable limit", + "finishedOn", ARGV[3]) rcall("PUBLISH", KEYS[4], "{\"jobId\":\"" .. jobId .. "\", \"val\": \"job stalled more than maxStalledCount\"}") + + if removeOnFailType == "number" then + removeJobsByMaxCount(opts["removeOnFail"], + KEYS[4], ARGV[2]) + elseif removeOnFailType == "boolean" then + if opts["removeOnFail"] then + removeJob(jobId, ARGV[2]) + rcall("ZREM", KEYS[4], jobId) + end + elseif removeOnFailType ~= "nil" then + local maxAge = opts["removeOnFail"]["age"] + local maxCount = opts["removeOnFail"]["count"] + + if maxAge ~= nil then + removeJobsByMaxAge(ARGV[3], maxAge, + KEYS[4], ARGV[2]) + end + + if maxCount ~= nil and maxCount > 0 then + removeJobsByMaxCount(maxCount, KEYS[4], + ARGV[2]) + end + end + table.insert(failed, jobId) else -- Move the job back to the wait queue, to immediately be picked up by a waiting worker. diff --git a/lib/scripts.js b/lib/scripts.js index 267d9c416..9175401bb 100644 --- a/lib/scripts.js +++ b/lib/scripts.js @@ -37,7 +37,7 @@ const scripts = { _.isUndefined(opts.customJobId) ? '' : opts.customJobId, job.name, job.data, - job.opts, + pack(job.opts), job.timestamp, job.delay, job.delay ? job.timestamp + job.delay : 0, diff --git a/test/test_queue.js b/test/test_queue.js index 22061fb22..092eb5fdc 100644 --- a/test/test_queue.js +++ b/test/test_queue.js @@ -1739,6 +1739,46 @@ describe('Queue', () => { .catch(done); }); + it('removes failed stalled jobs that stall more than allowable stalled limit when removeOnFail is present', function(done) { + const FAILED_MESSAGE = 'job stalled more than allowable limit'; + this.timeout(10000); + + const queue2 = utils.buildQueue('running-stalled-job-' + uuid.v4(), { + settings: { + lockRenewTime: 2500, + lockDuration: 250, + stalledInterval: 500, + maxStalledCount: 1 + } + }); + + let processedCount = 0; + queue2.process(job => { + processedCount++; + expect(job.data.foo).to.be.equal('bar'); + return delay(1500); + }); + + queue2.on('completed', () => { + done(new Error('should not complete')); + }); + + queue2.on('failed', (job, err) => { + expect(processedCount).to.be.eql(2); + expect(job).to.be.null; + expect(err.message).to.be.eql(FAILED_MESSAGE); + done(); + }); + + queue2 + .add({ foo: 'bar' }, {removeOnFail: true}) + .then(job => { + expect(job.id).to.be.ok; + expect(job.data.foo).to.be.eql('bar'); + }) + .catch(done); + }); + it('should clear job from stalled set when job completed', done => { const queue2 = utils.buildQueue('running-job-' + uuid.v4(), { settings: {