Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add option for user-provided EventEmitter to control buckets #809

Merged
merged 18 commits into from
Sep 13, 2023
Merged
12 changes: 12 additions & 0 deletions lib/circuit.js
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,15 @@ Please use options.errorThresholdPercentage`;
* @param {AbortController} options.abortController this allows Opossum to
* signal upon timeout and properly abort your on going requests instead of
* leaving it in the background
* @param {boolean} options.enableSnapshots whether to enable the rolling
* stats snapshots that opossum emits at the bucketInterval. Disable this
* as an optimization if you don't listen to the 'snapshot' event to reduce
* the number of timers opossum initiates.
* @param {EventEmitter} options.rotateBucketController if you have multiple
* breakers in your app, the number of timers across breakers can get costly.
* This option allows you to provide an EventEmitter that rotates the buckets
* so you can have one global timer in your app. Make sure that you are
* emitting a 'rotate' event from this EventEmitter
*
*
* @fires CircuitBreaker#halfOpen
Expand Down Expand Up @@ -159,6 +168,7 @@ class CircuitBreaker extends EventEmitter {
this.options.cacheGetKey = options.cacheGetKey ??
((...args) => JSON.stringify(args));
this.options.enableSnapshots = options.enableSnapshots !== false;
this.options.rotateBucketController = options.rotateBucketController;

// Set default cache transport if not provided
if (this.options.cache) {
Expand Down Expand Up @@ -789,6 +799,7 @@ class CircuitBreaker extends EventEmitter {
*/
enable () {
this[ENABLED] = true;
this.status.startListeneningForRotateEvent();
}

/**
Expand All @@ -798,6 +809,7 @@ class CircuitBreaker extends EventEmitter {
*/
disable () {
this[ENABLED] = false;
this.status.removeRotateBucketControllerListener();
}
}

Expand Down
47 changes: 40 additions & 7 deletions lib/status.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ const TIMEOUT = Symbol('timeout');
const PERCENTILES = Symbol('percentiles');
const BUCKET_INTERVAL = Symbol('bucket-interval');
const SNAPSHOT_INTERVAL = Symbol('snapshot-interval');
const ROTATE_EVENT_NAME = Symbol('rotate-event-name');

const EventEmitter = require('events').EventEmitter;

Expand Down Expand Up @@ -56,6 +57,7 @@ class Status extends EventEmitter {
this[TIMEOUT] = options.rollingCountTimeout || 10000;
this[WINDOW] = new Array(this[BUCKETS]);
this[PERCENTILES] = [0.0, 0.25, 0.5, 0.75, 0.9, 0.95, 0.99, 0.995, 1];
this[ROTATE_EVENT_NAME] = 'rotate';

// Default this value to true
this.rollingPercentilesEnabled =
Expand All @@ -64,17 +66,25 @@ class Status extends EventEmitter {
// Default this value to true
this.enableSnapshots = options.enableSnapshots !== false;

// can be undefined
this.rotateBucketController = options.rotateBucketController;
this.rotateBucket = nextBucket(this[WINDOW]);

// prime the window with buckets
for (let i = 0; i < this[BUCKETS]; i++) this[WINDOW][i] = bucket();

// rotate the buckets periodically
const bucketInterval = Math.floor(this[TIMEOUT] / this[BUCKETS]);
this[BUCKET_INTERVAL] = setInterval(nextBucket(this[WINDOW]),
bucketInterval);

// No unref() in the browser
if (typeof this[BUCKET_INTERVAL].unref === 'function') {
this[BUCKET_INTERVAL].unref();
if (this.rotateBucketController) {
// rotate the buckets based on an optional EventEmitter
this.startListeneningForRotateEvent();
} else {
// or rotate the buckets periodically
this[BUCKET_INTERVAL] = setInterval(this.rotateBucket, bucketInterval);
// No unref() in the browser
if (typeof this[BUCKET_INTERVAL].unref === 'function') {
this[BUCKET_INTERVAL].unref();
}
}

/**
Expand Down Expand Up @@ -173,11 +183,34 @@ class Status extends EventEmitter {

shutdown () {
this.removeAllListeners();
clearInterval(this[BUCKET_INTERVAL]);
// interval is not set if rotateBucketController is provided
if (this.rotateBucketController === undefined) {
clearInterval(this[BUCKET_INTERVAL]);
} else {
this.removeRotateBucketControllerListener();
}
if (this.enableSnapshots) {
clearInterval(this[SNAPSHOT_INTERVAL]);
}
}

removeRotateBucketControllerListener () {
if (this.rotateBucketController) {
this.rotateBucketController.removeListener(this[ROTATE_EVENT_NAME],
this.rotateBucket);
}
}

startListeneningForRotateEvent () {
if (
this.rotateBucketController &&
this.rotateBucketController.listenerCount(this[ROTATE_EVENT_NAME],
this.rotateBucket) === 0
) {
this.rotateBucketController.on(this[ROTATE_EVENT_NAME],
this.rotateBucket);
}
}
}

const nextBucket = window => _ => {
Expand Down
2 changes: 1 addition & 1 deletion test/browser/generate-index.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
echo $PWD

cd test
file_list=$(ls -1 | grep .js)
file_list=$(ls -1 | grep .js | grep -v rolling-event-emitter)
cd ..
requires=""

Expand Down
2 changes: 1 addition & 1 deletion test/enable-disable-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ test('When disabled the circuit should always be closed', t => {
breaker.fire(-1)
.catch(e => t.equals(e, 'Error: -1 is < 0'))
.then(() => {
t.ok(breaker.opened, 'should be closed');
t.ok(breaker.opened, 'should be open');
Copy link
Contributor Author

@gjethwani gjethwani Sep 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lholmquist Was this a typo? If yes, I took the liberty of correcting it :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤣 most likely. damn copy/paste

})
.then(_ => breaker.shutdown())
.then(t.end);
Expand Down
81 changes: 81 additions & 0 deletions test/rolling-event-emitter-test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
'use strict';

const test = require('tape');
const CircuitBreaker = require('../');
const { passFail } = require('./common');
const EventEmitter = require('events').EventEmitter;

test('When disabled, the event emitter listener should be removed', t => {
t.plan(2);
const emitter = new EventEmitter();
const breaker = new CircuitBreaker(passFail, {
rotateBucketController: emitter
});
t.equals(emitter.listeners('rotate').length, 1, 'listener attached automatically');
breaker.disable();
t.equals(emitter.listeners('rotate').length, 0, 'listener removed when breaker disabled');
breaker.shutdown();
t.end();
});

test('Event listener should be removed only for the breaker that is disabled', t => {
t.plan(2);
const emitter = new EventEmitter();
const breakerToBeDisabled = new CircuitBreaker(passFail, {
rotateBucketController: emitter
});
const breakerNotToBeDisabled = new CircuitBreaker(passFail, {
rotateBucketController: emitter
});
t.equals(emitter.listeners('rotate').length, 2, '1 listener attached for each breaker');
breakerToBeDisabled.disable();
t.equals(emitter.listeners('rotate').length, 1, '1 listener should be disabled and 1 should remain');
breakerToBeDisabled.shutdown();
breakerNotToBeDisabled.shutdown();
t.end();
});

test('Event listener should be re-added when circuit is re-enabled', t => {
t.plan(3);
const emitter = new EventEmitter();
const breaker = new CircuitBreaker(passFail, {
rotateBucketController: emitter
});
t.equals(emitter.listeners('rotate').length, 1, 'listener attached automatically');
breaker.disable();
t.equals(emitter.listeners('rotate').length, 0, 'listener removed when breaker disabled');
breaker.enable();
t.equals(emitter.listeners('rotate').length, 1, 'listener re-attached when breaker re-enabled');
breaker.shutdown();
t.end();
});

test('Listener should not be attached with a call to enable if there is already a listener', t => {
t.plan(2);
const emitter = new EventEmitter();
const breaker = new CircuitBreaker(passFail, {
rotateBucketController: emitter
});
t.equals(emitter.listeners('rotate').length, 1, 'listener attached automatically');
breaker.enable();
t.equals(emitter.listeners('rotate').length, 1, 'listener should not be added again');
breaker.shutdown();
t.end();
});

test('Listener should not be attached with a call to enable if there is already a listener and there is another breaker in the mix', t => {
t.plan(2);
const emitter = new EventEmitter();
const breaker = new CircuitBreaker(passFail, {
rotateBucketController: emitter
});
const anotherBreaker = new CircuitBreaker(passFail, {
rotateBucketController: emitter
});
t.equals(emitter.listeners('rotate').length, 2, 'listener attached automatically');
breaker.enable();
t.equals(emitter.listeners('rotate').length, 2, 'listener should not be added again');
breaker.shutdown();
anotherBreaker.shutdown();
t.end();
});
56 changes: 56 additions & 0 deletions test/status-test.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
'use strict';

const { EventEmitter } = require('events');
const test = require('tape');
const CircuitBreaker = require('../');
const Status = require('../lib/status.js');
Expand Down Expand Up @@ -194,3 +195,58 @@ test('CircuitBreaker status - enableSnapshots is false in Status when set to fal
breaker.shutdown();
t.end();
});

test('CircuitBreaker status - breaker stats should not reset when rotateBucketController provided and no event emitted', t => {
t.plan(1);

const emitter = new EventEmitter();
const breaker = new CircuitBreaker(passFail, {
rotateBucketController: emitter,
rollingCountTimeout: 10
});

breaker.fire(-1)
.catch(() => {
setTimeout(() => {
t.equal(breaker.status.stats.failures, 1, 'failures do not reset because no rotate event is emitted');
breaker.shutdown();
t.end();
}, 100);
});
});

test('CircuitBreaker status - breaker stats should rotate when rotateBucketController provided and "rotate" event emitted', t => {
t.plan(1);

const emitter = new EventEmitter();
const breaker = new CircuitBreaker(passFail, {
rotateBucketController: emitter,
rollingCountBuckets: 1,
rollingCountTimeout: 10000000
});

breaker.fire(-1)
.catch(() => {
emitter.emit('rotate');
t.equal(breaker.status.stats.failures, 0, 'failures reset buckets are rotated by EventEmitter');
breaker.shutdown();
t.end();
});
});

test('CircuitBreaker status - breaker stats should reset when rotateBucketController not provided', t => {
t.plan(1);

const breaker = new CircuitBreaker(passFail, {
rollingCountTimeout: 10
});

breaker.fire(-1)
.catch(() => {
setTimeout(() => {
t.equal(breaker.status.stats.failures, 0, 'failures reset because no event emitter is provided and rollingCountTimeout set to 10ms');
breaker.shutdown();
t.end();
}, 100);
});
});