Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: internal listeners infinite retry loop #284

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]
### Fixed
- Avoid risk of infinte retry loops when fetching new blocks ([#284](https://github.com/MetaMask/eth-block-tracker/pull/284))
mikesposito marked this conversation as resolved.
Show resolved Hide resolved

## [11.0.2]
### Fixed
Expand Down
27 changes: 27 additions & 0 deletions src/PollingBlockTracker.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,33 @@ describe('PollingBlockTracker', () => {
);
});

it('should not retry failed requests after the block tracker is stopped', async () => {
Copy link
Contributor

@mcmire mcmire Nov 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, this test does not seem to test that requests are not retried. I believe what it tests is that the promise that getLatestBlock returns eventually resolves or is rejected. Should we be more clear here? Something like:

Suggested change
it('should not retry failed requests after the block tracker is stopped', async () => {
it('should return a promise that rejects if the request for the block number fails and the block tracker is then stopped', async () => {

Or do we have a way to test that requests are not retried?

Copy link
Contributor

@mcmire mcmire Nov 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, perhaps the original test name is fine. I guess the test would fail if the polling loop continued because we are using numAutomaticCalls: 1. Maybe the new test name I am suggesting is appropriate for the case you mentioned above, or something.

recordCallsToSetTimeout({ numAutomaticCalls: 1 });

await withPollingBlockTracker(
{
provider: {
stubs: [
{
methodName: 'eth_blockNumber',
error: 'boom',
},
],
},
},
async ({ blockTracker }) => {
const latestBlockPromise = blockTracker.getLatestBlock();

expect(blockTracker.isRunning()).toBe(true);
await blockTracker.destroy();
await expect(latestBlockPromise).rejects.toThrow(
'Block tracker ended before latest block was available',
);
expect(blockTracker.isRunning()).toBe(false);
},
);
});

it('request the latest block number with `skipCache: true` if the block tracker was initialized with `setSkipCacheFlag: true`', async () => {
recordCallsToSetTimeout();

Expand Down
83 changes: 74 additions & 9 deletions src/PollingBlockTracker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ const log = createModuleLogger(projectLogger, 'polling-block-tracker');
const createRandomId = getCreateRandomId();
const sec = 1000;

const calculateSum = (accumulator: number, currentValue: number) =>
accumulator + currentValue;
const blockTrackerEvents: (string | symbol)[] = ['sync', 'latest'];

export interface PollingBlockTrackerOptions {
Expand All @@ -28,6 +26,8 @@ interface ExtendedJsonRpcRequest extends JsonRpcRequest<[]> {
skipCache?: boolean;
}

type InternalListener = (value: string | PromiseLike<string>) => void;

export class PollingBlockTracker
extends SafeEventEmitter
implements BlockTracker
Expand All @@ -54,6 +54,8 @@ export class PollingBlockTracker

private readonly _setSkipCacheFlag: boolean;

readonly #internalEventListeners: InternalListener[] = [];

constructor(opts: PollingBlockTrackerOptions = {}) {
// parse + validate args
if (!opts.provider) {
Expand Down Expand Up @@ -89,7 +91,19 @@ export class PollingBlockTracker
async destroy() {
this._cancelBlockResetTimeout();
this._maybeEnd();
super.removeAllListeners();
this.eventNames().forEach((eventName) =>
Copy link
Contributor

@mcmire mcmire Nov 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to explain why excluding internal listeners are necessary? Perhaps something like:

Suggested change
this.eventNames().forEach((eventName) =>
// The `getLatestBlock` method waits for the latest block to be fetched in
// the next iteration of the polling loop. It does this by listening to the
// `latest` event, and it handles errors that occur during fetching by
// listening to the `error` event. Because `getLatestBlock` is actively
// relying on these listeners to get called in order to resolve or reject a
// promise, we don't want to remove them (otherwise the polling loop will
// run forever and the promise will never get fulfilled). We will handle
// removing them manually.
this.eventNames().forEach((eventName) =>

Copy link
Contributor

@mcmire mcmire Nov 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By the way, I was playing around with this PR and it seems that if you revert the changes to destroy and _getBlockTrackerEventCount, then your test still works. It seems that listening to error solved at least one bug. But perhaps there is another test we need to write to verify the endless loop bug? I can take another look tomorrow to see what that would be.

this.listeners(eventName).forEach((listener) => {
if (
this.#internalEventListeners.every(
(internalListener) => !Object.is(internalListener, listener),
)
) {
mcmire marked this conversation as resolved.
Show resolved Hide resolved
// @ts-expect-error this listener comes from SafeEventEmitter itself, though
// its type differs between `.listeners()` and `.removeListener()`
this.removeListener(eventName, listener);
}
}),
);
}

isRunning(): boolean {
Expand All @@ -106,9 +120,32 @@ export class PollingBlockTracker
return this._currentBlock;
}
// wait for a new latest block
const latestBlock: string = await new Promise((resolve) =>
this.once('latest', resolve),
);
const latestBlock: string = await new Promise((resolve, reject) => {
// eslint-disable-next-line prefer-const
let onLatestBlockUnavailable: InternalListener;
mikesposito marked this conversation as resolved.
Show resolved Hide resolved
const onLatestBlockAvailable = (value: string | PromiseLike<string>) => {
this.#removeInternalListener(onLatestBlockAvailable);
this.removeListener('error', onLatestBlockUnavailable);
resolve(value);
};
onLatestBlockUnavailable = () => {
// if the block tracker is no longer running, reject
// and remove the listeners
if (!this._isRunning) {
this.#removeInternalListener(onLatestBlockAvailable);
this.#removeInternalListener(onLatestBlockUnavailable);
this.removeListener('latest', onLatestBlockAvailable);
this.removeListener('error', onLatestBlockUnavailable);
reject(
new Error('Block tracker ended before latest block was available'),
);
}
};
this.#addInternalListener(onLatestBlockAvailable);
this.#addInternalListener(onLatestBlockUnavailable);
this.once('latest', onLatestBlockAvailable);
this.on('error', onLatestBlockUnavailable);
mikesposito marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I'm thinking that this would not cover the scenario where the block tracker is destroyed before fetching the first block and without throwing errors.

We can probably listen to _ended instead

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like _ended could present other issues, because of the weird order of events received:

  1. _start is emitted
  2. Latest block is fetched from the provider
  3. _ended event listener is executed
  4. latest event listener is executed

Copy link
Contributor

@mcmire mcmire Nov 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly, because getLatestBlock now relies on latest or error to fire in order to resolve the promise it creates, if that never happens because the fetch call never happens, then getLatestBlock will just hang, even if destroy is called?

I think listening to _ended could make sense. Perhaps we add a check in onLatestBlockAvailable that only resolves the promise if the block runner is still running?

Copy link
Contributor

@mcmire mcmire Nov 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am slow today :/ It seems that Mark's solution in #286 may solve that case, because it grants destroy access to the promise that getLatestBlock creates, and we can just force-reject it if it hasn't resolved yet.

});
// return newly set current block
return latestBlock;
}
Expand Down Expand Up @@ -179,9 +216,17 @@ export class PollingBlockTracker
}

private _getBlockTrackerEventCount(): number {
return blockTrackerEvents
.map((eventName) => this.listenerCount(eventName))
.reduce(calculateSum);
return (
blockTrackerEvents
.map((eventName) => this.listeners(eventName))
.flat()
// internal listeners are not included in the count
.filter((listener) =>
this.#internalEventListeners.every(
(internalListener) => !Object.is(internalListener, listener),
),
mikesposito marked this conversation as resolved.
Show resolved Hide resolved
).length
);
}

private _shouldUseNewBlock(newBlock: string) {
Expand Down Expand Up @@ -299,6 +344,15 @@ export class PollingBlockTracker

try {
this.emit('error', newErr);
if (
this.listeners('error').filter((listener) =>
this.#internalEventListeners.every(
(internalListener) => !Object.is(listener, internalListener),
),
mcmire marked this conversation as resolved.
Show resolved Hide resolved
).length === 0
) {
console.error(newErr);
}
mikesposito marked this conversation as resolved.
Show resolved Hide resolved
} catch (emitErr) {
console.error(newErr);
}
Expand Down Expand Up @@ -333,6 +387,17 @@ export class PollingBlockTracker
this._pollingTimeout = undefined;
}
}

#addInternalListener(listener: InternalListener) {
this.#internalEventListeners.push(listener);
}

#removeInternalListener(listener: InternalListener) {
this.#internalEventListeners.splice(
this.#internalEventListeners.indexOf(listener),
1,
);
}
}

/**
Expand Down
27 changes: 27 additions & 0 deletions src/SubscribeBlockTracker.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,33 @@ describe('SubscribeBlockTracker', () => {
});
});

it('should not retry failed requests after the block tracker is stopped', async () => {
recordCallsToSetTimeout({ numAutomaticCalls: 1 });

await withSubscribeBlockTracker(
{
provider: {
stubs: [
{
methodName: 'eth_blockNumber',
error: 'boom',
},
],
},
},
async ({ blockTracker }) => {
const latestBlockPromise = blockTracker[methodToGetLatestBlock]();

expect(blockTracker.isRunning()).toBe(true);
await blockTracker.destroy();
await expect(latestBlockPromise).rejects.toThrow(
'Block tracker ended before latest block was available',
);
expect(blockTracker.isRunning()).toBe(false);
},
);
});

it('should fetch the latest block number', async () => {
recordCallsToSetTimeout();

Expand Down
76 changes: 67 additions & 9 deletions src/SubscribeBlockTracker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ const createRandomId = getCreateRandomId();

const sec = 1000;

const calculateSum = (accumulator: number, currentValue: number) =>
accumulator + currentValue;
const blockTrackerEvents: (string | symbol)[] = ['sync', 'latest'];

export interface SubscribeBlockTrackerOptions {
Expand All @@ -25,6 +23,8 @@ interface SubscriptionNotificationParams {
result: { number: string };
}

type InternalListener = (value: string | PromiseLike<string>) => void;

export class SubscribeBlockTracker
extends SafeEventEmitter
implements BlockTracker
Expand All @@ -43,6 +43,10 @@ export class SubscribeBlockTracker

private _subscriptionId: string | null;

readonly #internalEventListeners: ((
value: string | PromiseLike<string>,
) => void)[] = [];

constructor(opts: SubscribeBlockTrackerOptions = {}) {
// parse + validate args
if (!opts.provider) {
Expand Down Expand Up @@ -74,7 +78,19 @@ export class SubscribeBlockTracker
async destroy() {
this._cancelBlockResetTimeout();
await this._maybeEnd();
super.removeAllListeners();
this.eventNames().forEach((eventName) =>
this.listeners(eventName).forEach((listener) => {
if (
this.#internalEventListeners.every(
(internalListener) => !Object.is(internalListener, listener),
)
) {
// @ts-expect-error this listener comes from SafeEventEmitter itself, though
// its type differs between `.listeners()` and `.removeListener()`
this.removeListener(eventName, listener);
}
}),
);
}

isRunning(): boolean {
Expand All @@ -91,9 +107,32 @@ export class SubscribeBlockTracker
return this._currentBlock;
}
// wait for a new latest block
const latestBlock: string = await new Promise((resolve) =>
this.once('latest', resolve),
);
const latestBlock: string = await new Promise((resolve, reject) => {
// eslint-disable-next-line prefer-const
let onLatestBlockUnavailable: InternalListener;
const onLatestBlockAvailable = (value: string | PromiseLike<string>) => {
this.#removeInternalListener(onLatestBlockAvailable);
this.removeListener('error', onLatestBlockUnavailable);
resolve(value);
};
onLatestBlockUnavailable = () => {
// if the block tracker is no longer running, reject
// and remove the listeners
if (!this._isRunning) {
this.#removeInternalListener(onLatestBlockAvailable);
this.#removeInternalListener(onLatestBlockUnavailable);
this.removeListener('latest', onLatestBlockAvailable);
this.removeListener('error', onLatestBlockUnavailable);
reject(
new Error('Block tracker ended before latest block was available'),
);
}
};
this.#addInternalListener(onLatestBlockAvailable);
this.#addInternalListener(onLatestBlockUnavailable);
this.once('latest', onLatestBlockAvailable);
this.on('error', onLatestBlockUnavailable);
});
// return newly set current block
return latestBlock;
}
Expand Down Expand Up @@ -162,9 +201,17 @@ export class SubscribeBlockTracker
}

private _getBlockTrackerEventCount(): number {
return blockTrackerEvents
.map((eventName) => this.listenerCount(eventName))
.reduce(calculateSum);
return (
blockTrackerEvents
.map((eventName) => this.listeners(eventName))
.flat()
// internal listeners are not included in the count
.filter((listener) =>
this.#internalEventListeners.every(
(internalListener) => !Object.is(internalListener, listener),
),
).length
);
}

private _shouldUseNewBlock(newBlock: string) {
Expand Down Expand Up @@ -271,6 +318,17 @@ export class SubscribeBlockTracker
this._newPotentialLatest(response.params.result.number);
}
}

#addInternalListener(listener: InternalListener) {
this.#internalEventListeners.push(listener);
}

#removeInternalListener(listener: InternalListener) {
this.#internalEventListeners.splice(
this.#internalEventListeners.indexOf(listener),
1,
);
}
}

/**
Expand Down
Loading