Skip to content

Commit

Permalink
Merge pull request #1 from ably-forks/ably-connector
Browse files Browse the repository at this point in the history
Ably connector
  • Loading branch information
sacOO7 authored Jul 27, 2022
2 parents a32161d + e2de8ca commit 1d2eb5f
Show file tree
Hide file tree
Showing 24 changed files with 1,425 additions and 10 deletions.
10 changes: 7 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
"types": "dist/echo.d.ts",
"scripts": {
"build": "npm run compile && npm run declarations",
"compile": "./node_modules/.bin/rollup -c",
"declarations": "./node_modules/.bin/tsc --emitDeclarationOnly",
"compile": "rollup -c",
"declarations": "tsc --emitDeclarationOnly",
"lint": "eslint --ext .js,.ts ./src ./tests",
"prepublish": "npm run build",
"release": "npm run test && standard-version && git push --follow-tags && npm publish",
Expand All @@ -41,14 +41,18 @@
"@types/node": "^17.0.21",
"@typescript-eslint/eslint-plugin": "^5.14.0",
"@typescript-eslint/parser": "^5.14.0",
"ably": "^1.2.20",
"eslint": "^8.11.0",
"got": "^11.8.3",
"jest": "^27.5.1",
"jsonwebtoken": "^8.5.1",
"rollup": "^2.70.1",
"rollup-plugin-typescript2": "^0.31.2",
"standard-version": "^9.3.2",
"ts-jest": "^27.1.3",
"tslib": "^2.3.1",
"typescript": "^4.6.2"
"typescript": "^4.6.2",
"wait-for-expect": "^3.0.2"
},
"engines": {
"node": ">=10"
Expand Down
201 changes: 201 additions & 0 deletions src/channel/ably-channel.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
import { AblyRealtime, AblyRealtimeChannel } from '../../typings/ably';
import { EventFormatter } from '../util';
import { Channel } from './channel';

/**
* This class represents an Ably channel.
*/
export class AblyChannel extends Channel {
/**
* The Ably client instance.
*/
ably: AblyRealtime;

/**
* The name of the channel.
*/
name: string;

/**
* Channel options.
*/
options: any;

/**
* The event formatter.
*/
eventFormatter: EventFormatter;

/**
* The subscription of the channel.
*/
channel: AblyRealtimeChannel;

/**
* An array containing all registered subscribed listeners.
*/
subscribedListeners: Function[];

/**
* An array containing all registered error listeners.
*/
errorListeners: Function[];

/**
* Channel event subscribe callbacks, maps callback to modified implementation.
*/
callbacks: Map<Function, Function>;

/**
* Create a new class instance.
*/
constructor(ably: any, name: string, options: any, autoSubscribe = true) {
super();

this.name = name;
this.ably = ably;
this.options = options;
this.eventFormatter = new EventFormatter(this.options.namespace);
this.subscribedListeners = [];
this.errorListeners = [];
this.channel = ably.channels.get(name);
this.callbacks = new Map();

if (autoSubscribe) {
this.subscribe();
}
}

/**
* Subscribe to an Ably channel.
*/
subscribe(): any {
this.channel.on(stateChange => {
const { previous, current, reason } = stateChange;
if (previous !== 'attached' && current == 'attached') {
this.subscribedListeners.forEach(listener => listener());
} else if (reason) {
this._alertErrorListeners(stateChange);
}
});
this.channel.attach(this._alertErrorListeners);
}

/**
* Unsubscribe from an Ably channel, unregister all callbacks and finally detach the channel
*/
unsubscribe(): void {
this.channel.unsubscribe();
this.callbacks.clear();
this.unregisterError();
this.unregisterSubscribed();
this.channel.off();
this.channel.detach();
}

/**
* Listen for an event on the channel instance.
*/
listen(event: string, callback: Function): AblyChannel {
this.callbacks.set(callback, ({ data, ...metaData }) => callback(data, metaData));
this.channel.subscribe(this.eventFormatter.format(event), this.callbacks.get(callback) as any);
return this;
}

/**
* Listen for all events on the channel instance.
*/
listenToAll(callback: Function): AblyChannel {
this.callbacks.set(callback, ({ name, data, ...metaData }) => {
let namespace = this.options.namespace.replace(/\./g, '\\');

let formattedEvent = name.startsWith(namespace) ? name.substring(namespace.length + 1) : '.' + name;

callback(formattedEvent, data, metaData);
});
this.channel.subscribe(this.callbacks.get(callback) as any);
return this;
}

/**
* Stop listening for an event on the channel instance.
*/
stopListening(event: string, callback?: Function): AblyChannel {
if (callback) {
this.channel.unsubscribe(this.eventFormatter.format(event), this.callbacks.get(callback) as any);
this.callbacks.delete(callback);
} else {
this.channel.unsubscribe(this.eventFormatter.format(event));
}

return this;
}

/**
* Stop listening for all events on the channel instance.
*/
stopListeningToAll(callback?: Function): AblyChannel {
if (callback) {
this.channel.unsubscribe(this.callbacks.get(callback) as any);
this.callbacks.delete(callback);
} else {
this.channel.unsubscribe();
}

return this;
}

/**
* Register a callback to be called anytime a subscription succeeds.
*/
subscribed(callback: Function): AblyChannel {
this.subscribedListeners.push(callback);

return this;
}

/**
* Register a callback to be called anytime a subscription error occurs.
*/
error(callback: Function): AblyChannel {
this.errorListeners.push(callback);

return this;
}

/**
* Unregisters given error callback from the listeners.
* @param callback
* @returns AblyChannel
*/
unregisterSubscribed(callback?: Function): AblyChannel {
if (callback) {
this.subscribedListeners = this.subscribedListeners.filter(s => s != callback);
} else {
this.subscribedListeners = [];
}

return this;
}

/**
* Unregisters given error callback from the listeners.
* @param callback
* @returns AblyChannel
*/
unregisterError(callback?: Function): AblyChannel {
if (callback) {
this.errorListeners = this.errorListeners.filter(e => e != callback);
} else {
this.errorListeners = [];
}

return this;
}

_alertErrorListeners = (err: any) => {
if (err) {
this.errorListeners.forEach(listener => listener(err));
}
}
}
102 changes: 102 additions & 0 deletions src/channel/ably-presence-channel.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
import { AblyChannel } from './ably-channel';
import { AblyAuth } from './ably/auth';
import { PresenceChannel } from './presence-channel';

/**
* This class represents an Ably presence channel.
*/
export class AblyPresenceChannel extends AblyChannel implements PresenceChannel {

presenceData: any;

constructor(ably: any, name: string, options: any, auth: AblyAuth) {
super(ably, name, options, false);
this.channel.on("failed", auth.onChannelFailed(this));
this.channel.on("attached", () => this.enter(this.presenceData, this._alertErrorListeners));
this.subscribe();
}

unsubscribe(): void {
this.leave(this.presenceData, this._alertErrorListeners);
this.channel.presence.unsubscribe();
super.unsubscribe();
}

/**
* Register a callback to be called anytime the member list changes.
*/
here(callback: Function): AblyPresenceChannel {
this.channel.presence.subscribe(['enter', 'update', 'leave'], () =>
this.channel.presence.get((err, members) => callback(members, err)// returns local sync copy of updated members
));
return this;
}

/**
* Listen for someone joining the channel.
*/
joining(callback: Function): AblyPresenceChannel {
this.channel.presence.subscribe(['enter', 'update'], ({ data, ...metaData }) => {
callback(data, metaData);
});

return this;
}

/**
* Listen for someone leaving the channel.
*/
leaving(callback: Function): AblyPresenceChannel {
this.channel.presence.subscribe('leave', ({ data, ...metaData }) => {
callback(data, metaData);
});

return this;
}

/**
* Enter presence
* @param data - Data to be published while entering the channel
* @param callback - success/error callback (err) => {}
* @returns AblyPresenceChannel
*/
enter(data: any, callback: Function): AblyPresenceChannel {
this.channel.presence.enter(data, callback as any);

return this;
}

/**
* Leave presence
* @param data - Data to be published while leaving the channel
* @param callback - success/error callback (err) => {}
* @returns AblyPresenceChannel
*/
leave(data: any, callback?: Function): AblyPresenceChannel {
this.channel.presence.leave(data, callback as any);

return this;
}

/**
* Update presence
* @param data - Update presence with data
* @param callback - success/error callback (err) => {}
* @returns AblyPresenceChannel
*/
update(data: any, callback: Function): AblyPresenceChannel {
this.channel.presence.update(data, callback as any);

return this;
}

/**
* Trigger client event on the channel.
*/
whisper(eventName: string, data: any): AblyPresenceChannel {
this.channel.publish(`client-${eventName}`, data);

return this;
}

}
19 changes: 19 additions & 0 deletions src/channel/ably-private-channel.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import { AblyChannel } from './ably-channel';
import { AblyAuth } from './ably/auth';

export class AblyPrivateChannel extends AblyChannel {

constructor(ably: any, name: string, options: any, auth: AblyAuth) {
super(ably, name, options, false);
this.channel.on("failed", auth.onChannelFailed(this));
this.subscribe();
}
/**
* Trigger client event on the channel.
*/
whisper(eventName: string, data: any): AblyPrivateChannel {
this.channel.publish(`client-${eventName}`, data);

return this;
}
}
37 changes: 37 additions & 0 deletions src/channel/ably/attach.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import { isNullOrUndefined } from './utils';

let channelAttachAuthorized = false;

/**
* Modifies existing channel attach with custom authz implementation
*/
export const beforeChannelAttach = (ablyClient, authorize: Function) => {
const dummyRealtimeChannel = ablyClient.channels.get("dummy");
if (channelAttachAuthorized) { //Only once all ably instance
return;
}
const internalAttach = dummyRealtimeChannel.__proto__._attach; // get parent class method inferred from object, store it in temp. variable
if (isNullOrUndefined(internalAttach)) {
console.warn("channel internal attach function not found, please check for right library version")
return;
}
function customInternalAttach(forceReattach, attachReason, errCallback) {// Define new function that needs to be added
if (this.state === 'attached' || this.authorizing) {
return;
}
this.authorizing = true;
const bindedInternalAttach = internalAttach.bind(this); // bind object instance at runtime
// custom logic before attach
authorize(this, (error) => {
this.authorizing = false;
if (error) {
errCallback(error);
return;
} else {
bindedInternalAttach(forceReattach, attachReason, errCallback);// call internal function here
}
})
}
dummyRealtimeChannel.__proto__._attach = customInternalAttach; // add updated extension method to parent class, auto binded
channelAttachAuthorized = true;
}
Loading

0 comments on commit 1d2eb5f

Please sign in to comment.