diff --git a/src/common/lib/client/realtimechannel.ts b/src/common/lib/client/realtimechannel.ts index 87decff1b6..497283abc5 100644 --- a/src/common/lib/client/realtimechannel.ts +++ b/src/common/lib/client/realtimechannel.ts @@ -753,9 +753,7 @@ class RealtimeChannel extends EventEmitter { checkPendingState(): void { /* if can't send events, do nothing */ const cmState = this.connectionManager.state; - /* Allow attach messages to queue up when synchronizing, since this will be - * the state we'll be in when upgrade transport.active triggers a checkpendingstate */ - if (!(cmState.sendEvents || cmState.forceQueueEvents)) { + if (!cmState.sendEvents) { Logger.logAction( Logger.LOG_MINOR, 'RealtimeChannel.checkPendingState', diff --git a/src/common/lib/transport/connectionmanager.ts b/src/common/lib/transport/connectionmanager.ts index 4a6855d633..4e9d36b85b 100644 --- a/src/common/lib/transport/connectionmanager.ts +++ b/src/common/lib/transport/connectionmanager.ts @@ -42,13 +42,6 @@ function clearSessionRecoverData() { return haveSessionStorage() && Platform.WebStorage?.removeSession?.(sessionRecoveryName); } -function betterTransportThan(a: Transport, b: Transport) { - return ( - Platform.Defaults.transportPreferenceOrder.indexOf(a.shortName) > - Platform.Defaults.transportPreferenceOrder.indexOf(b.shortName) - ); -} - function bundleWith(dest: ProtocolMessage, src: ProtocolMessage, maxSize: number) { let action; if (dest.channel !== src.channel) { @@ -122,9 +115,6 @@ export class TransportParams { const params = authParams ? Utils.copy(authParams) : {}; const options = this.options; switch (this.mode) { - case 'upgrade': - params.upgrade = this.connectionKey as string; - break; case 'resume': params.resume = this.connectionKey as string; break; @@ -184,7 +174,6 @@ type ConnectionState = { sendEvents?: boolean; failState?: string; retryDelay?: number; - forceQueueEvents?: boolean; retryImmediately?: boolean; error?: IPartialErrorInfo; }; @@ -204,18 +193,18 @@ class ConnectionManager extends EventEmitter { connectionStateTtl: number; maxIdleInterval: number | null; transports: TransportName[]; - baseTransport: TransportName; - upgradeTransports: TransportName[]; + baseTransport?: TransportName; + webSocketTransportAvailable?: true; transportPreference: string | null; httpHosts: string[]; + wsHosts: string[]; activeProtocol: null | Protocol; - proposedTransports: Transport[]; - pendingTransports: Transport[]; + pendingTransport?: Transport; + proposedTransport?: Transport; host: string | null; lastAutoReconnectAttempt: number | null; lastActivity: number | null; forceFallbackHost: boolean; - connectCounter: number; transitionTimer?: number | NodeJS.Timeout | null; suspendTimer?: number | NodeJS.Timeout | null; retryTimer?: number | NodeJS.Timeout | null; @@ -226,6 +215,11 @@ class ConnectionManager extends EventEmitter { // The messages remaining to be processed (excluding any message currently being processed) queue: { message: ProtocolMessage; transport: Transport }[]; } = { isProcessing: false, queue: [] }; + webSocketSlowTimer: NodeJS.Timeout | null; + wsCheckResult: boolean | null; + webSocketGiveUpTimer: NodeJS.Timeout | null; + abandonedWebSocket: boolean; + connectCounter: number; constructor(realtime: BaseRealtime, options: NormalisedClientOptions) { super(); @@ -233,10 +227,10 @@ class ConnectionManager extends EventEmitter { this.initTransports(); this.options = options; const timeouts = options.timeouts; - /* connectingTimeout: leave preferenceConnectTimeout (~6s) to try the - * preference transport, then realtimeRequestTimeout (~10s) to establish + /* connectingTimeout: leave webSocketConnectTimeout (~6s) to try the + * websocket transport, then realtimeRequestTimeout (~10s) to establish * the base transport in case that fails */ - const connectingTimeout = timeouts.preferenceConnectTimeout + timeouts.realtimeRequestTimeout; + const connectingTimeout = timeouts.webSocketConnectTimeout + timeouts.realtimeRequestTimeout; this.states = { initialized: { state: 'initialized', @@ -260,14 +254,6 @@ class ConnectionManager extends EventEmitter { sendEvents: true, failState: 'disconnected', }, - synchronizing: { - state: 'connected', - terminal: false, - queueEvents: true, - sendEvents: false, - forceQueueEvents: true, - failState: 'disconnected', - }, disconnected: { state: 'disconnected', terminal: false, @@ -307,21 +293,29 @@ class ConnectionManager extends EventEmitter { this.maxIdleInterval = null; this.transports = Utils.intersect(options.transports || Defaults.defaultTransports, this.supportedTransports); - /* baseTransports selects the leftmost transport in the Defaults.baseTransportOrder list - * that's both requested and supported. */ - this.baseTransport = Utils.intersect(Defaults.baseTransportOrder, this.transports)[0]; - this.upgradeTransports = Utils.intersect(this.transports, Defaults.upgradeTransports); this.transportPreference = null; + if (this.transports.includes(TransportNames.WebSocket)) { + this.webSocketTransportAvailable = true; + } + if (this.transports.includes(TransportNames.XhrPolling)) { + this.baseTransport = TransportNames.XhrPolling; + } else if (this.transports.includes(TransportNames.Comet)) { + this.baseTransport = TransportNames.Comet; + } + this.httpHosts = Defaults.getHosts(options); + this.wsHosts = Defaults.getHosts(options, true); this.activeProtocol = null; - this.proposedTransports = []; - this.pendingTransports = []; this.host = null; this.lastAutoReconnectAttempt = null; this.lastActivity = null; this.forceFallbackHost = false; this.connectCounter = 0; + this.wsCheckResult = null; + this.webSocketSlowTimer = null; + this.webSocketGiveUpTimer = null; + this.abandonedWebSocket = false; Logger.logAction(Logger.LOG_MINOR, 'Realtime.ConnectionManager()', 'started'); Logger.logAction( @@ -371,10 +365,7 @@ class ConnectionManager extends EventEmitter { this.requestState({ state: 'connecting' }); } else if (this.state == this.states.connecting) { // RTN20c: if 'online' event recieved while CONNECTING, abandon connection attempt and retry - this.pendingTransports.forEach(function (transport) { - // Detach transport listeners to avoid connection state side effects from calling dispose - transport.off(); - }); + this.pendingTransport?.off(); this.disconnectAllTransports(); this.startConnect(); @@ -492,7 +483,7 @@ class ConnectionManager extends EventEmitter { tryATransport(transportParams: TransportParams, candidate: TransportName, callback: Function): void { Logger.logAction(Logger.LOG_MICRO, 'ConnectionManager.tryATransport()', 'trying ' + candidate); - Transport.tryConnect( + this.proposedTransport = Transport.tryConnect( this.supportedTransports[candidate]!, this, this.realtime.auth, @@ -577,33 +568,13 @@ class ConnectionManager extends EventEmitter { 'transport = ' + transport + '; mode = ' + mode ); - Utils.arrDeleteValue(this.proposedTransports, transport); - this.pendingTransports.push(transport); - const optimalTransport = - Platform.Defaults.transportPreferenceOrder[Platform.Defaults.transportPreferenceOrder.length - 1]; - transport.once('connected', (error: ErrorInfo, connectionId: string, connectionDetails: Record) => { - if (mode == 'upgrade' && this.activeProtocol) { - /* if ws and xhrs are connecting in parallel, delay xhrs activation to let ws go ahead */ - if ( - transport.shortName !== optimalTransport && - this.getUpgradePossibilities().includes(optimalTransport) && - this.activeProtocol - ) { - setTimeout(() => { - this.scheduleTransportActivation(error, transport, connectionId, connectionDetails); - }, this.options.timeouts.parallelUpgradeDelay); - } else { - this.scheduleTransportActivation(error, transport, connectionId, connectionDetails); - } - } else { - this.activateTransport(error, transport, connectionId, connectionDetails); + this.pendingTransport = transport; - /* allow connectImpl to start the upgrade process if needed, but allow - * other event handlers, including activating the transport, to run first */ - Platform.Config.nextTick(() => { - this.connectImpl(transportParams); - }); - } + this.cancelWebSocketSlowTimer(); + this.cancelWebSocketGiveUpTimer(); + + transport.once('connected', (error: ErrorInfo, connectionId: string, connectionDetails: Record) => { + this.activateTransport(error, transport, connectionId, connectionDetails); if (mode === 'recover' && this.options.recover) { /* After a successful recovery, we unpersist, as a recovery key cannot @@ -621,166 +592,6 @@ class ConnectionManager extends EventEmitter { this.emit('transport.pending', transport); } - /** - * Called when an upgrade transport is connected, - * to schedule the activation of that transport. - * @param error - * @param transport - * @param connectionId - * @param connectionDetails - */ - scheduleTransportActivation( - error: ErrorInfo, - transport: Transport, - connectionId: string, - connectionDetails: Record - ): void { - const currentTransport = this.activeProtocol && this.activeProtocol.getTransport(), - abandon = () => { - transport.disconnect(); - Utils.arrDeleteValue(this.pendingTransports, transport); - }; - - if (this.state !== this.states.connected && this.state !== this.states.connecting) { - /* This is most likely to happen for the delayed XHRs, when XHRs and ws are scheduled in parallel*/ - Logger.logAction( - Logger.LOG_MINOR, - 'ConnectionManager.scheduleTransportActivation()', - 'Current connection state (' + - this.state.state + - (this.state === this.states.synchronizing ? ', but with an upgrade already in progress' : '') + - ') is not valid to upgrade in; abandoning upgrade to ' + - transport.shortName - ); - abandon(); - return; - } - - if (currentTransport && !betterTransportThan(transport, currentTransport)) { - Logger.logAction( - Logger.LOG_MINOR, - 'ConnectionManager.scheduleTransportActivation()', - 'Proposed transport ' + - transport.shortName + - ' is no better than current active transport ' + - currentTransport.shortName + - ' - abandoning upgrade' - ); - abandon(); - return; - } - - Logger.logAction( - Logger.LOG_MINOR, - 'ConnectionManager.scheduleTransportActivation()', - 'Scheduling transport upgrade; transport = ' + transport - ); - - let oldProtocol: Protocol | null = null; - - if (!transport.isConnected) { - /* This is only possible if the xhr streaming transport was disconnected during the parallelUpgradeDelay */ - Logger.logAction( - Logger.LOG_MINOR, - 'ConnectionManager.scheduleTransportActivation()', - 'Proposed transport ' + transport.shortName + 'is no longer connected; abandoning upgrade' - ); - abandon(); - return; - } - - if (this.state === this.states.connected) { - Logger.logAction( - Logger.LOG_MICRO, - 'ConnectionManager.scheduleTransportActivation()', - 'Currently connected, so temporarily pausing events until the upgrade is complete' - ); - this.state = this.states.synchronizing; - oldProtocol = this.activeProtocol; - } else if (this.state !== this.states.connecting) { - /* Note: upgrading from the connecting state is valid if the old active - * transport was deactivated after the upgrade transport first connected; - * see logic in deactivateTransport */ - Logger.logAction( - Logger.LOG_MINOR, - 'ConnectionManager.scheduleTransportActivation()', - 'Current connection state (' + - this.state.state + - (this.state === this.states.synchronizing ? ', but with an upgrade already in progress' : '') + - ') is not valid to upgrade in; abandoning upgrade to ' + - transport.shortName - ); - abandon(); - return; - } - - Logger.logAction( - Logger.LOG_MINOR, - 'ConnectionManager.scheduleTransportActivation()', - 'Syncing transport; transport = ' + transport - ); - - const finishUpgrade = () => { - Logger.logAction( - Logger.LOG_MINOR, - 'ConnectionManager.scheduleTransportActivation()', - 'Activating transport; transport = ' + transport - ); - - // Send ACTIVATE to tell the server to make this transport the - // active transport, which suspends channels until we re-attach. - transport.send( - protocolMessageFromValues({ - action: actions.ACTIVATE, - }) - ); - - this.activateTransport(error, transport, connectionId, connectionDetails); - /* Restore pre-sync state. If state has changed in the meantime, - * don't touch it -- since the websocket transport waits a tick before - * disposing itself, it's possible for it to have happily synced - * without err while, unknown to it, the connection has closed in the - * meantime and the ws transport is scheduled for death */ - if (this.state === this.states.synchronizing) { - Logger.logAction( - Logger.LOG_MICRO, - 'ConnectionManager.scheduleTransportActivation()', - 'Pre-upgrade protocol idle, sending queued messages on upgraded transport; transport = ' + transport - ); - this.state = this.states.connected; - } else { - Logger.logAction( - Logger.LOG_MINOR, - 'ConnectionManager.scheduleTransportActivation()', - 'Pre-upgrade protocol idle, but state is now ' + this.state.state + ', so leaving unchanged' - ); - } - if (this.state.sendEvents) { - this.sendQueuedMessages(); - } - }; - - /* Wait until sync is done and old transport is idle before activating new transport. This - * guarantees that messages arrive at realtime in the same order they are sent. - * - * If a message times out on the old transport, since it's still the active transport the - * message will be requeued. deactivateTransport will see the pending transport and notify - * the `connecting` state without starting a new connection, so the new transport can take - * over once deactivateTransport clears the old protocol's queue. - * - * If there is no old protocol, that meant that we weren't in the connected state at the - * beginning of the sync - likely the base transport died just before the sync. So can just - * finish the upgrade. If we're actually in closing/failed rather than connecting, that's - * fine, activatetransport will deal with that. */ - if (oldProtocol) { - /* Most of the time this will be already true: the new-transport sync will have given - * enough time for in-flight messages on the old transport to complete. */ - oldProtocol.onceIdle(finishUpgrade); - } else { - finishUpgrade(); - } - } - /** * Called when a transport is connected, and the connectionmanager decides that * it will now be the active transport. Returns whether or not it activated @@ -835,8 +646,7 @@ class ConnectionManager extends EventEmitter { return false; } - /* remove this transport from pending transports */ - Utils.arrDeleteValue(this.pendingTransports, transport); + delete this.pendingTransport; /* if the transport is not connected then don't activate it */ if (!transport.isConnected) { @@ -879,10 +689,7 @@ class ConnectionManager extends EventEmitter { * error). */ if (existingState.state === this.states.connected.state) { if (error) { - /* if upgrading without error, leave any existing errorReason alone */ this.errorReason = this.realtime.connection.errorReason = error; - /* Only bother emitting an upgrade if there's an error; otherwise it's - * just a transport upgrade, so auth details won't have changed */ this.emit('update', new ConnectionStateChange(connectedState, connectedState, null, error)); } } else { @@ -925,38 +732,6 @@ class ConnectionManager extends EventEmitter { } } - // terminate any other pending transport(s), and abort any not-yet-pending transport attempts - // need to use .slice() here, since we intend to mutate the array during .forEach() iteration - this.pendingTransports.slice().forEach((pendingTransport) => { - if (pendingTransport === transport) { - const msg = - 'Assumption violated: activating a transport that is still marked as a pending transport; transport = ' + - transport.shortName + - '; stack = ' + - new Error().stack; - Logger.logAction(Logger.LOG_ERROR, 'ConnectionManager.activateTransport()', msg); - Utils.arrDeleteValue(this.pendingTransports, transport); - } else { - pendingTransport.disconnect(); - } - }); - // need to use .slice() here, since we intend to mutate the array during .forEach() iteration - this.proposedTransports.slice().forEach((proposedTransport: Transport) => { - if (proposedTransport === transport) { - Logger.logAction( - Logger.LOG_ERROR, - 'ConnectionManager.activateTransport()', - 'Assumption violated: activating a transport that is still marked as a proposed transport; transport = ' + - transport.shortName + - '; stack = ' + - new Error().stack - ); - Utils.arrDeleteValue(this.proposedTransports, transport); - } else { - proposedTransport.dispose(); - } - }); - return true; } @@ -968,8 +743,7 @@ class ConnectionManager extends EventEmitter { deactivateTransport(transport: Transport, state: string, error: ErrorInfo): void { const currentProtocol = this.activeProtocol, wasActive = currentProtocol && currentProtocol.getTransport() === transport, - wasPending = Utils.arrDeleteValue(this.pendingTransports, transport), - wasProposed = Utils.arrDeleteValue(this.proposedTransports, transport), + wasPending = transport === this.pendingTransport, noTransportsScheduledForActivation = this.noTransportsScheduledForActivation(); Logger.logAction(Logger.LOG_MINOR, 'ConnectionManager.deactivateTransport()', 'transport = ' + transport); @@ -978,7 +752,7 @@ class ConnectionManager extends EventEmitter { 'ConnectionManager.deactivateTransport()', 'state = ' + state + - (wasActive ? '; was active' : wasPending ? '; was pending' : wasProposed ? '; was proposed' : '') + + (wasActive ? '; was active' : wasPending ? '; was pending' : '') + (noTransportsScheduledForActivation ? '' : '; another transport is scheduled for activation') ); if (error && error.message) @@ -993,13 +767,8 @@ class ConnectionManager extends EventEmitter { ' pending messages' ); this.queuePendingMessages((currentProtocol as Protocol).getPendingMessages()); - /* Clear any messages we requeue to allow the protocol to become idle. - * In case of an upgrade, this will trigger an immediate activation of - * the upgrade transport, so delay a tick so this transport can finish - * deactivating */ - Platform.Config.nextTick(function () { - (currentProtocol as Protocol).clearPendingMessages(); - }); + /* Clear any messages we requeue to allow the protocol to become idle.*/ + (currentProtocol as Protocol).clearPendingMessages(); this.activeProtocol = this.host = null; } @@ -1018,7 +787,7 @@ class ConnectionManager extends EventEmitter { (wasActive && noTransportsScheduledForActivation) || (wasActive && state === 'failed') || state === 'closed' || - (currentProtocol === null && wasPending && this.pendingTransports.length === 0) + (currentProtocol === null && wasPending) ) { /* If we're disconnected with a 5xx we need to try fallback hosts * (RTN14d), but (a) due to how the upgrade sequence works, the @@ -1043,37 +812,13 @@ class ConnectionManager extends EventEmitter { this.notifyState({ state: newConnectionState, error: error }); return; } - - if (wasActive && state === 'disconnected' && this.state !== this.states.synchronizing) { - /* If we were active but there is another transport scheduled for - * activation, go into to the connecting state until that transport - * activates and sets us back to connected. (manually starting the - * transition timers in case that never happens). (If we were in the - * synchronizing state, then that's fine, the old transport just got its - * disconnected before the new one got the sync -- ignore it and keep - * waiting for the sync. If it fails we have a separate sync timer that - * will expire). */ - Logger.logAction( - Logger.LOG_MICRO, - 'ConnectionManager.deactivateTransport()', - 'wasActive but another transport is connected and scheduled for activation, so going into the connecting state until it activates' - ); - this.startSuspendTimer(); - this.startTransitionTimer(this.states.connecting); - this.notifyState({ state: 'connecting', error: error }); - } } /* Helper that returns true if there are no transports which are pending, * have been connected, and are just waiting for onceNoPending to fire before * being activated */ noTransportsScheduledForActivation(): boolean { - return ( - Utils.isEmpty(this.pendingTransports) || - this.pendingTransports.every(function (transport) { - return !transport.isConnected; - }) - ); + return !this.pendingTransport || !this.pendingTransport.isConnected; } setConnection(connectionId: string, connectionDetails: Record, hasConnectionError?: boolean): void { @@ -1293,6 +1038,86 @@ class ConnectionManager extends EventEmitter { } } + startWebSocketSlowTimer() { + this.webSocketSlowTimer = setTimeout(() => { + Logger.logAction(Logger.LOG_MINOR, 'ConnectionManager WebSocket slow timer', 'checking connectivity'); + if (this.wsCheckResult === null) { + this.checkWsConnectivity() + .then(() => { + Logger.logAction( + Logger.LOG_MINOR, + 'ConnectionManager WebSocket slow timer', + 'ws connectivity check succeeded' + ); + this.wsCheckResult = true; + }) + .catch(() => { + Logger.logAction( + Logger.LOG_MAJOR, + 'ConnectionManager WebSocket slow timer', + 'ws connectivity check failed' + ); + this.wsCheckResult = false; + }); + } + if (this.realtime.http.checkConnectivity) { + Utils.whenPromiseSettles(this.realtime.http.checkConnectivity(), (err, connectivity) => { + if (err || !connectivity) { + Logger.logAction( + Logger.LOG_MAJOR, + 'ConnectionManager WebSocket slow timer', + 'http connectivity check failed' + ); + this.cancelWebSocketGiveUpTimer(); + this.notifyState({ + state: 'disconnected', + error: new ErrorInfo("new ErrorInfo('Unable to connect (network unreachable)'", 80003, 404), + }); + } else { + Logger.logAction( + Logger.LOG_MINOR, + 'ConnectionManager WebSocket slow timer', + 'http connectivity check succeeded' + ); + } + }); + } + }, this.options.timeouts.webSocketSlowTimeout); + } + + cancelWebSocketSlowTimer() { + if (this.webSocketSlowTimer) { + clearTimeout(this.webSocketSlowTimer); + this.webSocketSlowTimer = null; + } + } + + startWebSocketGiveUpTimer(transportParams: TransportParams) { + this.webSocketGiveUpTimer = setTimeout(() => { + if (!this.wsCheckResult) { + this.abandonedWebSocket = true; + Logger.logAction( + Logger.LOG_MINOR, + 'ConnectionManager WebSocket give up timer', + 'websocket connection took more than 10s; ' + (this.baseTransport ? 'trying base transport' : '') + ); + // if we don't have a base transport to fallback to, just let the websocket connection attempt time out + if (this.baseTransport) { + this.proposedTransport?.dispose(); + this.pendingTransport?.dispose(); + this.connectBase(transportParams, this.connectCounter++); + } + } + }, this.options.timeouts.webSocketConnectTimeout); + } + + cancelWebSocketGiveUpTimer() { + if (this.webSocketGiveUpTimer) { + clearTimeout(this.webSocketGiveUpTimer); + this.webSocketGiveUpTimer = null; + } + } + notifyState(indicated: ConnectionState): void { const state = indicated.state; @@ -1307,7 +1132,6 @@ class ConnectionManager extends EventEmitter { const retryImmediately = state === 'disconnected' && (this.state === this.states.connected || - this.state === this.states.synchronizing || indicated.retryImmediately || (this.state === this.states.connecting && indicated.error && @@ -1326,6 +1150,8 @@ class ConnectionManager extends EventEmitter { * state), as these are superseded by this notification */ this.cancelTransitionTimer(); this.cancelRetryTimer(); + this.cancelWebSocketSlowTimer(); + this.cancelWebSocketGiveUpTimer(); this.checkSuspendTimer(indicated.state); if (state === 'suspended' || state === 'connected') { @@ -1415,6 +1241,8 @@ class ConnectionManager extends EventEmitter { if (state == this.state.state) return; /* silently do nothing */ /* kill running timers, as this request supersedes them */ + this.cancelWebSocketSlowTimer(); + this.cancelWebSocketGiveUpTimer(); this.cancelTransitionTimer(); this.cancelRetryTimer(); /* for suspend timer check rather than cancel -- eg requesting a connecting @@ -1507,130 +1335,129 @@ class ConnectionManager extends EventEmitter { } } - /** - * There are three stages in connecting: - * - preference: if there is a cached transport preference, we try to connect - * on that. If that fails or times out we abort the attempt, remove the - * preference and fall back to base. If it succeeds, we try upgrading it if - * needed (will only be in the case where the preference is xhrs and the - * browser supports ws). - * - base: we try to connect with the best transport that we think will - * never fail for this platform. If it doesn't work, we try fallback hosts. - * - upgrade: given a connected transport, we see if there are any better - * ones, and if so, try to upgrade to them. + /* + * there are, at most, two transports available with which a connection may + * be attempted: web_socket and/or a base transport (xhr_polling in browsers, + * comet in nodejs). web_socket is always preferred, and the base transport is + * only used in case web_socket connectivity appears to be unavailable. + * + * connectImpl begins the transport selection process by checking which transports + * are available, and if there is a cached preference. It then defers to the + * transport-specific connect methods: connectWs and connectBase. * - * connectImpl works out what stage you're at (which is purely a function of - * the current connection state and whether there are any stored preferences), - * and dispatches accordingly. After a transport has been set pending, - * tryATransport calls connectImpl to see if there's another stage to be done. - * */ - connectImpl(transportParams: TransportParams, connectCount?: number): void { + * It is also responsible for invalidating the cache in the case that a base + * transport preference is stored but web socket connectivity is now available. + * + * handling of the case where we need to failover from web_socket to the base + * transport is implemented in the connectWs method. + */ + connectImpl(transportParams: TransportParams, connectCount: number): void { const state = this.state.state; - - if (state !== this.states.connecting.state && state !== this.states.connected.state) { + if (state !== this.states.connecting.state) { /* Only keep trying as long as in the 'connecting' state (or 'connected' * for upgrading). Any operation can put us into 'disconnected' to cancel * connection attempts and wait before retrying, or 'failed' to fail. */ Logger.logAction( Logger.LOG_MINOR, 'ConnectionManager.connectImpl()', - 'Must be in connecting state to connect (or connected to upgrade), but was ' + state - ); - } else if (this.pendingTransports.length) { - Logger.logAction( - Logger.LOG_MINOR, - 'ConnectionManager.connectImpl()', - 'Transports ' + this.pendingTransports[0].toString() + ' currently pending; taking no action' + 'Must be in connecting state to connect, but was ' + state ); - } else if (state == this.states.connected.state) { - this.upgradeIfNeeded(transportParams); - } else if (this.transports.length > 1 && this.getTransportPreference()) { - this.connectPreference(transportParams, connectCount); - } else { - this.connectBase(transportParams, connectCount); + return; } - } - connectPreference(transportParams: TransportParams, connectCount?: number): void { - const preference = this.getTransportPreference(); - let preferenceTimeoutExpired = false; + const transportPreference = this.getTransportPreference(); - if (!this.transports.includes(preference)) { - this.unpersistTransportPreference(); - this.connectImpl(transportParams, connectCount); + // If transport preference is for a non-ws transport but websocket is now available, unpersist the preference for next time + if (transportPreference === this.baseTransport) { + this.checkWsConnectivity() + .then(() => { + this.unpersistTransportPreference(); + if (this.state === this.states.connecting) { + Logger.logAction( + Logger.LOG_MINOR, + 'ConnectionManager.connectImpl():', + 'web socket connectivity available, cancelling connection attempt with ' + this.baseTransport + ); + if (this.proposedTransport) { + this.proposedTransport.dispose(); + } + this.connectWs(transportParams, this.connectCounter++); + } + }) + .catch(noop); } - Logger.logAction( - Logger.LOG_MINOR, - 'ConnectionManager.connectPreference()', - 'Trying to connect with stored transport preference ' + preference - ); + if (transportPreference === this.baseTransport || (this.baseTransport && !this.webSocketTransportAvailable)) { + this.connectBase(transportParams, connectCount); + } else { + this.connectWs(transportParams, connectCount); + } + } + + /* + * connectWs starts two timers to monitor the success of a web_socket connection attempt: + * - webSocketSlowTimer: if this timer fires before the connection succeeds, + * cm will simultaneously check websocket and http/xhr connectivity. if the http + * connectivity check fails, we give up the connection sequence entirely and + * transition to disconnected. if the websocket connectivity check fails then + * we assume no ws connectivity and failover to base transport. in the case that + * the checks succeed, we continue with websocket and wait for it to try fallback hosts + * and, if unsuccessful, ultimately transition to disconnected. + * - webSocketGiveUpTimer: if this timer fires, and the preceding websocket + * connectivity check is still pending then we assume that there is an issue + * with the transport and fallback to base transport. + */ + connectWs(transportParams: TransportParams, connectCount: number) { + Logger.logAction(Logger.LOG_DEBUG, 'ConnectionManager.connectWs()'); + this.startWebSocketSlowTimer(); + this.startWebSocketGiveUpTimer(transportParams); - const preferenceTimeout = setTimeout(() => { - preferenceTimeoutExpired = true; - if (!(this.state.state === this.states.connected.state)) { - Logger.logAction( - Logger.LOG_MINOR, - 'ConnectionManager.connectPreference()', - 'Shortcircuit connection attempt with ' + preference + ' failed; clearing preference and trying from scratch' - ); - /* Abort all connection attempts. (This also disconnects the active - * protocol, but none exists if we're not in the connected state) */ - this.disconnectAllTransports(); - /* Be quite agressive about clearing the stored preference if ever it doesn't work */ - this.unpersistTransportPreference(); - } - this.connectImpl(transportParams, connectCount); - }, this.options.timeouts.preferenceConnectTimeout); - - /* For connectPreference, just use the main host. If host fallback is needed, do it in connectBase. - * The wstransport it will substitute the httphost for an appropriate wshost */ - transportParams.host = this.httpHosts[0]; - this.tryATransport(transportParams, preference, (fatal: boolean, transport: Transport) => { - clearTimeout(preferenceTimeout); - if (preferenceTimeoutExpired && transport) { - /* Viable, but too late - connectImpl() will already be trying - * connectBase, and we weren't in upgrade mode. Just remove the - * onconnected listener and get rid of it */ - transport.off(); - transport.disconnect(); - Utils.arrDeleteValue(this.pendingTransports, transport); - } else if (!transport && !fatal) { - /* Preference failed in a transport-specific way. Try more */ - this.unpersistTransportPreference(); - this.connectImpl(transportParams, connectCount); - } - /* If suceeded, or failed fatally, nothing to do */ + this.tryTransportWithFallbacks('web_socket', transportParams, true, connectCount, () => { + return this.wsCheckResult !== false && !this.abandonedWebSocket; }); } - /** - * Try to establish a transport on the base transport (the best transport - * such that if it doesn't work, nothing will work) as determined through - * static feature detection, checking for network connectivity and trying - * fallback hosts if applicable. - * @param transportParams - */ - connectBase(transportParams: TransportParams, connectCount?: number): void { + connectBase(transportParams: TransportParams, connectCount: number) { + Logger.logAction(Logger.LOG_DEBUG, 'ConnectionManager.connectBase()'); + if (this.baseTransport) { + this.tryTransportWithFallbacks(this.baseTransport, transportParams, false, connectCount, () => true); + } else { + this.notifyState({ + state: 'disconnected', + error: new ErrorInfo('No transports left to try', 80000, 404), + }); + } + } + + tryTransportWithFallbacks( + transportName: TransportName, + transportParams: TransportParams, + ws: boolean, + connectCount: number, + shouldContinue: () => boolean + ): void { + Logger.logAction(Logger.LOG_DEBUG, 'ConnectionManager.tryTransportWithFallbacks()', transportName); const giveUp = (err: IPartialErrorInfo) => { this.notifyState({ state: this.states.connecting.failState as string, error: err }); }; - const candidateHosts = this.httpHosts.slice(); + + const candidateHosts = ws ? this.wsHosts.slice() : this.httpHosts.slice(); + const hostAttemptCb = (fatal: boolean, transport: Transport) => { if (connectCount !== this.connectCounter) { return; } + if (!shouldContinue()) { + if (transport) { + transport.dispose(); + } + return; + } if (!transport && !fatal) { tryFallbackHosts(); } }; - Logger.logAction( - Logger.LOG_MINOR, - 'ConnectionManager.connectBase()', - 'Trying to connect with base transport ' + this.baseTransport - ); - /* first try to establish a connection with the priority host with http transport */ const host = candidateHosts.shift(); if (!host) { @@ -1659,6 +1486,9 @@ class ConnectionManager extends EventEmitter { if (connectCount !== this.connectCounter) { return; } + if (!shouldContinue()) { + return; + } /* we know err won't happen but handle it here anyway */ if (err) { giveUp(err); @@ -1673,7 +1503,7 @@ class ConnectionManager extends EventEmitter { * its dns. Try the fallback hosts. We could try them simultaneously but * that would potentially cause a huge spike in load on the load balancer */ transportParams.host = Utils.arrPopRandomElement(candidateHosts); - this.tryATransport(transportParams, this.baseTransport, hostAttemptCb); + this.tryATransport(transportParams, transportName, hostAttemptCb); } ); }; @@ -1684,35 +1514,7 @@ class ConnectionManager extends EventEmitter { return; } - this.tryATransport(transportParams, this.baseTransport, hostAttemptCb); - } - - getUpgradePossibilities(): TransportName[] { - /* returns the subset of upgradeTransports to the right of the current - * transport in upgradeTransports (if it's in there - if not, currentSerial - * will be -1, so return upgradeTransports.slice(0) == upgradeTransports */ - const current = (this.activeProtocol as Protocol).getTransport().shortName; - const currentSerial = this.upgradeTransports.indexOf(current); - return this.upgradeTransports.slice(currentSerial + 1); - } - - upgradeIfNeeded(transportParams: Record): void { - const upgradePossibilities = this.getUpgradePossibilities(); - Logger.logAction( - Logger.LOG_MINOR, - 'ConnectionManager.upgradeIfNeeded()', - 'upgrade possibilities: ' + Platform.Config.inspect(upgradePossibilities) - ); - - if (!upgradePossibilities.length) { - return; - } - - upgradePossibilities.forEach((upgradeTransport: TransportName) => { - /* Note: the transport may mutate the params, so give each transport a fresh one */ - const upgradeTransportParams = this.createTransportParams(transportParams.host, 'upgrade'); - this.tryATransport(upgradeTransportParams, upgradeTransport, noop); - }); + this.tryATransport(transportParams, transportName, hostAttemptCb); } closeImpl(): void { @@ -1720,21 +1522,14 @@ class ConnectionManager extends EventEmitter { this.cancelSuspendTimer(); this.startTransitionTimer(this.states.closing); - // need to use .slice() here, since we intend to mutate the array during .forEach() iteration - this.pendingTransports.slice().forEach(function (transport) { - Logger.logAction(Logger.LOG_MICRO, 'ConnectionManager.closeImpl()', 'Closing pending transport: ' + transport); - if (transport) transport.close(); - }); - - // need to use .slice() here, since we intend to mutate the array during .forEach() iteration - this.proposedTransports.slice().forEach(function (transport) { + if (this.pendingTransport) { Logger.logAction( Logger.LOG_MICRO, 'ConnectionManager.closeImpl()', - 'Disposing of proposed transport: ' + transport + 'Closing pending transport: ' + this.pendingTransport ); - if (transport) transport.dispose(); - }); + this.pendingTransport.close(); + } if (this.activeProtocol) { Logger.logAction( @@ -1758,23 +1553,6 @@ class ConnectionManager extends EventEmitter { 'ConnectionManager.onAuthUpdated()', 'Sending AUTH message on active transport' ); - /* If there are any proposed/pending transports (eg an upgrade that - * isn't yet scheduled for activation) that hasn't yet started syncing, - * just to get rid of them & restart the upgrade with the new token, to - * avoid a race condition. (If it has started syncing, the AUTH will be - * queued until the upgrade is complete, so everything's fine) */ - if ( - (this.pendingTransports.length || this.proposedTransports.length) && - this.state !== this.states.synchronizing - ) { - this.disconnectAllTransports(/* exceptActive: */ true); - const transportParams = (this.activeProtocol as Protocol).getTransport().params; - Platform.Config.nextTick(() => { - if (this.state.state === 'connected') { - this.upgradeIfNeeded(transportParams); - } - }); - } /* Do any transport-specific new-token action */ const activeTransport = this.activeProtocol?.getTransport(); @@ -1853,39 +1631,23 @@ class ConnectionManager extends EventEmitter { } } - disconnectAllTransports(exceptActive?: boolean): void { - Logger.logAction( - Logger.LOG_MINOR, - 'ConnectionManager.disconnectAllTransports()', - 'Disconnecting all transports' + (exceptActive ? ' except the active transport' : '') - ); + disconnectAllTransports(): void { + Logger.logAction(Logger.LOG_MINOR, 'ConnectionManager.disconnectAllTransports()', 'Disconnecting all transports'); /* This will prevent any connection procedure in an async part of one of its early stages from continuing */ this.connectCounter++; - // need to use .slice() here, since we intend to mutate the array during .forEach() iteration - this.pendingTransports.slice().forEach(function (transport) { - Logger.logAction( - Logger.LOG_MICRO, - 'ConnectionManager.disconnectAllTransports()', - 'Disconnecting pending transport: ' + transport - ); - if (transport) transport.disconnect(); - }); - this.pendingTransports = []; - - // need to use .slice() here, since we intend to mutate the array during .forEach() iteration - this.proposedTransports.slice().forEach(function (transport) { + if (this.pendingTransport) { Logger.logAction( Logger.LOG_MICRO, 'ConnectionManager.disconnectAllTransports()', - 'Disposing of proposed transport: ' + transport + 'Disconnecting pending transport: ' + this.pendingTransport ); - if (transport) transport.dispose(); - }); - this.proposedTransports = []; + this.pendingTransport.disconnect(); + } + delete this.pendingTransport; - if (this.activeProtocol && !exceptActive) { + if (this.activeProtocol) { Logger.logAction( Logger.LOG_MICRO, 'ConnectionManager.disconnectAllTransports()', @@ -1910,7 +1672,7 @@ class ConnectionManager extends EventEmitter { this.sendImpl(new PendingMessage(msg, callback)); return; } - const shouldQueue = (queueEvent && state.queueEvents) || state.forceQueueEvents; + const shouldQueue = queueEvent && state.queueEvents; if (!shouldQueue) { const err = 'rejecting event, queueEvent was ' + queueEvent + ', state was ' + state.state; Logger.logAction(Logger.LOG_MICRO, 'ConnectionManager.send()', err); @@ -2009,7 +1771,7 @@ class ConnectionManager extends EventEmitter { this.pendingChannelMessagesState.isProcessing = true; const pendingChannelMessage = this.pendingChannelMessagesState.queue.shift()!; - this.processChannelMessage(pendingChannelMessage.message, pendingChannelMessage.transport) + this.processChannelMessage(pendingChannelMessage.message) .catch((err) => { Logger.logAction( Logger.LOG_ERROR, @@ -2024,29 +1786,8 @@ class ConnectionManager extends EventEmitter { } } - private async processChannelMessage(message: ProtocolMessage, transport: Transport) { - const onActiveTransport = this.activeProtocol && transport === this.activeProtocol.getTransport(), - onUpgradeTransport = this.pendingTransports.includes(transport) && this.state == this.states.synchronizing; - - /* As the lib now has a period where the upgrade transport is synced but - * before it's become active (while waiting for the old one to become - * idle), message can validly arrive on it even though it isn't active */ - if (onActiveTransport || onUpgradeTransport) { - await this.realtime.channels.processChannelMessage(message); - } else { - // Message came in on a defunct transport. Allow only acks, nacks, & errors for outstanding - // messages, no new messages (as sync has been sent on new transport so new messages will - // be resent there, or connection has been closed so don't want new messages) - if ([actions.ACK, actions.NACK, actions.ERROR].includes(message.action!)) { - await this.realtime.channels.processChannelMessage(message); - } else { - Logger.logAction( - Logger.LOG_MICRO, - 'ConnectionManager.onChannelMessage()', - 'received message ' + JSON.stringify(message) + 'on defunct transport; discarding' - ); - } - } + private async processChannelMessage(message: ProtocolMessage) { + await this.realtime.channels.processChannelMessage(message); } ping(transport: Transport | null, callback: Function): void { @@ -2115,20 +1856,14 @@ class ConnectionManager extends EventEmitter { (this.activeProtocol as Protocol).getTransport().fail(error); } - registerProposedTransport(transport: Transport): void { - this.proposedTransports.push(transport); - } - getTransportPreference(): TransportName { return this.transportPreference || (haveWebStorage() && Platform.WebStorage?.get?.(transportPreferenceName)); } persistTransportPreference(transport: Transport): void { - if (Defaults.upgradeTransports.includes(transport.shortName)) { - this.transportPreference = transport.shortName; - if (haveWebStorage()) { - Platform.WebStorage?.set?.(transportPreferenceName, transport.shortName); - } + this.transportPreference = transport.shortName; + if (haveWebStorage()) { + Platform.WebStorage?.set?.(transportPreferenceName, transport.shortName); } } @@ -2185,6 +1920,27 @@ class ConnectionManager extends EventEmitter { this.maxIdleInterval = connectionDetails.maxIdleInterval; this.emit('connectiondetails', connectionDetails); } + + checkWsConnectivity() { + const ws = new Platform.Config.WebSocket(Defaults.wsConnectivityUrl); + return new Promise((resolve, reject) => { + let finished = false; + ws.onopen = () => { + if (!finished) { + finished = true; + resolve(); + ws.close(); + } + }; + + ws.onclose = ws.onerror = () => { + if (!finished) { + finished = true; + reject(); + } + }; + }); + } } export default ConnectionManager; diff --git a/src/common/lib/transport/transport.ts b/src/common/lib/transport/transport.ts index b1cb2e17ee..184ad3e1b1 100644 --- a/src/common/lib/transport/transport.ts +++ b/src/common/lib/transport/transport.ts @@ -64,7 +64,6 @@ abstract class Transport extends EventEmitter { params.heartbeats = true; } this.connectionManager = connectionManager; - connectionManager.registerProposedTransport(this); this.auth = auth; this.params = params; this.timeouts = params.options.timeouts; @@ -296,7 +295,7 @@ abstract class Transport extends EventEmitter { auth: Auth, transportParams: TransportParams, callback: TryConnectCallback - ): void { + ): Transport { const transport = new transportCtor(connectionManager, auth, transportParams); let transportAttemptTimer: NodeJS.Timeout | number; @@ -324,6 +323,7 @@ abstract class Transport extends EventEmitter { callback(null, transport); }); transport.connect(); + return transport; } onAuthUpdated?: (tokenDetails: API.TokenDetails) => void; diff --git a/src/common/lib/transport/websockettransport.ts b/src/common/lib/transport/websockettransport.ts index b68456a1c4..7f2a048a52 100644 --- a/src/common/lib/transport/websockettransport.ts +++ b/src/common/lib/transport/websockettransport.ts @@ -29,7 +29,7 @@ class WebSocketTransport extends Transport { super(connectionManager, auth, params); /* If is a browser, can't detect pings, so request protocol heartbeats */ params.heartbeats = Platform.Config.useProtocolHeartbeats; - this.wsHost = Defaults.getHost(params.options, params.host, true); + this.wsHost = params.host as string; } static isAvailable() { @@ -207,6 +207,26 @@ class WebSocketTransport extends Transport { }); } } + + async checkConnectivity() { + const ws = this.createWebSocket('wss://ws-up.ably-realtime-nonprod.com/', {}); + + return new Promise((resolve, reject) => { + let finished = false; + ws.onopen = () => { + if (!finished) { + resolve(); + ws.close(); + } + }; + + ws.onclose = ws.onerror = () => { + if (!finished) { + reject(); + } + }; + }); + } } export default WebSocketTransport; diff --git a/src/common/lib/util/defaults.ts b/src/common/lib/util/defaults.ts index 752cc841ab..46f9b4518e 100644 --- a/src/common/lib/util/defaults.ts +++ b/src/common/lib/util/defaults.ts @@ -27,8 +27,8 @@ type CompleteDefaults = IDefaults & { connectionStateTtl: number; realtimeRequestTimeout: number; recvTimeout: number; - preferenceConnectTimeout: number; - parallelUpgradeDelay: number; + webSocketConnectTimeout: number; + webSocketSlowTimeout: number; }; httpMaxRetryCount: number; maxMessageSize: number; @@ -40,7 +40,7 @@ type CompleteDefaults = IDefaults & { getHttpScheme(options: ClientOptions): string; environmentFallbackHosts(environment: string): string[]; getFallbackHosts(options: NormalisedClientOptions): string[]; - getHosts(options: NormalisedClientOptions): string[]; + getHosts(options: NormalisedClientOptions, ws?: boolean): string[]; checkHost(host: string): void; getRealtimeHost(options: ClientOptions, production: boolean, environment: string): string; objectifyOptions(options: ClientOptions | string): ClientOptions; @@ -74,8 +74,8 @@ const Defaults = { connectionStateTtl: 120000, realtimeRequestTimeout: 10000, recvTimeout: 90000, - preferenceConnectTimeout: 6000, - parallelUpgradeDelay: 6000, + webSocketConnectTimeout: 10000, + webSocketSlowTimeout: 4000, }, httpMaxRetryCount: 3, maxMessageSize: 65536, @@ -130,8 +130,9 @@ export function getFallbackHosts(options: NormalisedClientOptions): string[] { return fallbackHosts ? Utils.arrChooseN(fallbackHosts, httpMaxRetryCount) : []; } -export function getHosts(options: NormalisedClientOptions): string[] { - return [options.restHost].concat(getFallbackHosts(options)); +export function getHosts(options: NormalisedClientOptions, ws?: boolean): string[] { + const hosts = [options.restHost].concat(getFallbackHosts(options)); + return ws ? hosts.map((host) => getHost(options, host, true)) : hosts; } function checkHost(host: string): void { diff --git a/src/common/types/IDefaults.d.ts b/src/common/types/IDefaults.d.ts index 1fd02bf5da..5e2e12d37d 100644 --- a/src/common/types/IDefaults.d.ts +++ b/src/common/types/IDefaults.d.ts @@ -3,9 +3,7 @@ import { RestAgentOptions } from './ClientOptions'; export default interface IDefaults { connectivityCheckUrl: string; + wsConnectivityUrl: string; defaultTransports: TransportName[]; - baseTransportOrder: TransportName[]; - transportPreferenceOrder: TransportName[]; - upgradeTransports: TransportName[]; restAgentOptions?: RestAgentOptions; } diff --git a/src/common/types/IPlatformConfig.d.ts b/src/common/types/IPlatformConfig.d.ts index 65b757b7dc..7ceeddc1c8 100644 --- a/src/common/types/IPlatformConfig.d.ts +++ b/src/common/types/IPlatformConfig.d.ts @@ -30,7 +30,6 @@ export interface ISpecificPlatformConfig { fetchSupported?: boolean; xhrSupported?: boolean; allowComet?: boolean; - streamingSupported?: boolean; ArrayBuffer?: typeof ArrayBuffer | false; atob?: typeof atob | null; TextEncoder?: typeof TextEncoder; diff --git a/src/platform/nativescript/config.js b/src/platform/nativescript/config.js index 842b8af527..db739aaf30 100644 --- a/src/platform/nativescript/config.js +++ b/src/platform/nativescript/config.js @@ -24,7 +24,6 @@ var Config = { WebSocket: WebSocket, xhrSupported: XMLHttpRequest, allowComet: true, - streamingSupported: false, useProtocolHeartbeats: true, supportsBinary: typeof TextDecoder !== 'undefined' && TextDecoder, preferBinary: false, // Motivation as on web; see `preferBinary` comment there. diff --git a/src/platform/nodejs/lib/util/defaults.ts b/src/platform/nodejs/lib/util/defaults.ts index 8b29953cbd..ca5e479ea8 100644 --- a/src/platform/nodejs/lib/util/defaults.ts +++ b/src/platform/nodejs/lib/util/defaults.ts @@ -3,12 +3,10 @@ import { TransportNames } from '../../../../common/constants/TransportName'; const Defaults: IDefaults = { connectivityCheckUrl: 'https://internet-up.ably-realtime.com/is-the-internet-up.txt', + wsConnectivityUrl: 'wss://ws-up.ably-realtime-nonprod.com/', /* Note: order matters here: the base transport is the leftmost one in the * intersection of baseTransportOrder and the transports clientOption that's supported. */ defaultTransports: [TransportNames.WebSocket], - baseTransportOrder: [TransportNames.Comet, TransportNames.WebSocket], - transportPreferenceOrder: [TransportNames.Comet, TransportNames.WebSocket], - upgradeTransports: [TransportNames.WebSocket], restAgentOptions: { maxSockets: 40, keepAlive: true }, }; diff --git a/src/platform/react-native/config.ts b/src/platform/react-native/config.ts index 7e92eaa59c..8a0be88127 100644 --- a/src/platform/react-native/config.ts +++ b/src/platform/react-native/config.ts @@ -13,7 +13,6 @@ export default function (bufferUtils: typeof BufferUtils): IPlatformConfig { WebSocket: WebSocket, xhrSupported: true, allowComet: true, - streamingSupported: true, useProtocolHeartbeats: true, supportsBinary: !!(typeof TextDecoder !== 'undefined' && TextDecoder), preferBinary: false, // Motivation as on web; see `preferBinary` comment there. diff --git a/src/platform/web/config.ts b/src/platform/web/config.ts index 977ae102c0..dfb682364b 100644 --- a/src/platform/web/config.ts +++ b/src/platform/web/config.ts @@ -41,7 +41,6 @@ const Config: IPlatformConfig = { fetchSupported: !!globalObject.fetch, xhrSupported: globalObject.XMLHttpRequest && 'withCredentials' in new XMLHttpRequest(), allowComet: allowComet(), - streamingSupported: true, useProtocolHeartbeats: true, supportsBinary: !!globalObject.TextDecoder, /* Per Paddy (https://ably-real-time.slack.com/archives/CURL4U2FP/p1705674537763479) web intentionally prefers JSON to MessagePack: diff --git a/src/platform/web/lib/util/defaults.ts b/src/platform/web/lib/util/defaults.ts index 2ff50682b2..f4c3471d0f 100644 --- a/src/platform/web/lib/util/defaults.ts +++ b/src/platform/web/lib/util/defaults.ts @@ -3,13 +3,11 @@ import { TransportNames } from 'common/constants/TransportName'; const Defaults: IDefaults = { connectivityCheckUrl: 'https://internet-up.ably-realtime.com/is-the-internet-up.txt', + wsConnectivityUrl: 'wss://ws-up.ably-realtime-nonprod.com/', /* Order matters here: the base transport is the leftmost one in the * intersection of baseTransportOrder and the transports clientOption that's * supported. */ defaultTransports: [TransportNames.XhrPolling, TransportNames.WebSocket], - baseTransportOrder: [TransportNames.XhrPolling, TransportNames.WebSocket], - transportPreferenceOrder: [TransportNames.XhrPolling, TransportNames.WebSocket], - upgradeTransports: [TransportNames.WebSocket], }; export default Defaults; diff --git a/test/browser/connection.test.js b/test/browser/connection.test.js index 036cf06f00..b491a5511b 100644 --- a/test/browser/connection.test.js +++ b/test/browser/connection.test.js @@ -412,7 +412,7 @@ define(['shared_helper', 'chai'], function (helper, chai) { var realtime = helper.AblyRealtime(); try { expect(realtime.connection.connectionManager.baseTransport).to.equal('xhr_polling'); - expect(realtime.connection.connectionManager.upgradeTransports).to.deep.equal(['web_socket']); + expect(realtime.connection.connectionManager.webSocketTransport).to.be.ok; } catch (err) { closeAndFinish(done, realtime, err); return; diff --git a/test/common/modules/shared_helper.js b/test/common/modules/shared_helper.js index 6e48c36ba3..db5d4c1630 100644 --- a/test/common/modules/shared_helper.js +++ b/test/common/modules/shared_helper.js @@ -138,7 +138,7 @@ define([ } /* testFn is assumed to be a function of realtimeOptions that returns a mocha test */ - function testOnAllTransports(name, testFn, excludeUpgrade, skip) { + function testOnAllTransports(name, testFn, skip) { var itFn = skip ? it.skip : it; let transports = availableTransports; transports.forEach(function (transport) { @@ -151,18 +151,16 @@ define([ testFn({ transports: [transport], useBinaryProtocol: false }) ); }); - /* Plus one for no transport specified (ie use upgrade mechanism if + /* Plus one for no transport specified (ie use websocket/base mechanism if * present). (we explicitly specify all transports since node only does - * nodecomet+upgrade if comet is explicitly requested + * websocket+nodecomet if comet is explicitly requested) * */ - if (!excludeUpgrade) { - itFn(name + '_with_binary_transport', testFn({ transports, useBinaryProtocol: true })); - itFn(name + '_with_text_transport', testFn({ transports, useBinaryProtocol: false })); - } + itFn(name + '_with_binary_transport', testFn({ transports, useBinaryProtocol: true })); + itFn(name + '_with_text_transport', testFn({ transports, useBinaryProtocol: false })); } - testOnAllTransports.skip = function (name, testFn, excludeUpgrade) { - testOnAllTransports(name, testFn, excludeUpgrade, true); + testOnAllTransports.skip = function (name, testFn) { + testOnAllTransports(name, testFn, true); }; function restTestOnJsonMsgpack(name, testFn, skip) { @@ -221,6 +219,8 @@ define([ expect(json1 === json2, 'JSON data contents mismatch.').to.be.ok; } + beforeEach(clearTransportPreference); + after(function (done) { this.timeout(10 * 1000); testAppModule.tearDown(function (err) { diff --git a/test/realtime/channel.test.js b/test/realtime/channel.test.js index b02f202f10..117e779216 100644 --- a/test/realtime/channel.test.js +++ b/test/realtime/channel.test.js @@ -280,9 +280,6 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, helper, async }, true ); - /* NB upgrade is excluded because realtime now sends an ATTACHED - * post-upgrade, which can race with the DETACHED if the DETACH is only sent - * just after upgrade. Re-include it with 1.1 spec which has IDs in ATTACHs */ /* * Attach with an empty channel and expect a channel error @@ -690,10 +687,7 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, helper, async }, function (cb) { var channelUpdated = false; - // attached or update: in case this is happening in parallel with - // a transport upgrade, we might flip to attaching, meaning it'll come - // through as attached not update - channel._allChannelChanges.on(['attached', 'update'], function () { + channel._allChannelChanges.on(['update'], function () { channelUpdated = true; }); diff --git a/test/realtime/connection.test.js b/test/realtime/connection.test.js index 8d5b9fb163..bb27848dc2 100644 --- a/test/realtime/connection.test.js +++ b/test/realtime/connection.test.js @@ -163,7 +163,7 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, helper, async /* * Check that a message published on one transport that has not yet been * acked will be republished with the same msgSerial on a new transport (eg - * after a resume or an upgrade), before any new messages are send (and + * after a resume), before any new messages are send (and * without being merged with new messages) */ it('connectionQueuing', function (done) { diff --git a/test/realtime/event_emitter.test.js b/test/realtime/event_emitter.test.js index 3c4d9be3ce..435a7e1ac4 100644 --- a/test/realtime/event_emitter.test.js +++ b/test/realtime/event_emitter.test.js @@ -25,10 +25,7 @@ define(['shared_helper', 'chai'], function (helper, chai) { */ it('attachdetach0', function (done) { try { - /* Note: realtime now sends an ATTACHED post-upgrade, which can race with - * the DETACHED if the DETACH is only sent just after upgrade. Remove - * bestTransport with 1.1 spec which has IDs in ATTACHs */ - var realtime = helper.AblyRealtime({ transports: [helper.bestTransport] }), + var realtime = helper.AblyRealtime(), index, expectedConnectionEvents = ['connecting', 'connected', 'closing', 'closed'], expectedChannelEvents = ['attaching', 'attached', 'detaching', 'detached']; diff --git a/test/realtime/failure.test.js b/test/realtime/failure.test.js index a4e9601ad2..2e68a52f08 100644 --- a/test/realtime/failure.test.js +++ b/test/realtime/failure.test.js @@ -60,7 +60,7 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, helper, async .map(function (transport) { return failure_test([transport]); }) - .concat(failure_test(null)), // to test not specifying a transport (so will use upgrade mechanism) + .concat(failure_test(null)), // to test not specifying a transport (so will use websocket/base mechanism) function (err, realtimes) { closeAndFinish(done, realtimes, err); } @@ -98,7 +98,7 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, helper, async .map(function (transport) { return break_test([transport]); }) - .concat(break_test(null)), // to test not specifying a transport (so will use upgrade mechanism) + .concat(break_test(null)), // to test not specifying a transport (so will use websocket/base mechanism) function (err, realtimes) { closeAndFinish(done, realtimes, err); } @@ -127,7 +127,7 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, helper, async * the suspended timeout trips before three connection cycles */ disconnectedRetryTimeout: 1000, realtimeRequestTimeout: 50, - preferenceConnectTimeout: 50, + webSocketConnectTimeout: 50, suspendedRetryTimeout: 1000, connectionStateTtl: 2900, }); @@ -173,7 +173,7 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, helper, async .map(function (transport) { return lifecycleTest([transport]); }) - .concat(lifecycleTest(null)), // to test not specifying a transport (so will use upgrade mechanism) + .concat(lifecycleTest(null)), // to test not specifying a transport (so will use websocket/base mechanism) function (err) { if (err) { done(err); diff --git a/test/realtime/init.test.js b/test/realtime/init.test.js index 7ea8490e56..3ee47d849c 100644 --- a/test/realtime/init.test.js +++ b/test/realtime/init.test.js @@ -31,7 +31,7 @@ define(['ably', 'shared_helper', 'chai'], function (Ably, helper, chai) { realtime.connection.on('connected', function () { /* check api version */ var transport = realtime.connection.connectionManager.activeProtocol.transport; - var connectUri = helper.isWebsocket(transport) ? transport.uri : transport.recvRequest.uri; + var connectUri = helper.isWebsocket(transport) ? transport.uri : transport.recvRequest.recvUri; try { expect(connectUri.indexOf('v=3') > -1, 'Check uri includes v=3').to.be.ok; } catch (err) { @@ -309,14 +309,14 @@ define(['ably', 'shared_helper', 'chai'], function (Ably, helper, chai) { } }); - /* Check base and upgrade transports (nodejs only; browser tests in their own section) */ + /* Check base and websocket transports (nodejs only; browser tests in their own section) */ if (!isBrowser) { it('node_transports', function (done) { var realtime; try { realtime = helper.AblyRealtime({ transports: helper.availableTransports }); expect(realtime.connection.connectionManager.baseTransport).to.equal('comet'); - expect(realtime.connection.connectionManager.upgradeTransports).to.deep.equal(['web_socket']); + expect(realtime.connection.connectionManager.webSocketTransport).to.be.ok; closeAndFinish(done, realtime); } catch (err) { closeAndFinish(done, realtime, err); @@ -331,9 +331,9 @@ define(['ably', 'shared_helper', 'chai'], function (Ably, helper, chai) { var keyStr = helper.getTestApp().keys[0].keyStr; var realtime = helper.AblyRealtime({ key: keyStr, useTokenAuth: true }); realtime.connection.connectionManager.once('transport.pending', function (state) { - var transport = realtime.connection.connectionManager.pendingTransports[0], + var transport = realtime.connection.connectionManager.pendingTransport, originalOnProtocolMessage = transport.onProtocolMessage; - realtime.connection.connectionManager.pendingTransports[0].onProtocolMessage = function (message) { + realtime.connection.connectionManager.pendingTransport.onProtocolMessage = function (message) { try { if (message.action === 4) { expect(message.connectionDetails.connectionKey).to.be.ok; @@ -371,13 +371,14 @@ define(['ably', 'shared_helper', 'chai'], function (Ably, helper, chai) { var realtime = helper.AblyRealtime({ httpMaxRetryCount: 3, fallbackHosts: ['a', 'b', 'c'], + transport: ['web_socket'], }); realtime.connection.once('connected', function () { try { var hosts = new Ably.Rest._Http()._getHosts(realtime); /* restHost rather than realtimeHost as that's what connectionManager * knows about; converted to realtimeHost by the websocketTransport */ - expect(hosts[0]).to.equal(realtime.options.restHost, 'Check connected realtime host is the first option'); + expect(hosts[0]).to.equal(realtime.options.realtimeHost, 'Check connected realtime host is the first option'); expect(hosts.length).to.equal(4, 'Check also have three fallbacks'); } catch (err) { closeAndFinish(done, realtime, err); diff --git a/test/realtime/presence.test.js b/test/realtime/presence.test.js index a0e327f4ea..01591fdc6c 100644 --- a/test/realtime/presence.test.js +++ b/test/realtime/presence.test.js @@ -1708,8 +1708,7 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, helper, async /* Enter ten clients while attaching, finish the attach, check they were all entered correctly */ it('multiple_pending', function (done) { - /* single transport to avoid upgrade stalling due to the stubbed attachImpl */ - var realtime = helper.AblyRealtime({ transports: [helper.bestTransport] }), + var realtime = helper.AblyRealtime(), channel = realtime.channels.get('multiple_pending'), originalAttachImpl = channel.attachImpl; diff --git a/test/realtime/resume.test.js b/test/realtime/resume.test.js index ba1931765b..a3d169d62f 100644 --- a/test/realtime/resume.test.js +++ b/test/realtime/resume.test.js @@ -138,15 +138,11 @@ define(['shared_helper', 'async', 'chai'], function (helper, async, chai) { }); } - testOnAllTransports( - 'resume_inactive', - function (realtimeOpts) { - return function (done) { - resume_inactive(done, 'resume_inactive' + String(Math.random()), {}, realtimeOpts); - }; - }, - /* excludeUpgrade: */ true - ); + testOnAllTransports('resume_inactive', function (realtimeOpts) { + return function (done) { + resume_inactive(done, 'resume_inactive' + String(Math.random()), {}, realtimeOpts); + }; + }); /** * Simple resume case @@ -259,15 +255,11 @@ define(['shared_helper', 'async', 'chai'], function (helper, async, chai) { }); } - testOnAllTransports( - 'resume_active', - function (realtimeOpts) { - return function (done) { - resume_active(done, 'resume_active' + String(Math.random()), {}, realtimeOpts); - }; - }, - /* excludeUpgrade: */ true - ); + testOnAllTransports('resume_active', function (realtimeOpts) { + return function (done) { + resume_active(done, 'resume_active' + String(Math.random()), {}, realtimeOpts); + }; + }); /* RTN15c3 * Resume with loss of continuity @@ -547,8 +539,7 @@ define(['shared_helper', 'async', 'chai'], function (helper, async, chai) { * connection was > connectionStateTtl ago */ it('no_resume_last_activity', function (done) { - /* Specify a best transport so that upgrade activity doesn't reset the last activity timer */ - var realtime = helper.AblyRealtime({ transports: [bestTransport] }), + var realtime = helper.AblyRealtime(), connection = realtime.connection, connectionManager = connection.connectionManager; diff --git a/test/realtime/transports.test.js b/test/realtime/transports.test.js new file mode 100644 index 0000000000..8056a431bd --- /dev/null +++ b/test/realtime/transports.test.js @@ -0,0 +1,230 @@ +'use strict'; + +define(['shared_helper', 'async', 'chai', 'ably'], function (helper, async, chai, Ably) { + const expect = chai.expect; + const closeAndFinish = helper.closeAndFinish; + const monitorConnection = helper.monitorConnection; + const whenPromiseSettles = helper.whenPromiseSettles; + const Defaults = Ably.Rest.Platform.Defaults; + const originialWsCheckUrl = Defaults.wsConnectivityUrl; + const transportPreferenceName = 'ably-transport-preference'; + const localStorageSupported = globalThis.localStorage; + const urlScheme = 'https://'; + const echoServer = 'echo.ably.io'; + const failUrl = urlScheme + echoServer + '/respondwith?status=500'; + const availableTransports = helper.availableTransports; + const defaultTransports = new Ably.Realtime({ key: 'xxx:yyy', autoConnect: false }).connection.connectionManager + .transports; + const baseTransport = new Ably.Realtime({ key: 'xxx:yyy', autoConnect: false }).connection.connectionManager + .baseTransport; + const mixin = helper.Utils.mixin; + + function restoreWsConnectivityUrl() { + Defaults.wsConnectivityUrl = originialWsCheckUrl; + } + + const Config = Ably.Rest.Platform.Config; + const oldWs = Config.WebSocket; + + function restoreWebSocketConstructor() { + Config.WebSocket = oldWs; + } + + // drop in replacement for WebSocket which doesn't emit any events (same behaviour as when WebSockets upgrade headers are removed by a proxy) + class FakeWebSocket { + close() {} + } + + describe('realtime/transports', function () { + this.timeout(60 * 1000); + + before(function (done) { + helper.setupApp(function (err) { + if (err) { + done(err); + return; + } + done(); + }); + }); + + afterEach(restoreWsConnectivityUrl); + afterEach(restoreWebSocketConstructor); + + if (helper.availableTransports.length > 1) { + // ensure comet transport is used for nodejs tests + function options(opts) { + return mixin( + { + transports: helper.availableTransports, + }, + opts || {} + ); + } + + it('websocket_is_default', function (done) { + const realtime = helper.AblyRealtime(options()); + + realtime.connection.on('connected', function () { + try { + expect(realtime.connection.connectionManager.activeProtocol.transport.shortName).to.equal('web_socket'); + } catch (err) { + closeAndFinish(done, realtime, err); + } + closeAndFinish(done, realtime); + }); + + monitorConnection(done, realtime); + }); + + it('no_ws_connectivity', function (done) { + Config.WebSocket = FakeWebSocket; + const realtime = helper.AblyRealtime(options({ webSocketSlowTimeout: 1000, webSocketConnectTimeout: 3000 })); + + realtime.connection.on('connected', function () { + try { + expect(realtime.connection.connectionManager.activeProtocol.transport.shortName).to.equal(baseTransport); + // check that transport preference is set + if (localStorageSupported) { + expect(window.localStorage.getItem(transportPreferenceName)).to.equal( + JSON.stringify({ value: baseTransport }) + ); + } + } catch (err) { + closeAndFinish(done, realtime, err); + } + closeAndFinish(done, realtime); + }); + + monitorConnection(done, realtime); + }); + + it('ws_primary_host_fails', function (done) { + const goodHost = helper.AblyRest().options.realtimeHost; + const realtime = helper.AblyRealtime( + options({ realtimeHost: helper.unroutableAddress, fallbackHosts: [goodHost] }) + ); + + realtime.connection.on('connected', function () { + expect(realtime.connection.connectionManager.activeProtocol.transport.shortName).to.equal('web_socket'); + closeAndFinish(done, realtime); + }); + + monitorConnection(done, realtime); + }); + + it('no_internet_connectivity', function (done) { + Config.WebSocket = FakeWebSocket; + const realtime = helper.AblyRealtime(options({ connectivityCheckUrl: failUrl, webSocketSlowTimeout: 1000 })); + + // expect client to transition to disconnected rather than attempting base transport (which would succeed in this instance) + realtime.connection.on('disconnected', function () { + closeAndFinish(done, realtime); + }); + }); + + it('no_websocket_or_base_transport', function (done) { + Config.WebSocket = FakeWebSocket; + const realtime = helper.AblyRealtime({ + transports: ['web_socket'], + realtimeRequestTimeout: 3000, + webSocketConnectTimeout: 3000, + }); + + realtime.connection.on('disconnected', function () { + closeAndFinish(done, realtime); + }); + }); + + if (localStorageSupported) { + it('base_transport_preference', function (done) { + window.localStorage.setItem(transportPreferenceName, JSON.stringify({ value: baseTransport })); + const realtime = helper.AblyRealtime(options()); + + // make ws connectivity check only resolve after connected with base transport. + // prevents a race condition where the wsConnectivity check succeeds before base transport is activated; + // in this case the base transport would be abandoned in favour of websocket + realtime.connection.connectionManager.checkWsConnectivity = function () { + return new Promise((resolve) => { + realtime.connection.once('connected', () => { + resolve(); + }); + }); + }; + + realtime.connection.on('connected', function () { + try { + expect(realtime.connection.connectionManager.activeProtocol.transport.shortName).to.equal(baseTransport); + } catch (err) { + closeAndFinish(done, realtime, err); + } + closeAndFinish(done, realtime); + }); + monitorConnection(done, realtime); + }); + + it('transport_preference_reset_while_connecting', function (done) { + window.localStorage.setItem(transportPreferenceName, JSON.stringify({ value: baseTransport })); + const realtime = helper.AblyRealtime(options()); + + // make ws connectivity check fast so that it succeeds while base transport is still connecting + realtime.connection.connectionManager.checkWsConnectivity = function () { + return new Promise((resolve) => { + setTimeout(() => resolve(), 1); + }); + }; + + realtime.connection.once('connected', function () { + try { + expect(realtime.connection.connectionManager.activeProtocol.transport.shortName).to.equal('web_socket'); + expect(realtime.connection.connectionManager.getTransportPreference()).to.equal('web_socket'); + } catch (err) { + closeAndFinish(done, realtime, err); + return; + } + closeAndFinish(done, realtime); + }); + monitorConnection(done, realtime); + }); + + it('transport_preference_reset_after_connected', function (done) { + window.localStorage.setItem(transportPreferenceName, JSON.stringify({ value: baseTransport })); + const realtime = helper.AblyRealtime(options()); + + // make ws connectivity check only resolve after connected with base transport + realtime.connection.connectionManager.checkWsConnectivity = function () { + return new Promise((resolve) => { + realtime.connection.once('connected', () => { + try { + expect(realtime.connection.connectionManager.activeProtocol.transport.shortName).to.equal( + baseTransport + ); + resolve(); + } catch (err) { + closeAndFinish(done, realtime, err); + return; + } + }); + }); + }; + + realtime.connection.once('connected', function () { + // the checkWsConnectivity promise won't execute .then callbacks synchronously upon resolution + // so we need to wait one tick before the transport preference is unpersisted + setTimeout(() => { + try { + // ensure base transport preference is erased + expect(realtime.connection.connectionManager.getTransportPreference()).to.equal(null); + } catch (err) { + closeAndFinish(done, realtime, err); + return; + } + closeAndFinish(done, realtime); + }, 0); + }); + monitorConnection(done, realtime); + }); + } + } + }); +}); diff --git a/test/realtime/upgrade.test.js b/test/realtime/upgrade.test.js deleted file mode 100644 index 971eae1f36..0000000000 --- a/test/realtime/upgrade.test.js +++ /dev/null @@ -1,709 +0,0 @@ -'use strict'; - -define(['shared_helper', 'async', 'chai', 'ably'], function (helper, async, chai, Ably) { - var expect = chai.expect; - var rest; - var publishIntervalHelper = function (currentMessageNum, channel, dataFn, onPublish) { - return function (currentMessageNum) { - whenPromiseSettles(channel.publish('event0', dataFn()), function () { - onPublish(); - }); - }; - }; - var publishAtIntervals = function (numMessages, channel, dataFn, onPublish) { - for (var i = numMessages; i > 0; i--) { - setTimeout(publishIntervalHelper(i, channel, dataFn, onPublish), 2 * i); - } - }; - var closeAndFinish = helper.closeAndFinish; - var monitorConnection = helper.monitorConnection; - var bestTransport = helper.bestTransport; - var whenPromiseSettles = helper.whenPromiseSettles; - - if (bestTransport === 'web_socket') { - describe('realtime/upgrade', function () { - this.timeout(60 * 1000); - - before(function (done) { - helper.setupApp(function (err) { - if (err) { - done(err); - return; - } - rest = helper.AblyRest(); - done(); - }); - }); - - afterEach(helper.clearTransportPreference); - - /* - * Publish once with REST, before upgrade, verify message received - */ - it('publishpreupgrade', function (done) { - var transportOpts = { useBinaryProtocol: true, transports: helper.availableTransports }; - try { - var realtime = helper.AblyRealtime(transportOpts); - /* connect and attach */ - realtime.connection.on('connected', function () { - //console.log('publishpreupgrade: connected'); - var testMsg = 'Hello world'; - var rtChannel = realtime.channels.get('publishpreupgrade'); - whenPromiseSettles(rtChannel.attach(), function (err) { - if (err) { - closeAndFinish(done, realtime, err); - return; - } - - /* subscribe to event */ - rtChannel.subscribe('event0', function (msg) { - try { - expect(msg.data).to.equal(testMsg, 'Unexpected msg text received'); - } catch (err) { - closeAndFinish(done, realtime, err); - return; - } - closeAndFinish(done, realtime); - }); - - /* publish event */ - var restChannel = rest.channels.get('publishpreupgrade'); - whenPromiseSettles(restChannel.publish('event0', testMsg), function (err) { - if (err) { - closeAndFinish(done, realtime, err); - } - }); - }); - }); - monitorConnection(done, realtime); - } catch (err) { - closeAndFinish(done, realtime, err); - } - }); - - /* - * Publish once with REST, after upgrade, verify message received on active transport - */ - it('publishpostupgrade0', function (done) { - var transportOpts = { useBinaryProtocol: true, transports: helper.availableTransports }; - try { - var realtime = helper.AblyRealtime(transportOpts); - - /* subscribe to event */ - var rtChannel = realtime.channels.get('publishpostupgrade0'); - rtChannel.subscribe('event0', function (msg) { - try { - expect(msg.data).to.equal(testMsg, 'Unexpected msg text received'); - } catch (err) { - closeAndFinish(done, realtime, err); - return; - } - var closeFn = function () { - closeAndFinish(done, realtime); - }; - if (isBrowser) setTimeout(closeFn, 0); - else process.nextTick(closeFn); - }); - - /* publish event */ - var testMsg = 'Hello world'; - var restChannel = rest.channels.get('publishpostupgrade0'); - var connectionManager = realtime.connection.connectionManager; - connectionManager.on('transport.active', function (transport) { - //console.log('publishpostupgrade0: transport active: transport = ' + transport); - if (transport.toString().match(/wss?\:/)) { - if (rtChannel.state == 'attached') { - //console.log('*** publishpostupgrade0: publishing (channel attached on transport active) ...'); - whenPromiseSettles(restChannel.publish('event0', testMsg), function (err) { - //console.log('publishpostupgrade0: publish returned err = ' + displayError(err)); - if (err) { - closeAndFinish(done, realtime, err); - } - }); - } else { - rtChannel.on('attached', function () { - //console.log('*** publishpostupgrade0: publishing (channel attached after wait) ...'); - whenPromiseSettles(restChannel.publish('event0', testMsg), function (err) { - //console.log('publishpostupgrade0: publish returned err = ' + displayError(err)); - if (err) { - closeAndFinish(done, realtime, err); - } - }); - }); - } - } - }); - monitorConnection(done, realtime); - } catch (err) { - closeAndFinish(done, realtime, err); - } - }); - - /* - * Publish once with REST, after upgrade, verify message not received on inactive transport - */ - it('publishpostupgrade1', function (done) { - var transportOpts = { useBinaryProtocol: true, transports: helper.availableTransports }; - try { - var realtime = helper.AblyRealtime(transportOpts); - - /* subscribe to event */ - var rtChannel = realtime.channels.get('publishpostupgrade1'); - rtChannel.subscribe('event0', function (msg) { - try { - expect(msg.data).to.equal(testMsg, 'Unexpected msg text received'); - } catch (err) { - closeAndFinish(done, realtime, err); - return; - } - var closeFn = function () { - closeAndFinish(done, realtime); - }; - if (isBrowser) setTimeout(closeFn, 0); - else process.nextTick(closeFn); - }); - - /* publish event */ - var testMsg = 'Hello world'; - var restChannel = rest.channels.get('publishpostupgrade1'); - var connectionManager = realtime.connection.connectionManager; - connectionManager.on('transport.active', function (transport) { - if (helper.isComet(transport)) { - /* override the processing of incoming messages on this channel - * so we can see if a message arrived. - * NOTE: this relies on knowledge of the internal implementation - * of the transport */ - - var originalOnProtocolMessage = transport.onProtocolMessage; - transport.onProtocolMessage = function (message) { - if (message.messages) { - closeAndFinish(done, realtime, new Error('Message received on comet transport')); - return; - } - originalOnProtocolMessage.apply(this, arguments); - }; - } - }); - connectionManager.on('transport.active', function (transport) { - if (helper.isWebsocket(transport)) { - if (rtChannel.state == 'attached') { - //console.log('*** publishing (channel attached on transport active) ...'); - restChannel.publish('event0', testMsg); - } else { - rtChannel.on('attached', function () { - //console.log('*** publishing (channel attached after wait) ...'); - restChannel.publish('event0', testMsg); - }); - } - } - }); - - monitorConnection(done, realtime); - } catch (err) { - closeAndFinish(done, realtime, err); - } - }); - - /** - * Publish and subscribe, text protocol - */ - it('upgradepublish0', function (done) { - var count = 10; - var cbCount = 10; - var checkFinish = function () { - if (count <= 0 && cbCount <= 0) { - closeAndFinish(done, realtime); - } - }; - var onPublish = function () { - --cbCount; - checkFinish(); - }; - var transportOpts = { useBinaryProtocol: false, transports: helper.availableTransports }; - var realtime = helper.AblyRealtime(transportOpts); - var channel = realtime.channels.get('upgradepublish0'); - /* subscribe to event */ - whenPromiseSettles( - channel.subscribe('event0', function () { - --count; - checkFinish(); - }), - function () { - var dataFn = function () { - return 'Hello world at: ' + new Date(); - }; - publishAtIntervals(count, channel, dataFn, onPublish); - } - ); - }); - - /** - * Publish and subscribe, binary protocol - */ - it('upgradepublish1', function (done) { - var count = 10; - var cbCount = 10; - var checkFinish = function () { - if (count <= 0 && cbCount <= 0) { - closeAndFinish(done, realtime); - } - }; - var onPublish = function () { - --cbCount; - checkFinish(); - }; - var transportOpts = { useBinaryProtocol: true, transports: helper.availableTransports }; - var realtime = helper.AblyRealtime(transportOpts); - var channel = realtime.channels.get('upgradepublish1'); - /* subscribe to event */ - whenPromiseSettles( - channel.subscribe('event0', function () { - --count; - checkFinish(); - }), - function () { - var dataFn = function () { - return 'Hello world at: ' + new Date(); - }; - publishAtIntervals(count, channel, dataFn, onPublish); - } - ); - }); - - /* - * Base upgrade case - */ - it('upgradebase0', function (done) { - var transportOpts = { useBinaryProtocol: true, transports: helper.availableTransports }; - var cometDeactivated = false; - try { - var realtime = helper.AblyRealtime(transportOpts); - /* check that we see the transport we're interested in get activated, - * and that we see the comet transport deactivated */ - var failTimer = setTimeout(function () { - closeAndFinish(done, realtime, new Error('upgrade heartbeat failed (timer expired)')); - }, 120000); - - var connectionManager = realtime.connection.connectionManager; - connectionManager.once('transport.inactive', function (transport) { - if (transport.toString().indexOf('/comet/') > -1) cometDeactivated = true; - }); - connectionManager.on('transport.active', function (transport) { - if (transport.toString().match(/wss?\:/)) { - clearTimeout(failTimer); - var closeFn = function () { - try { - expect(cometDeactivated).to.be.ok; - } catch (err) { - closeAndFinish(done, realtime, err); - return; - } - closeAndFinish(done, realtime); - }; - if (isBrowser) { - setTimeout(closeFn, 0); - } else { - process.nextTick(closeFn); - } - } - }); - monitorConnection(done, realtime); - } catch (err) { - closeAndFinish(done, realtime, err); - } - }); - - /* - * Check active heartbeat, text protocol - */ - it('upgradeheartbeat0', function (done) { - var transportOpts = { useBinaryProtocol: false, transports: helper.availableTransports }; - try { - var realtime = helper.AblyRealtime(transportOpts); - - /* when we see the transport we're interested in get activated, - * listen for the heartbeat event */ - var failTimer; - var connectionManager = realtime.connection.connectionManager; - connectionManager.on('transport.active', function (transport) { - if (transport.toString().match(/wss?\:/)) - transport.on('heartbeat', function () { - transport.off('heartbeat'); - clearTimeout(failTimer); - closeAndFinish(done, realtime); - }); - transport.ping(); - }); - - realtime.connection.on('connected', function () { - failTimer = setTimeout(function () { - closeAndFinish(done, realtime, new Error('upgrade heartbeat failed (timer expired)')); - }, 120000); - }); - monitorConnection(done, realtime); - } catch (err) { - closeAndFinish(done, realtime, err); - } - }); - - /* - * Check active heartbeat, binary protocol - */ - it('upgradeheartbeat1', function (done) { - var transportOpts = { useBinaryProtocol: true, transports: helper.availableTransports }; - try { - var realtime = helper.AblyRealtime(transportOpts); - - /* when we see the transport we're interested in get activated, - * listen for the heartbeat event */ - var failTimer; - var connectionManager = realtime.connection.connectionManager; - connectionManager.on('transport.active', function (transport) { - if (transport.toString().match(/wss?\:/)) - transport.on('heartbeat', function () { - transport.off('heartbeat'); - clearTimeout(failTimer); - closeAndFinish(done, realtime); - }); - transport.ping(); - }); - - realtime.connection.on('connected', function () { - failTimer = setTimeout(function () { - closeAndFinish(done, realtime, new Error('upgrade heartbeat failed (timer expired)')); - }, 120000); - }); - monitorConnection(done, realtime); - } catch (err) { - closeAndFinish(done, realtime, err); - } - }); - - /* - * Check heartbeat does not fire on inactive transport, text protocol - */ - it('upgradeheartbeat2', function (done) { - var transportOpts = { useBinaryProtocol: false, transports: helper.availableTransports }; - try { - var realtime = helper.AblyRealtime(transportOpts); - - /* when we see the transport we're interested in get activated, - * listen for the heartbeat event */ - var failTimer, cometTransport, wsTransport; - var connectionManager = realtime.connection.connectionManager; - connectionManager.on('transport.active', function (transport) { - var transportDescription = transport.toString(); - //console.log('active transport: ' + transportDescription); - if (transportDescription.indexOf('/comet/') > -1) { - cometTransport = transport; - cometTransport.on('heartbeat', function () { - closeAndFinish(done, realtime, new Error('verify heartbeat does not fire on inactive transport')); - }); - } - if (transportDescription.match(/wss?\:/)) { - wsTransport = transport; - wsTransport.on('heartbeat', function () { - clearTimeout(failTimer); - /* wait a couple of seconds to give it time - * in case it might still fire */ - setTimeout(function () { - closeAndFinish(done, realtime); - }, 2000); - }); - wsTransport.ping(); - } - }); - - realtime.connection.on('connected', function () { - failTimer = setTimeout(function () { - closeAndFinish(done, realtime, new Error('upgrade heartbeat failed (timer expired)')); - }, 120000); - }); - monitorConnection(done, realtime); - } catch (err) { - closeAndFinish(done, realtime, err); - } - }); - - /* - * Check heartbeat does not fire on inactive transport, binary protocol - */ - it('upgradeheartbeat3', function (done) { - var transportOpts = { useBinaryProtocol: true, transports: helper.availableTransports }; - try { - var realtime = helper.AblyRealtime(transportOpts); - - /* when we see the transport we're interested in get activated, - * listen for the heartbeat event */ - var failTimer, cometTransport, wsTransport; - var connectionManager = realtime.connection.connectionManager; - connectionManager.on('transport.active', function (transport) { - var transportDescription = transport.toString(); - //console.log('active transport: ' + transportDescription); - if (transportDescription.indexOf('/comet/') > -1) { - cometTransport = transport; - cometTransport.on('heartbeat', function () { - closeAndFinish(done, realtime, new Error('verify heartbeat does not fire on inactive transport')); - }); - } - if (transportDescription.match(/wss?\:/)) { - wsTransport = transport; - wsTransport.on('heartbeat', function () { - clearTimeout(failTimer); - /* wait a couple of seconds to give it time - * in case it might still fire */ - setTimeout(function () { - closeAndFinish(done, realtime); - }, 2000); - }); - wsTransport.ping(); - } - }); - - realtime.connection.on('connected', function () { - failTimer = setTimeout(function () { - closeAndFinish(done, realtime, new Error('upgrade heartbeat failed (timer expired)')); - }, 120000); - }); - monitorConnection(done, realtime); - } catch (err) { - closeAndFinish(done, realtime, err); - } - }); - - it('unrecoverableUpgrade', function (done) { - var realtime, - fakeConnectionKey = '_____!ablyjs_test_fake-key____', - fakeConnectionId = 'ablyjs_tes'; - - try { - /* on base transport active */ - realtime = helper.AblyRealtime({ transports: helper.availableTransports }); - realtime.connection.connectionManager.once('transport.active', function (transport) { - expect( - transport.toString().indexOf('/comet/') > -1, - 'assert first transport to become active is a comet transport' - ).to.be.ok; - try { - expect(realtime.connection.errorReason).to.equal(null, 'Check connection.errorReason is initially null'); - /* sabotage the upgrade */ - realtime.connection.connectionManager.connectionKey = fakeConnectionKey; - realtime.connection.connectionManager.connectionId = fakeConnectionId; - } catch (err) { - closeAndFinish(done, realtiem, err); - return; - } - - /* on upgrade failure */ - realtime.connection.once('update', function (stateChange) { - try { - expect(stateChange.reason.code).to.equal(80018, 'Check correct (unrecoverable connection) error'); - expect(stateChange.current).to.equal('connected', 'Check current is connected'); - expect(realtime.connection.errorReason.code).to.equal( - 80018, - 'Check error set in connection.errorReason' - ); - expect(realtime.connection.state).to.equal('connected', 'Check still connected'); - } catch (err) { - closeAndFinish(done, realtime, err); - return; - } - - /* Check events not still paused */ - var channel = realtime.channels.get('unrecoverableUpgrade'); - whenPromiseSettles(channel.attach(), function (err) { - if (err) { - closeAndFinish(done, realtime, err); - return; - } - channel.subscribe(function (msg) { - closeAndFinish(done, realtime); - }); - channel.publish('msg', null); - }); - }); - }); - } catch (err) { - closeAndFinish(done, realtime, err); - } - }); - - /* - * Check that a message that fails to publish on a comet transport can be - * seamlessly transferred to the websocket transport and published there - */ - it('message_timeout_stalling_upgrade', function (done) { - var realtime = helper.AblyRealtime({ transports: helper.availableTransports, httpRequestTimeout: 3000 }), - channel = realtime.channels.get('timeout1'), - connectionManager = realtime.connection.connectionManager; - - realtime.connection.once('connected', function () { - /* Sabotage comet sending */ - var transport = connectionManager.activeProtocol.getTransport(); - try { - expect(helper.isComet(transport), 'Check active transport is still comet').to.be.ok; - } catch (err) { - closeAndFinish(done, realtime, err); - return; - } - transport.sendUri = helper.unroutableAddress; - - async.parallel( - [ - function (cb) { - channel.subscribe('event', function () { - cb(); - }); - }, - function (cb) { - whenPromiseSettles(channel.publish('event', null), function (err) { - try { - expect(!err, 'Successfully published message').to.be.ok; - } catch (err) { - cb(err); - return; - } - cb(); - }); - }, - ], - function (err) { - closeAndFinish(done, realtime, err); - } - ); - }); - }); - - /* - * Check that after a successful upgrade, the transport pref is persisted, - * and subsequent connections do not upgrade - */ - it('persist_transport_prefs', function (done) { - var realtime = helper.AblyRealtime({ transports: helper.availableTransports }), - connection = realtime.connection, - connectionManager = connection.connectionManager; - - async.series( - [ - function (cb) { - connectionManager.once('transport.active', function (transport) { - try { - expect(helper.isComet(transport), 'Check first transport to become active is comet').to.be.ok; - } catch (err) { - cb(err); - return; - } - cb(); - }); - }, - function (cb) { - connectionManager.once('transport.active', function (transport) { - try { - expect(helper.isWebsocket(transport), 'Check second transport to become active is ws').to.be.ok; - } catch (err) { - cb(err); - return; - } - cb(); - }); - }, - function (cb) { - connection.once('closed', function () { - cb(); - }); - Ably.Realtime.Platform.Config.nextTick(function () { - connection.close(); - }); - }, - function (cb) { - connectionManager.once('transport.active', function (transport) { - try { - expect( - helper.isWebsocket(transport), - 'Check first transport to become active the second time round is websocket' - ).to.be.ok; - } catch (err) { - cb(err); - return; - } - cb(); - }); - connection.connect(); - }, - ], - function (err) { - closeAndFinish(done, realtime, err); - } - ); - }); - - /* - * Check that upgrades succeed even if the original transport dies before the sync - */ - it('upgrade_original_transport_dies', function (done) { - var realtime = helper.AblyRealtime({ transports: helper.availableTransports }), - connection = realtime.connection, - connectionManager = connection.connectionManager; - - async.series( - [ - function (cb) { - connectionManager.once('transport.active', function (transport) { - try { - expect(helper.isComet(transport), 'Check first transport to become active is comet').to.be.ok; - } catch (err) { - cb(err); - return; - } - cb(); - }); - }, - function (cb) { - connectionManager.on('transport.pending', function (transport) { - connectionManager.off('transport.pending'); - /* Abort comet transport nonfatally */ - var baseTransport = connectionManager.activeProtocol.getTransport(); - try { - expect(helper.isComet(baseTransport), 'Check original transport is still comet').to.be.ok; - } catch (err) { - cb(err); - return; - } - /* Check that if we do get a statechange, it's to connecting, not disconnected. */ - var stateChangeListener = function (stateChange) { - try { - expect(stateChange.current).to.equal( - 'connecting', - 'check that deactivateTransport only drops us to connecting as another transport is ready for activation' - ); - } catch (err) { - cb(err); - } - }; - connection.once(stateChangeListener); - connectionManager.once('connectiondetails', function () { - connection.off(stateChangeListener); - /* Check the upgrade completed */ - var newActiveTransport = connectionManager.activeProtocol.getTransport(); - try { - expect(transport).to.equal(newActiveTransport, 'Check the upgrade transport is now active'); - } catch (err) { - cb(err); - return; - } - cb(); - }); - transport.once('connected', function () { - baseTransport.disconnect({ code: 50000, statusCode: 500, message: 'a non-fatal transport error' }); - }); - }); - }, - ], - function (err) { - closeAndFinish(done, realtime, err); - } - ); - }); - }); - } -}); diff --git a/test/support/browser_file_list.js b/test/support/browser_file_list.js index 748ab836ad..fe8c027820 100644 --- a/test/support/browser_file_list.js +++ b/test/support/browser_file_list.js @@ -42,7 +42,7 @@ window.__testFiles__.files = { 'test/realtime/reauth.test.js': true, 'test/realtime/resume.test.js': true, 'test/realtime/sync.test.js': true, - 'test/realtime/upgrade.test.js': true, + 'test/realtime/transports.test.js': true, 'test/rest/auth.test.js': true, 'test/rest/bufferutils.test.js': true, 'test/rest/capability.test.js': true, diff --git a/test/support/root_hooks.js b/test/support/root_hooks.js index 8c96f0d332..4f40ebd64c 100644 --- a/test/support/root_hooks.js +++ b/test/support/root_hooks.js @@ -9,4 +9,6 @@ define(['shared_helper'], function (helper) { done(); }); }); + + beforeEach(helper.clearTransportPreference); });