Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ECO-4977] Fix broadcast to others #42

Merged
merged 12 commits into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 31 additions & 5 deletions src/channel/ably/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ export const parseJwt = (jwtToken: string): { header: any; payload: any } => {
// Get Token Header
const base64HeaderUrl = jwtToken.split('.')[0];
const base64Header = base64HeaderUrl.replace('-', '+').replace('_', '/');
const header = JSON.parse(toText(base64Header));
const header = JSON.parse(fromBase64UrlEncoded(base64Header));
// Get Token payload
const base64Url = jwtToken.split('.')[1];
const base64 = base64Url.replace('-', '+').replace('_', '/');
const payload = JSON.parse(toText(base64));
const payload = JSON.parse(fromBase64UrlEncoded(base64));
return { header, payload };
};

Expand All @@ -33,11 +33,37 @@ export const toTokenDetails = (jwtToken: string): TokenDetails | any => {

const isBrowser = typeof window === 'object';

const toText = (base64: string) => {
/**
* Helper method to decode base64 url encoded string
* https://stackoverflow.com/a/78178053
* @param base64 base64 url encoded string
* @returns decoded text string
*/
export const fromBase64UrlEncoded = (base64: string): string => {
const base64Encoded = base64.replace(/-/g, '+').replace(/_/g, '/');
const padding = base64.length % 4 === 0 ? '' : '='.repeat(4 - (base64.length % 4));
const base64WithPadding = base64Encoded + padding;

if (isBrowser) {
return atob(base64WithPadding);
}
return Buffer.from(base64WithPadding, 'base64').toString();
};

/**
* Helper method to encode text into base64 url encoded string
* https://stackoverflow.com/a/78178053
* @param base64 text
* @returns base64 url encoded string
*/
export const toBase64UrlEncoded = (text: string): string => {
let encoded = ''
if (isBrowser) {
return atob(base64);
encoded = btoa(text);
} else {
encoded = Buffer.from(text).toString('base64');
}
return Buffer.from(base64, 'base64').toString('binary');
return encoded.replace(/\+/g, '-').replace(/\//g, '_').replace(/=+$/, '');
Dismissed Show dismissed Hide dismissed
};

const isAbsoluteUrl = (url: string) => (url && url.indexOf('http://') === 0) || url.indexOf('https://') === 0;
Expand Down
8 changes: 7 additions & 1 deletion src/connector/ably-connector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { Connector } from './connector';

import { AblyChannel, AblyPrivateChannel, AblyPresenceChannel, AblyAuth } from './../channel';
import { AblyRealtime, TokenDetails } from '../../typings/ably';
import { toBase64UrlEncoded } from '../channel/ably/utils';

/**
* This class creates a connector to Ably.
Expand Down Expand Up @@ -118,9 +119,14 @@ export class AblyConnector extends Connector {

/**
* Get the socket ID for the connection.
* For ably, returns base64 url encoded json with keys {connectionKey, clientId}
*/
socketId(): string {
return this.ably.connection.key;
let socketIdObject = {
connectionKey : this.ably.connection.key,
clientId : this.ably.auth.clientId ?? null,
}
return toBase64UrlEncoded(JSON.stringify(socketIdObject));
}

/**
Expand Down
1 change: 1 addition & 0 deletions src/echo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ export default class Echo {

/**
* Get the Socket ID for the connection.
* For ably, returns base64 url encoded json with keys {connectionKey, clientId}
*/
socketId(): string {
return this.connector.socketId();
Expand Down
4 changes: 2 additions & 2 deletions tests/ably/ably-user-login.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ describe('AblyUserLogin', () => {
});
});

test('user logs in without previous (guest) channels', async () => {
test('user logs in without previous (public) channels', async () => {
let connectionStates : Array<any>= []
// Initial clientId is null
expect(mockAuthServer.clientId).toBeNull();
Expand Down Expand Up @@ -70,7 +70,7 @@ describe('AblyUserLogin', () => {
expect(echo.connector.ablyAuth.existingToken().clientId).toBe('[email protected]');
});

test('user logs in with previous (guest) channels', async () => {
test('user logs in with previous (public) channels', async () => {
let connectionStates : Array<any>= []
let publicChannelStates : Array<any>= []

Expand Down
167 changes: 167 additions & 0 deletions tests/ably/ably-user-rest-publishing.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
import { setup, tearDown } from './setup/sandbox';
import Echo from '../../src/echo';
import { MockAuthServer } from './setup/mock-auth-server';
import { AblyChannel, AblyPrivateChannel } from '../../src/channel';
import * as Ably from 'ably';
import waitForExpect from 'wait-for-expect';
import { fromBase64UrlEncoded } from '../../src/channel/ably/utils';

jest.setTimeout(30000);
describe('AblyUserRestPublishing', () => {
let testApp: any;
let mockAuthServer: MockAuthServer;
let echoInstances: Array<Echo>;

beforeAll(async () => {
global.Ably = Ably;
testApp = await setup();
mockAuthServer = new MockAuthServer(testApp.keys[0].keyStr);
});

afterAll(async () => {
return await tearDown(testApp);
});

beforeEach(() => {
echoInstances = [];
})

afterEach((done) => {
let promises: Array<Promise<boolean>> = []
for (const echo of echoInstances) {
echo.disconnect();
const promise = new Promise<boolean>(res => {
echo.connector.ably.connection.once('closed', () => {
res(true);
});
})
promises.push(promise);
}
Promise.all(promises).then(_ => {
done();
})
});

async function getGuestUserChannel(channelName: string) {
mockAuthServer.clientId = null;
const guestUser = new Echo({
broadcaster: 'ably',
useTls: true,
environment: 'sandbox',
requestTokenFn: mockAuthServer.getSignedToken
});
echoInstances.push(guestUser);
const publicChannel = guestUser.channel(channelName) as AblyChannel;
await new Promise((resolve) => publicChannel.subscribed(resolve));
expect(guestUser.connector.ably.auth.clientId).toBeFalsy();
expect(guestUser.connector.ablyAuth.existingToken().clientId).toBeNull();
return publicChannel;
}

async function getLoggedInUserChannel(channelName: string) {
mockAuthServer.clientId = '[email protected]';
const loggedInUser = new Echo({
broadcaster: 'ably',
useTls: true,
environment: 'sandbox',
requestTokenFn: mockAuthServer.getSignedToken
});
echoInstances.push(loggedInUser);
const privateChannel = loggedInUser.private(channelName) as AblyPrivateChannel;
await new Promise((resolve) => privateChannel.subscribed(resolve));
expect(loggedInUser.connector.ably.auth.clientId).toBe("[email protected]");
expect(loggedInUser.connector.ablyAuth.existingToken().clientId).toBe("[email protected]")
mockAuthServer.clientId = null;
return privateChannel;
}

test('Guest user return socketId as base64 encoded connectionkey and null clientId', async () => {
await getGuestUserChannel("dummyChannel");
const guestUser = echoInstances[0];
const socketIdObj = JSON.parse(fromBase64UrlEncoded(guestUser.socketId()));

const expectedConnectionKey = guestUser.connector.ably.connection.key;

expect(socketIdObj.connectionKey).toBe(expectedConnectionKey);
expect(socketIdObj.connectionKey).toBeTruthy();

expect(socketIdObj.clientId).toBeNull();
});

test('Guest user publishes message via rest API', async () => {
let messagesReceived: Array<string> = []
let channelName = "testChannel";

const publicChannel1 = await getGuestUserChannel(channelName);
publicChannel1.listenToAll((eventName, data) => {
messagesReceived.push(eventName);
});

const publicChannel2 = await getGuestUserChannel(channelName);
publicChannel2.listenToAll((eventName, data) => {
messagesReceived.push(eventName);
})

// Publish message to all clients
await mockAuthServer.broadcast(`public:${channelName}`, "testEvent", "mydata")
await waitForExpect(() => {
expect(messagesReceived.length).toBe(2);
expect(messagesReceived.filter(m => m == ".testEvent").length).toBe(2)
});

// Publish message to other client
messagesReceived = []
const firstClientSocketId = echoInstances[0].socketId();
await mockAuthServer.broadcastToOthers(firstClientSocketId,
{ channelName: `public:${channelName}`, eventName: "toOthers", payload: "data" })
await waitForExpect(() => {
expect(messagesReceived.length).toBe(1);
expect(messagesReceived.filter(m => m == ".toOthers").length).toBe(1);
});
});

test('Logged in user return socketId as base64 encoded connectionkey and clientId', async () => {
await getLoggedInUserChannel("dummyChannel");
const loggedInUser = echoInstances[0];
const socketIdObj = JSON.parse(fromBase64UrlEncoded(loggedInUser.socketId()));

const expectedConnectionKey = loggedInUser.connector.ably.connection.key;

expect(socketIdObj.connectionKey).toBe(expectedConnectionKey);
expect(socketIdObj.connectionKey).toBeTruthy();

expect(socketIdObj.clientId).toBe("[email protected]");
});

test('Logged in user publishes message via rest API', async () => {
let messagesReceived: Array<string> = []
let channelName = "testChannel";

const privateChannel1 = await getLoggedInUserChannel(channelName);
privateChannel1.listenToAll((eventName, data) => {
messagesReceived.push(eventName);
});

const privateChannel2 = await getLoggedInUserChannel(channelName);
privateChannel2.listenToAll((eventName, data) => {
messagesReceived.push(eventName);
})

// Publish message to all clients
await mockAuthServer.broadcast(`private:${channelName}`, "testEvent", "mydata")
await waitForExpect(() => {
expect(messagesReceived.length).toBe(2);
expect(messagesReceived.filter(m => m == ".testEvent").length).toBe(2);
});

// Publish message to other client
messagesReceived = []
const firstClientSocketId = echoInstances[0].socketId();
await mockAuthServer.broadcastToOthers(firstClientSocketId,
{ channelName: `private:${channelName}`, eventName: "toOthers", payload: "data" });
await waitForExpect(() => {
expect(messagesReceived.length).toBe(1);
expect(messagesReceived.filter(m => m == ".toOthers").length).toBe(1);
});
});
});
27 changes: 24 additions & 3 deletions tests/ably/setup/mock-auth-server.ts
Original file line number Diff line number Diff line change
@@ -1,28 +1,49 @@
import { isNullOrUndefinedOrEmpty, parseJwt } from '../../../src/channel/ably/utils';
import { isNullOrUndefinedOrEmpty, parseJwt, fromBase64UrlEncoded } from '../../../src/channel/ably/utils';
import * as Ably from 'ably/promises';
import * as jwt from 'jsonwebtoken';

type channels = Array<string>;

/**
* MockAuthServer mimicks {@link https://github.com/ably/laravel-broadcaster/blob/main/src/AblyBroadcaster.php AblyBroadcaster.php}.
* Aim is to keep implementation and behaviour in sync with AblyBroadcaster.php, so that it can be tested
* without running actual PHP server.
*/
export class MockAuthServer {
keyName: string;
keySecret: string;
ablyClient: Ably.Rest;
clientId: string | null = '[email protected]';
userInfo = { id: '[email protected]', name: 'sacOO7' };

shortLived: channels;
banned: channels;

userInfo = { id: '[email protected]', name: 'sacOO7' }; // Used for presence

constructor(apiKey: string, environment = 'sandbox') {
const keys = apiKey.split(':');
this.keyName = keys[0];
this.keySecret = keys[1];
this.ablyClient = new Ably.Rest({ key: apiKey, environment });
}

/**
* Broadcast to all clients subscribed to given channel.
*/
broadcast = async (channelName: string, eventName: string, message: string) => {
await this.ablyClient.channels.get(channelName).publish(eventName, message);
await this.broadcastToOthers("", {channelName, eventName, payload: message});
};

/**
* Broadcast on behalf of a given realtime client.
*/
broadcastToOthers = async (socketId: string, {channelName, eventName, payload}) => {
let protoMsg = {name: eventName, data: payload};
if (!isNullOrUndefinedOrEmpty(socketId)) {
const socketIdObj = JSON.parse(fromBase64UrlEncoded(socketId));
protoMsg = {...protoMsg, ...socketIdObj}
}
await this.ablyClient.channels.get(channelName).publish(protoMsg);
};

tokenInvalidOrExpired = (serverTime, token) => {
Expand Down
4 changes: 2 additions & 2 deletions tests/ably/setup/sandbox.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { httpDeleteAsync, httpPostAsync, toBase64 } from './utils';
import { httpDeleteAsync, httpPostAsync, toBase64UrlEncoded } from './utils';

let sandboxUrl = 'https://sandbox-rest.ably.io/apps';

Expand All @@ -16,7 +16,7 @@ const creatNewApp = async () => {

const deleteApp = async (app) => {
let authKey = app.keys[0].keyStr;
const headers = { Authorization: 'Basic ' + toBase64(authKey) };
const headers = { Authorization: 'Basic ' + toBase64UrlEncoded(authKey) };
return await httpDeleteAsync(`${sandboxUrl}/${app.appId}`, headers);
};

Expand Down
5 changes: 2 additions & 3 deletions tests/ably/setup/utils.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { httpRequestAsync } from '../../../src/channel/ably/utils';
import { httpRequestAsync} from '../../../src/channel/ably/utils';
export { toBase64UrlEncoded } from '../../../src/channel/ably/utils';

const safeAssert = (assertions: Function, done: Function, finalAssertion = false) => {
try {
Expand All @@ -17,8 +18,6 @@ export const execute = (fn: Function, times: number) => {
}
};

export const toBase64 = (text: string) => Buffer.from(text, 'binary').toString('base64');

export const httpPostAsync = async (url: string, postData: any) => {
postData = JSON.stringify(postData);
let postOptions = {
Expand Down
Loading
Loading