Skip to content

Commit

Permalink
feat: replace upgrade with websocket+base mechanism
Browse files Browse the repository at this point in the history
  • Loading branch information
owenpearson committed Mar 5, 2024
1 parent 654814c commit 2fb92ee
Show file tree
Hide file tree
Showing 25 changed files with 568 additions and 1,298 deletions.
4 changes: 1 addition & 3 deletions src/common/lib/client/realtimechannel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -753,9 +753,7 @@ class RealtimeChannel extends EventEmitter {
checkPendingState(): void {
/* if can't send events, do nothing */
const cmState = this.connectionManager.state;
/* Allow attach messages to queue up when synchronizing, since this will be
* the state we'll be in when upgrade transport.active triggers a checkpendingstate */
if (!(cmState.sendEvents || cmState.forceQueueEvents)) {
if (!cmState.sendEvents) {
Logger.logAction(
Logger.LOG_MINOR,
'RealtimeChannel.checkPendingState',
Expand Down
772 changes: 264 additions & 508 deletions src/common/lib/transport/connectionmanager.ts

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions src/common/lib/transport/transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ abstract class Transport extends EventEmitter {
params.heartbeats = true;
}
this.connectionManager = connectionManager;
connectionManager.registerProposedTransport(this);
this.auth = auth;
this.params = params;
this.timeouts = params.options.timeouts;
Expand Down Expand Up @@ -296,7 +295,7 @@ abstract class Transport extends EventEmitter {
auth: Auth,
transportParams: TransportParams,
callback: TryConnectCallback
): void {
): Transport {
const transport = new transportCtor(connectionManager, auth, transportParams);

let transportAttemptTimer: NodeJS.Timeout | number;
Expand Down Expand Up @@ -324,6 +323,7 @@ abstract class Transport extends EventEmitter {
callback(null, transport);
});
transport.connect();
return transport;
}

onAuthUpdated?: (tokenDetails: API.TokenDetails) => void;
Expand Down
22 changes: 21 additions & 1 deletion src/common/lib/transport/websockettransport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class WebSocketTransport extends Transport {
super(connectionManager, auth, params);
/* If is a browser, can't detect pings, so request protocol heartbeats */
params.heartbeats = Platform.Config.useProtocolHeartbeats;
this.wsHost = Defaults.getHost(params.options, params.host, true);
this.wsHost = params.host as string;
}

static isAvailable() {
Expand Down Expand Up @@ -207,6 +207,26 @@ class WebSocketTransport extends Transport {
});
}
}

async checkConnectivity() {
const ws = this.createWebSocket('wss://ws-up.ably-realtime-nonprod.com/', {});

return new Promise<void>((resolve, reject) => {
let finished = false;
ws.onopen = () => {
if (!finished) {
resolve();
ws.close();
}
};

ws.onclose = ws.onerror = () => {
if (!finished) {
reject();
}
};
});
}
}

export default WebSocketTransport;
15 changes: 8 additions & 7 deletions src/common/lib/util/defaults.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ type CompleteDefaults = IDefaults & {
connectionStateTtl: number;
realtimeRequestTimeout: number;
recvTimeout: number;
preferenceConnectTimeout: number;
parallelUpgradeDelay: number;
webSocketConnectTimeout: number;
webSocketSlowTimeout: number;
};
httpMaxRetryCount: number;
maxMessageSize: number;
Expand All @@ -40,7 +40,7 @@ type CompleteDefaults = IDefaults & {
getHttpScheme(options: ClientOptions): string;
environmentFallbackHosts(environment: string): string[];
getFallbackHosts(options: NormalisedClientOptions): string[];
getHosts(options: NormalisedClientOptions): string[];
getHosts(options: NormalisedClientOptions, ws?: boolean): string[];
checkHost(host: string): void;
getRealtimeHost(options: ClientOptions, production: boolean, environment: string): string;
objectifyOptions(options: ClientOptions | string): ClientOptions;
Expand Down Expand Up @@ -74,8 +74,8 @@ const Defaults = {
connectionStateTtl: 120000,
realtimeRequestTimeout: 10000,
recvTimeout: 90000,
preferenceConnectTimeout: 6000,
parallelUpgradeDelay: 6000,
webSocketConnectTimeout: 10000,
webSocketSlowTimeout: 4000,
},
httpMaxRetryCount: 3,
maxMessageSize: 65536,
Expand Down Expand Up @@ -130,8 +130,9 @@ export function getFallbackHosts(options: NormalisedClientOptions): string[] {
return fallbackHosts ? Utils.arrChooseN(fallbackHosts, httpMaxRetryCount) : [];
}

export function getHosts(options: NormalisedClientOptions): string[] {
return [options.restHost].concat(getFallbackHosts(options));
export function getHosts(options: NormalisedClientOptions, ws?: boolean): string[] {
const hosts = [options.restHost].concat(getFallbackHosts(options));
return ws ? hosts.map((host) => getHost(options, host, true)) : hosts;
}

function checkHost(host: string): void {
Expand Down
4 changes: 1 addition & 3 deletions src/common/types/IDefaults.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@ import { RestAgentOptions } from './ClientOptions';

export default interface IDefaults {
connectivityCheckUrl: string;
wsConnectivityUrl: string;
defaultTransports: TransportName[];
baseTransportOrder: TransportName[];
transportPreferenceOrder: TransportName[];
upgradeTransports: TransportName[];
restAgentOptions?: RestAgentOptions;
}
1 change: 0 additions & 1 deletion src/common/types/IPlatformConfig.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ export interface ISpecificPlatformConfig {
fetchSupported?: boolean;
xhrSupported?: boolean;
allowComet?: boolean;
streamingSupported?: boolean;
ArrayBuffer?: typeof ArrayBuffer | false;
atob?: typeof atob | null;
TextEncoder?: typeof TextEncoder;
Expand Down
1 change: 0 additions & 1 deletion src/platform/nativescript/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ var Config = {
WebSocket: WebSocket,
xhrSupported: XMLHttpRequest,
allowComet: true,
streamingSupported: false,
useProtocolHeartbeats: true,
supportsBinary: typeof TextDecoder !== 'undefined' && TextDecoder,
preferBinary: false, // Motivation as on web; see `preferBinary` comment there.
Expand Down
4 changes: 1 addition & 3 deletions src/platform/nodejs/lib/util/defaults.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,10 @@ import { TransportNames } from '../../../../common/constants/TransportName';

const Defaults: IDefaults = {
connectivityCheckUrl: 'https://internet-up.ably-realtime.com/is-the-internet-up.txt',
wsConnectivityUrl: 'wss://ws-up.ably-realtime-nonprod.com/',
/* Note: order matters here: the base transport is the leftmost one in the
* intersection of baseTransportOrder and the transports clientOption that's supported. */
defaultTransports: [TransportNames.WebSocket],
baseTransportOrder: [TransportNames.Comet, TransportNames.WebSocket],
transportPreferenceOrder: [TransportNames.Comet, TransportNames.WebSocket],
upgradeTransports: [TransportNames.WebSocket],
restAgentOptions: { maxSockets: 40, keepAlive: true },
};

Expand Down
1 change: 0 additions & 1 deletion src/platform/react-native/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ export default function (bufferUtils: typeof BufferUtils): IPlatformConfig {
WebSocket: WebSocket,
xhrSupported: true,
allowComet: true,
streamingSupported: true,
useProtocolHeartbeats: true,
supportsBinary: !!(typeof TextDecoder !== 'undefined' && TextDecoder),
preferBinary: false, // Motivation as on web; see `preferBinary` comment there.
Expand Down
1 change: 0 additions & 1 deletion src/platform/web/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ const Config: IPlatformConfig = {
fetchSupported: !!globalObject.fetch,
xhrSupported: globalObject.XMLHttpRequest && 'withCredentials' in new XMLHttpRequest(),
allowComet: allowComet(),
streamingSupported: true,
useProtocolHeartbeats: true,
supportsBinary: !!globalObject.TextDecoder,
/* Per Paddy (https://ably-real-time.slack.com/archives/CURL4U2FP/p1705674537763479) web intentionally prefers JSON to MessagePack:
Expand Down
4 changes: 1 addition & 3 deletions src/platform/web/lib/util/defaults.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,11 @@ import { TransportNames } from 'common/constants/TransportName';

const Defaults: IDefaults = {
connectivityCheckUrl: 'https://internet-up.ably-realtime.com/is-the-internet-up.txt',
wsConnectivityUrl: 'wss://ws-up.ably-realtime-nonprod.com/',
/* Order matters here: the base transport is the leftmost one in the
* intersection of baseTransportOrder and the transports clientOption that's
* supported. */
defaultTransports: [TransportNames.XhrPolling, TransportNames.WebSocket],
baseTransportOrder: [TransportNames.XhrPolling, TransportNames.WebSocket],
transportPreferenceOrder: [TransportNames.XhrPolling, TransportNames.WebSocket],
upgradeTransports: [TransportNames.WebSocket],
};

export default Defaults;
2 changes: 1 addition & 1 deletion test/browser/connection.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ define(['shared_helper', 'chai'], function (helper, chai) {
var realtime = helper.AblyRealtime();
try {
expect(realtime.connection.connectionManager.baseTransport).to.equal('xhr_polling');
expect(realtime.connection.connectionManager.upgradeTransports).to.deep.equal(['web_socket']);
expect(realtime.connection.connectionManager.webSocketTransport).to.be.ok;
} catch (err) {
closeAndFinish(done, realtime, err);
return;
Expand Down
18 changes: 9 additions & 9 deletions test/common/modules/shared_helper.js
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ define([
}

/* testFn is assumed to be a function of realtimeOptions that returns a mocha test */
function testOnAllTransports(name, testFn, excludeUpgrade, skip) {
function testOnAllTransports(name, testFn, skip) {
var itFn = skip ? it.skip : it;
let transports = availableTransports;
transports.forEach(function (transport) {
Expand All @@ -151,18 +151,16 @@ define([
testFn({ transports: [transport], useBinaryProtocol: false })
);
});
/* Plus one for no transport specified (ie use upgrade mechanism if
/* Plus one for no transport specified (ie use websocket/base mechanism if
* present). (we explicitly specify all transports since node only does
* nodecomet+upgrade if comet is explicitly requested
* websocket+nodecomet if comet is explicitly requested)
* */
if (!excludeUpgrade) {
itFn(name + '_with_binary_transport', testFn({ transports, useBinaryProtocol: true }));
itFn(name + '_with_text_transport', testFn({ transports, useBinaryProtocol: false }));
}
itFn(name + '_with_binary_transport', testFn({ transports, useBinaryProtocol: true }));
itFn(name + '_with_text_transport', testFn({ transports, useBinaryProtocol: false }));
}

testOnAllTransports.skip = function (name, testFn, excludeUpgrade) {
testOnAllTransports(name, testFn, excludeUpgrade, true);
testOnAllTransports.skip = function (name, testFn) {
testOnAllTransports(name, testFn, true);
};

function restTestOnJsonMsgpack(name, testFn, skip) {
Expand Down Expand Up @@ -221,6 +219,8 @@ define([
expect(json1 === json2, 'JSON data contents mismatch.').to.be.ok;
}

beforeEach(clearTransportPreference);

after(function (done) {
this.timeout(10 * 1000);
testAppModule.tearDown(function (err) {
Expand Down
8 changes: 1 addition & 7 deletions test/realtime/channel.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -280,9 +280,6 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, helper, async
},
true
);
/* NB upgrade is excluded because realtime now sends an ATTACHED
* post-upgrade, which can race with the DETACHED if the DETACH is only sent
* just after upgrade. Re-include it with 1.1 spec which has IDs in ATTACHs */

/*
* Attach with an empty channel and expect a channel error
Expand Down Expand Up @@ -690,10 +687,7 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, helper, async
},
function (cb) {
var channelUpdated = false;
// attached or update: in case this is happening in parallel with
// a transport upgrade, we might flip to attaching, meaning it'll come
// through as attached not update
channel._allChannelChanges.on(['attached', 'update'], function () {
channel._allChannelChanges.on(['update'], function () {
channelUpdated = true;
});

Expand Down
2 changes: 1 addition & 1 deletion test/realtime/connection.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, helper, async
/*
* Check that a message published on one transport that has not yet been
* acked will be republished with the same msgSerial on a new transport (eg
* after a resume or an upgrade), before any new messages are send (and
* after a resume), before any new messages are send (and
* without being merged with new messages)
*/
it('connectionQueuing', function (done) {
Expand Down
5 changes: 1 addition & 4 deletions test/realtime/event_emitter.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,7 @@ define(['shared_helper', 'chai'], function (helper, chai) {
*/
it('attachdetach0', function (done) {
try {
/* Note: realtime now sends an ATTACHED post-upgrade, which can race with
* the DETACHED if the DETACH is only sent just after upgrade. Remove
* bestTransport with 1.1 spec which has IDs in ATTACHs */
var realtime = helper.AblyRealtime({ transports: [helper.bestTransport] }),
var realtime = helper.AblyRealtime(),
index,
expectedConnectionEvents = ['connecting', 'connected', 'closing', 'closed'],
expectedChannelEvents = ['attaching', 'attached', 'detaching', 'detached'];
Expand Down
8 changes: 4 additions & 4 deletions test/realtime/failure.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, helper, async
.map(function (transport) {
return failure_test([transport]);
})
.concat(failure_test(null)), // to test not specifying a transport (so will use upgrade mechanism)
.concat(failure_test(null)), // to test not specifying a transport (so will use websocket/base mechanism)
function (err, realtimes) {
closeAndFinish(done, realtimes, err);
}
Expand Down Expand Up @@ -98,7 +98,7 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, helper, async
.map(function (transport) {
return break_test([transport]);
})
.concat(break_test(null)), // to test not specifying a transport (so will use upgrade mechanism)
.concat(break_test(null)), // to test not specifying a transport (so will use websocket/base mechanism)
function (err, realtimes) {
closeAndFinish(done, realtimes, err);
}
Expand Down Expand Up @@ -127,7 +127,7 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, helper, async
* the suspended timeout trips before three connection cycles */
disconnectedRetryTimeout: 1000,
realtimeRequestTimeout: 50,
preferenceConnectTimeout: 50,
webSocketConnectTimeout: 50,
suspendedRetryTimeout: 1000,
connectionStateTtl: 2900,
});
Expand Down Expand Up @@ -173,7 +173,7 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, helper, async
.map(function (transport) {
return lifecycleTest([transport]);
})
.concat(lifecycleTest(null)), // to test not specifying a transport (so will use upgrade mechanism)
.concat(lifecycleTest(null)), // to test not specifying a transport (so will use websocket/base mechanism)
function (err) {
if (err) {
done(err);
Expand Down
13 changes: 7 additions & 6 deletions test/realtime/init.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ define(['ably', 'shared_helper', 'chai'], function (Ably, helper, chai) {
realtime.connection.on('connected', function () {
/* check api version */
var transport = realtime.connection.connectionManager.activeProtocol.transport;
var connectUri = helper.isWebsocket(transport) ? transport.uri : transport.recvRequest.uri;
var connectUri = helper.isWebsocket(transport) ? transport.uri : transport.recvRequest.recvUri;
try {
expect(connectUri.indexOf('v=3') > -1, 'Check uri includes v=3').to.be.ok;
} catch (err) {
Expand Down Expand Up @@ -309,14 +309,14 @@ define(['ably', 'shared_helper', 'chai'], function (Ably, helper, chai) {
}
});

/* Check base and upgrade transports (nodejs only; browser tests in their own section) */
/* Check base and websocket transports (nodejs only; browser tests in their own section) */
if (!isBrowser) {
it('node_transports', function (done) {
var realtime;
try {
realtime = helper.AblyRealtime({ transports: helper.availableTransports });
expect(realtime.connection.connectionManager.baseTransport).to.equal('comet');
expect(realtime.connection.connectionManager.upgradeTransports).to.deep.equal(['web_socket']);
expect(realtime.connection.connectionManager.webSocketTransport).to.be.ok;
closeAndFinish(done, realtime);
} catch (err) {
closeAndFinish(done, realtime, err);
Expand All @@ -331,9 +331,9 @@ define(['ably', 'shared_helper', 'chai'], function (Ably, helper, chai) {
var keyStr = helper.getTestApp().keys[0].keyStr;
var realtime = helper.AblyRealtime({ key: keyStr, useTokenAuth: true });
realtime.connection.connectionManager.once('transport.pending', function (state) {
var transport = realtime.connection.connectionManager.pendingTransports[0],
var transport = realtime.connection.connectionManager.pendingTransport,
originalOnProtocolMessage = transport.onProtocolMessage;
realtime.connection.connectionManager.pendingTransports[0].onProtocolMessage = function (message) {
realtime.connection.connectionManager.pendingTransport.onProtocolMessage = function (message) {
try {
if (message.action === 4) {
expect(message.connectionDetails.connectionKey).to.be.ok;
Expand Down Expand Up @@ -371,13 +371,14 @@ define(['ably', 'shared_helper', 'chai'], function (Ably, helper, chai) {
var realtime = helper.AblyRealtime({
httpMaxRetryCount: 3,
fallbackHosts: ['a', 'b', 'c'],
transport: ['web_socket'],
});
realtime.connection.once('connected', function () {
try {
var hosts = new Ably.Rest._Http()._getHosts(realtime);
/* restHost rather than realtimeHost as that's what connectionManager
* knows about; converted to realtimeHost by the websocketTransport */
expect(hosts[0]).to.equal(realtime.options.restHost, 'Check connected realtime host is the first option');
expect(hosts[0]).to.equal(realtime.options.realtimeHost, 'Check connected realtime host is the first option');
expect(hosts.length).to.equal(4, 'Check also have three fallbacks');
} catch (err) {
closeAndFinish(done, realtime, err);
Expand Down
3 changes: 1 addition & 2 deletions test/realtime/presence.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -1708,8 +1708,7 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, helper, async

/* Enter ten clients while attaching, finish the attach, check they were all entered correctly */
it('multiple_pending', function (done) {
/* single transport to avoid upgrade stalling due to the stubbed attachImpl */
var realtime = helper.AblyRealtime({ transports: [helper.bestTransport] }),
var realtime = helper.AblyRealtime(),
channel = realtime.channels.get('multiple_pending'),
originalAttachImpl = channel.attachImpl;

Expand Down
Loading

0 comments on commit 2fb92ee

Please sign in to comment.