From 38013d474413089da19f4d7a59a58c718d783ede Mon Sep 17 00:00:00 2001 From: Gautam Jethwani Date: Wed, 13 Sep 2023 06:41:23 -0700 Subject: [PATCH] feat: add option for user-provided EventEmitter to control buckets (#809) The motivation for this PR. I added an option for a user-provided EventEmitter that would handle the rotating of the buckets. The idea here is that you could do something like this in your app ``` const bucketController = new EventEmitter(); setInterval(() => { bucketController.emit('rotate') }, 1000) ``` And then pass `bucketController` into opossum. Opossum would listen for that event and then rotate the buckets. --------- Co-authored-by: Gautam Jethwani --- lib/circuit.js | 12 +++++ lib/status.js | 47 ++++++++++++++--- test/browser/generate-index.sh | 2 +- test/enable-disable-test.js | 2 +- test/rolling-event-emitter-test.js | 81 ++++++++++++++++++++++++++++++ test/status-test.js | 56 +++++++++++++++++++++ 6 files changed, 191 insertions(+), 9 deletions(-) create mode 100644 test/rolling-event-emitter-test.js diff --git a/lib/circuit.js b/lib/circuit.js index 69c21ac0..c07d1531 100644 --- a/lib/circuit.js +++ b/lib/circuit.js @@ -98,6 +98,15 @@ Please use options.errorThresholdPercentage`; * @param {AbortController} options.abortController this allows Opossum to * signal upon timeout and properly abort your on going requests instead of * leaving it in the background + * @param {boolean} options.enableSnapshots whether to enable the rolling + * stats snapshots that opossum emits at the bucketInterval. Disable this + * as an optimization if you don't listen to the 'snapshot' event to reduce + * the number of timers opossum initiates. + * @param {EventEmitter} options.rotateBucketController if you have multiple + * breakers in your app, the number of timers across breakers can get costly. + * This option allows you to provide an EventEmitter that rotates the buckets + * so you can have one global timer in your app. Make sure that you are + * emitting a 'rotate' event from this EventEmitter * * * @fires CircuitBreaker#halfOpen @@ -159,6 +168,7 @@ class CircuitBreaker extends EventEmitter { this.options.cacheGetKey = options.cacheGetKey ?? ((...args) => JSON.stringify(args)); this.options.enableSnapshots = options.enableSnapshots !== false; + this.options.rotateBucketController = options.rotateBucketController; // Set default cache transport if not provided if (this.options.cache) { @@ -789,6 +799,7 @@ class CircuitBreaker extends EventEmitter { */ enable () { this[ENABLED] = true; + this.status.startListeneningForRotateEvent(); } /** @@ -798,6 +809,7 @@ class CircuitBreaker extends EventEmitter { */ disable () { this[ENABLED] = false; + this.status.removeRotateBucketControllerListener(); } } diff --git a/lib/status.js b/lib/status.js index b95c1ac0..53a05fcc 100644 --- a/lib/status.js +++ b/lib/status.js @@ -6,6 +6,7 @@ const TIMEOUT = Symbol('timeout'); const PERCENTILES = Symbol('percentiles'); const BUCKET_INTERVAL = Symbol('bucket-interval'); const SNAPSHOT_INTERVAL = Symbol('snapshot-interval'); +const ROTATE_EVENT_NAME = Symbol('rotate-event-name'); const EventEmitter = require('events').EventEmitter; @@ -56,6 +57,7 @@ class Status extends EventEmitter { this[TIMEOUT] = options.rollingCountTimeout || 10000; this[WINDOW] = new Array(this[BUCKETS]); this[PERCENTILES] = [0.0, 0.25, 0.5, 0.75, 0.9, 0.95, 0.99, 0.995, 1]; + this[ROTATE_EVENT_NAME] = 'rotate'; // Default this value to true this.rollingPercentilesEnabled = @@ -64,17 +66,25 @@ class Status extends EventEmitter { // Default this value to true this.enableSnapshots = options.enableSnapshots !== false; + // can be undefined + this.rotateBucketController = options.rotateBucketController; + this.rotateBucket = nextBucket(this[WINDOW]); + // prime the window with buckets for (let i = 0; i < this[BUCKETS]; i++) this[WINDOW][i] = bucket(); - // rotate the buckets periodically const bucketInterval = Math.floor(this[TIMEOUT] / this[BUCKETS]); - this[BUCKET_INTERVAL] = setInterval(nextBucket(this[WINDOW]), - bucketInterval); - // No unref() in the browser - if (typeof this[BUCKET_INTERVAL].unref === 'function') { - this[BUCKET_INTERVAL].unref(); + if (this.rotateBucketController) { + // rotate the buckets based on an optional EventEmitter + this.startListeneningForRotateEvent(); + } else { + // or rotate the buckets periodically + this[BUCKET_INTERVAL] = setInterval(this.rotateBucket, bucketInterval); + // No unref() in the browser + if (typeof this[BUCKET_INTERVAL].unref === 'function') { + this[BUCKET_INTERVAL].unref(); + } } /** @@ -173,11 +183,34 @@ class Status extends EventEmitter { shutdown () { this.removeAllListeners(); - clearInterval(this[BUCKET_INTERVAL]); + // interval is not set if rotateBucketController is provided + if (this.rotateBucketController === undefined) { + clearInterval(this[BUCKET_INTERVAL]); + } else { + this.removeRotateBucketControllerListener(); + } if (this.enableSnapshots) { clearInterval(this[SNAPSHOT_INTERVAL]); } } + + removeRotateBucketControllerListener () { + if (this.rotateBucketController) { + this.rotateBucketController.removeListener(this[ROTATE_EVENT_NAME], + this.rotateBucket); + } + } + + startListeneningForRotateEvent () { + if ( + this.rotateBucketController && + this.rotateBucketController.listenerCount(this[ROTATE_EVENT_NAME], + this.rotateBucket) === 0 + ) { + this.rotateBucketController.on(this[ROTATE_EVENT_NAME], + this.rotateBucket); + } + } } const nextBucket = window => _ => { diff --git a/test/browser/generate-index.sh b/test/browser/generate-index.sh index 560b0080..e92861bc 100755 --- a/test/browser/generate-index.sh +++ b/test/browser/generate-index.sh @@ -3,7 +3,7 @@ echo $PWD cd test -file_list=$(ls -1 | grep .js) +file_list=$(ls -1 | grep .js | grep -v rolling-event-emitter) cd .. requires="" diff --git a/test/enable-disable-test.js b/test/enable-disable-test.js index 132c4750..d76c178d 100644 --- a/test/enable-disable-test.js +++ b/test/enable-disable-test.js @@ -51,7 +51,7 @@ test('When disabled the circuit should always be closed', t => { breaker.fire(-1) .catch(e => t.equals(e, 'Error: -1 is < 0')) .then(() => { - t.ok(breaker.opened, 'should be closed'); + t.ok(breaker.opened, 'should be open'); }) .then(_ => breaker.shutdown()) .then(t.end); diff --git a/test/rolling-event-emitter-test.js b/test/rolling-event-emitter-test.js new file mode 100644 index 00000000..b928ec94 --- /dev/null +++ b/test/rolling-event-emitter-test.js @@ -0,0 +1,81 @@ +'use strict'; + +const test = require('tape'); +const CircuitBreaker = require('../'); +const { passFail } = require('./common'); +const EventEmitter = require('events').EventEmitter; + +test('When disabled, the event emitter listener should be removed', t => { + t.plan(2); + const emitter = new EventEmitter(); + const breaker = new CircuitBreaker(passFail, { + rotateBucketController: emitter + }); + t.equals(emitter.listeners('rotate').length, 1, 'listener attached automatically'); + breaker.disable(); + t.equals(emitter.listeners('rotate').length, 0, 'listener removed when breaker disabled'); + breaker.shutdown(); + t.end(); +}); + +test('Event listener should be removed only for the breaker that is disabled', t => { + t.plan(2); + const emitter = new EventEmitter(); + const breakerToBeDisabled = new CircuitBreaker(passFail, { + rotateBucketController: emitter + }); + const breakerNotToBeDisabled = new CircuitBreaker(passFail, { + rotateBucketController: emitter + }); + t.equals(emitter.listeners('rotate').length, 2, '1 listener attached for each breaker'); + breakerToBeDisabled.disable(); + t.equals(emitter.listeners('rotate').length, 1, '1 listener should be disabled and 1 should remain'); + breakerToBeDisabled.shutdown(); + breakerNotToBeDisabled.shutdown(); + t.end(); +}); + +test('Event listener should be re-added when circuit is re-enabled', t => { + t.plan(3); + const emitter = new EventEmitter(); + const breaker = new CircuitBreaker(passFail, { + rotateBucketController: emitter + }); + t.equals(emitter.listeners('rotate').length, 1, 'listener attached automatically'); + breaker.disable(); + t.equals(emitter.listeners('rotate').length, 0, 'listener removed when breaker disabled'); + breaker.enable(); + t.equals(emitter.listeners('rotate').length, 1, 'listener re-attached when breaker re-enabled'); + breaker.shutdown(); + t.end(); +}); + +test('Listener should not be attached with a call to enable if there is already a listener', t => { + t.plan(2); + const emitter = new EventEmitter(); + const breaker = new CircuitBreaker(passFail, { + rotateBucketController: emitter + }); + t.equals(emitter.listeners('rotate').length, 1, 'listener attached automatically'); + breaker.enable(); + t.equals(emitter.listeners('rotate').length, 1, 'listener should not be added again'); + breaker.shutdown(); + t.end(); +}); + +test('Listener should not be attached with a call to enable if there is already a listener and there is another breaker in the mix', t => { + t.plan(2); + const emitter = new EventEmitter(); + const breaker = new CircuitBreaker(passFail, { + rotateBucketController: emitter + }); + const anotherBreaker = new CircuitBreaker(passFail, { + rotateBucketController: emitter + }); + t.equals(emitter.listeners('rotate').length, 2, 'listener attached automatically'); + breaker.enable(); + t.equals(emitter.listeners('rotate').length, 2, 'listener should not be added again'); + breaker.shutdown(); + anotherBreaker.shutdown(); + t.end(); +}); diff --git a/test/status-test.js b/test/status-test.js index e4ee4c5a..9b5bc16a 100644 --- a/test/status-test.js +++ b/test/status-test.js @@ -1,5 +1,6 @@ 'use strict'; +const { EventEmitter } = require('events'); const test = require('tape'); const CircuitBreaker = require('../'); const Status = require('../lib/status.js'); @@ -194,3 +195,58 @@ test('CircuitBreaker status - enableSnapshots is false in Status when set to fal breaker.shutdown(); t.end(); }); + +test('CircuitBreaker status - breaker stats should not reset when rotateBucketController provided and no event emitted', t => { + t.plan(1); + + const emitter = new EventEmitter(); + const breaker = new CircuitBreaker(passFail, { + rotateBucketController: emitter, + rollingCountTimeout: 10 + }); + + breaker.fire(-1) + .catch(() => { + setTimeout(() => { + t.equal(breaker.status.stats.failures, 1, 'failures do not reset because no rotate event is emitted'); + breaker.shutdown(); + t.end(); + }, 100); + }); +}); + +test('CircuitBreaker status - breaker stats should rotate when rotateBucketController provided and "rotate" event emitted', t => { + t.plan(1); + + const emitter = new EventEmitter(); + const breaker = new CircuitBreaker(passFail, { + rotateBucketController: emitter, + rollingCountBuckets: 1, + rollingCountTimeout: 10000000 + }); + + breaker.fire(-1) + .catch(() => { + emitter.emit('rotate'); + t.equal(breaker.status.stats.failures, 0, 'failures reset buckets are rotated by EventEmitter'); + breaker.shutdown(); + t.end(); + }); +}); + +test('CircuitBreaker status - breaker stats should reset when rotateBucketController not provided', t => { + t.plan(1); + + const breaker = new CircuitBreaker(passFail, { + rollingCountTimeout: 10 + }); + + breaker.fire(-1) + .catch(() => { + setTimeout(() => { + t.equal(breaker.status.stats.failures, 0, 'failures reset because no event emitter is provided and rollingCountTimeout set to 10ms'); + breaker.shutdown(); + t.end(); + }, 100); + }); +});