diff --git a/index.js b/index.js index a8053b2..1a55fc2 100644 --- a/index.js +++ b/index.js @@ -2,283 +2,16 @@ const Executor = require('screwdriver-executor-base'); const logger = require('screwdriver-logger'); -const Redis = require('ioredis'); -const Resque = require('node-resque'); -const fuses = require('circuit-fuses'); const requestretry = require('requestretry'); -const hoek = require('hoek'); -const cron = require('./lib/cron'); -const timeOutOfWindows = require('./lib/freezeWindows').timeOutOfWindows; -const Breaker = fuses.breaker; -const FuseBox = fuses.box; -const EXPIRE_TIME = 1800; // 30 mins const RETRY_LIMIT = 3; const RETRY_DELAY = 5; -const DEFAULT_BUILD_TIMEOUT = 90; class ExecutorQueue extends Executor { - /** - * Constructs a router for different Executor strategies. - * @method constructor - * @param {Object} config Object with executor and ecosystem - * @param {Object} config.redisConnection Connection details for redis - * @param {Object} config.pipelineFactory Pipeline Factory instance - * @param {String} [config.prefix] Prefix for queue name - * @param {Object} [config.breaker] Optional breaker config - */ constructor(config = {}) { - if (!config.redisConnection) { - throw new Error('No redis connection passed in'); - } - if (!config.pipelineFactory) { - throw new Error('No PipelineFactory instance passed in'); - } - - const breaker = Object.assign({}, config.breaker || {}); - super(); - - this.prefix = config.prefix || ''; - this.buildQueue = `${this.prefix}builds`; - this.periodicBuildQueue = `${this.prefix}periodicBuilds`; - this.frozenBuildQueue = `${this.prefix}frozenBuilds`; - this.buildConfigTable = `${this.prefix}buildConfigs`; - this.periodicBuildTable = `${this.prefix}periodicBuildConfigs`; - this.frozenBuildTable = `${this.prefix}frozenBuildConfigs`; - this.tokenGen = null; - this.userTokenGen = null; - this.pipelineFactory = config.pipelineFactory; - this.timeoutQueue = `${this.prefix}timeoutConfigs`; - - const redisConnection = Object.assign({}, config.redisConnection, { pkg: 'ioredis' }); - - this.redis = new Redis( - redisConnection.port, - redisConnection.host, - redisConnection.options - ); - - // eslint-disable-next-line new-cap - this.queue = new Resque.Queue({ connection: redisConnection }); - this.queueBreaker = new Breaker((funcName, ...args) => { - const callback = args.pop(); - - this.queue[funcName](...args) - .then((...results) => callback(null, ...results)) - .catch(callback); - }, breaker); - this.redisBreaker = new Breaker((funcName, ...args) => - // Use the queue's built-in connection to send redis commands instead of instantiating a new one - this.redis[funcName](...args), breaker); this.requestRetryStrategy = (err, response) => !!err || (response.statusCode !== 201 && response.statusCode !== 200); - this.requestRetryStrategyPostEvent = (err, response) => - !!err || (response.statusCode !== 201 && response.statusCode !== 200 - && response.statusCode !== 404); // postEvent can return 404 if no job to start - this.fuseBox = new FuseBox(); - this.fuseBox.addFuse(this.queueBreaker); - this.fuseBox.addFuse(this.redisBreaker); - - const retryOptions = { - plugins: ['Retry'], - pluginOptions: { - Retry: { - retryLimit: RETRY_LIMIT, - retryDelay: RETRY_DELAY - } - } - }; - // Jobs object to register the worker with - const jobs = { - startDelayed: Object.assign({ - perform: async (jobConfig) => { - try { - const fullConfig = await this.redisBreaker - .runCommand('hget', this.periodicBuildTable, jobConfig.jobId); - - return await this.startPeriodic( - Object.assign(JSON.parse(fullConfig), { triggerBuild: true })); - } catch (err) { - logger.error('err in startDelayed job: ', err); - throw err; - } - } - }, retryOptions), - startFrozen: Object.assign({ - perform: async (jobConfig) => { - try { - const fullConfig = await this.redisBreaker - .runCommand('hget', this.frozenBuildTable, jobConfig.jobId); - - return await this.startFrozen(JSON.parse(fullConfig)); - } catch (err) { - logger.error('err in startFrozen job: ', err); - throw err; - } - } - }, retryOptions) - }; - - // eslint-disable-next-line new-cap - this.multiWorker = new Resque.MultiWorker({ - connection: redisConnection, - queues: [this.periodicBuildQueue, this.frozenBuildQueue], - minTaskProcessors: 1, - maxTaskProcessors: 10, - checkTimeout: 1000, - maxEventLoopDelay: 10, - toDisconnectProcessors: true - }, jobs); - // eslint-disable-next-line new-cap - this.scheduler = new Resque.Scheduler({ connection: redisConnection }); - - this.multiWorker.on('start', workerId => - logger.info(`worker[${workerId}] started`)); - this.multiWorker.on('end', workerId => - logger.info(`worker[${workerId}] ended`)); - this.multiWorker.on('cleaning_worker', (workerId, worker, pid) => - logger.info(`cleaning old worker ${worker} pid ${pid}`)); - this.multiWorker.on('job', (workerId, queue, job) => - logger.info(`worker[${workerId}] working job ${queue} ${JSON.stringify(job)}`)); - this.multiWorker.on('reEnqueue', (workerId, queue, job, plugin) => - // eslint-disable-next-line max-len - logger.info(`worker[${workerId}] reEnqueue job (${plugin}) ${queue} ${JSON.stringify(job)}`)); - this.multiWorker.on('success', (workerId, queue, job, result) => - // eslint-disable-next-line max-len - logger.info(`worker[${workerId}] job success ${queue} ${JSON.stringify(job)} >> ${result}`)); - this.multiWorker.on('failure', (workerId, queue, job, failure) => - // eslint-disable-next-line max-len - logger.info(`worker[${workerId}] job failure ${queue} ${JSON.stringify(job)} >> ${failure}`)); - this.multiWorker.on('error', (workerId, queue, job, error) => - logger.error(`worker[${workerId}] error ${queue} ${JSON.stringify(job)} >> ${error}`)); - - // multiWorker emitters - this.multiWorker.on('internalError', error => - logger.error(error)); - - this.scheduler.on('start', () => - logger.info('scheduler started')); - this.scheduler.on('end', () => - logger.info('scheduler ended')); - this.scheduler.on('master', state => - logger.info(`scheduler became master ${state}`)); - this.scheduler.on('error', error => - logger.info(`scheduler error >> ${error}`)); - this.scheduler.on('workingTimestamp', timestamp => - logger.info(`scheduler working timestamp ${timestamp}`)); - this.scheduler.on('transferredJob', (timestamp, job) => - logger.info(`scheduler enqueuing job timestamp >> ${JSON.stringify(job)}`)); - - this.multiWorker.start(); - this.scheduler.connect().then(() => this.scheduler.start()); - } - - /** - * Cleanup any reladed processing - */ - async cleanUp() { - try { - await this.multiWorker.end(); - await this.scheduler.end(); - await this.queue.end(); - } catch (err) { - logger.error(`failed to end executor queue: ${err}`); - } - } - - /** - * Posts a new build event to the API - * @method postBuildEvent - * @param {Object} config Configuration - * @param {Number} [config.eventId] Optional Parent event ID (optional) - * @param {Number} config.buildId Freezed build id - * @param {Object} config.pipeline Pipeline of the job - * @param {Object} config.job Job object to create periodic builds for - * @param {String} config.apiUri Base URL of the Screwdriver API - * @return {Promise} - */ - async postBuildEvent({ pipeline, job, apiUri, eventId, buildId, causeMessage }) { - const pipelineInstance = await this.pipelineFactory.get(pipeline.id); - const admin = await pipelineInstance.getFirstAdmin(); - const jwt = this.userTokenGen(admin.username, {}, pipeline.scmContext); - - logger.info(`POST event for pipeline ${pipeline.id}:${job.name}` + - `using user ${admin.username}`); - const options = { - url: `${apiUri}/v4/events`, - method: 'POST', - headers: { - Authorization: `Bearer ${jwt}`, - 'Content-Type': 'application/json' - }, - json: true, - body: { - pipelineId: pipeline.id, - startFrom: job.name, - creator: { - name: 'Screwdriver scheduler', - username: 'sd:scheduler' - }, - causeMessage: causeMessage || 'Automatically started by scheduler' - }, - maxAttempts: RETRY_LIMIT, - retryDelay: RETRY_DELAY * 1000, // in ms - retryStrategy: this.requestRetryStrategyPostEvent - }; - - if (eventId) { - options.body.parentEventId = eventId; - } - - if (buildId) { - options.body.buildId = buildId; - } - - return new Promise((resolve, reject) => { - requestretry(options, (err, response) => { - if (!err && response.statusCode === 201) { - return resolve(response); - } - if (response && response.statusCode !== 201) { - return reject(JSON.stringify(response.body)); - } - - return reject(err); - }); - }); - } - - async updateBuildStatus({ buildId, status, statusMessage, token, apiUri }) { - const options = { - json: true, - method: 'PUT', - uri: `${apiUri}/v4/builds/${buildId}`, - body: { - status, - statusMessage - }, - headers: { - Authorization: `Bearer ${token}`, - 'Content-Type': 'application/json' - }, - maxAttempts: RETRY_LIMIT, - retryDelay: RETRY_DELAY * 1000, // in ms - retryStrategy: this.requestRetryStrategy - }; - - return new Promise((resolve, reject) => { - requestretry(options, (err, response) => { - if (!err && response.statusCode === 200) { - return resolve(response); - } - - if (response && response.statusCode !== 200) { - return reject(JSON.stringify(response.body)); - } - - return reject(err); - }); - }); + this.queueUri = config.ecosystem.queueUri; } /** @@ -294,70 +27,11 @@ class ExecutorQueue extends Executor { * @return {Promise} */ async _startPeriodic(config) { - const { pipeline, job, tokenGen, isUpdate, triggerBuild } = config; - // eslint-disable-next-line max-len - const buildCron = hoek.reach(job, 'permutations>0>annotations>screwdriver.cd/buildPeriodically', - { separator: '>' }); - - // Save tokenGen to current executor object so we can access it in postBuildEvent - if (!this.userTokenGen) { - this.userTokenGen = tokenGen; - } - - if (isUpdate) { - // eslint-disable-next-line no-underscore-dangle - await this._stopPeriodic({ jobId: job.id }); - } - - if (triggerBuild) { - config.causeMessage = 'Started by periodic build scheduler'; - - // Even if post event failed for this event after retry, we should still enqueue the next event - try { - await this.postBuildEvent(config); - } catch (err) { - logger.error('periodic builds: failed to post build event for job' - + `${job.id} in pipeline ${pipeline.id}: ${err}`); - } - } - - if (buildCron && job.state === 'ENABLED' && !job.archived) { - await this.connect(); - - const next = cron.next(cron.transform(buildCron, job.id)); - - // Store the config in redis - await this.redisBreaker.runCommand('hset', this.periodicBuildTable, - job.id, JSON.stringify(Object.assign(config, { - isUpdate: false, - triggerBuild: false - }))); - - // Note: arguments to enqueueAt are [timestamp, queue name, job name, array of args] - let shouldRetry = false; - - try { - await this.queue.enqueueAt(next, this.periodicBuildQueue, - 'startDelayed', [{ jobId: job.id }]); - } catch (err) { - // Error thrown by node-resque if there is duplicate: https://github.com/taskrabbit/node-resque/blob/master/lib/queue.js#L65 - // eslint-disable-next-line max-len - if (err && err.message !== 'Job already enqueued at this time with same arguments') { - shouldRetry = true; - } - } - if (!shouldRetry) { - return Promise.resolve(); - } - try { - await this.queueBreaker.runCommand('enqueueAt', next, - this.periodicBuildQueue, 'startDelayed', [{ jobId: job.id }]); - } catch (err) { - logger.error(`failed to add to delayed queue for job ${job.id}: ${err}`); - } - } - - return Promise.resolve(); + return this.api(config, { + path: '/v1/queue/message?type=periodic', + method: 'POST', + body: config + }); } /** @@ -368,13 +42,11 @@ class ExecutorQueue extends Executor { * @return {Promise} */ async _stopPeriodic(config) { - await this.connect(); - - await this.queueBreaker.runCommand('delDelayed', this.periodicBuildQueue, 'startDelayed', [{ - jobId: config.jobId - }]); - - return this.redisBreaker.runCommand('hdel', this.periodicBuildTable, config.jobId); + return this.api(config, { + path: '/v1/queue/message?type=periodic', + method: 'DELETE', + body: config + }); } /** @@ -384,50 +56,31 @@ class ExecutorQueue extends Executor { * @return {Promise} */ async _startFrozen(config) { - const newConfig = { - job: { - name: config.jobName - }, - causeMessage: 'Started by freeze window scheduler' - }; - - if (config.jobState === 'DISABLED' || config.jobArchived === true) { - logger.error(`job ${config.jobName} is disabled or archived`); - - return Promise.resolve(); - } - - Object.assign(newConfig, config); - - return this.postBuildEvent(newConfig) - .catch((err) => { - logger.error('frozen builds: failed to post build event for job' - + `${config.jobId}:${config.pipeline.id} ${err}`); - - return Promise.resolve(); - }); + return this.api(config, { + path: '/v1/queue/message?type=frozen', + method: 'POST', + body: config + }); } /** * Stops a previously enqueued frozen build in an executor - * @async stopFrozen + * @async _stopFrozen * @param {Object} config Configuration * @param {Integer} config.jobId ID of the job with frozen builds * @return {Promise} */ async _stopFrozen(config) { - await this.connect(); - - await this.queueBreaker.runCommand('delDelayed', this.frozenBuildQueue, 'startFrozen', [{ - jobId: config.jobId - }]); - - return this.redisBreaker.runCommand('hdel', this.frozenBuildTable, config.jobId); + return this.api(config, { + path: '/v1/queue/message?type=frozen', + method: 'DELETE', + body: config + }); } /** * Adds start time of a build to timeout queue - * @method status + * @method _startTimer * @param {Object} config Configuration * @param {String} config.buildId Unique ID for a build * @param {String} config.startTime Start time fo build @@ -435,66 +88,26 @@ class ExecutorQueue extends Executor { * @return {Promise} */ async _startTimer(config) { - try { - await this.connect(); - const { - buildId, - jobId, - buildStatus, - startTime - } = config; - - if (buildStatus === 'RUNNING') { - const buildTimeout = hoek.reach(config, 'annotations>screwdriver.cd/timeout', - { separator: '>' }); - const timeout = parseInt(buildTimeout || DEFAULT_BUILD_TIMEOUT, 10); - - const data = await this.redisBreaker.runCommand('hget', this.timeoutQueue, buildId); - - if (data) { - return Promise.resolve(); - } - - return await this.redisBreaker.runCommand('hset', this.timeoutQueue, buildId, - JSON.stringify({ - jobId, - startTime, - timeout - })); - } - - return Promise.resolve(); - } catch (err) { - logger.error(`Error occurred while saving to timeout queue ${err}`); - - return Promise.resolve(); - } + return this.api(config, { + path: '/v1/queue/message?type=timer', + method: 'POST', + body: config + }); } /** * Removes start time info key from timeout queue - * @method status + * @method _stopTimer * @param {Object} config Configuration * @param {String} config.buildId Unique ID for a build * @return {Promise} */ async _stopTimer(config) { - try { - await this.connect(); - - const data = await this.redisBreaker.runCommand('hget', this.timeoutQueue, - config.buildId); - - if (!data) { - return Promise.resolve(); - } - - return await this.redisBreaker.runCommand('hdel', this.timeoutQueue, config.buildId); - } catch (err) { - logger.error(`Error occurred while removing from timeout queue ${err}`); - - return Promise.resolve(); - } + return this.api(config, { + path: '/v1/queue/message?type=timer', + method: 'DELETE', + body: config + }); } /** @@ -520,97 +133,11 @@ class ExecutorQueue extends Executor { * @return {Promise} */ async _start(config) { - await this.connect(); - const { - build, - buildId, - causeMessage, - jobId, - jobState, - jobArchived, - blockedBy, - freezeWindows, - token, - apiUri - } = config; - const forceStart = /\[(force start)\]/.test(causeMessage); - - if (!this.tokenGen) { - this.tokenGen = config.tokenGen; - } - - delete config.build; - delete config.causeMessage; - - // eslint-disable-next-line no-underscore-dangle - await this._stopFrozen({ - jobId + return this.api(config, { + path: '/v1/queue/message', + method: 'POST', + body: config }); - - // Skip if job is disabled or archived - if (jobState === 'DISABLED' || jobArchived === true) { - return Promise.resolve(); - } - - const currentTime = new Date(); - const origTime = new Date(currentTime.getTime()); - - timeOutOfWindows(freezeWindows, currentTime); - - let enq; - - // Check freeze window - if (currentTime.getTime() > origTime.getTime() && !forceStart) { - await this.updateBuildStatus({ - buildId, - token, - apiUri, - status: 'FROZEN', - statusMessage: `Blocked by freeze window, re-enqueued to ${currentTime}` - }).catch((err) => { - logger.error(`failed to update build status for build ${buildId}: ${err}`); - - return Promise.resolve(); - }); - - // Remove old job from queue to collapse builds - await this.queueBreaker.runCommand('delDelayed', this.frozenBuildQueue, - 'startFrozen', [{ - jobId - }]); - - await this.redisBreaker.runCommand('hset', this.frozenBuildTable, - jobId, JSON.stringify(config)); - - // Add new job back to queue - enq = await this.queueBreaker.runCommand('enqueueAt', currentTime.getTime(), - this.frozenBuildQueue, 'startFrozen', [{ - jobId - }] - ); - } else { - // set the start time in the queue - Object.assign(config, { enqueueTime: new Date() }); - // Store the config in redis - await this.redisBreaker.runCommand('hset', this.buildConfigTable, - buildId, JSON.stringify(config)); - - // Note: arguments to enqueue are [queue name, job name, array of args] - enq = await this.queueBreaker.runCommand('enqueue', this.buildQueue, 'start', [{ - buildId, - jobId, - blockedBy: blockedBy.toString() - }]); - } - - // for backward compatibility - if (build && build.stats) { - // need to reassign so the field can be dirty - build.stats = hoek.merge(build.stats, { queueEnterTime: (new Date()).toISOString() }); - await build.update(); - } - - return enq; } /** @@ -623,63 +150,62 @@ class ExecutorQueue extends Executor { * @return {Promise} */ async _stop(config) { - await this.connect(); - - const { buildId, jobId } = config; // in case config contains something else - - let blockedBy; - - if (config.blockedBy !== undefined) { - blockedBy = config.blockedBy.toString(); - } - - const numDeleted = await this.queueBreaker.runCommand('del', this.buildQueue, 'start', [{ - buildId, - jobId, - blockedBy - }]); - const deleteKey = `deleted_${jobId}_${buildId}`; - let started = true; - - // This is to prevent the case where a build is aborted while still in buildQueue - // The job might be picked up by the worker, so it's not deleted from buildQueue here - // Immediately after, the job gets put back to the queue, so it's always inside buildQueue - // This key will be cleaned up automatically or when it's picked up by the worker - await this.redisBreaker.runCommand('set', deleteKey, ''); - await this.redisBreaker.runCommand('expire', deleteKey, EXPIRE_TIME); - - if (numDeleted !== 0) { // build hasn't started - started = false; - } - - return this.queueBreaker.runCommand('enqueue', this.buildQueue, 'stop', [{ - buildId, - jobId, - blockedBy, - started // call executor.stop if the job already started - }]); + return this.api(config, { + path: '/v1/queue/message', + method: 'DELETE', + body: config + }); } /** - * Connect to the queue if we haven't already - * @method connect - * @return {Promise} + * Retrieve stats for the executor + * @method stats + * @param {Response} Object Object containing stats for the executor */ - connect() { - if (this.queue.connection.connected) { - return Promise.resolve(); - } - - return this.queueBreaker.runCommand('connect'); + stats(config) { + return this.api(config, { + path: '/v1/queue/stats', + method: 'GET' + }); } /** - * Retrieve stats for the executor - * @method stats - * @param {Response} Object Object containing stats for the executor + * Makes api call to the url endpoint + * @async api + * @param {Object} args + * @param {Object} config + * @return Promise.resolve */ - stats() { - return this.queueBreaker.stats(); + async api(config, args) { + const options = { + headers: { + Authorization: `Bearer ${config.token}`, + 'Content-Type': 'application/json' + }, + url: `${this.queueUri}${args.path}`, + json: true, + maxAttempts: RETRY_LIMIT, + retryDelay: RETRY_DELAY * 1000, // in ms + retryStrategy: this.requestRetryStrategy, + ...args + }; + + logger.info(`${options.method} ${options.path} for pipeline ${config.pipelineId}:${config.jobId}`); + + return new Promise((resolve, reject) => { + requestretry(options, (err, response) => { + if (!err) { + if (response.statusCode === 200) { + return resolve(response); + } + if (response.statusCode !== 201) { + return resolve(JSON.stringify(response.body)); + } + } + + return reject(err); + }); + }); } } diff --git a/lib/cron.js b/lib/cron.js deleted file mode 100644 index 5547d71..0000000 --- a/lib/cron.js +++ /dev/null @@ -1,119 +0,0 @@ -'use strict'; - -const parser = require('cron-parser'); -const stringHash = require('string-hash'); - -/** - * Evaluate a numeric hash to a number within the range of min and max - * - * @method evaluateHash - * @param {Number} hash Hash to evaluate - * @param {Number} min Minimum evaluated value - * @param {String} max Maximum evaluated value - * @return {Number} Evaluated hash - */ -const evaluateHash = (hash, min, max) => (hash % ((max + 1) - min)) + min; - -/** - * Transform a cron value containing a valid 'H' symbol into a valid cron value - * @method transformValue - * @param {String} cronValue Value to transform - * @param {Number} min Minimum acceptable value - * @param {Number} max Maximum acceptable value - * @param {Number} hashValue Numeric hash to determine new value - * @return {String} Transformed cron value - */ -const transformValue = (cronValue, min, max, hashValue) => { - const values = cronValue.split(','); - - // Transform each ',' seperated value - // Ignore values that do not have a valid 'H' symbol - values.forEach((value, i) => { - // 'H' should evaluate to some value within the range (e.g. [0-59]) - if (value === 'H') { - values[i] = evaluateHash(hashValue, min, max); - - return; - } - - // e.g. H/5 -> #/5 - if (value.match(/H\/\d+/)) { - values[i] = value.replace('H', evaluateHash(hashValue, min, max)); - - return; - } - - // e.g. H(0-5) -> # - if (value.match(/H\(\d+-\d+\)/)) { - const newMin = Number(value.substring(2, value.lastIndexOf('-'))); - const newMax = Number(value.substring(value.lastIndexOf('-') + 1, - value.lastIndexOf(')'))); - - // Range is invalid, throw an error - if (newMin < min || newMax > max || newMin > newMax) { - throw new Error(`${value} has an invalid range, expected range ${min}-${max}`); - } - - values[i] = evaluateHash(hashValue, newMin, newMax); - } - }); - - return values.join(','); -}; - -/** - * Transform a cron expression containing valid 'H' symbol(s) into a valid cron expression - * @method transformCron - * @param {String} cronExp Cron expression to transform - * @param {Number} jobId Job ID - * @return {String} Transformed cron expression - */ -const transformCron = (cronExp, jobId) => { - const fields = cronExp.trim().split(/\s+/); - - // The seconds field is not allowed (e.g. '* * * * * *') - if (fields.length !== 5) { - throw new Error(`${cronExp} does not have exactly 5 fields`); - } - - const jobIdHash = stringHash(jobId.toString()); - - if (fields[0] !== 'H' && !fields[0].match(/H\(\d+-\d+\)/)) { - fields[0] = 'H'; - } - - // Minutes [0-59] - fields[0] = transformValue(fields[0], 0, 59, jobIdHash); - // Hours [0-23] - fields[1] = transformValue(fields[1], 0, 23, jobIdHash); - // Day of month [1-31] - fields[2] = transformValue(fields[2], 1, 31, jobIdHash); - // Months [1-12] - fields[3] = transformValue(fields[3], 1, 12, jobIdHash); - // Day of week [0-6] - fields[4] = transformValue(fields[4], 0, 6, jobIdHash); - - const newCronExp = fields.join(' '); - - // Perform final validation before returning - parser.parseExpression(newCronExp); - - return newCronExp; -}; - -/** - * Get the next time of execution based on the cron expression - * @method nextExecution - * @param {String} cronExp Cron expression to calculate next execution time - * @return {Number} Epoch timestamp (time of next execution). - */ -const nextExecution = (cronExp) => { - const interval = parser.parseExpression(cronExp); - - return interval.next().getTime(); -}; - -module.exports = { - transform: transformCron, - next: nextExecution -}; diff --git a/lib/freezeWindows.js b/lib/freezeWindows.js deleted file mode 100644 index bfe36e5..0000000 --- a/lib/freezeWindows.js +++ /dev/null @@ -1,185 +0,0 @@ -'use strict'; - -const parser = require('cron-parser'); - -/** - * Find the smallest missing integer in a range. - * If there's a missing integer greater than element return that - * Otherwise return the smallest missing integer lesser than element - * @method findLatestMissing - * @param {Array} array Array of sorted numbers - * @param {Integer} element Element to compare against - * @param {Integer} start Start of range - * @param {Integer} end End of range - * @return {Integer} Return smallest missing integer - */ -const findLatestMissing = (array, element, start, end) => { - let firstMissing; - let firstMissingAfter; - let offset = start; - - for (let i = start; i <= end; i += 1) { - if (array[i - offset] !== i) { - if (i < element) { - if (firstMissing === undefined) { - firstMissing = i; - } - } else if (i > element) { - if (firstMissingAfter === undefined) { - firstMissingAfter = i; - break; - } - } - offset += 1; - } - } - - if (firstMissingAfter === undefined) { - return firstMissing; - } - - return firstMissingAfter; -}; - -/** - * Return latest time outside of cron window for the given date. - * @method timeOutOfWindow - * @param {String} cronExp Cron expression - * @param {Date} date JavaScript Date object to check if in window - * @return {Boolean} Date Object of latest time out of window - */ -const timeOutOfWindow = (cronExp, timeToCheck) => { - const fields = cronExp.trim().split(/\s+/); - - // The seconds field is not allowed (e.g. '* * * * * *') - if (fields.length !== 5) { - throw new Error(`${cronExp} does not have exactly 5 fields`); - } - - if (fields[2] !== '?' && fields[4] !== '?') { - throw new Error(`${cronExp} cannot contain both days of month and week`); - } - - const newCronExp = `${fields[0]} ${fields[1]} ${fields[2] === '?' ? '*' : fields[2]} - ${fields[3]} ${fields[4] === '?' ? '*' : fields[4]}`; - - // Perform final validation before returning - const cronObj = parser.parseExpression(newCronExp); - let latest; - - const utcMinutes = timeToCheck.getUTCMinutes(); - const utcHours = timeToCheck.getUTCHours(); - const utcDayOfMonth = timeToCheck.getUTCDate(); - const utcDayOfWeek = timeToCheck.getUTCDay(); - const utcMonth = timeToCheck.getUTCMonth() + 1; - - /* eslint no-underscore-dangle: ["error", { "allow": ["_fields"] }] */ - const minuteField = cronObj._fields.minute; - const hourField = cronObj._fields.hour; - const dayOfMonthField = cronObj._fields.dayOfMonth; - const dayOfWeekField = cronObj._fields.dayOfWeek; - const monthField = cronObj._fields.month; - - const includesMinute = minuteField.includes(utcMinutes); - const includesHour = hourField.includes(utcHours); - const includesDayOfMonth = fields[2] === '?' || dayOfMonthField.includes(utcDayOfMonth); - const includesDayOfWeek = fields[4] === '?' || dayOfWeekField.includes(utcDayOfWeek); - const includesMonth = monthField.includes(utcMonth); - - const inWindow = [includesMinute, - includesHour, - includesDayOfMonth, - includesDayOfWeek, - includesMonth].every(Boolean); - - if (!inWindow) { return timeToCheck; } - - if (includesMinute && minuteField.length !== 60) { - latest = findLatestMissing(minuteField, utcMinutes, 0, 59); - timeToCheck.setUTCMinutes(latest); - if (latest < utcMinutes) { - timeToCheck.setUTCHours(timeToCheck.getUTCHours() + 1); - } - - return timeToCheck; - } else if (!includesMinute && minuteField.length !== 60) { - return timeToCheck; - } - - if (includesHour && hourField.length !== 24) { - latest = findLatestMissing(hourField, utcHours, 0, 23); - timeToCheck.setUTCHours(latest); - if (latest < utcHours) { - timeToCheck.setUTCDate(timeToCheck.getUTCDate() + 1); - } - timeToCheck.setUTCMinutes(0); - - return timeToCheck; - } else if (!includesHour && hourField.length !== 24) { - return timeToCheck; - } - - if (includesDayOfMonth && fields[2] !== '?' && dayOfMonthField.length !== 31) { - latest = findLatestMissing(dayOfMonthField, utcDayOfMonth, 1, 31); - timeToCheck.setUTCDate(latest); - if (latest < utcDayOfMonth) { - timeToCheck.setUTCMonth(timeToCheck.getUTCMonth() + 1); - } - timeToCheck.setUTCMinutes(0); - timeToCheck.setUTCHours(0); - - return timeToCheck; - } else if (!includesDayOfMonth && fields[2] !== '?' && dayOfMonthField.length !== 31) { - return timeToCheck; - } - - if (includesDayOfWeek && fields[4] !== '?' && dayOfWeekField.length !== 8) { - latest = findLatestMissing(dayOfWeekField, utcDayOfWeek, 0, 6); - timeToCheck.setUTCDate((timeToCheck.getUTCDate() + latest) - utcDayOfWeek); - if (latest < utcDayOfWeek) { - timeToCheck.setUTCDate(timeToCheck.getUTCDate() + 7); - } - - return timeToCheck; - } else if (!includesDayOfWeek && fields[4] !== '?' && dayOfWeekField.length !== 8) { - return timeToCheck; - } - - if (includesMonth && monthField.length !== 12) { - latest = findLatestMissing(monthField, utcMonth, 1, 12); - timeToCheck.setUTCMonth(latest - 1); - if (latest < utcMonth) { - timeToCheck.setUTCFullYear(timeToCheck.getUTCFullYear() + 1); - } - timeToCheck.setUTCMinutes(0); - timeToCheck.setUTCHours(0); - timeToCheck.setUTCDate(1); - } - - return timeToCheck; -}; - -/** - * Return latest time outside of all freeze windows. - * @method timeOutOfWindows - * @param {Array} freezeWindows Array of cron expressions of freeze windows - * @param {Date} date JavaScript Date object to check if in all windows - * @return {Date} Date Object of latest time out of all windows - */ -const timeOutOfWindows = (freezeWindows, date) => { - let idx = 0; - - while (idx < freezeWindows.length) { - freezeWindows.map(freezeWindow => - timeOutOfWindow(freezeWindow, date) - ); - idx += 1; - } - - return date; -}; - -module.exports = { - timeOutOfWindow, - timeOutOfWindows -}; diff --git a/package.json b/package.json index 19938fd..0eb687b 100644 --- a/package.json +++ b/package.json @@ -1,11 +1,11 @@ { "name": "screwdriver-executor-queue", - "version": "1.0.0", + "version": "3.0.0", "description": "Executor plugin for Screwdriver using Resque", "main": "index.js", "scripts": { "pretest": "eslint .", - "test": "jenkins-mocha --recursive", + "test": "nyc --report-dir ./artifacts/coverage --reporter=lcov mocha --recursive --timeout 4000 --retries 1 --exit --color true", "semantic-release": "semantic-release pre && npm publish && semantic-release post" }, "repository": { @@ -34,23 +34,19 @@ ], "devDependencies": { "chai": "^4.2.0", - "eslint": "^4.19.1", - "eslint-config-screwdriver": "^3.0.1", - "jenkins-mocha": "^8.0.0", + "eslint": "^6.8.0", + "eslint-config-screwdriver": "^5.0.3", + "mocha": "^7.1.0", "mockery": "^2.0.0", + "nyc": "^15.0.0", "sinon": "^4.5.0" }, "dependencies": { - "circuit-fuses": "^4.0.0", - "cron-parser": "^2.13.0", - "hoek": "^5.0.4", - "ioredis": "^3.2.2", - "node-resque": "^5.5.3", "requestretry": "^3.1.0", - "screwdriver-data-schema": "^19.1.1", + "screwdriver-data-schema": "^19.7.0", "screwdriver-executor-base": "^7.4.0", - "string-hash": "^1.1.3", - "screwdriver-logger": "^1.0.0" + "screwdriver-logger": "^1.0.0", + "string-hash": "^1.1.3" }, "release": { "debug": false, diff --git a/test/data/fullConfig.json b/test/data/fullConfig.json index 24b70b9..b4ac4da 100644 --- a/test/data/fullConfig.json +++ b/test/data/fullConfig.json @@ -11,5 +11,6 @@ "apiUri": "http://api.com", "token": "asdf", "blockedBy": [777, 888], - "freezeWindows": ["* * ? * 1", "0-59 0-23 * 1 ?"] + "freezeWindows": ["* * ? * 1", "0-59 0-23 * 1 ?"], + "pipelineId": "123" } diff --git a/test/data/testConnection.json b/test/data/testConnection.json deleted file mode 100644 index 94d1259..0000000 --- a/test/data/testConnection.json +++ /dev/null @@ -1,6 +0,0 @@ -{ - "host": "127.0.0.1", - "password": "hunter2", - "port": 6379, - "database": 0 -} diff --git a/test/index.test.js b/test/index.test.js index ef08c08..b83c604 100644 --- a/test/index.test.js +++ b/test/index.test.js @@ -3,50 +3,20 @@ /* eslint-disable no-underscore-dangle */ const chai = require('chai'); -const util = require('util'); -const assert = chai.assert; +const { assert } = chai; const mockery = require('mockery'); const sinon = require('sinon'); -const testConnection = require('./data/testConnection.json'); const testConfig = require('./data/fullConfig.json'); const testPipeline = require('./data/testPipeline.json'); const testJob = require('./data/testJob.json'); -const { buildId, jobId, blockedBy } = testConfig; -const partialTestConfig = { - buildId, - jobId, - blockedBy -}; -const partialTestConfigToString = Object.assign({}, partialTestConfig, { - blockedBy: blockedBy.toString() -}); -const testAdmin = { - username: 'admin' -}; -const EventEmitter = require('events').EventEmitter; sinon.assert.expose(chai.assert, { prefix: '' }); describe('index test', () => { let Executor; let executor; - let multiWorker; - let spyMultiWorker; - let scheduler; - let spyScheduler; - let resqueMock; - let queueMock; - let redisMock; - let redisConstructorMock; - let cronMock; - let freezeWindowsMock; - let reqMock; - let pipelineMock; - let buildMock; - let pipelineFactoryMock; - let fakeResponse; - let userTokenGen; - let testDelayedConfig; + let mockRequest; + let requestOptions; before(() => { mockery.enable({ @@ -56,96 +26,34 @@ describe('index test', () => { }); beforeEach(() => { - userTokenGen = sinon.stub().returns('admintoken'); - testDelayedConfig = { + Object.assign(testConfig, { pipeline: testPipeline, - job: testJob, - apiUri: 'http://localhost', - tokenGen: userTokenGen - }; - multiWorker = function () { - this.start = () => { }; - this.end = sinon.stub().resolves(); - }; - scheduler = function () { - this.start = sinon.stub().resolves(); - this.connect = sinon.stub().resolves(); - this.end = sinon.stub().resolves(); - }; - util.inherits(multiWorker, EventEmitter); - util.inherits(scheduler, EventEmitter); - queueMock = { - connect: sinon.stub().resolves(), - enqueue: sinon.stub().resolves(), - enqueueAt: sinon.stub().resolves(), - del: sinon.stub().resolves(1), - delDelayed: sinon.stub().resolves(1), - connection: { - connected: false - }, - end: sinon.stub().resolves() - }; - resqueMock = { - Queue: sinon.stub().returns(queueMock), - MultiWorker: multiWorker, - Scheduler: scheduler - }; - spyMultiWorker = sinon.spy(resqueMock, 'MultiWorker'); - spyScheduler = sinon.spy(resqueMock, 'Scheduler'); - redisMock = { - hdel: sinon.stub().yieldsAsync(), - hset: sinon.stub().yieldsAsync(), - set: sinon.stub().yieldsAsync(), - expire: sinon.stub().yieldsAsync(), - hget: sinon.stub().yieldsAsync() - }; - redisConstructorMock = sinon.stub().returns(redisMock); - cronMock = { - transform: sinon.stub().returns('H H H H H'), - next: sinon.stub().returns(1500000) - }; - freezeWindowsMock = { - timeOutOfWindows: (windows, date) => date - }; - - fakeResponse = { - statusCode: 201, - body: { - id: '12345' - } - }; - reqMock = sinon.stub(); - pipelineMock = { - getFirstAdmin: sinon.stub().resolves(testAdmin) - }; - pipelineFactoryMock = { - get: sinon.stub().resolves(pipelineMock) - }; - buildMock = { - update: sinon.stub().resolves({ - id: buildId - }) - }; - - mockery.registerMock('node-resque', resqueMock); - mockery.registerMock('ioredis', redisConstructorMock); - mockery.registerMock('./lib/cron', cronMock); - mockery.registerMock('./lib/freezeWindows', freezeWindowsMock); - mockery.registerMock('requestretry', reqMock); + apiUri: 'http://localhost:3000', + token: 'admintoken' + }); + mockRequest = sinon.stub(); + mockery.registerMock('requestretry', mockRequest); /* eslint-disable global-require */ Executor = require('../index'); /* eslint-enable global-require */ executor = new Executor({ - redisConnection: testConnection, - breaker: { - retry: { - retries: 1 - } - }, - pipelineFactory: pipelineFactoryMock + ecosystem: { + queueUri: 'http://localhost' + } }); + requestOptions = { + headers: { + Authorization: 'Bearer admintoken', + 'Content-Type': 'application/json' + }, + json: true, + body: testConfig, + maxAttempts: 3, + retryDelay: 5000, + retryStrategy: executor.requestRetryStrategy + }; }); afterEach(() => { @@ -162,653 +70,220 @@ describe('index test', () => { assert.instanceOf(executor, Executor); }); - it('constructs the multiWorker', () => { - const expectedConfig = { - connection: testConnection, - queues: ['periodicBuilds', 'frozenBuilds'], - minTaskProcessors: 1, - maxTaskProcessors: 10, - checkTimeout: 1000, - maxEventLoopDelay: 10, - toDisconnectProcessors: true - }; - - assert.calledWith(spyMultiWorker, sinon.match(expectedConfig), sinon.match.any); - }); - - it('constructs the scheduler', () => { - const expectedConfig = { - connection: testConnection - }; - - assert.calledWith(spyScheduler, sinon.match(expectedConfig)); - }); - - it('constructs the executor when no breaker config is passed in', () => { - executor = new Executor({ - redisConnection: testConnection, - pipelineFactory: pipelineFactoryMock - }); - - assert.instanceOf(executor, Executor); - }); - - it('takes in a prefix', () => { - executor = new Executor({ - redisConnection: testConnection, - prefix: 'beta_', - pipelineFactory: pipelineFactoryMock - }); - + it('sets the queue uri', () => { assert.instanceOf(executor, Executor); - assert.strictEqual(executor.prefix, 'beta_'); - assert.strictEqual(executor.buildQueue, 'beta_builds'); - assert.strictEqual(executor.buildConfigTable, 'beta_buildConfigs'); - }); - - it('throws when not given a redis connection', () => { - assert.throws(() => new Executor(), 'No redis connection passed in'); - }); - - it('throws when not given a pipelineFactory', () => { - assert.throws(() => new Executor({ - redisConnection: testConnection - })); + assert.strictEqual(executor.queueUri, 'http://localhost'); }); }); - describe('_startPeriodic', () => { - beforeEach(() => { - reqMock.yieldsAsync(null, fakeResponse, fakeResponse.body); - }); - it('rejects if it can\'t establish a connection', function () { - queueMock.connect.rejects(new Error('couldn\'t connect')); - - return executor.startPeriodic(testDelayedConfig).then(() => { - assert.fail('Should not get here'); - }, (err) => { - assert.instanceOf(err, Error); - }); - }); - - it('doesn\'t call connect if there\'s already a connection', () => { - queueMock.connection.connected = true; - - return executor.startPeriodic(testDelayedConfig).then(() => { - assert.notCalled(queueMock.connect); - }); - }); - - it('enqueues a new delayed job in the queue', () => - executor.startPeriodic(testDelayedConfig).then(() => { - assert.calledOnce(queueMock.connect); - assert.calledWith(redisMock.hset, 'periodicBuildConfigs', testJob.id, - JSON.stringify(testDelayedConfig)); - assert.calledWith(cronMock.transform, '* * * * *', testJob.id); - assert.calledWith(cronMock.next, 'H H H H H'); - assert.calledWith(queueMock.enqueueAt, 1500000, 'periodicBuilds', 'startDelayed', [{ - jobId: testJob.id - }]); - })); - - it('do not enqueue the same delayed job in the queue', () => { - const err = new Error('Job already enqueued at this time with same arguments'); - - queueMock.enqueueAt = sinon.stub().rejects(err); - - return executor.startPeriodic(testDelayedConfig).then(() => { - assert.calledWith(cronMock.next, 'H H H H H'); - assert.calledOnce(queueMock.enqueueAt); - }); - }); - - it('stops and reEnqueues an existing job if isUpdate flag is passed', () => { - testDelayedConfig.isUpdate = true; - - return executor.startPeriodic(testDelayedConfig).then(() => { - assert.calledTwice(queueMock.connect); - assert.calledWith(redisMock.hset, 'periodicBuildConfigs', testJob.id, - JSON.stringify(testDelayedConfig)); - assert.calledWith(queueMock.enqueueAt, 1500000, 'periodicBuilds', 'startDelayed', [{ - jobId: testJob.id - }]); - assert.calledWith(queueMock.delDelayed, 'periodicBuilds', 'startDelayed', [{ - jobId: testJob.id - }]); - assert.calledWith(redisMock.hdel, 'periodicBuildConfigs', testJob.id); - }); - }); - - it('stops but does not reEnqueue an existing job if it is disabled', () => { - testDelayedConfig.isUpdate = true; - testDelayedConfig.job.state = 'DISABLED'; - testDelayedConfig.job.archived = false; - - return executor.startPeriodic(testDelayedConfig).then(() => { - assert.calledOnce(queueMock.connect); - assert.notCalled(redisMock.hset); - assert.notCalled(queueMock.enqueueAt); - assert.calledWith(queueMock.delDelayed, 'periodicBuilds', 'startDelayed', [{ - jobId: testJob.id - }]); - assert.calledWith(redisMock.hdel, 'periodicBuildConfigs', testJob.id); - }); - }); - - it('stops but does not reEnqueue an existing job if it is archived', () => { - testDelayedConfig.isUpdate = true; - testDelayedConfig.job.state = 'ENABLED'; - testDelayedConfig.job.archived = true; - - return executor.startPeriodic(testDelayedConfig).then(() => { - assert.calledOnce(queueMock.connect); - assert.notCalled(redisMock.hset); - assert.notCalled(queueMock.enqueueAt); - assert.calledWith(queueMock.delDelayed, 'periodicBuilds', 'startDelayed', [{ - jobId: testJob.id - }]); - assert.calledWith(redisMock.hdel, 'periodicBuildConfigs', testJob.id); - }); - }); - - it('trigger build and do not enqueue next job if archived', () => { - testDelayedConfig.isUpdate = true; - testDelayedConfig.job.state = 'ENABLED'; - testDelayedConfig.job.archived = true; - testDelayedConfig.triggerBuild = true; + describe('_startPeriodic', done => { + it('Calls api to start periodic build', () => { + mockRequest.yieldsAsync(null, { statusCode: 200 }); + const periodicConfig = { ...testConfig, username: 'admin' }; const options = { - url: 'http://localhost/v4/events', + ...requestOptions, + url: 'http://localhost/v1/queue/message?type=periodic', method: 'POST', - headers: { - Authorization: 'Bearer admintoken', - 'Content-Type': 'application/json' - }, - json: true, - body: { - causeMessage: 'Started by periodic build scheduler', - creator: { name: 'Screwdriver scheduler', username: 'sd:scheduler' }, - pipelineId: testDelayedConfig.pipeline.id, - startFrom: testDelayedConfig.job.name - }, - maxAttempts: 3, - retryDelay: 5000, - retryStrategy: executor.requestRetryStrategyPostEvent + body: periodicConfig }; - return executor.startPeriodic(testDelayedConfig).then(() => { - assert.calledOnce(queueMock.connect); - assert.notCalled(redisMock.hset); - assert.notCalled(queueMock.enqueueAt); - assert.calledWith(queueMock.delDelayed, 'periodicBuilds', 'startDelayed', [{ - jobId: testJob.id - }]); - assert.calledWith(redisMock.hdel, 'periodicBuildConfigs', testJob.id); - assert.calledOnce(userTokenGen); - assert.calledWith(reqMock, options); + return executor.startPeriodic(periodicConfig, err => { + assert.calledWithArgs(mockRequest, periodicConfig, options); + assert.isNull(err); + done(); }); }); + }); - it('trigger build and enqueue next job', () => { - testDelayedConfig.isUpdate = false; - testDelayedConfig.job.state = 'ENABLED'; - testDelayedConfig.job.archived = false; - testDelayedConfig.triggerBuild = true; + describe('_stopPeriodic', done => { + it('Calls api to stop periodic build', () => { + mockRequest.yieldsAsync(null, { statusCode: 200 }); const options = { - url: 'http://localhost/v4/events', - method: 'POST', - headers: { - Authorization: 'Bearer admintoken', - 'Content-Type': 'application/json' - }, - json: true, - body: { - causeMessage: 'Started by periodic build scheduler', - creator: { name: 'Screwdriver scheduler', username: 'sd:scheduler' }, - pipelineId: testDelayedConfig.pipeline.id, - startFrom: testDelayedConfig.job.name - }, - maxAttempts: 3, - retryDelay: 5000, - retryStrategy: executor.requestRetryStrategyPostEvent + ...requestOptions, + url: 'http://localhost/v1/queue/message?type=periodic', + method: 'DELETE', + body: testConfig }; - return executor.startPeriodic(testDelayedConfig).then(() => { - assert.notCalled(queueMock.delDelayed); - assert.calledOnce(userTokenGen); - assert.calledWith(reqMock, options); - assert.calledOnce(queueMock.connect); - assert.calledWith(redisMock.hset, 'periodicBuildConfigs', testJob.id, - JSON.stringify(testDelayedConfig)); - assert.calledWith(cronMock.transform, '* * * * *', testJob.id); - assert.calledWith(cronMock.next, 'H H H H H'); - assert.calledWith(queueMock.enqueueAt, 1500000, 'periodicBuilds', 'startDelayed', [{ - jobId: testJob.id - }]); + return executor.stopPeriodic(testConfig, err => { + assert.calledWithArgs(mockRequest, testConfig, options); + assert.isNull(err); + done(); }); }); }); - describe('_start', () => { - it('rejects if it can\'t establish a connection', () => { - queueMock.connect.rejects(new Error('couldn\'t connect')); - - return executor.start(testConfig).then(() => { - assert.fail('Should not get here'); - }, (err) => { - assert.instanceOf(err, Error); - }); - }); - - it('enqueues a build and caches the config', () => { - const dateNow = Date.now(); - const isoTime = (new Date(dateNow)).toISOString(); - const sandbox = sinon.sandbox.create({ - useFakeTimers: false - }); - - sandbox.useFakeTimers(dateNow); - buildMock.stats = {}; - testConfig.build = buildMock; - - return executor.start(testConfig).then(() => { - assert.calledTwice(queueMock.connect); - assert.calledWith(redisMock.hset, 'buildConfigs', buildId, - JSON.stringify(testConfig)); - assert.calledWith(queueMock.enqueue, 'builds', 'start', - [partialTestConfigToString]); - assert.calledOnce(buildMock.update); - assert.equal(buildMock.stats.queueEnterTime, isoTime); - sandbox.restore(); - }); - } - ); + describe('_start', done => { + it('Calls api to start build', () => { + mockRequest.yieldsAsync(null, { statusCode: 200 }); + const startConfig = { ...testConfig, pipeline: testPipeline }; - it('enqueues a build and when force start is on', () => { - const dateNow = Date.now(); - const isoTime = (new Date(dateNow)).toISOString(); - const sandbox = sinon.sandbox.create({ - useFakeTimers: false - }); - - sandbox.useFakeTimers(dateNow); - buildMock.stats = {}; - testConfig.build = buildMock; - testConfig.causeMessage = '[force start] Need to push hotfix'; - - return executor.start(testConfig).then(() => { - assert.calledTwice(queueMock.connect); - assert.calledWith(redisMock.hset, 'buildConfigs', buildId, - JSON.stringify(testConfig)); - assert.calledWith(queueMock.enqueue, 'builds', 'start', - [partialTestConfigToString]); - assert.calledOnce(buildMock.update); - assert.equal(buildMock.stats.queueEnterTime, isoTime); - sandbox.restore(); - }); - }); - - it('enqueues a build and with enqueueTime', () => { - buildMock.stats = {}; - testConfig.build = buildMock; - const config = Object.assign({}, testConfig, { enqueueTime: new Date() }); - - return executor.start(config).then(() => { - assert.calledTwice(queueMock.connect); - assert.calledWith(redisMock.hset, 'buildConfigs', buildId, - JSON.stringify(config)); - assert.calledWith(queueMock.enqueue, 'builds', 'start', - [partialTestConfigToString]); + Object.assign(requestOptions, { + url: 'http://localhost/v1/queue/message', + method: 'POST', + body: startConfig }); - }); - it('enqueues a build and caches the config', () => - executor.start(testConfig).then(() => { - assert.calledTwice(queueMock.connect); - assert.calledWith(redisMock.hset, 'buildConfigs', buildId, - JSON.stringify(testConfig)); - assert.calledWith(queueMock.enqueue, 'builds', 'start', - [partialTestConfigToString]); - })); - - it('doesn\'t call connect if there\'s already a connection', () => { - queueMock.connection.connected = true; - - return executor.start(testConfig).then(() => { - assert.notCalled(queueMock.connect); - assert.calledWith(queueMock.enqueue, 'builds', 'start', - [partialTestConfigToString]); + return executor.start(startConfig, err => { + assert.calledWithArgs(mockRequest, startConfig, requestOptions); + assert.isNull(err); + done(); }); }); }); - describe('_startFrozen', () => { - it('enqueues a delayed job if in freeze window', () => { - mockery.resetCache(); - reqMock.yieldsAsync(null, fakeResponse, fakeResponse.body); - - const freezeWindowsMockB = { - timeOutOfWindows: (windows, date) => { - date.setUTCMinutes(date.getUTCMinutes() + 1); - - return date; - } - }; - - mockery.deregisterMock('./lib/freezeWindows'); - mockery.registerMock('./lib/freezeWindows', freezeWindowsMockB); - - /* eslint-disable global-require */ - Executor = require('../index'); - /* eslint-enable global-require */ - - executor = new Executor({ - redisConnection: testConnection, - breaker: { - retry: { - retries: 1 - } - }, - pipelineFactory: pipelineFactoryMock - }); + describe('_startFrozen', done => { + it('Calls api to start frozen build', () => { + mockRequest.yieldsAsync(null, { statusCode: 200 }); - const dateNow = new Date(); - - const sandbox = sinon.sandbox.create({ - useFakeTimers: false + Object.assign(requestOptions, { + url: 'http://localhost/v1/queue/message?type=frozen', + method: 'POST', + body: testConfig }); - sandbox.useFakeTimers(dateNow.getTime()); - - const options = { - json: true, - method: 'PUT', - uri: `http://api.com/v4/builds/${testConfig.buildId}`, - body: { - status: 'FROZEN', - statusMessage: sinon.match('Blocked by freeze window, re-enqueued to ') - }, - headers: { - Authorization: 'Bearer asdf', - 'Content-Type': 'application/json' - }, - maxAttempts: 3, - retryDelay: 5000, - retryStrategy: executor.requestRetryStrategy - }; - - return executor.start(testConfig).then(() => { - assert.calledTwice(queueMock.connect); - assert.calledWith(queueMock.delDelayed, 'frozenBuilds', 'startFrozen', [{ - jobId - }]); - assert.calledWith(redisMock.hset, 'frozenBuildConfigs', jobId, - JSON.stringify(testConfig)); - assert.calledWith(queueMock.enqueueAt, dateNow.getTime() + 60000, 'frozenBuilds', - 'startFrozen', [{ - jobId - }]); - assert.calledWith(reqMock, options); - sandbox.restore(); + return executor.startFrozen(testConfig, err => { + assert.calledWithArgs(mockRequest, testConfig, requestOptions); + assert.isNull(err); + done(); }); }); }); - describe('_stop', () => { - it('rejects if it can\'t establish a connection', function () { - queueMock.connect.rejects(new Error('couldn\'t connect')); - - return executor.stop(partialTestConfig).then(() => { - assert.fail('Should not get here'); - }, (err) => { - assert.instanceOf(err, Error); - }); - }); - - it('removes a start event from the queue and the cached buildconfig', () => { - const deleteKey = `deleted_${jobId}_${buildId}`; - const stopConfig = Object.assign({ started: false }, partialTestConfigToString); - - return executor.stop(partialTestConfig).then(() => { - assert.calledOnce(queueMock.connect); - assert.calledWith(queueMock.del, 'builds', 'start', [partialTestConfigToString]); - assert.calledWith(redisMock.set, deleteKey, ''); - assert.calledWith(redisMock.expire, deleteKey, 1800); - assert.calledWith(queueMock.enqueue, 'builds', 'stop', [stopConfig]); - }); - }); - - it('adds a stop event to the queue if no start events were removed', () => { - queueMock.del.resolves(0); - const stopConfig = Object.assign({ started: true }, partialTestConfigToString); + describe('_stopFrozen', done => { + it('Calls api to stop frozen builds', () => { + mockRequest.yieldsAsync(null, { statusCode: 200 }); - return executor.stop(partialTestConfig).then(() => { - assert.calledOnce(queueMock.connect); - assert.calledWith(queueMock.del, 'builds', 'start', [partialTestConfigToString]); - assert.calledWith(queueMock.enqueue, 'builds', 'stop', [stopConfig]); + Object.assign(requestOptions, { + url: 'http://localhost/v1/queue/message?type=frozen', + method: 'DELETE', + body: testConfig }); - }); - - it('adds a stop event to the queue if it has no blocked job', () => { - queueMock.del.resolves(0); - const partialTestConfigUndefined = Object.assign({}, partialTestConfig, { - blockedBy: undefined - }); - const stopConfig = Object.assign({ started: true }, partialTestConfigUndefined); - return executor.stop(partialTestConfigUndefined).then(() => { - assert.calledOnce(queueMock.connect); - assert.calledWith(queueMock.del, 'builds', 'start', [partialTestConfigUndefined]); - assert.calledWith(queueMock.enqueue, 'builds', 'stop', [stopConfig]); + return executor.stopFrozen(testConfig, err => { + assert.calledWithArgs(mockRequest, testConfig, requestOptions); + assert.isNull(err); + done(); }); }); + }); - it('doesn\'t call connect if there\'s already a connection', () => { - queueMock.connection.connected = true; + describe('_stop', done => { + it('Calls api to stop a build', () => { + mockRequest.yieldsAsync(null, { statusCode: 200 }); + const stopConfig = { + annotations: testConfig.annotations, + blockedBy: testConfig.blockedBy, + freezeWindows: testConfig.freezeWindows, + buildId: testConfig.buildId, + buildClusterName: testConfig.buildClusterName, + jobId: testConfig.jobId, + token: testConfig.token, + pipelineId: testConfig.pipelineId + }; - return executor.stop(Object.assign({}, partialTestConfig, { - annotations: { - 'beta.screwdriver.cd/executor': 'screwdriver-executor-k8s' - } - })).then(() => { - assert.notCalled(queueMock.connect); - assert.calledWith(queueMock.del, 'builds', 'start', [partialTestConfigToString]); + Object.assign(requestOptions, { + url: 'http://localhost/v1/queue/message', + method: 'DELETE', + body: stopConfig }); - }); - }); - describe('cleanUp', () => { - it('worker.end() is called', () => { - executor.cleanUp().then(() => { - assert.called(spyMultiWorker); - assert.called(spyScheduler); - assert.called(queueMock.end); + executor.stop(stopConfig, err => { + assert.calledWithArgs(mockRequest, stopConfig, requestOptions); + assert.isNull(err); + done(); }); }); }); - describe('stats', () => { - it('returns the correct stats', () => { - assert.deepEqual(executor.stats(), { - requests: { - total: 0, - timeouts: 0, - success: 0, - failure: 0, - concurrent: 0, - averageTime: 0 - }, - breaker: { - isClosed: true - } - }); - }); - }); - - describe('_stopTimer', () => { - it('does not reject if it can\'t establish a connection', async () => { - queueMock.connect.rejects(new Error('couldn\'t connect')); - try { - await executor.stopTimer({}); - } catch (err) { - assert.fail('Should not get here'); - } - }); - - it('removes a key from redis for the specified buildId if it exists', async () => { - const dateNow = Date.now(); - const isoTime = (new Date(dateNow)).toISOString(); - const sandbox = sinon.sandbox.create({ - useFakeTimers: false - }); - - const timerConfig = { - buildId, - jobId, - startTime: isoTime + describe('stats', done => { + it('Calls api to get stats', () => { + mockRequest.yieldsAsync(null, { statusCode: 200 }); + const statsConfig = { + buildId: testConfig.buildId, + jobId: testConfig.jobId, + token: testConfig.token, + pipelineId: testConfig.pipelineId }; - sandbox.useFakeTimers(dateNow); - redisMock.hget.withArgs('timeoutConfigs', buildId).yieldsAsync(null, { - buildId, - jobId, - startTime: isoTime + Object.assign(requestOptions, { + url: 'http://localhost/v1/queue/message', + method: 'GET' }); - await executor.stopTimer(timerConfig); - - assert.calledOnce(queueMock.connect); - assert.calledWith(redisMock.hdel, 'timeoutConfigs', buildId); - sandbox.restore(); - }); - - it('hdel is not called if buildId does not exist in cache', async () => { - redisMock.hget.withArgs('timeoutConfigs', buildId) - .yieldsAsync(null, null); - - await executor.stopTimer(testConfig); - assert.calledOnce(queueMock.connect); - assert.notCalled(redisMock.hdel); + return executor.stats(statsConfig, err => { + assert.calledWithArgs(mockRequest, {}, requestOptions); + assert.isNull(err); + done(); + }); }); }); - describe('_startTimer', () => { - it('does not reject if it can\'t establish a connection', async () => { - queueMock.connect.rejects(new Error('couldn\'t connect')); - try { - await executor.startTimer({}); - } catch (err) { - assert.fail('Should not get here'); - } - }); - - it('adds a timeout key if status is RUNNING and caches the config', async () => { + describe('_stopTimer', done => { + it('Calls api to stop timer', () => { + mockRequest.yieldsAsync(null, { statusCode: 200 }); const dateNow = Date.now(); - const isoTime = (new Date(dateNow)).toISOString(); + const isoTime = new Date(dateNow).toISOString(); const sandbox = sinon.sandbox.create({ useFakeTimers: false }); const timerConfig = { - buildId, - jobId, - buildStatus: 'RUNNING', - startTime: isoTime + buildId: testConfig.buildId, + jobId: testConfig.jobId, + startTime: isoTime, + job: testJob, + pipeline: testPipeline }; sandbox.useFakeTimers(dateNow); - redisMock.hget.yieldsAsync(null, null); - await executor.startTimer(timerConfig); - assert.calledOnce(queueMock.connect); - assert.calledWith(redisMock.hset, 'timeoutConfigs', buildId, - JSON.stringify({ - jobId, - startTime: isoTime, - timeout: 90 - })); - sandbox.restore(); - }); - - it('does not add a timeout key if status is !RUNNING', async () => { - const dateNow = Date.now(); - const isoTime = (new Date(dateNow)).toISOString(); - const sandbox = sinon.sandbox.create({ - useFakeTimers: false + Object.assign(requestOptions, { + url: 'http://localhost/v1/queue/message?type=timer', + method: 'DEELTE', + body: timerConfig }); - const timerConfig = { - buildId, - jobId, - buildStatus: 'QUEUED', - startTime: isoTime - }; - - sandbox.useFakeTimers(dateNow); - redisMock.hget.yieldsAsync(null, null); - - await executor.startTimer(timerConfig); - assert.calledOnce(queueMock.connect); - assert.notCalled(redisMock.hset); - sandbox.restore(); + return executor.stopTimer(timerConfig, err => { + assert.calledWithArgs(mockRequest, timerConfig, requestOptions); + assert.isNull(err); + done(); + sandbox.restore(); + }); }); + }); - it('does not add a timeout key if buildId already exists', async () => { + describe('_startTimer', done => { + it('Calls api to start timer', () => { + mockRequest.yieldsAsync(null, { statusCode: 200 }); const dateNow = Date.now(); - const isoTime = (new Date(dateNow)).toISOString(); + const isoTime = new Date(dateNow).toISOString(); const sandbox = sinon.sandbox.create({ useFakeTimers: false }); const timerConfig = { - buildId, - jobId, - buildStatus: 'QUEUED', - startTime: isoTime + buildId: testConfig.buildId, + jobId: testConfig.jobId, + startTime: isoTime, + job: testJob, + pipeline: testPipeline }; sandbox.useFakeTimers(dateNow); - redisMock.hget.withArgs('timeoutConfigs', buildId).yieldsAsync({ - jobId, - startTime: isoTime, - timeout: 90 + Object.assign(requestOptions, { + url: 'http://localhost/v1/queue/message?type=timer', + method: 'POST', + body: testConfig }); - await executor.startTimer(timerConfig); - assert.calledOnce(queueMock.connect); - assert.notCalled(redisMock.hset); - sandbox.restore(); - }); - - it('adds a timeout config with specific timeout when annotations present', - async () => { - const dateNow = Date.now(); - const isoTime = (new Date(dateNow)).toISOString(); - const sandbox = sinon.sandbox.create({ - useFakeTimers: false - }); - - const timerConfig = { - buildId, - jobId, - buildStatus: 'RUNNING', - startTime: isoTime, - annotations: { - 'screwdriver.cd/timeout': 5 - } - }; - - sandbox.useFakeTimers(dateNow); - redisMock.hget.yieldsAsync(null, null); - await executor.startTimer(timerConfig); - assert.calledOnce(queueMock.connect); - assert.calledWith(redisMock.hset, 'timeoutConfigs', buildId, - JSON.stringify({ - jobId, - startTime: isoTime, - timeout: 5 - })); + return executor.startTimer(timerConfig, err => { + assert.calledWithArgs(mockRequest, timerConfig, requestOptions); + assert.isNull(err); + done(); sandbox.restore(); }); + }); }); }); diff --git a/test/lib/cron.js b/test/lib/cron.js deleted file mode 100644 index 9acb782..0000000 --- a/test/lib/cron.js +++ /dev/null @@ -1,59 +0,0 @@ -'use strict'; - -const { assert } = require('chai'); -const cron = require('../../lib/cron.js'); -const hash = require('string-hash'); - -const evaluateHash = (jobId, min, max) => (hash(jobId) % ((max + 1) - min)) + min; - -describe('cron', () => { - const jobId = '123'; - - // Evaluate the hashes for the default minutes and hours field - const minutesHash = evaluateHash(jobId, 0, 59); - const hoursHash = evaluateHash(jobId, 0, 23); - - it('should throw if the cron expession does not have 5 fields', () => { - let cronExp; - // 6 fields - - cronExp = '1 2 3 4 5 6'; - assert.throws(() => cron.transform(cronExp, jobId), - Error, '1 2 3 4 5 6 does not have exactly 5 fields'); - - // 4 fields - cronExp = '1 2 3 4'; - assert.throws(() => cron.transform(cronExp, jobId), - Error, '1 2 3 4 does not have exactly 5 fields'); - }); - - it('should transform a cron expression with valid H symbol(s)', () => { - let cronExp; - - // H * * * * - cronExp = 'H * * * *'; - assert.deepEqual(cron.transform(cronExp, jobId), `${minutesHash} * * * *`); - - // * H/2 * * * - cronExp = '* H/2 * * *'; - assert.deepEqual(cron.transform(cronExp, jobId), - `${minutesHash} ${hoursHash}/2 * * *`); - - // * H(0-5) * * * - cronExp = '* H(0-5) * * *'; - assert.deepEqual(cron.transform(cronExp, jobId), - `${minutesHash} ${evaluateHash(jobId, 0, 5)} * * *`); - - // H(0-5) * * * * - cronExp = 'H(0-5) * * * *'; - assert.deepEqual(cron.transform(cronExp, jobId), - `${evaluateHash(jobId, 0, 5)} * * * *`); - }); - - it('should throw if the cron expression has an invalid range value', () => { - const cronExp = '* H(99-100) * * *'; - - assert.throws(() => cron.transform(cronExp, jobId), - Error, 'H(99-100) has an invalid range, expected range 0-23'); - }); -}); diff --git a/test/lib/freezeWindows.js b/test/lib/freezeWindows.js deleted file mode 100644 index 7c43a53..0000000 --- a/test/lib/freezeWindows.js +++ /dev/null @@ -1,36 +0,0 @@ -'use strict'; - -const { assert } = require('chai'); -const timeOutOfWindows = require('../../lib/freezeWindows.js').timeOutOfWindows; - -describe('freeze windows', () => { - it('should return the correct date outside the freeze windows', () => { - const currentDate = new Date(Date.UTC(2018, 10, 24, 10, 33)); - - timeOutOfWindows([ - '0-31,33-35 * * * ?', - '* 5-23 * * ?', - '* * ? 11 *', - '* * ? * 6' - ], currentDate); - - const expectedDate = new Date('2018-12-02T00:32:00.000Z'); - - assert.equal(currentDate.getTime(), expectedDate.getTime()); - }); - - it('should return the same date if outside the freeze windows', () => { - const currentDate = new Date(Date.UTC(2018, 10, 24, 10, 33)); - - timeOutOfWindows([ - '0-31,34-35 * * * ?', - '* 11-17 * * ?', - '* * ? 10 *', - '* * ? * 4' - ], currentDate); - - const expectedDate = new Date('2018-11-24T10:33:00.000Z'); - - assert.equal(currentDate.getTime(), expectedDate.getTime()); - }); -});