diff --git a/packages/api-graphql/package.json b/packages/api-graphql/package.json index 34dfe40461d..6704198405d 100644 --- a/packages/api-graphql/package.json +++ b/packages/api-graphql/package.json @@ -16,19 +16,19 @@ "access": "public" }, "scripts": { - "test": "npm run lint && jest --coverage", - "test:size": "size-limit", - "build-with-test": "npm test && npm run build", - "build:cjs": "node ./build es5 && webpack && webpack --config ./webpack.config.dev.js", - "build:esm": "node ./build es6", - "build:cjs:watch": "node ./build es5 --watch", - "build:esm:watch": "node ./build es6 --watch", - "build": "npm run clean && npm run build:esm && npm run build:cjs", - "clean": "npm run clean:size && rimraf lib-esm lib dist", - "clean:size": "rimraf dual-publish-tmp tmp*", - "format": "echo \"Not implemented\"", - "lint": "tslint 'src/**/*.ts' && npm run ts-coverage", - "ts-coverage": "typescript-coverage-report -p ./tsconfig.build.json -t 75.62" + "test": "npm run lint && jest -w 1 --coverage", + "test:watch": "tslint 'src/**/*.ts' && jest -w 1 --watch", + "build-with-test": "npm run clean && npm test && tsc && webpack", + "build:cjs": "rimraf lib && tsc -m commonjs --outDir lib && webpack && webpack --config ./webpack.config.dev.js", + "build:esm": "rimraf lib-esm && tsc -m esnext --outDir lib-esm", + "build:cjs:watch": "rimraf lib && tsc -m commonjs --outDir lib --watch", + "build:esm:watch": "rimraf lib-esm && tsc -m esnext --outDir lib-esm --watch", + "build": "npm run clean && npm run build:esm && npm run build:cjs", + "clean": "npm run clean:size && rimraf lib-esm lib dist", + "clean:size": "rimraf dual-publish-tmp tmp*", + "format": "echo \"Not implemented\"", + "lint": "tslint 'src/**/*.ts' && npm run ts-coverage", + "ts-coverage": "typescript-coverage-report -p ./tsconfig.json -t 70.0" }, "repository": { "type": "git", diff --git a/packages/api-graphql/src/GraphQLAPI.ts b/packages/api-graphql/src/GraphQLAPI.ts index 2c433703f3b..e4009209e52 100644 --- a/packages/api-graphql/src/GraphQLAPI.ts +++ b/packages/api-graphql/src/GraphQLAPI.ts @@ -40,4 +40,3 @@ export class GraphQLAPIClass extends InternalGraphQLAPIClass { } export const GraphQLAPI = new GraphQLAPIClass(null); -// Amplify.register(GraphQLAPI); diff --git a/packages/api-graphql/src/Providers/AWSAppSyncRealTimeProvider/index.ts b/packages/api-graphql/src/Providers/AWSAppSyncRealTimeProvider/index.ts new file mode 100644 index 00000000000..7a368d3766c --- /dev/null +++ b/packages/api-graphql/src/Providers/AWSAppSyncRealTimeProvider/index.ts @@ -0,0 +1,1003 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +import Observable, { ZenObservable } from 'zen-observable-ts'; +import { GraphQLError } from 'graphql'; +import * as url from 'url'; +import { v4 as uuid } from 'uuid'; +import { Buffer } from 'buffer'; +import { Hub, fetchAuthSession } from '@aws-amplify/core'; + +import { + CONTROL_MSG, + ConnectionState, + PubSubContent, + PubSubContentObserver, +} from '../../types/PubSub'; + +import { signRequest } from '@aws-amplify/core/internals/aws-client-utils'; + +import { + AMPLIFY_SYMBOL, + AWS_APPSYNC_REALTIME_HEADERS, + CONNECTION_INIT_TIMEOUT, + DEFAULT_KEEP_ALIVE_TIMEOUT, + DEFAULT_KEEP_ALIVE_ALERT_TIMEOUT, + MAX_DELAY_MS, + MESSAGE_TYPES, + NON_RETRYABLE_CODES, + SOCKET_STATUS, + START_ACK_TIMEOUT, + SUBSCRIPTION_STATUS, + CONNECTION_STATE_CHANGE, +} from '../constants'; +import { + ConnectionStateMonitor, + CONNECTION_CHANGE, +} from '../../utils/ConnectionStateMonitor'; +import { + ReconnectEvent, + ReconnectionMonitor, +} from '../../utils/ReconnectionMonitor'; +import { GraphQLAuthMode } from '@aws-amplify/core/lib-esm/singleton/API/types'; + +import { + CustomUserAgentDetails, + Logger, + NonRetryableError, + USER_AGENT_HEADER, + getAmplifyUserAgent, + isNonRetryableError, + jitteredExponentialRetry, +} from '@aws-amplify/core/internals/utils'; + +const logger = new Logger('AWSAppSyncRealTimeProvider'); + +const dispatchApiEvent = payload => { + Hub.dispatch('api', payload, 'PubSub', AMPLIFY_SYMBOL); +}; + +export type ObserverQuery = { + observer: PubSubContentObserver; + query: string; + variables: Record; + subscriptionState: SUBSCRIPTION_STATUS; + subscriptionReadyCallback?: Function; + subscriptionFailedCallback?: Function; + startAckTimeoutId?: ReturnType; +}; + +const standardDomainPattern = + /^https:\/\/\w{26}\.appsync\-api\.\w{2}(?:(?:\-\w{2,})+)\-\d\.amazonaws.com(?:\.cn)?\/graphql$/i; + +const customDomainPath = '/realtime'; + +type DataObject = { + data: Record; +}; + +type DataPayload = { + id: string; + payload: DataObject; + type: string; +}; + +type ParsedMessagePayload = { + type: string; + payload: { + connectionTimeoutMs: number; + errors?: [{ errorType: string; errorCode: number }]; + }; +}; + +export interface AWSAppSyncRealTimeProviderOptions { + appSyncGraphqlEndpoint?: string; + authenticationType?: GraphQLAuthMode; + query?: string; + variables?: Record; + apiKey?: string; + region?: string; + graphql_headers?: () => {} | (() => Promise<{}>); + additionalHeaders?: { [key: string]: string }; +} + +type AWSAppSyncRealTimeAuthInput = + Partial & { + canonicalUri: string; + payload: string; + host?: string | undefined; + }; + +export class AWSAppSyncRealTimeProvider { + private awsRealTimeSocket?: WebSocket; + private socketStatus: SOCKET_STATUS = SOCKET_STATUS.CLOSED; + private keepAliveTimeoutId?: ReturnType; + private keepAliveTimeout = DEFAULT_KEEP_ALIVE_TIMEOUT; + private keepAliveAlertTimeoutId?: ReturnType; + private subscriptionObserverMap: Map = new Map(); + private promiseArray: Array<{ res: Function; rej: Function }> = []; + private connectionState: ConnectionState; + private readonly connectionStateMonitor = new ConnectionStateMonitor(); + private readonly reconnectionMonitor = new ReconnectionMonitor(); + private connectionStateMonitorSubscription: ZenObservable.Subscription; + + constructor(options: AWSAppSyncRealTimeProviderOptions = {}) { + // Monitor the connection state and pass changes along to Hub + this.connectionStateMonitorSubscription = + this.connectionStateMonitor.connectionStateObservable.subscribe( + connectionState => { + dispatchApiEvent({ + event: CONNECTION_STATE_CHANGE, + data: { + provider: this, + connectionState, + }, + message: `Connection state is ${connectionState}`, + }); + this.connectionState = connectionState; + + // Trigger START_RECONNECT when the connection is disrupted + if (connectionState === ConnectionState.ConnectionDisrupted) { + this.reconnectionMonitor.record(ReconnectEvent.START_RECONNECT); + } + + // Trigger HALT_RECONNECT to halt reconnection attempts when the state is anything other than + // ConnectionDisrupted or Connecting + if ( + [ + ConnectionState.Connected, + ConnectionState.ConnectedPendingDisconnect, + ConnectionState.ConnectedPendingKeepAlive, + ConnectionState.ConnectedPendingNetwork, + ConnectionState.ConnectionDisruptedPendingNetwork, + ConnectionState.Disconnected, + ].includes(connectionState) + ) { + this.reconnectionMonitor.record(ReconnectEvent.HALT_RECONNECT); + } + } + ); + } + + /** + * Mark the socket closed and release all active listeners + */ + close() { + // Mark the socket closed both in status and the connection monitor + this.socketStatus = SOCKET_STATUS.CLOSED; + this.connectionStateMonitor.record(CONNECTION_CHANGE.CONNECTION_FAILED); + + // Turn off the subscription monitor Hub publishing + this.connectionStateMonitorSubscription.unsubscribe(); + // Complete all reconnect observers + this.reconnectionMonitor.close(); + } + + getNewWebSocket(url: string, protocol: string) { + return new WebSocket(url, protocol); + } + + getProviderName() { + return 'AWSAppSyncRealTimeProvider'; + } + + // Check if url matches standard domain pattern + private isCustomDomain(url: string): boolean { + return url.match(standardDomainPattern) === null; + } + + subscribe( + options?: AWSAppSyncRealTimeProviderOptions, + customUserAgentDetails?: CustomUserAgentDetails + ): Observable> { + const { + appSyncGraphqlEndpoint, + region, + query, + variables, + authenticationType, + } = options; + + return new Observable(observer => { + if (!options || !appSyncGraphqlEndpoint) { + observer.error({ + errors: [ + { + ...new GraphQLError( + `Subscribe only available for AWS AppSync endpoint` + ), + }, + ], + }); + observer.complete(); + } else { + let subscriptionStartActive = false; + const subscriptionId = uuid(); + const startSubscription = () => { + if (!subscriptionStartActive) { + subscriptionStartActive = true; + + const startSubscriptionPromise = + this._startSubscriptionWithAWSAppSyncRealTime({ + options: { + query, + variables, + region, + authenticationType, + appSyncGraphqlEndpoint, + }, + observer, + subscriptionId, + customUserAgentDetails, + }).catch(err => { + logger.debug( + `${CONTROL_MSG.REALTIME_SUBSCRIPTION_INIT_ERROR}: ${err}` + ); + + this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSED); + }); + startSubscriptionPromise.finally(() => { + subscriptionStartActive = false; + }); + } + }; + + let reconnectSubscription: ZenObservable.Subscription; + + // Add an observable to the reconnection list to manage reconnection for this subscription + reconnectSubscription = new Observable(observer => { + this.reconnectionMonitor.addObserver(observer); + }).subscribe(() => { + startSubscription(); + }); + + startSubscription(); + + return async () => { + // Cleanup reconnection subscription + reconnectSubscription?.unsubscribe(); + + // Cleanup after unsubscribing or observer.complete was called after _startSubscriptionWithAWSAppSyncRealTime + try { + // Waiting that subscription has been connected before trying to unsubscribe + await this._waitForSubscriptionToBeConnected(subscriptionId); + + const { subscriptionState } = + this.subscriptionObserverMap.get(subscriptionId) || {}; + + if (!subscriptionState) { + // subscription already unsubscribed + return; + } + + if (subscriptionState === SUBSCRIPTION_STATUS.CONNECTED) { + this._sendUnsubscriptionMessage(subscriptionId); + } else { + throw new Error('Subscription never connected'); + } + } catch (err) { + logger.debug(`Error while unsubscribing ${err}`); + } finally { + this._removeSubscriptionObserver(subscriptionId); + } + }; + } + }); + } + + private async _startSubscriptionWithAWSAppSyncRealTime({ + options, + observer, + subscriptionId, + customUserAgentDetails, + }: { + options: AWSAppSyncRealTimeProviderOptions; + observer: PubSubContentObserver; + subscriptionId: string; + customUserAgentDetails: CustomUserAgentDetails; + }) { + const { + appSyncGraphqlEndpoint, + authenticationType, + query, + variables, + apiKey, + region, + graphql_headers = () => ({}), + additionalHeaders = {}, + } = options; + + const subscriptionState: SUBSCRIPTION_STATUS = SUBSCRIPTION_STATUS.PENDING; + const data = { + query, + variables, + }; + // Having a subscription id map will make it simple to forward messages received + this.subscriptionObserverMap.set(subscriptionId, { + observer, + query: query ?? '', + variables: variables ?? {}, + subscriptionState, + startAckTimeoutId: undefined, + }); + + // Preparing payload for subscription message + + const dataString = JSON.stringify(data); + const headerObj = { + ...(await this._awsRealTimeHeaderBasedAuth({ + apiKey, + appSyncGraphqlEndpoint, + authenticationType, + payload: dataString, + canonicalUri: '', + region, + additionalHeaders, + })), + ...(await graphql_headers()), + ...additionalHeaders, + [USER_AGENT_HEADER]: getAmplifyUserAgent(customUserAgentDetails), + }; + + const subscriptionMessage = { + id: subscriptionId, + payload: { + data: dataString, + extensions: { + authorization: { + ...headerObj, + }, + }, + }, + type: MESSAGE_TYPES.GQL_START, + }; + + const stringToAWSRealTime = JSON.stringify(subscriptionMessage); + + try { + this.connectionStateMonitor.record(CONNECTION_CHANGE.OPENING_CONNECTION); + await this._initializeWebSocketConnection({ + apiKey, + appSyncGraphqlEndpoint, + authenticationType, + region, + additionalHeaders, + }); + } catch (err) { + this._logStartSubscriptionError(subscriptionId, observer, err); + return; + } + + // Potential race condition can occur when unsubscribe is called during _initializeWebSocketConnection. + // E.g.unsubscribe gets invoked prior to finishing WebSocket handshake or START_ACK. + // Both subscriptionFailedCallback and subscriptionReadyCallback are used to synchronized this. + + const { subscriptionFailedCallback, subscriptionReadyCallback } = + this.subscriptionObserverMap.get(subscriptionId) ?? {}; + + // This must be done before sending the message in order to be listening immediately + this.subscriptionObserverMap.set(subscriptionId, { + observer, + subscriptionState, + query: query ?? '', + variables: variables ?? {}, + subscriptionReadyCallback, + subscriptionFailedCallback, + startAckTimeoutId: setTimeout(() => { + this._timeoutStartSubscriptionAck.call(this, subscriptionId); + }, START_ACK_TIMEOUT), + }); + if (this.awsRealTimeSocket) { + this.awsRealTimeSocket.send(stringToAWSRealTime); + } + } + + // Log logic for start subscription failures + private _logStartSubscriptionError( + subscriptionId: string, + observer: PubSubContentObserver, + err: { message?: string } + ) { + logger.debug({ err }); + const message = String(err.message ?? ''); + // Resolving to give the state observer time to propogate the update + Promise.resolve( + this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSED) + ); + + // Capture the error only when the network didn't cause disruption + if ( + this.connectionState !== ConnectionState.ConnectionDisruptedPendingNetwork + ) { + // When the error is non-retriable, error out the observable + if (isNonRetryableError(err)) { + observer.error({ + errors: [ + { + ...new GraphQLError( + `${CONTROL_MSG.CONNECTION_FAILED}: ${message}` + ), + }, + ], + }); + } else { + logger.debug(`${CONTROL_MSG.CONNECTION_FAILED}: ${message}`); + } + + const { subscriptionFailedCallback } = + this.subscriptionObserverMap.get(subscriptionId) || {}; + + // Notify concurrent unsubscription + if (typeof subscriptionFailedCallback === 'function') { + subscriptionFailedCallback(); + } + } + } + + // Waiting that subscription has been connected before trying to unsubscribe + private async _waitForSubscriptionToBeConnected(subscriptionId: string) { + const subscriptionObserver = + this.subscriptionObserverMap.get(subscriptionId); + if (subscriptionObserver) { + const { subscriptionState } = subscriptionObserver; + // This in case unsubscribe is invoked before sending start subscription message + if (subscriptionState === SUBSCRIPTION_STATUS.PENDING) { + return new Promise((res, rej) => { + const { observer, subscriptionState, variables, query } = + subscriptionObserver; + this.subscriptionObserverMap.set(subscriptionId, { + observer, + subscriptionState, + variables, + query, + subscriptionReadyCallback: res, + subscriptionFailedCallback: rej, + }); + }); + } + } + } + + private _sendUnsubscriptionMessage(subscriptionId: string) { + try { + if ( + this.awsRealTimeSocket && + this.awsRealTimeSocket.readyState === WebSocket.OPEN && + this.socketStatus === SOCKET_STATUS.READY + ) { + // Preparing unsubscribe message to stop receiving messages for that subscription + const unsubscribeMessage = { + id: subscriptionId, + type: MESSAGE_TYPES.GQL_STOP, + }; + const stringToAWSRealTime = JSON.stringify(unsubscribeMessage); + this.awsRealTimeSocket.send(stringToAWSRealTime); + } + } catch (err) { + // If GQL_STOP is not sent because of disconnection issue, then there is nothing the client can do + logger.debug({ err }); + } + } + + private _removeSubscriptionObserver(subscriptionId: string) { + this.subscriptionObserverMap.delete(subscriptionId); + + // Verifying 1000ms after removing subscription in case there are new subscription unmount/mount + setTimeout(this._closeSocketIfRequired.bind(this), 1000); + } + + private _closeSocketIfRequired() { + if (this.subscriptionObserverMap.size > 0) { + // Active subscriptions on the WebSocket + return; + } + + if (!this.awsRealTimeSocket) { + this.socketStatus = SOCKET_STATUS.CLOSED; + return; + } + + this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSING_CONNECTION); + + if (this.awsRealTimeSocket.bufferedAmount > 0) { + // Still data on the WebSocket + setTimeout(this._closeSocketIfRequired.bind(this), 1000); + } else { + logger.debug('closing WebSocket...'); + if (this.keepAliveTimeoutId) { + clearTimeout(this.keepAliveTimeoutId); + } + if (this.keepAliveAlertTimeoutId) { + clearTimeout(this.keepAliveAlertTimeoutId); + } + const tempSocket = this.awsRealTimeSocket; + // Cleaning callbacks to avoid race condition, socket still exists + tempSocket.onclose = null; + tempSocket.onerror = null; + tempSocket.close(1000); + this.awsRealTimeSocket = undefined; + this.socketStatus = SOCKET_STATUS.CLOSED; + this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSED); + } + } + + private _handleIncomingSubscriptionMessage(message: MessageEvent) { + if (typeof message.data !== 'string') { + return; + } + logger.debug( + `subscription message from AWS AppSync RealTime: ${message.data}` + ); + const { + id = '', + payload, + type, + }: DataPayload = JSON.parse(String(message.data)); + const { + observer = null, + query = '', + variables = {}, + startAckTimeoutId, + subscriptionReadyCallback, + subscriptionFailedCallback, + } = this.subscriptionObserverMap.get(id) || {}; + + logger.debug({ id, observer, query, variables }); + + if (type === MESSAGE_TYPES.GQL_DATA && payload && payload.data) { + if (observer) { + observer.next(payload); + } else { + logger.debug(`observer not found for id: ${id}`); + } + return; + } + + if (type === MESSAGE_TYPES.GQL_START_ACK) { + logger.debug( + `subscription ready for ${JSON.stringify({ query, variables })}` + ); + if (typeof subscriptionReadyCallback === 'function') { + subscriptionReadyCallback(); + } + if (startAckTimeoutId) clearTimeout(startAckTimeoutId); + // dispatchApiEvent( + // CONTROL_MSG.SUBSCRIPTION_ACK, + // { query, variables }, + // 'Connection established for subscription' + // ); + const subscriptionState = SUBSCRIPTION_STATUS.CONNECTED; + if (observer) { + this.subscriptionObserverMap.set(id, { + observer, + query, + variables, + startAckTimeoutId: undefined, + subscriptionState, + subscriptionReadyCallback, + subscriptionFailedCallback, + }); + } + this.connectionStateMonitor.record( + CONNECTION_CHANGE.CONNECTION_ESTABLISHED + ); + + return; + } + + if (type === MESSAGE_TYPES.GQL_CONNECTION_KEEP_ALIVE) { + if (this.keepAliveTimeoutId) clearTimeout(this.keepAliveTimeoutId); + if (this.keepAliveAlertTimeoutId) + clearTimeout(this.keepAliveAlertTimeoutId); + this.keepAliveTimeoutId = setTimeout( + () => this._errorDisconnect(CONTROL_MSG.TIMEOUT_DISCONNECT), + this.keepAliveTimeout + ); + this.keepAliveAlertTimeoutId = setTimeout(() => { + this.connectionStateMonitor.record(CONNECTION_CHANGE.KEEP_ALIVE_MISSED); + }, DEFAULT_KEEP_ALIVE_ALERT_TIMEOUT); + this.connectionStateMonitor.record(CONNECTION_CHANGE.KEEP_ALIVE); + return; + } + + if (type === MESSAGE_TYPES.GQL_ERROR) { + const subscriptionState = SUBSCRIPTION_STATUS.FAILED; + if (observer) { + this.subscriptionObserverMap.set(id, { + observer, + query, + variables, + startAckTimeoutId, + subscriptionReadyCallback, + subscriptionFailedCallback, + subscriptionState, + }); + + logger.debug( + `${CONTROL_MSG.CONNECTION_FAILED}: ${JSON.stringify(payload)}` + ); + + observer.error({ + errors: [ + { + ...new GraphQLError( + `${CONTROL_MSG.CONNECTION_FAILED}: ${JSON.stringify(payload)}` + ), + }, + ], + }); + + if (startAckTimeoutId) clearTimeout(startAckTimeoutId); + + if (typeof subscriptionFailedCallback === 'function') { + subscriptionFailedCallback(); + } + } + } + } + + private _errorDisconnect(msg: string) { + logger.debug(`Disconnect error: ${msg}`); + + if (this.awsRealTimeSocket) { + this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSED); + this.awsRealTimeSocket.close(); + } + + this.socketStatus = SOCKET_STATUS.CLOSED; + } + + private _timeoutStartSubscriptionAck(subscriptionId: string) { + const subscriptionObserver = + this.subscriptionObserverMap.get(subscriptionId); + if (subscriptionObserver) { + const { observer, query, variables } = subscriptionObserver; + if (!observer) { + return; + } + this.subscriptionObserverMap.set(subscriptionId, { + observer, + query, + variables, + subscriptionState: SUBSCRIPTION_STATUS.FAILED, + }); + + this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSED); + logger.debug( + 'timeoutStartSubscription', + JSON.stringify({ query, variables }) + ); + } + } + + private _initializeWebSocketConnection({ + appSyncGraphqlEndpoint, + authenticationType, + apiKey, + region, + additionalHeaders, + }: AWSAppSyncRealTimeProviderOptions) { + if (this.socketStatus === SOCKET_STATUS.READY) { + return; + } + return new Promise(async (res, rej) => { + this.promiseArray.push({ res, rej }); + + if (this.socketStatus === SOCKET_STATUS.CLOSED) { + try { + this.socketStatus = SOCKET_STATUS.CONNECTING; + + const payloadString = '{}'; + + const authHeader = await this._awsRealTimeHeaderBasedAuth({ + authenticationType, + payload: payloadString, + canonicalUri: '/connect', + apiKey, + appSyncGraphqlEndpoint, + region, + additionalHeaders, + }); + + const headerString = authHeader ? JSON.stringify(authHeader) : ''; + const headerQs = Buffer.from(headerString).toString('base64'); + + const payloadQs = Buffer.from(payloadString).toString('base64'); + + let discoverableEndpoint = appSyncGraphqlEndpoint ?? ''; + + if (this.isCustomDomain(discoverableEndpoint)) { + discoverableEndpoint = + discoverableEndpoint.concat(customDomainPath); + } else { + discoverableEndpoint = discoverableEndpoint + .replace('appsync-api', 'appsync-realtime-api') + .replace('gogi-beta', 'grt-beta'); + } + + // Creating websocket url with required query strings + const protocol = 'wss://'; + discoverableEndpoint = discoverableEndpoint + .replace('https://', protocol) + .replace('http://', protocol); + + const awsRealTimeUrl = `${discoverableEndpoint}?header=${headerQs}&payload=${payloadQs}`; + + await this._initializeRetryableHandshake(awsRealTimeUrl); + + this.promiseArray.forEach(({ res }) => { + logger.debug('Notifying connection successful'); + res(); + }); + this.socketStatus = SOCKET_STATUS.READY; + this.promiseArray = []; + } catch (err) { + logger.debug('Connection exited with', err); + this.promiseArray.forEach(({ rej }) => rej(err)); + this.promiseArray = []; + if ( + this.awsRealTimeSocket && + this.awsRealTimeSocket.readyState === WebSocket.OPEN + ) { + this.awsRealTimeSocket.close(3001); + } + this.awsRealTimeSocket = undefined; + this.socketStatus = SOCKET_STATUS.CLOSED; + } + } + }); + } + + private async _initializeRetryableHandshake(awsRealTimeUrl: string) { + logger.debug(`Initializaling retryable Handshake`); + await jitteredExponentialRetry( + this._initializeHandshake.bind(this), + [awsRealTimeUrl], + MAX_DELAY_MS + ); + } + + private async _initializeHandshake(awsRealTimeUrl: string) { + logger.debug(`Initializing handshake ${awsRealTimeUrl}`); + // Because connecting the socket is async, is waiting until connection is open + // Step 1: connect websocket + try { + await (() => { + return new Promise((res, rej) => { + const newSocket = this.getNewWebSocket(awsRealTimeUrl, 'graphql-ws'); + newSocket.onerror = () => { + logger.debug(`WebSocket connection error`); + }; + newSocket.onclose = () => { + rej(new Error('Connection handshake error')); + }; + newSocket.onopen = () => { + this.awsRealTimeSocket = newSocket; + return res(); + }; + }); + })(); + // Step 2: wait for ack from AWS AppSyncReaTime after sending init + await (() => { + return new Promise((res, rej) => { + if (this.awsRealTimeSocket) { + let ackOk = false; + this.awsRealTimeSocket.onerror = error => { + logger.debug(`WebSocket error ${JSON.stringify(error)}`); + }; + this.awsRealTimeSocket.onclose = event => { + logger.debug(`WebSocket closed ${event.reason}`); + rej(new Error(JSON.stringify(event))); + }; + + this.awsRealTimeSocket.onmessage = (message: MessageEvent) => { + if (typeof message.data !== 'string') { + return; + } + logger.debug( + `subscription message from AWS AppSyncRealTime: ${message.data} ` + ); + const data = JSON.parse(message.data) as ParsedMessagePayload; + const { + type, + payload: { + connectionTimeoutMs = DEFAULT_KEEP_ALIVE_TIMEOUT, + } = {}, + } = data; + if (type === MESSAGE_TYPES.GQL_CONNECTION_ACK) { + ackOk = true; + if (this.awsRealTimeSocket) { + this.keepAliveTimeout = connectionTimeoutMs; + this.awsRealTimeSocket.onmessage = + this._handleIncomingSubscriptionMessage.bind(this); + this.awsRealTimeSocket.onerror = err => { + logger.debug(err); + this._errorDisconnect(CONTROL_MSG.CONNECTION_CLOSED); + }; + this.awsRealTimeSocket.onclose = event => { + logger.debug(`WebSocket closed ${event.reason}`); + this._errorDisconnect(CONTROL_MSG.CONNECTION_CLOSED); + }; + } + res('Cool, connected to AWS AppSyncRealTime'); + return; + } + + if (type === MESSAGE_TYPES.GQL_CONNECTION_ERROR) { + const { + payload: { + errors: [{ errorType = '', errorCode = 0 } = {}] = [], + } = {}, + } = data; + + rej({ errorType, errorCode }); + } + }; + + const gqlInit = { + type: MESSAGE_TYPES.GQL_CONNECTION_INIT, + }; + this.awsRealTimeSocket.send(JSON.stringify(gqlInit)); + + const checkAckOk = (ackOk: boolean) => { + if (!ackOk) { + this.connectionStateMonitor.record( + CONNECTION_CHANGE.CONNECTION_FAILED + ); + rej( + new Error( + `Connection timeout: ack from AWSAppSyncRealTime was not received after ${CONNECTION_INIT_TIMEOUT} ms` + ) + ); + } + }; + + setTimeout(() => checkAckOk(ackOk), CONNECTION_INIT_TIMEOUT); + } + }); + })(); + } catch (err) { + const { errorType, errorCode } = err as { + errorType: string; + errorCode: number; + }; + + if (NON_RETRYABLE_CODES.includes(errorCode)) { + throw new NonRetryableError(errorType); + } else if (errorType) { + throw new Error(errorType); + } else { + throw err; + } + } + } + + private async _awsRealTimeHeaderBasedAuth({ + authenticationType, + payload, + canonicalUri, + appSyncGraphqlEndpoint, + region, + additionalHeaders, + }: AWSAppSyncRealTimeAuthInput): Promise< + Record | undefined + > { + debugger; + const headerHandler: { + [key: string]: (AWSAppSyncRealTimeAuthInput) => {}; + } = { + apiKey: this._awsRealTimeApiKeyHeader.bind(this), + iam: this._awsRealTimeIAMHeader.bind(this), + jwt: this._awsRealTimeOPENIDHeader.bind(this), + custom: this._customAuthHeader, + }; + + if (!authenticationType || !headerHandler[authenticationType.type]) { + logger.debug(`Authentication type ${authenticationType} not supported`); + return undefined; + } else { + const handler = headerHandler[authenticationType.type]; + + const { host } = url.parse(appSyncGraphqlEndpoint ?? ''); + + logger.debug(`Authenticating with ${authenticationType}`); + let apiKey; + if (authenticationType.type === 'apiKey') { + apiKey = authenticationType.apiKey; + } + const result = await handler({ + payload, + canonicalUri, + appSyncGraphqlEndpoint, + apiKey, + region, + host, + additionalHeaders, + }); + + return result; + } + } + + private async _awsRealTimeCUPHeader({ host }: AWSAppSyncRealTimeAuthInput) { + const session = await fetchAuthSession(); + return { + Authorization: session.tokens.accessToken.toString(), + host, + }; + } + + private async _awsRealTimeOPENIDHeader({ + host, + }: AWSAppSyncRealTimeAuthInput) { + const session = await fetchAuthSession(); + + return { + Authorization: session.tokens.accessToken.toString(), + host, + }; + } + + private async _awsRealTimeApiKeyHeader({ + apiKey, + host, + }: AWSAppSyncRealTimeAuthInput) { + const dt = new Date(); + const dtStr = dt.toISOString().replace(/[:\-]|\.\d{3}/g, ''); + + return { + host, + 'x-amz-date': dtStr, + 'x-api-key': apiKey, + }; + } + + private async _awsRealTimeIAMHeader({ + payload, + canonicalUri, + appSyncGraphqlEndpoint, + region, + }: AWSAppSyncRealTimeAuthInput) { + const endpointInfo = { + region, + service: 'appsync', + }; + + const creds = (await fetchAuthSession()).credentials; + + const request = { + url: `${appSyncGraphqlEndpoint}${canonicalUri}`, + data: payload, + method: 'POST', + headers: { ...AWS_APPSYNC_REALTIME_HEADERS }, + }; + + const signed_params = signRequest( + { + headers: request.headers, + method: request.method, + url: new URL(request.url), + body: request.data, + }, + { + credentials: creds, + signingRegion: endpointInfo.region, + signingService: endpointInfo.service, + } + ); + return signed_params.headers; + } + + private _customAuthHeader({ + host, + additionalHeaders, + }: AWSAppSyncRealTimeAuthInput) { + if (!additionalHeaders || !additionalHeaders['Authorization']) { + throw new Error('No auth token specified'); + } + + return { + Authorization: additionalHeaders.Authorization, + host, + }; + } +} diff --git a/packages/api-graphql/src/Providers/constants.ts b/packages/api-graphql/src/Providers/constants.ts new file mode 100644 index 00000000000..cda958b7f6a --- /dev/null +++ b/packages/api-graphql/src/Providers/constants.ts @@ -0,0 +1,110 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +export { AMPLIFY_SYMBOL } from '@aws-amplify/core/internals/utils'; + +export const MAX_DELAY_MS = 5000; + +export const NON_RETRYABLE_CODES = [400, 401, 403]; + +export const CONNECTION_STATE_CHANGE = 'ConnectionStateChange'; + +export enum MESSAGE_TYPES { + /** + * Client -> Server message. + * This message type is the first message after handshake and this will initialize AWS AppSync RealTime communication + */ + GQL_CONNECTION_INIT = 'connection_init', + /** + * Server -> Client message + * This message type is in case there is an issue with AWS AppSync RealTime when establishing connection + */ + GQL_CONNECTION_ERROR = 'connection_error', + /** + * Server -> Client message. + * This message type is for the ack response from AWS AppSync RealTime for GQL_CONNECTION_INIT message + */ + GQL_CONNECTION_ACK = 'connection_ack', + /** + * Client -> Server message. + * This message type is for register subscriptions with AWS AppSync RealTime + */ + GQL_START = 'start', + /** + * Server -> Client message. + * This message type is for the ack response from AWS AppSync RealTime for GQL_START message + */ + GQL_START_ACK = 'start_ack', + /** + * Server -> Client message. + * This message type is for subscription message from AWS AppSync RealTime + */ + GQL_DATA = 'data', + /** + * Server -> Client message. + * This message type helps the client to know is still receiving messages from AWS AppSync RealTime + */ + GQL_CONNECTION_KEEP_ALIVE = 'ka', + /** + * Client -> Server message. + * This message type is for unregister subscriptions with AWS AppSync RealTime + */ + GQL_STOP = 'stop', + /** + * Server -> Client message. + * This message type is for the ack response from AWS AppSync RealTime for GQL_STOP message + */ + GQL_COMPLETE = 'complete', + /** + * Server -> Client message. + * This message type is for sending error messages from AWS AppSync RealTime to the client + */ + GQL_ERROR = 'error', // Server -> Client +} + +export enum SUBSCRIPTION_STATUS { + PENDING, + CONNECTED, + FAILED, +} + +export enum SOCKET_STATUS { + CLOSED, + READY, + CONNECTING, +} + +export const AWS_APPSYNC_REALTIME_HEADERS = { + accept: 'application/json, text/javascript', + 'content-encoding': 'amz-1.0', + 'content-type': 'application/json; charset=UTF-8', +}; + +/** + * Time in milleseconds to wait for GQL_CONNECTION_INIT message + */ +export const CONNECTION_INIT_TIMEOUT = 15000; + +/** + * Time in milleseconds to wait for GQL_START_ACK message + */ +export const START_ACK_TIMEOUT = 15000; + +/** + * Default Time in milleseconds to wait for GQL_CONNECTION_KEEP_ALIVE message + */ +export const DEFAULT_KEEP_ALIVE_TIMEOUT = 5 * 60 * 1000; + +/** + * Default Time in milleseconds to alert for missed GQL_CONNECTION_KEEP_ALIVE message + */ +export const DEFAULT_KEEP_ALIVE_ALERT_TIMEOUT = 65 * 1000; + +/** + * Default delay time in milleseconds between when reconnect is triggered vs when it is attempted + */ +export const RECONNECT_DELAY = 5 * 1000; + +/** + * Default interval time in milleseconds between when reconnect is re-attempted + */ +export const RECONNECT_INTERVAL = 60 * 1000; diff --git a/packages/api-graphql/src/index.ts b/packages/api-graphql/src/index.ts index e69d50f6dd7..dd2d4acf836 100644 --- a/packages/api-graphql/src/index.ts +++ b/packages/api-graphql/src/index.ts @@ -1,8 +1,10 @@ // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 -import { GraphQLAPI } from './GraphQLAPI'; -export { GraphQLResult, GraphQLAuthError, GRAPHQL_AUTH_MODE } from './types'; +// import { graphqlSubscription } from './GraphQLAPI'; + +// export { GraphQLResult, GraphQLAuthError, GRAPHQL_AUTH_MODE } from './types'; export { GraphQLAPI, GraphQLAPIClass, graphqlOperation } from './GraphQLAPI'; -export * from './types'; -export default GraphQLAPI; +// export * from './types'; + +// export { graphqlSubscription }; diff --git a/packages/api-graphql/src/internals/InternalGraphQLAPI.ts b/packages/api-graphql/src/internals/InternalGraphQLAPI.ts index 619f229c1e9..bd35e3f15ef 100644 --- a/packages/api-graphql/src/internals/InternalGraphQLAPI.ts +++ b/packages/api-graphql/src/internals/InternalGraphQLAPI.ts @@ -15,6 +15,7 @@ import { import Observable from 'zen-observable-ts'; // TODO V6 import { + Amplify, // Amplify, // ConsoleLogger as Logger, // Credentials, @@ -43,6 +44,7 @@ import { } from '../types'; // import { RestClient } from '@aws-amplify/api-rest'; import { post } from '@aws-amplify/api-rest'; +import { AWSAppSyncRealTimeProvider } from '../Providers/AWSAppSyncRealTimeProvider'; const USER_AGENT_HEADER = 'x-amz-user-agent'; @@ -69,6 +71,7 @@ export class InternalGraphQLAPIClass { */ private _options; private _api = null; + private appSyncRealTime: AWSAppSyncRealTimeProvider | null; // TODO V6: can be removed // InternalAuth = InternalAuth; @@ -172,10 +175,10 @@ export class InternalGraphQLAPIClass { break; // NOTHING HERE case 'AWS_IAM': - const credentialsOK = await this._ensureCredentials(); - if (!credentialsOK) { - throw new Error(GraphQLAuthError.NO_CREDENTIALS); - } + // const credentialsOK = await this._ensureCredentials(); + // if (!credentialsOK) { + // throw new Error(GraphQLAuthError.NO_CREDENTIALS); + // } break; case 'OPENID_CONNECT': try { @@ -310,12 +313,12 @@ export class InternalGraphQLAPIClass { // cancellableToken // ); return responsePromise; - // case 'subscription': - // return this._graphqlSubscribe( - // { query, variables, authMode }, - // headers, - // customUserAgentDetails - // ); + case 'subscription': + return this._graphqlSubscribe( + { query, variables, authMode }, + headers, + customUserAgentDetails + ); default: throw new Error(`invalid operation type: ${operationType}`); } @@ -453,69 +456,72 @@ export class InternalGraphQLAPIClass { // return this._api.hasCancelToken(request); // } - // private _graphqlSubscribe( - // { - // query, - // variables, - // authMode: defaultAuthenticationType, - // authToken, - // }: GraphQLOptions, - // additionalHeaders = {}, - // customUserAgentDetails?: CustomUserAgentDetails - // ): Observable { - // const { - // aws_appsync_region: region, - // aws_appsync_graphqlEndpoint: appSyncGraphqlEndpoint, - // aws_appsync_authenticationType, - // aws_appsync_apiKey: apiKey, - // graphql_headers = () => ({}), - // } = this._options; - // const authenticationType = - // defaultAuthenticationType || aws_appsync_authenticationType || 'AWS_IAM'; - - // if (InternalPubSub && typeof InternalPubSub.subscribe === 'function') { - // return InternalPubSub.subscribe( - // '', - // { - // provider: INTERNAL_AWS_APPSYNC_REALTIME_PUBSUB_PROVIDER, - // appSyncGraphqlEndpoint, - // authenticationType, - // apiKey, - // query: print(query as DocumentNode), - // region, - // variables, - // graphql_headers, - // additionalHeaders, - // authToken, - // }, - // customUserAgentDetails - // ); - // } else { - // logger.debug('No pubsub module applied for subscription'); - // throw new Error('No pubsub module applied for subscription'); - // } - // } - - /** - * @private - */ - async _ensureCredentials() { - // return this.Credentials.get() - return await fetchAuthSession() - .then(credentials => { - if (!credentials) return false; - // TODO V6 - const cred = this.Credentials.shear(credentials); - logger.debug('set credentials for api', cred); - - return true; - }) - .catch(err => { - logger.warn('ensure credentials error', err); - return false; + private _graphqlSubscribe( + { + query, + variables, + authMode: defaultAuthenticationType, + authToken, + }: GraphQLOptions, + additionalHeaders = {}, + customUserAgentDetails?: CustomUserAgentDetails + ): Observable { + if (!this.appSyncRealTime) { + const { AppSync } = Amplify.getConfig().API ?? {}; + + this.appSyncRealTime = new AWSAppSyncRealTimeProvider(); + + return this.appSyncRealTime.subscribe({ + query: print(query as DocumentNode), + variables, + appSyncGraphqlEndpoint: AppSync.endpoint, + region: AppSync.region, + authenticationType: AppSync.defaultAuthMode, }); + } } + // if (InternalPubSub && typeof InternalPubSub.subscribe === 'function') { + // return InternalPubSub.subscribe( + // '', + // { + // provider: INTERNAL_AWS_APPSYNC_REALTIME_PUBSUB_PROVIDER, + // appSyncGraphqlEndpoint, + // authenticationType, + // apiKey, + // query: print(query as DocumentNode), + // region, + // variables, + // graphql_headers, + // additionalHeaders, + // authToken, + // }, + // customUserAgentDetails + // ); + // } else { + // logger.debug('No pubsub module applied for subscription'); + // throw new Error('No pubsub module applied for subscription'); + // } } +/** + * @private + */ +// async _ensureCredentials() { +// // return this.Credentials.get() +// return await fetchAuthSession() +// .then(credentials => { +// if (!credentials) return false; +// // TODO V6 +// const cred = this.Credentials.shear(credentials); +// logger.debug('set credentials for api', cred); + +// return true; +// }) +// .catch(err => { +// logger.warn('ensure credentials error', err); +// return false; +// }); +// } + export const InternalGraphQLAPI = new InternalGraphQLAPIClass(null); // Amplify.register(InternalGraphQLAPI); diff --git a/packages/pubsub/src/types/PubSub.ts b/packages/api-graphql/src/types/PubSub.ts similarity index 100% rename from packages/pubsub/src/types/PubSub.ts rename to packages/api-graphql/src/types/PubSub.ts diff --git a/packages/api-graphql/src/types/index.ts b/packages/api-graphql/src/types/index.ts index e89efd2c0a2..d56329da876 100644 --- a/packages/api-graphql/src/types/index.ts +++ b/packages/api-graphql/src/types/index.ts @@ -2,19 +2,21 @@ // SPDX-License-Identifier: Apache-2.0 import { Source, DocumentNode, GraphQLError } from 'graphql'; export { OperationTypeNode } from 'graphql'; -import { GRAPHQL_AUTH_MODE } from '@aws-amplify/auth'; -export { GRAPHQL_AUTH_MODE }; +// import { GRAPHQL_AUTH_MODE } from '@aws-amplify/auth'; +// export { GRAPHQL_AUTH_MODE }; import { Observable } from 'zen-observable-ts'; -// PubSub does not currently compile in V6, and will be replaced: +// TODO: remove for now: // import { AWSAppSyncRealTimeProvider } from '@aws-amplify/pubsub'; +type GraphQLAuthMode = 'AWS_IAM' | 'COGNITO_USERPOOLS' | 'API_KEY'; + /** * Loose/Unknown options for raw GraphQLAPICategory `graphql()`. */ export interface GraphQLOptions { query: string | DocumentNode; - variables?: object; - authMode?: keyof typeof GRAPHQL_AUTH_MODE; + variables?: Record; + authMode?: GraphQLAuthMode; authToken?: string; /** * @deprecated This property should not be used @@ -84,6 +86,23 @@ export type GraphqlSubscriptionMessage = { value: { data?: T }; }; +export interface AWSAppSyncRealTimeProviderOptions { + appSyncGraphqlEndpoint?: string; + authenticationType?: GraphQLAuthMode; + query?: string; + variables?: Record; + apiKey?: string; + region?: string; + graphql_headers?: () => {} | (() => Promise<{}>); + additionalHeaders?: { [key: string]: string }; +} + +export type AWSAppSyncRealTimeProvider = { + subscribe( + options?: AWSAppSyncRealTimeProviderOptions + ): Observable>; +}; + export enum GraphQLAuthError { NO_API_KEY = 'No api-key configured', NO_CURRENT_USER = 'No current user', @@ -108,7 +127,7 @@ export interface GraphQLOptionsV6< > { query: TYPED_GQL_STRING | DocumentNode; variables?: GraphQLVariablesV6; - authMode?: keyof typeof GRAPHQL_AUTH_MODE; + authMode?: GraphQLAuthMode; authToken?: string; /** * @deprecated This property should not be used diff --git a/packages/api-graphql/src/utils/ConnectionStateMonitor.ts b/packages/api-graphql/src/utils/ConnectionStateMonitor.ts new file mode 100644 index 00000000000..b506006121d --- /dev/null +++ b/packages/api-graphql/src/utils/ConnectionStateMonitor.ts @@ -0,0 +1,195 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +import Observable, { ZenObservable } from 'zen-observable-ts'; +import { ConnectionState } from '../types/PubSub'; +import { ReachabilityMonitor } from './ReachabilityMonitor'; + +// Internal types for tracking different connection states +type LinkedConnectionState = 'connected' | 'disconnected'; +type LinkedHealthState = 'healthy' | 'unhealthy'; +type LinkedConnectionStates = { + networkState: LinkedConnectionState; + connectionState: LinkedConnectionState | 'connecting'; + intendedConnectionState: LinkedConnectionState; + keepAliveState: LinkedHealthState; +}; + +export const CONNECTION_CHANGE: { + [key in + | 'KEEP_ALIVE_MISSED' + | 'KEEP_ALIVE' + | 'CONNECTION_ESTABLISHED' + | 'CONNECTION_FAILED' + | 'CLOSING_CONNECTION' + | 'OPENING_CONNECTION' + | 'CLOSED' + | 'ONLINE' + | 'OFFLINE']: Partial; +} = { + KEEP_ALIVE_MISSED: { keepAliveState: 'unhealthy' }, + KEEP_ALIVE: { keepAliveState: 'healthy' }, + CONNECTION_ESTABLISHED: { connectionState: 'connected' }, + CONNECTION_FAILED: { + intendedConnectionState: 'disconnected', + connectionState: 'disconnected', + }, + CLOSING_CONNECTION: { intendedConnectionState: 'disconnected' }, + OPENING_CONNECTION: { + intendedConnectionState: 'connected', + connectionState: 'connecting', + }, + CLOSED: { connectionState: 'disconnected' }, + ONLINE: { networkState: 'connected' }, + OFFLINE: { networkState: 'disconnected' }, +}; + +export class ConnectionStateMonitor { + /** + * @private + */ + private _linkedConnectionState: LinkedConnectionStates; + private _linkedConnectionStateObservable: Observable; + private _linkedConnectionStateObserver: ZenObservable.SubscriptionObserver; + private _networkMonitoringSubscription?: ZenObservable.Subscription; + private _initialNetworkStateSubscription?: ZenObservable.Subscription; + + constructor() { + this._networkMonitoringSubscription = undefined; + this._linkedConnectionState = { + networkState: 'connected', + connectionState: 'disconnected', + intendedConnectionState: 'disconnected', + keepAliveState: 'healthy', + }; + + // Attempt to update the state with the current actual network state + this._initialNetworkStateSubscription = ReachabilityMonitor().subscribe( + ({ online }) => { + this.record( + online ? CONNECTION_CHANGE.ONLINE : CONNECTION_CHANGE.OFFLINE + ); + this._initialNetworkStateSubscription?.unsubscribe(); + } + ); + + this._linkedConnectionStateObservable = + new Observable(connectionStateObserver => { + connectionStateObserver.next(this._linkedConnectionState); + this._linkedConnectionStateObserver = connectionStateObserver; + }); + } + + /** + * Turn network state monitoring on if it isn't on already + */ + private enableNetworkMonitoring() { + // If no initial network state was discovered, stop trying + this._initialNetworkStateSubscription?.unsubscribe(); + + // Maintain the network state based on the reachability monitor + if (this._networkMonitoringSubscription === undefined) { + this._networkMonitoringSubscription = ReachabilityMonitor().subscribe( + ({ online }) => { + this.record( + online ? CONNECTION_CHANGE.ONLINE : CONNECTION_CHANGE.OFFLINE + ); + } + ); + } + } + + /** + * Turn network state monitoring off if it isn't off already + */ + private disableNetworkMonitoring() { + this._networkMonitoringSubscription?.unsubscribe(); + this._networkMonitoringSubscription = undefined; + } + + /** + * Get the observable that allows us to monitor the connection state + * + * @returns {Observable} - The observable that emits ConnectionState updates + */ + public get connectionStateObservable(): Observable { + let previous: ConnectionState; + + // The linked state aggregates state changes to any of the network, connection, + // intendedConnection and keepAliveHealth. Some states will change these independent + // states without changing the overall connection state. + + // After translating from linked states to ConnectionState, then remove any duplicates + return this._linkedConnectionStateObservable + .map(value => { + return this.connectionStatesTranslator(value); + }) + .filter(current => { + const toInclude = current !== previous; + previous = current; + return toInclude; + }); + } + + /* + * Updates local connection state and emits the full state to the observer. + */ + record(statusUpdates: Partial) { + // Maintain the network monitor + if (statusUpdates.intendedConnectionState === 'connected') { + this.enableNetworkMonitoring(); + } else if (statusUpdates.intendedConnectionState === 'disconnected') { + this.disableNetworkMonitoring(); + } + + // Maintain the socket state + const newSocketStatus = { + ...this._linkedConnectionState, + ...statusUpdates, + }; + + this._linkedConnectionState = { ...newSocketStatus }; + + this._linkedConnectionStateObserver.next(this._linkedConnectionState); + } + + /* + * Translate the ConnectionState structure into a specific ConnectionState string literal union + */ + private connectionStatesTranslator({ + connectionState, + networkState, + intendedConnectionState, + keepAliveState, + }: LinkedConnectionStates): ConnectionState { + if (connectionState === 'connected' && networkState === 'disconnected') + return ConnectionState.ConnectedPendingNetwork; + + if ( + connectionState === 'connected' && + intendedConnectionState === 'disconnected' + ) + return ConnectionState.ConnectedPendingDisconnect; + + if ( + connectionState === 'disconnected' && + intendedConnectionState === 'connected' && + networkState === 'disconnected' + ) + return ConnectionState.ConnectionDisruptedPendingNetwork; + + if ( + connectionState === 'disconnected' && + intendedConnectionState === 'connected' + ) + return ConnectionState.ConnectionDisrupted; + + if (connectionState === 'connected' && keepAliveState === 'unhealthy') + return ConnectionState.ConnectedPendingKeepAlive; + + // All remaining states directly correspond to the connection state + if (connectionState === 'connecting') return ConnectionState.Connecting; + if (connectionState === 'disconnected') return ConnectionState.Disconnected; + return ConnectionState.Connected; + } +} diff --git a/packages/api-graphql/src/utils/ReachabilityMonitor/index.native.ts b/packages/api-graphql/src/utils/ReachabilityMonitor/index.native.ts new file mode 100644 index 00000000000..540d7da67ec --- /dev/null +++ b/packages/api-graphql/src/utils/ReachabilityMonitor/index.native.ts @@ -0,0 +1,7 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +import { Reachability } from '@aws-amplify/core/internals/utils'; +import { default as NetInfo } from '@react-native-community/netinfo'; + +export const ReachabilityMonitor = () => + new Reachability().networkMonitor(NetInfo); diff --git a/packages/api-graphql/src/utils/ReachabilityMonitor/index.ts b/packages/api-graphql/src/utils/ReachabilityMonitor/index.ts new file mode 100644 index 00000000000..1f17e311524 --- /dev/null +++ b/packages/api-graphql/src/utils/ReachabilityMonitor/index.ts @@ -0,0 +1,4 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +import { Reachability } from '@aws-amplify/core/internals/utils'; +export const ReachabilityMonitor = () => new Reachability().networkMonitor(); diff --git a/packages/api-graphql/src/utils/ReconnectionMonitor.ts b/packages/api-graphql/src/utils/ReconnectionMonitor.ts new file mode 100644 index 00000000000..fd89f51f8c1 --- /dev/null +++ b/packages/api-graphql/src/utils/ReconnectionMonitor.ts @@ -0,0 +1,76 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +import { Observer } from 'zen-observable-ts'; +import { RECONNECT_DELAY, RECONNECT_INTERVAL } from '../Providers/constants'; + +export enum ReconnectEvent { + START_RECONNECT = 'START_RECONNECT', + HALT_RECONNECT = 'HALT_RECONNECT', +} + +/** + * Captures the reconnect event logic used to determine when to reconnect to PubSub providers. + * Reconnnect attempts are delayed by 5 seconds to let the interface settle. + * Attempting to reconnect only once creates unrecoverable states when the network state isn't + * supported by the browser, so this keeps retrying every minute until halted. + */ +export class ReconnectionMonitor { + private reconnectObservers: Observer[] = []; + private reconnectIntervalId?: ReturnType; + private reconnectSetTimeoutId?: ReturnType; + + /** + * Add reconnect observer to the list of observers to alert on reconnect + */ + addObserver(reconnectObserver: Observer) { + this.reconnectObservers.push(reconnectObserver); + } + + /** + * Given a reconnect event, start the appropriate behavior + */ + record(event: ReconnectEvent) { + if (event === ReconnectEvent.START_RECONNECT) { + // If the reconnection hasn't been started + if ( + this.reconnectSetTimeoutId === undefined && + this.reconnectIntervalId === undefined + ) { + this.reconnectSetTimeoutId = setTimeout(() => { + // Reconnect now + this._triggerReconnect(); + // Retry reconnect every periodically until it works + this.reconnectIntervalId = setInterval(() => { + this._triggerReconnect(); + }, RECONNECT_INTERVAL); + }, RECONNECT_DELAY); + } + } + + if (event === ReconnectEvent.HALT_RECONNECT) { + if (this.reconnectIntervalId) { + clearInterval(this.reconnectIntervalId); + this.reconnectIntervalId = undefined; + } + if (this.reconnectSetTimeoutId) { + clearTimeout(this.reconnectSetTimeoutId); + this.reconnectSetTimeoutId = undefined; + } + } + } + + /** + * Complete all reconnect observers + */ + close() { + this.reconnectObservers.forEach(reconnectObserver => { + reconnectObserver.complete?.(); + }); + } + + private _triggerReconnect() { + this.reconnectObservers.forEach(reconnectObserver => { + reconnectObserver.next?.(); + }); + } +} diff --git a/packages/api-graphql/src/utils/errors/validation.ts b/packages/api-graphql/src/utils/errors/validation.ts index 229d952f3b8..37104e09d2e 100644 --- a/packages/api-graphql/src/utils/errors/validation.ts +++ b/packages/api-graphql/src/utils/errors/validation.ts @@ -8,6 +8,7 @@ export enum APIValidationErrorCode { NoAppId = 'NoAppId', NoCredentials = 'NoCredentials', NoRegion = 'NoRegion', + NoDefaultAuthMode = 'NoDefaultAuthMode', } export const validationErrorMap: AmplifyErrorMap = { @@ -20,4 +21,7 @@ export const validationErrorMap: AmplifyErrorMap = { [APIValidationErrorCode.NoRegion]: { message: 'Missing region.', }, + [APIValidationErrorCode.NoDefaultAuthMode]: { + message: 'Missing default auth mode', + }, }; diff --git a/packages/api-graphql/src/utils/resolveConfig.ts b/packages/api-graphql/src/utils/resolveConfig.ts index 12ba796b865..185f786de6f 100644 --- a/packages/api-graphql/src/utils/resolveConfig.ts +++ b/packages/api-graphql/src/utils/resolveConfig.ts @@ -9,9 +9,14 @@ import { APIValidationErrorCode, assertValidationError } from './errors'; */ export const resolveConfig = () => { // TODO V6 - const { appId, region } = Amplify.getConfig().API ?? {}; - assertValidationError(!!appId, APIValidationErrorCode.NoAppId); + const { region, defaultAuthMode, endpoint } = + Amplify.getConfig().API?.AppSync ?? {}; + assertValidationError(!!endpoint, APIValidationErrorCode.NoAppId); assertValidationError(!!region, APIValidationErrorCode.NoRegion); + assertValidationError( + !!defaultAuthMode, + APIValidationErrorCode.NoDefaultAuthMode + ); // TODO V6 - return { appId, region }; + return { endpoint, region, defaultAuthMode }; }; diff --git a/packages/api-graphql/tsconfig.json b/packages/api-graphql/tsconfig.json new file mode 100644 index 00000000000..f3d5ed63841 --- /dev/null +++ b/packages/api-graphql/tsconfig.json @@ -0,0 +1,10 @@ +{ + "extends": "../tsconfig.base.json", + "compilerOptions": { + "importHelpers": true, + "strict": false, + "noImplicitAny": false, + "skipLibCheck": true + }, + "include": ["./src"] +} diff --git a/packages/api-rest/src/API.ts b/packages/api-rest/src/API.ts index bf2304f850e..e60b0cddb9a 100644 --- a/packages/api-rest/src/API.ts +++ b/packages/api-rest/src/API.ts @@ -1,8 +1,15 @@ import { RestClient } from './RestClient'; import { PostOptions } from './types'; +const restClient = new RestClient({ headers: {}, endpoints: [] }); export function post(url: string, options: PostOptions) { - const restClient = new RestClient({ headers: {}, endpoints: [] }); - return restClient.post(url, options); } + +export function cancel(request: Promise, message?: string) { + return restClient.cancel(request, message); +} + +export function isCancel(error: Error) { + return restClient.isCancel(error); +} diff --git a/packages/api-rest/src/RestClient.ts b/packages/api-rest/src/RestClient.ts index b3132bb5e86..a5577c08f19 100644 --- a/packages/api-rest/src/RestClient.ts +++ b/packages/api-rest/src/RestClient.ts @@ -1,14 +1,14 @@ // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 -import { fetchAuthSession } from '@aws-amplify/core'; +import { + AWSCredentialsAndIdentityId, + fetchAuthSession, +} from '@aws-amplify/core'; import { apiOptions } from './types'; import axios, { CancelTokenSource } from 'axios'; import { parse, format } from 'url'; -import { - Credentials, - signRequest, -} from '@aws-amplify/core/internals/aws-client-utils'; +import { signRequest } from '@aws-amplify/core/internals/aws-client-utils'; // const logger = new Logger('RestClient'); @@ -45,8 +45,8 @@ export class RestClient { * * For more details, see https://github.com/aws-amplify/amplify-js/pull/3769#issuecomment-552660025 */ - private _cancelTokenMap: WeakMap | null = null; - + private _cancelTokenMap: WeakMap, CancelTokenSource> | null = + null; /** * @param {RestClientOptions} [options] - Instance options */ @@ -73,111 +73,117 @@ export class RestClient { * @param {json} [init] - Request extra params * @return {Promise} - A promise that resolves to an object with response status and JSON data, if successful. */ - async ajax(url: string, method: string, init) { + ajax(url: string, method: string, init) { // logger.debug(method, urlOrApiInfo); - - const parsed_url = new URL(url); - - const region: string = init.region || 'us-east-1'; - const service: string = init.serviceName || 'execute-api'; - - const params = { - method, - url, - host: parsed_url.host, - path: parsed_url.pathname, - headers: {}, - data: JSON.stringify(''), - responseType: 'json', - timeout: 0, - cancelToken: null, - }; - - const libraryHeaders = {}; - const initParams = Object.assign({}, init); - const isAllResponse = initParams.response; - if (initParams.body) { - if ( - typeof FormData === 'function' && - initParams.body instanceof FormData - ) { - libraryHeaders['Content-Type'] = 'multipart/form-data'; - params.data = initParams.body; - } else { - libraryHeaders['Content-Type'] = 'application/json; charset=UTF-8'; - params.data = JSON.stringify(initParams.body); + const source = axios.CancelToken.source(); + const promise = new Promise(async (res, rej) => { + const parsed_url = new URL(url); + + const region: string = init.region || 'us-east-1'; + const service: string = init.serviceName || 'execute-api'; + + const params = { + method, + url, + host: parsed_url.host, + path: parsed_url.pathname, + headers: {}, + data: JSON.stringify(''), + responseType: 'json', + timeout: 0, + }; + + const libraryHeaders = {}; + const initParams = Object.assign({}, init); + const isAllResponse = initParams.response; + if (initParams.body) { + if ( + typeof FormData === 'function' && + initParams.body instanceof FormData + ) { + libraryHeaders['Content-Type'] = 'multipart/form-data'; + params.data = initParams.body; + } else { + libraryHeaders['Content-Type'] = 'application/json; charset=UTF-8'; + params.data = JSON.stringify(initParams.body); + } + } + if (initParams.responseType) { + params.responseType = initParams.responseType; + } + if (initParams.withCredentials) { + params['withCredentials'] = initParams.withCredentials; + } + if (initParams.timeout) { + params.timeout = initParams.timeout; } - } - if (initParams.responseType) { - params.responseType = initParams.responseType; - } - if (initParams.withCredentials) { - params['withCredentials'] = initParams.withCredentials; - } - if (initParams.timeout) { - params.timeout = initParams.timeout; - } - if (initParams.cancellableToken) { - params.cancelToken = initParams.cancellableToken.token; - } - - params['signerServiceInfo'] = initParams.signerServiceInfo; - // custom_header callback + params['signerServiceInfo'] = initParams.signerServiceInfo; - params.headers = { - ...libraryHeaders, - ...initParams.headers, - }; + // custom_header callback - // Intentionally discarding search - const { search, ...parsedUrl } = parse(url, true, true); - params.url = format({ - ...parsedUrl, - query: { - ...parsedUrl.query, - ...(initParams.queryStringParameters || {}), - }, - }); + params.headers = { + ...libraryHeaders, + ...initParams.headers, + }; - // Do not sign the request if client has added 'Authorization' header, - // which means custom authorizer. - if (typeof params.headers['Authorization'] !== 'undefined') { - params.headers = Object.keys(params.headers).reduce((acc, k) => { - if (params.headers[k]) { - acc[k] = params.headers[k]; - } - return acc; - // tslint:disable-next-line:align - }, {}); - return this._request(params, isAllResponse); - } + // Intentionally discarding search + const { search, ...parsedUrl } = parse(url, true, true); + params.url = format({ + ...parsedUrl, + query: { + ...parsedUrl.query, + ...(initParams.queryStringParameters || {}), + }, + }); - let credentials: Credentials; + // Do not sign the request if client has added 'Authorization' header, + // which means custom authorizer. + if (typeof params.headers['Authorization'] !== 'undefined') { + params.headers = Object.keys(params.headers).reduce((acc, k) => { + if (params.headers[k]) { + acc[k] = params.headers[k]; + } + return acc; + // tslint:disable-next-line:align + }, {}); + return this._request(params, isAllResponse); + } - try { - credentials = (await fetchAuthSession()).credentials; - } catch (error) { - // logger.debug('No credentials available, the request will be unsigned'); - return this._request(params, isAllResponse); - } + let credentials: AWSCredentialsAndIdentityId; + + try { + const session = await fetchAuthSession(); + credentials = { + credentials: session.credentials, + identityId: session.identityId, + }; + } catch (error) { + // logger.debug('No credentials available, the request will be unsigned'); + return this._request(params, isAllResponse); + } - let signedParams; - try { + let signedParams; // before signed PARAMS signedParams = this._sign({ ...params }, credentials, { region, service, }); - const response = await axios({ - ...signedParams, - data: signedParams.body, - }); - return isAllResponse ? response : response.data; - } catch (error) { - throw error; - } + try { + res( + await axios({ + ...signedParams, + data: signedParams.body, + cancelToken: source.token, + }) + ); + } catch (error) { + rej(error); + } + }); + this._cancelTokenMap.set(promise, source); + return promise; } /** @@ -340,9 +346,8 @@ export class RestClient { data: BodyInit; responseType: string; timeout: number; - cancelToken: any; }, - credentials: Credentials, + credentialsAndIdentityId: AWSCredentialsAndIdentityId, { service, region } ) { const signed_params = signRequest( @@ -353,7 +358,7 @@ export class RestClient { body: params.data, }, { - credentials, + credentials: credentialsAndIdentityId.credentials, signingRegion: region, signingService: service, } @@ -361,7 +366,7 @@ export class RestClient { // logger.debug('Signed Request: ', signed_params); - // delete signed_params.headers['host']; + delete signed_params.headers['host']; return signed_params; } diff --git a/packages/api-rest/src/index.ts b/packages/api-rest/src/index.ts index f70b0f16d00..a25d1d1ee35 100644 --- a/packages/api-rest/src/index.ts +++ b/packages/api-rest/src/index.ts @@ -1,4 +1,4 @@ // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 -export { post } from './API'; +export { post, cancel, isCancel } from './API'; diff --git a/packages/core/src/singleton/API/types.ts b/packages/core/src/singleton/API/types.ts index 8f371875a47..30fc8f6a0fa 100644 --- a/packages/core/src/singleton/API/types.ts +++ b/packages/core/src/singleton/API/types.ts @@ -28,6 +28,22 @@ export type APIGraphQLConfig = { authMode?: string; }; +export type APIConfig = { + AppSync?: { + defaultAuthMode: GraphQLAuthMode; + region: string; + endpoint: string; + }; +}; + +export type GraphQLAuthMode = + | { type: 'apiKey'; apiKey: string } + | { type: 'jwt'; token: 'id' | 'access' } + | { type: 'iam' } + | { type: 'lambda' } + | { type: 'custom' }; +// TODO V6 + // import type { ModelIntrospectionSchema as InternalModelIntrospectionSchema } from '@aws-amplify/appsync-modelgen-plugin'; // import { REGION_SET_PARAM } from '../../clients/middleware/signing/signer/signatureV4/constants'; // export namespace Amplify { diff --git a/packages/core/src/singleton/types.ts b/packages/core/src/singleton/types.ts index 267d1996cbc..c2904852aba 100644 --- a/packages/core/src/singleton/types.ts +++ b/packages/core/src/singleton/types.ts @@ -1,7 +1,11 @@ // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 -import { APIGraphQLConfig, LibraryAPIGraphQLOptions } from './API/types'; +import { + APIConfig, + APIGraphQLConfig, + LibraryAPIGraphQLOptions, +} from './API/types'; import { AnalyticsConfig } from './Analytics/types'; import { AuthConfig, @@ -20,7 +24,7 @@ import { // TODO V6: API types?? export type ResourcesConfig = { - API?: APIGraphQLConfig; + API?: APIConfig; Analytics?: AnalyticsConfig; Auth?: AuthConfig; // Cache?: CacheConfig; diff --git a/packages/pubsub/src/Providers/AWSAppSyncRealTimeProvider/index.ts b/packages/pubsub/src/Providers/AWSAppSyncRealTimeProvider/index.ts index 672c06b125d..cfea8a26d43 100644 --- a/packages/pubsub/src/Providers/AWSAppSyncRealTimeProvider/index.ts +++ b/packages/pubsub/src/Providers/AWSAppSyncRealTimeProvider/index.ts @@ -27,7 +27,7 @@ import { ConnectionState, PubSubContent, PubSubContentObserver, -} from '../../types/PubSub'; +} from '../../../../api-graphql/src/types/PubSub'; import { AMPLIFY_SYMBOL, diff --git a/packages/pubsub/src/Providers/PubSubProvider.ts b/packages/pubsub/src/Providers/PubSubProvider.ts index acd787f71af..960ab4ab6c9 100644 --- a/packages/pubsub/src/Providers/PubSubProvider.ts +++ b/packages/pubsub/src/Providers/PubSubProvider.ts @@ -6,7 +6,7 @@ import { CustomUserAgentDetails, ConsoleLogger as Logger, } from '@aws-amplify/core'; -import { PubSubContent } from '../types/PubSub'; +import { PubSubContent } from '../../../api-graphql/src/types/PubSub'; const logger = new Logger('AbstractPubSubProvider'); diff --git a/packages/pubsub/src/types/Provider.ts b/packages/pubsub/src/types/Provider.ts index a7c99a8cbdb..5fcc328f2e2 100644 --- a/packages/pubsub/src/types/Provider.ts +++ b/packages/pubsub/src/types/Provider.ts @@ -1,7 +1,7 @@ // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 import Observable from 'zen-observable-ts'; -import { PubSubContent } from './PubSub'; +import { PubSubContent } from '../../../api-graphql/src/types/PubSub'; export interface PubSubOptions { [key: string]: any; diff --git a/packages/pubsub/src/utils/ConnectionStateMonitor.ts b/packages/pubsub/src/utils/ConnectionStateMonitor.ts index b506006121d..6003d936e01 100644 --- a/packages/pubsub/src/utils/ConnectionStateMonitor.ts +++ b/packages/pubsub/src/utils/ConnectionStateMonitor.ts @@ -2,7 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 import Observable, { ZenObservable } from 'zen-observable-ts'; -import { ConnectionState } from '../types/PubSub'; +import { ConnectionState } from '../../../api-graphql/src/types/PubSub'; import { ReachabilityMonitor } from './ReachabilityMonitor'; // Internal types for tracking different connection states