diff --git a/src/common/lib/client/baseclient.ts b/src/common/lib/client/baseclient.ts index 27d375cb1c..8d8b201036 100644 --- a/src/common/lib/client/baseclient.ts +++ b/src/common/lib/client/baseclient.ts @@ -30,6 +30,7 @@ class BaseClient { private readonly _rest: Rest | null; readonly _Crypto: IUntypedCryptoStatic | null; + readonly _MsgPack = Platform.Config.msgpack; constructor(options: ClientOptions | string, modules: ModulesMap) { if (!options) { diff --git a/src/common/lib/client/channel.ts b/src/common/lib/client/channel.ts index 370c32993e..ad4be4b767 100644 --- a/src/common/lib/client/channel.ts +++ b/src/common/lib/client/channel.ts @@ -99,7 +99,7 @@ class Channel extends EventEmitter { headers: Record, unpacked?: boolean ) { - return await Message.fromResponseBody(body, options, unpacked ? undefined : format); + return await Message.fromResponseBody(body, options, client._MsgPack, unpacked ? undefined : format); }).get(params as Record, callback); } @@ -177,7 +177,7 @@ class Channel extends EventEmitter { return; } - this._publish(Message.serialize(messages, format), headers, params, callback); + this._publish(Message.serialize(messages, client._MsgPack, format), headers, params, callback); }); } diff --git a/src/common/lib/client/presence.ts b/src/common/lib/client/presence.ts index ea64ad3cd7..ac756ef9b8 100644 --- a/src/common/lib/client/presence.ts +++ b/src/common/lib/client/presence.ts @@ -43,7 +43,12 @@ class Presence extends EventEmitter { headers: Record, unpacked?: boolean ) { - return await PresenceMessage.fromResponseBody(body, options as CipherOptions, unpacked ? undefined : format); + return await PresenceMessage.fromResponseBody( + body, + options as CipherOptions, + client._MsgPack, + unpacked ? undefined : format + ); }).get(params, callback); } @@ -82,7 +87,12 @@ class Presence extends EventEmitter { headers: Record, unpacked?: boolean ) { - return await PresenceMessage.fromResponseBody(body, options as CipherOptions, unpacked ? undefined : format); + return await PresenceMessage.fromResponseBody( + body, + options as CipherOptions, + client._MsgPack, + unpacked ? undefined : format + ); }).get(params, callback); } } diff --git a/src/common/lib/client/push.ts b/src/common/lib/client/push.ts index 2cf3e01d19..33054fbd0c 100644 --- a/src/common/lib/client/push.ts +++ b/src/common/lib/client/push.ts @@ -44,7 +44,7 @@ class Admin { if (client.options.pushFullWait) Utils.mixin(params, { fullWait: 'true' }); - const requestBody = Utils.encodeBody(body, format); + const requestBody = Utils.encodeBody(body, client._MsgPack, format); Resource.post(client, '/push/publish', requestBody, headers, params, null, (err) => callback(err)); } } @@ -71,7 +71,7 @@ class DeviceRegistrations { if (client.options.pushFullWait) Utils.mixin(params, { fullWait: 'true' }); - const requestBody = Utils.encodeBody(body, format); + const requestBody = Utils.encodeBody(body, client._MsgPack, format); Resource.put( client, '/push/deviceRegistrations/' + encodeURIComponent(device.id), @@ -85,6 +85,7 @@ class DeviceRegistrations { !err ? (DeviceDetails.fromResponseBody( body as Record, + client._MsgPack, unpacked ? undefined : format ) as DeviceDetails) : undefined @@ -128,6 +129,7 @@ class DeviceRegistrations { !err ? (DeviceDetails.fromResponseBody( body as Record, + client._MsgPack, unpacked ? undefined : format ) as DeviceDetails) : undefined @@ -153,7 +155,7 @@ class DeviceRegistrations { headers: Record, unpacked?: boolean ) { - return DeviceDetails.fromResponseBody(body, unpacked ? undefined : format); + return DeviceDetails.fromResponseBody(body, client._MsgPack, unpacked ? undefined : format); }).get(params, callback); } @@ -232,7 +234,7 @@ class ChannelSubscriptions { if (client.options.pushFullWait) Utils.mixin(params, { fullWait: 'true' }); - const requestBody = Utils.encodeBody(body, format); + const requestBody = Utils.encodeBody(body, client._MsgPack, format); Resource.post( client, '/push/channelSubscriptions', @@ -243,7 +245,12 @@ class ChannelSubscriptions { function (err, body, headers, unpacked) { callback( err, - !err && PushChannelSubscription.fromResponseBody(body as Record, unpacked ? undefined : format) + !err && + PushChannelSubscription.fromResponseBody( + body as Record, + client._MsgPack, + unpacked ? undefined : format + ) ); } ); @@ -266,7 +273,7 @@ class ChannelSubscriptions { headers: Record, unpacked?: boolean ) { - return PushChannelSubscription.fromResponseBody(body, unpacked ? undefined : format); + return PushChannelSubscription.fromResponseBody(body, client._MsgPack, unpacked ? undefined : format); }).get(params, callback); } @@ -308,7 +315,9 @@ class ChannelSubscriptions { headers: Record, unpacked?: boolean ) { - const parsedBody = (!unpacked && format ? Utils.decodeBody(body, format) : body) as Array; + const parsedBody = ( + !unpacked && format ? Utils.decodeBody(body, client._MsgPack, format) : body + ) as Array; for (let i = 0; i < parsedBody.length; i++) { parsedBody[i] = String(parsedBody[i]); diff --git a/src/common/lib/client/resource.ts b/src/common/lib/client/resource.ts index 125069f214..8a1b855369 100644 --- a/src/common/lib/client/resource.ts +++ b/src/common/lib/client/resource.ts @@ -6,6 +6,7 @@ import HttpMethods from '../../constants/HttpMethods'; import ErrorInfo, { IPartialErrorInfo, PartialErrorInfo } from '../types/errorinfo'; import BaseClient from './baseclient'; import { ErrnoException } from '../../types/http'; +import { MsgPack } from 'common/types/msgpack'; function withAuthDetails( client: BaseClient, @@ -27,7 +28,11 @@ function withAuthDetails( } } -function unenvelope(callback: ResourceCallback, format: Utils.Format | null): ResourceCallback { +function unenvelope( + callback: ResourceCallback, + MsgPack: MsgPack, + format: Utils.Format | null +): ResourceCallback { return (err, body, outerHeaders, unpacked, outerStatusCode) => { if (err && !body) { callback(err); @@ -36,7 +41,7 @@ function unenvelope(callback: ResourceCallback, format: Utils.Format | nul if (!unpacked) { try { - body = Utils.decodeBody(body, format); + body = Utils.decodeBody(body, MsgPack, format); } catch (e) { if (Utils.isErrorInfoOrPartialErrorInfo(e)) { callback(e); @@ -204,7 +209,7 @@ class Resource { } if (envelope) { - callback = callback && unenvelope(callback, envelope); + callback = callback && unenvelope(callback, client._MsgPack, envelope); (params = params || {})['envelope'] = envelope; } @@ -221,7 +226,7 @@ class Resource { let decodedBody = body; if (headers['content-type']?.indexOf('msgpack') > 0) { try { - decodedBody = Platform.Config.msgpack.decode(body as Buffer); + decodedBody = client._MsgPack.decode(body as Buffer); } catch (decodeErr) { Logger.logAction( Logger.LOG_MICRO, diff --git a/src/common/lib/client/rest.ts b/src/common/lib/client/rest.ts index 505bc05edc..b4ab2e3124 100644 --- a/src/common/lib/client/rest.ts +++ b/src/common/lib/client/rest.ts @@ -114,8 +114,8 @@ export class Rest { callback: StandardCallback> ): Promise> | void { const useBinary = this.client.options.useBinaryProtocol, - encoder = useBinary ? Platform.Config.msgpack.encode : JSON.stringify, - decoder = useBinary ? Platform.Config.msgpack.decode : JSON.parse, + encoder = useBinary ? this.client._MsgPack.encode : JSON.stringify, + decoder = useBinary ? this.client._MsgPack.decode : JSON.parse, format = useBinary ? Utils.Format.msgpack : Utils.Format.json, envelope = this.client.http.supportsLinkHeaders ? undefined : format; params = params || {}; diff --git a/src/common/lib/transport/websockettransport.ts b/src/common/lib/transport/websockettransport.ts index fc12dddf27..b352d68f48 100644 --- a/src/common/lib/transport/websockettransport.ts +++ b/src/common/lib/transport/websockettransport.ts @@ -102,7 +102,9 @@ class WebSocketTransport extends Transport { return; } try { - wsConnection.send(ProtocolMessage.serialize(message, this.params.format)); + wsConnection.send( + ProtocolMessage.serialize(message, this.connectionManager.realtime._MsgPack, this.params.format) + ); } catch (e) { const msg = 'Exception from ws connection when trying to send: ' + Utils.inspectError(e); Logger.logAction(Logger.LOG_ERROR, 'WebSocketTransport.send()', msg); @@ -119,7 +121,7 @@ class WebSocketTransport extends Transport { 'data received; length = ' + data.length + '; type = ' + typeof data ); try { - this.onProtocolMessage(ProtocolMessage.deserialize(data, this.format)); + this.onProtocolMessage(ProtocolMessage.deserialize(data, this.connectionManager.realtime._MsgPack, this.format)); } catch (e) { Logger.logAction( Logger.LOG_ERROR, diff --git a/src/common/lib/types/devicedetails.ts b/src/common/lib/types/devicedetails.ts index 59cc80b514..a8ba6c953c 100644 --- a/src/common/lib/types/devicedetails.ts +++ b/src/common/lib/types/devicedetails.ts @@ -1,3 +1,4 @@ +import { MsgPack } from 'common/types/msgpack'; import * as Utils from '../util/utils'; import ErrorInfo, { IConvertibleToErrorInfo } from './errorinfo'; @@ -74,10 +75,11 @@ class DeviceDetails { static fromResponseBody( body: Array> | Record, + MsgPack: MsgPack, format?: Utils.Format ): DeviceDetails | DeviceDetails[] { if (format) { - body = Utils.decodeBody(body, format); + body = Utils.decodeBody(body, MsgPack, format); } if (Utils.isArray(body)) { diff --git a/src/common/lib/types/message.ts b/src/common/lib/types/message.ts index 692c5e069f..7a994d867d 100644 --- a/src/common/lib/types/message.ts +++ b/src/common/lib/types/message.ts @@ -7,6 +7,7 @@ import * as Utils from '../util/utils'; import { Bufferlike as BrowserBufferlike } from '../../../platform/web/lib/util/bufferutils'; import * as API from '../../../../ably'; import { IUntypedCryptoStatic } from 'common/types/ICryptoStatic'; +import { MsgPack } from 'common/types/msgpack'; export type CipherOptions = { channelCipher: { @@ -335,10 +336,11 @@ class Message { static async fromResponseBody( body: Array, options: ChannelOptions | EncodingDecodingContext, + MsgPack: MsgPack, format?: Utils.Format ): Promise { if (format) { - body = Utils.decodeBody(body, format); + body = Utils.decodeBody(body, MsgPack, format); } for (let i = 0; i < body.length; i++) { diff --git a/src/common/lib/types/presencemessage.ts b/src/common/lib/types/presencemessage.ts index de2abfa256..01e9e2f786 100644 --- a/src/common/lib/types/presencemessage.ts +++ b/src/common/lib/types/presencemessage.ts @@ -3,6 +3,7 @@ import Platform from 'common/platform'; import Message, { CipherOptions } from './message'; import * as Utils from '../util/utils'; import * as API from '../../../../ably'; +import { MsgPack } from 'common/types/msgpack'; function toActionValue(actionString: string) { return PresenceMessage.Actions.indexOf(actionString); @@ -105,11 +106,12 @@ class PresenceMessage { static async fromResponseBody( body: Record[], options: CipherOptions, + MsgPack: MsgPack, format?: Utils.Format ): Promise { const messages: PresenceMessage[] = []; if (format) { - body = Utils.decodeBody(body, format); + body = Utils.decodeBody(body, MsgPack, format); } for (let i = 0; i < body.length; i++) { diff --git a/src/common/lib/types/protocolmessage.ts b/src/common/lib/types/protocolmessage.ts index 20acbdf380..46641b5c1c 100644 --- a/src/common/lib/types/protocolmessage.ts +++ b/src/common/lib/types/protocolmessage.ts @@ -1,3 +1,4 @@ +import { MsgPack } from 'common/types/msgpack'; import { Types } from '../../../../ably'; import * as Utils from '../util/utils'; import ErrorInfo from './errorinfo'; @@ -109,8 +110,8 @@ class ProtocolMessage { static serialize = Utils.encodeBody; - static deserialize = function (serialized: unknown, format?: Utils.Format): ProtocolMessage { - const deserialized = Utils.decodeBody>(serialized, format); + static deserialize = function (serialized: unknown, MsgPack: MsgPack, format?: Utils.Format): ProtocolMessage { + const deserialized = Utils.decodeBody>(serialized, MsgPack, format); return ProtocolMessage.fromDeserialized(deserialized); }; diff --git a/src/common/lib/types/pushchannelsubscription.ts b/src/common/lib/types/pushchannelsubscription.ts index b4056c006c..16a150b1d6 100644 --- a/src/common/lib/types/pushchannelsubscription.ts +++ b/src/common/lib/types/pushchannelsubscription.ts @@ -1,3 +1,4 @@ +import { MsgPack } from 'common/types/msgpack'; import * as Utils from '../util/utils'; type PushChannelSubscriptionObject = { @@ -36,10 +37,11 @@ class PushChannelSubscription { static fromResponseBody( body: Array> | Record, + MsgPack: MsgPack, format?: Utils.Format ): PushChannelSubscription | PushChannelSubscription[] { if (format) { - body = Utils.decodeBody(body, format) as Record; + body = Utils.decodeBody(body, MsgPack, format) as Record; } if (Utils.isArray(body)) { diff --git a/src/common/lib/util/utils.ts b/src/common/lib/util/utils.ts index 280a27cb1c..b5d9181e7e 100644 --- a/src/common/lib/util/utils.ts +++ b/src/common/lib/util/utils.ts @@ -1,5 +1,6 @@ import Platform from 'common/platform'; import ErrorInfo, { PartialErrorInfo } from 'common/lib/types/errorinfo'; +import { MsgPack } from 'common/types/msgpack'; function randomPosn(arrOrStr: Array | string) { return Math.floor(Math.random() * arrOrStr.length); @@ -450,12 +451,12 @@ export function promisify(ob: Record, fnName: string, args: IArg }); } -export function decodeBody(body: unknown, format?: Format | null): T { - return format == 'msgpack' ? Platform.Config.msgpack.decode(body as Buffer) : JSON.parse(String(body)); +export function decodeBody(body: unknown, MsgPack: MsgPack, format?: Format | null): T { + return format == 'msgpack' ? MsgPack.decode(body as Buffer) : JSON.parse(String(body)); } -export function encodeBody(body: unknown, format?: Format): string | Buffer { - return format == 'msgpack' ? (Platform.Config.msgpack.encode(body, true) as Buffer) : JSON.stringify(body); +export function encodeBody(body: unknown, MsgPack: MsgPack, format?: Format): string | Buffer { + return format == 'msgpack' ? (MsgPack.encode(body, true) as Buffer) : JSON.stringify(body); } export function allToLowerCase(arr: Array): Array { diff --git a/src/platform/nodejs/lib/util/http.ts b/src/platform/nodejs/lib/util/http.ts index 645d50435f..ff5b2868e9 100644 --- a/src/platform/nodejs/lib/util/http.ts +++ b/src/platform/nodejs/lib/util/http.ts @@ -28,7 +28,7 @@ import { shallowEquals } from 'common/lib/util/utils'; const globalAgentPool: Array<{ options: RestAgentOptions; agents: Agents }> = []; -const handler = function (uri: string, params: unknown, callback?: RequestCallback) { +const handler = function (uri: string, params: unknown, client: BaseClient | null, callback?: RequestCallback) { return function (err: ErrnoException | null, response?: Response, body?: unknown) { if (err) { callback?.(err); @@ -42,7 +42,10 @@ const handler = function (uri: string, params: unknown, callback?: RequestCallba body = JSON.parse(body as string); break; case 'application/x-msgpack': - body = Platform.Config.msgpack.decode(body as Buffer); + if (!client) { + throw new ErrorInfo('Cannot use MessagePack without a client', 400, 40000); + } + body = client._MsgPack.decode(body as Buffer); } const error = (body as { error: ErrorInfo }).error ? ErrorInfo.fromValues((body as { error: ErrorInfo }).error) @@ -230,14 +233,14 @@ const Http: typeof IHttp = class { (got[method](doOptions) as CancelableRequest) .then((res: Response) => { - handler(uri, params, callback)(null, res, res.body); + handler(uri, params, client, callback)(null, res, res.body); }) .catch((err: ErrnoException) => { if (err instanceof got.HTTPError) { - handler(uri, params, callback)(null, err.response, err.response.body); + handler(uri, params, client, callback)(null, err.response, err.response.body); return; } - handler(uri, params, callback)(err); + handler(uri, params, client, callback)(err); }); }