Skip to content

Commit

Permalink
Simplify streaming reconnection (#4797)
Browse files Browse the repository at this point in the history
* init commit

* add back public declarations

* use asyncwait, no arrow function
  • Loading branch information
briangregoryholmes authored May 10, 2024
1 parent 3092347 commit 5abee16
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 85 deletions.
56 changes: 20 additions & 36 deletions web-common/src/runtime-client/exponential-backoff-tracker.ts
Original file line number Diff line number Diff line change
@@ -1,52 +1,36 @@
import { asyncWait } from "@rilldata/web-common/lib/waitUtils";
import { asyncWait } from "../lib/waitUtils";

export class ExponentialBackoffTracker {
private curTime: number;
private curRetries = 0;
private trackerPeriod: number;
private currentDelay: number;

public constructor(
private readonly retries: number,
/**
* Time period within which to trigger the tracker.
* Any failure after this will be considered as an intermittent failure.
*/
private readonly trackerTriggerPeriod: number,
// time
private readonly waitPeriod: number,
private readonly maxRetries: number,
private readonly initialDelay: number,
) {
this.trackerPeriod = trackerTriggerPeriod;
this.currentDelay = initialDelay;
}

public static createBasicTracker() {
return new ExponentialBackoffTracker(5, 1000, 250);
return new ExponentialBackoffTracker(5, 1000);
}

public async failed(): Promise<boolean> {
const lastTime = this.curTime;
this.curTime = Date.now();
public async try(fn: () => Promise<void> | void) {
try {
await fn();

// if failed after the tracker period, reset everything
if (this.curTime - lastTime >= this.trackerPeriod) {
this.reset();
await asyncWait(this.waitPeriod);
return true;
}

// if retry count has reached max return false to stop the connection
if (this.curRetries === this.retries) return false;
this.curRetries = 0;
this.currentDelay = this.initialDelay;
} catch (e) {
if (this.curRetries >= this.maxRetries) {
throw e;
}

// increment retry and update the tracker periods.
this.curRetries++;
// A simple function to increase the tracking period.
this.trackerPeriod = this.trackerTriggerPeriod * 2 ** this.curRetries;
// multiply the retires to the wait period as well.
await asyncWait(this.waitPeriod * 2 ** this.curRetries);
return true;
}
this.currentDelay = this.initialDelay * 2 ** this.curRetries;
await asyncWait(this.currentDelay);

private reset() {
this.curRetries = 0;
this.trackerPeriod = this.trackerTriggerPeriod;
this.curRetries++;
return this.try(fn);
}
}
}
74 changes: 25 additions & 49 deletions web-common/src/runtime-client/watch-request-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type Callback<T, K extends keyof EventMap<T>> = (
export class WatchRequestClient<Res extends WatchResponse> {
private url: string | undefined;
private controller: AbortController | undefined;
private stream: AsyncGenerator<StreamingFetchResponse<Res>> | undefined;
private tracker = ExponentialBackoffTracker.createBasicTracker();
private outOfFocusThrottler = new Throttler(10000);
private listeners: Listeners<Res> = new Map([
Expand All @@ -50,12 +51,19 @@ export class WatchRequestClient<Res extends WatchResponse> {
public watch(url: string) {
this.cancel();
this.url = url;
this.restart().catch(console.log);
this.init();
this.listen().catch(console.error);
}

public cancel() {
this.controller?.abort();
this.controller = undefined;
this.stream = this.controller = undefined;
}

public init() {
if (!this.url) throw new Error("URL not set");
this.controller = new AbortController();
this.stream = this.getFetchStream(this.url, this.controller);
}

public throttle() {
Expand All @@ -72,55 +80,27 @@ export class WatchRequestClient<Res extends WatchResponse> {
// The stream was not cancelled, so don't reconnect
if (this.controller && !this.controller.signal.aborted) return;

this.restart().catch(console.log);
this.init();
this.listen().catch(console.error);

// Reconnecting, notify listeners
this.emitReconnect();
this.listeners.get("reconnect")?.forEach((cb) => void cb());
}

/**
* (re)starts the stream connection for watch request.
* If there is a disconnect then it reconnects with exponential backoff.
*/
private async restart() {
if (!this.url) return console.error("Unable to reconnect without a URL.");
// abort previous connections before starting a new one
this.controller?.abort();
this.controller = new AbortController();
private async listen() {
if (!this.stream) return;
try {
for await (const res of this.stream) {
if (this.controller?.signal.aborted) break;
if (res.error) throw new Error(res.error.message);

let firstRun = true;
// Maintain the controller here to make sure we check `aborted` for the correct one.
// Checking for `this.controller` might lead to edge cases where it has a newer controller.
let controller = this.controller;
while (!controller.signal.aborted) {
if (!firstRun) {
// Reconnecting, notify listeners
this.emitReconnect();
// safeguard to cancel the request if not already cancelled
controller.abort();
controller = new AbortController();
}
firstRun = false;

this.controller = controller;
try {
const stream = this.getFetchStream(this.url, this.controller);
for await (const res of stream) {
if (controller.signal.aborted) return;
if (res.error) throw new Error(res.error.message);

if (res.result) {
this.listeners
.get("response")
?.forEach((cb) => void cb(res.result));
}
}
} catch (err) {
if (!(await this.tracker.failed())) {
// No point in continuing retry once we have failed enough times
return;
}
if (res.result)
this.listeners.get("response")?.forEach((cb) => void cb(res.result));
}
} catch (err) {
// Stream failed, attempt to reconnect with exponential backoff
this.controller = undefined;
this.tracker.try(() => this.reconnect());
}
}

Expand All @@ -139,8 +119,4 @@ export class WatchRequestClient<Res extends WatchResponse> {
controller.signal,
);
}

private emitReconnect() {
this.listeners.get("reconnect")?.forEach((cb) => void cb());
}
}

1 comment on commit 5abee16

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.