diff --git a/README.md b/README.md index 86f9c23..242738a 100644 --- a/README.md +++ b/README.md @@ -69,11 +69,15 @@ app.listen(3000, () => { # Feature - [x] Simple to use -- [x] Auto serve static file and cache. -- [x] WebSocket reply event -- [x] Auto reload SSL when signal(1) -- [x] Auto graceful shutdown -- [x] Auto parse message -- [x] URL params to object -- [x] Can use built-in template engine or custom template engine +- [x] Serve static files +- [x] Simple WebSocket Framework +- [x] Reload SSL when system signal HUP(1) +- [x] Graceful shutdown +- [x] Parse body data +- [x] URL params parser +- [x] Support for template engine - [x] Response from pipe stream +- [x] Support cache +- [ ] Support for Socket.io +- [ ] Support for TypeScript +- [ ] Sub-route likes Express.js diff --git a/client/base.js b/client/base.js index 96f130a..51c7bff 100644 --- a/client/base.js +++ b/client/base.js @@ -1,8 +1,6 @@ const Replicator = require('replicator') const { EventEmitter } = require('events') -const replicator = new Replicator() - const PING = '\x0F' const PONG = '\x0E' const DATA_START = '\x01' @@ -17,6 +15,7 @@ class WSClientBase extends EventEmitter { constructor (options = {}) { super() this.options = options + this.replicator = new Replicator(options.parserOptions) this.connectState = 0 this.internalEvents = ['open', 'close', 'disconnect', 'connect', 'ping', 'pong', 'message', 'binary', 'error'] this.client = null @@ -24,26 +23,26 @@ class WSClientBase extends EventEmitter { this._event_return = {} } - static getPayload (data, type = 'message') { + getPayload (data, type = 'message') { if (type === 'event') { if (data.replyId === undefined || data.replyId === null) { data.replyId = '' } - return EVENT + eventId(data.event) + IDLE + data.replyId + WSClientBase.getPayload(data.data) + return EVENT + eventId(data.event) + IDLE + data.replyId + this.getPayload(data.data) } else if (type === 'ping') { return PING + new Date().valueOf().toString() } else if (type === 'pong') { return PONG + data.toString() } else if (type === 'message') { - return DATA_START + replicator.encode(data) + DATA_END + return DATA_START + this.replicator.encode(data) + DATA_END } else { return '' } } - static parsePayload (payload) { + parsePayload (payload) { if (payload[0] === DATA_START && payload[payload.length - 1] === DATA_END) { - return { type: 'message', data: replicator.decode(payload.slice(1, -1)) } + return { type: 'message', data: this.replicator.decode(payload.slice(1, -1)) } } else if (payload[0] === PING) { return { type: 'ping', data: Number(payload.slice(1)) } } else if (payload[0] === PONG) { @@ -52,11 +51,11 @@ class WSClientBase extends EventEmitter { const splitIndex = payload.indexOf(DATA_START) const id = Number(payload.slice(1, splitIndex)) const data = payload.slice(splitIndex) - return { type: 'returnData', id, data: WSClientBase.parsePayload(data).data } + return { type: 'returnData', id, data: this.parsePayload(data).data } } else if (payload[0] === EVENT) { const splitIndex = payload.indexOf(DATA_START) const data = payload.slice(splitIndex) - return { type: 'event', event: payload.slice(1, splitIndex), data: WSClientBase.parsePayload(data).data } + return { type: 'event', event: payload.slice(1, splitIndex), data: this.parsePayload(data).data } } } @@ -66,17 +65,17 @@ class WSClientBase extends EventEmitter { incomingPacket (payload) { if (payload.constructor.name === 'ArrayBuffer' || payload.constructor.name === 'Blob') { - this.emit('binary', payload) + super.emit('binary', payload) } else { - const incoming = WSClientBase.parsePayload(payload) + const incoming = this.parsePayload(payload) if (incoming.type === 'event') { - this.emit(incoming.event, incoming.data) + super.emit(incoming.event, incoming.data) } else if (incoming.type === 'returnData') { if (this._event_return[incoming.id]) { this._event_return[incoming.id](incoming) } } else { - this.emit(incoming.type, incoming.data) + super.emit(incoming.type, incoming.data) } } } @@ -125,7 +124,10 @@ class WSClientBase extends EventEmitter { this.client.close() } - send (event, data, waitReturn = false) { + emit (event, data, waitReturn = false) { + if (this.internalEvents.includes(event)) { + return super.emit(event, data) + } return new Promise((resolve, reject) => { let replyId if (waitReturn) { @@ -134,7 +136,7 @@ class WSClientBase extends EventEmitter { this._return_id_counter = 0 } } - this.client.send(WSClientBase.getPayload({ event, data, replyId }, 'event')) + this.client.send(this.getPayload({ event, data, replyId }, 'event')) if (waitReturn) { const timeOut = setTimeout(() => { reject(new Error('Response timeout.')) @@ -152,8 +154,8 @@ class WSClientBase extends EventEmitter { }) } - sendMessage (data) { - this.client.send(WSClientBase.getPayload(data)) + send (data) { + this.client.send(this.getPayload(data)) } sendBinary (data) { diff --git a/client/browser.js b/client/browser.js index 9cec9d8..b700dbb 100644 --- a/client/browser.js +++ b/client/browser.js @@ -5,28 +5,28 @@ class WSClient extends Base { super(options) this.client = new WebSocket(endpoint, 'fast-ws') this.client.onerror = error => { - this.emit('error', error) + super.emit('error', error) } this.client.onopen = () => { this.connectState = 1 - this.emit('open') + super.emit('open') this._heartbeat = setInterval(() => { this.ping() }, options.pingInterval || 30000) } this.client.onclose = () => { - this.emit('close') + super.emit('close') this.connectState = -1 clearInterval(this._heartbeat) - this.emit('disconnect') + super.emit('disconnect') } this.client.onmessage = ({ type, data }) => { if (this.connectState !== 2) { if (data === '\x00\x02') { this.connectState = 2 - this.emit('connect') + super.emit('connect') } else { - this.emit('error', new Error('Client version mismatch.')) + super.emit('error', new Error('Client version mismatch.')) } } else { this.incomingPacket(data) diff --git a/client/node.js b/client/node.js index 68c4acf..6bd74c4 100644 --- a/client/node.js +++ b/client/node.js @@ -6,7 +6,7 @@ class WSClient extends Base { super(options) this.client = new WebSocket(endpoint, 'fast-ws', options) this.client.on('error', error => { - this.emit('error', error) + super.emit('error', error) }) this.client.on('open', () => { this.connectState = 1 @@ -17,28 +17,28 @@ class WSClient extends Base { this.client.on('close', () => { this.connectState = -1 clearInterval(this._heartbeat) - this.emit('disconnect') + super.emit('disconnect') }) this.client.on('message', (message) => { if (this.connectState !== 2) { if (message === '\x00\x02') { this.connectState = 2 - this.emit('connect') + super.emit('connect') } else { - this.emit('error', new Error('Client version mismatch.')) + super.emit('error', new Error('Client version mismatch.')) } } else { this.incomingPacket(message) } }) this.client.on('ping', () => { - this.emit('ping') + super.emit('ping') }) this.client.on('pong', (data) => { if (data.length) { - this.emit('pong', new Date().valueOf() - data.toString()) + super.emit('pong', new Date().valueOf() - data.toString()) } else { - this.emit('pong') + super.emit('pong') } }) } diff --git a/docs/Client.md b/docs/Client.md index 2694c17..2b6e70f 100644 --- a/docs/Client.md +++ b/docs/Client.md @@ -11,6 +11,8 @@ const ws = new Client( pingInterval: 30000, // Default (ms) // Reply Timeout (Event reply) replyTimeout: 5000, // Default (ms) + // parser options (same as server) + parserOptions: {}, ...// Others options for package 'ws' } ) @@ -30,11 +32,11 @@ const ws = new Client( > Remove all listeners by event name -### `send(event, data)` +### `emit(event, data)` -> Send custom event to server +> Emit custom event to server -### `sendMessage(data)` +### `send(data)` > Send message to server diff --git a/docs/Server.md b/docs/Server.md index c18cde2..999ea02 100644 --- a/docs/Server.md +++ b/docs/Server.md @@ -17,7 +17,7 @@ app.listen(host, port, () => { ## Directory Structure -> `@` is your execute dir +> `@` is your execute dir (`process.cwd()`) - `@/stctic/*` => Static files - `@/template/*` => Template files @@ -150,6 +150,29 @@ app.ws( protocol: 'fast-ws', // Custom protocol object (must extends `fast-ws/server/ws-protocol/basic`) protocol: Object, + + /*== Protocol Options : basic ==*/ + protocolOptions: { + // Parse message (default) + parser: { + parse: (payload) => payload, + stringify: (payload) => payload + }, + // Parse message using JSON + parser: JSON, + // Or you can create wour own parser + }, + + /*== Protocol Options : fast-ws ==*/ + protocolOptions: { + // parser options, serialize to BSON + parserOptions: { + serialize: (val) => BSON.serialize(val, false, true, false), + deserialize: BSON.deserialize + }, + // Detail see: https://github.com/inikulin/replicator#readme + }, + /*== uWS options ==*/ // Compression compression: 'default', // equal shared diff --git a/docs/WSClient.md b/docs/WSClient.md index 60c8ebf..e4e2db6 100644 --- a/docs/WSClient.md +++ b/docs/WSClient.md @@ -45,13 +45,13 @@ > Send binary to client -### `broadcast(channel, message[, compress=true])` +### `sendToChannel(channel, message[, compress=true])` -> Broadcast message to channel +> Send message to channel -### `broadcastBinary(data[, compress=true])` +### `sendBinaryToChannel(data[, compress=true])` -> Broadcast binary to channel +> Send binary to channel ### `close()` @@ -76,29 +76,13 @@ ws.on('message', (message) => { }) ``` -### `send(event, data[, compress=true])` +### `emit(event, data[, compress=true])` -> Send event to client +> Emit event to client -### `sendMessage(data[, compress=true])` +### `emitToChannel(channel, event, data[, compress=true])` -> Send message to client - -### `sendBinary(data[, compress=true])` - -> Send binary to client - -### `broadcast(channel, event, data[, compress=true])` - -> Broadcast event to channel - -### `broadcastMessage(channel, data[, compress=true])` - -> Broadcast message to channel - -### `broadcastBinary(channel, data[, compress=true])` - -> Broadcast binary to channel +> Emit event to channel ### `WSEvent` diff --git a/server/index.js b/server/index.js index 16f489c..9fa9985 100644 --- a/server/index.js +++ b/server/index.js @@ -48,7 +48,8 @@ class fastWS { verbose = false, cache = false, templateRender = render, - bodySize = '4mb' + bodySize = '4mb', + forceStopTimeout = 5000 } = {}) { if (typeof bodySize === 'string') { // convert string to bytes number @@ -116,13 +117,14 @@ class fastWS { verbose, bodySize, templateRender, - cache + cache, + forceStopTimeout } this._server = null this._socket = null this._routes = {} - process.on('SIGINT', () => this.gracefulStop()) - process.on('SIGTERM', () => this.gracefulStop()) + process.on('SIGINT', () => this.gracefulStop(true)) + process.on('SIGTERM', () => this.gracefulStop(true)) process.on('SIGHUP', () => this.reload()) } @@ -191,7 +193,7 @@ class fastWS { callback(listenSocket) } } - this.gracefulStop() + this.gracefulStop(false) if (host) { this._server.listen(host, port, listenCallback) } else { @@ -203,13 +205,19 @@ class fastWS { if (!this._routes[path]) { this._routes[path] = {} } - let URLParams = path.match(/:\w+/g) - if (URLParams) { - URLParams = URLParams.map(key => key.slice(1)) + if (this._routes[path][method]) { + throw new ServerError({ + code: 'INVALID_DUPLICATE_ROUTER', + message: 'Invalid, duplicated router.' + }) } if (method === 'ws') { this._routes[path][method] = callbacks } else { + let URLParams = path.match(/:\w+/g) + if (URLParams) { + URLParams = URLParams.map(key => key.slice(1)) + } this._routes[path][method] = async (response, request) => { const params = {} if (URLParams) { @@ -264,17 +272,35 @@ class fastWS { } else { options.protocol = require('./ws-protocol/fast-ws') } + if (options.protocolOptions) { + if (typeof options.protocolOptions !== 'object') { + throw new ServerError({ code: 'INVALID_OPTIONS', message: 'Invalid websocket option' }) + } + } else { + options.protocolOptions = {} + } const Protocol = options.protocol + const protocol = new Protocol(options.protocolOptions) + let URLParams = path.match(/:\w+/g) + if (URLParams) { + URLParams = URLParams.map(key => key.slice(1)) + } this.route('ws', path, { compression: options.compression, idleTimeout: options.idleTimeout, maxPayloadLength: options.maxPayloadLength, open: (ws, request) => { - const client = new Protocol(ws, new Request(this.options, request, null)) + const params = {} + if (URLParams) { + URLParams.forEach((key, index) => { + params[key] = decodeURIComponent(request.getParameter(index)) + }) + } + const client = protocol.newClient(ws, request) this.options.verbose && console.log('[open]', client.remoteAddress) ws.client = client try { - callback(client) + callback(client, params) } catch (error) { console.error(error) // disconnect when error @@ -283,15 +309,11 @@ class fastWS { }, message: (ws, message, isBinary) => { try { - // decode message - ws.client.incomingPacket(Buffer.from(message), isBinary) + const buf = Buffer.from(message) + ws.client.incomingPacket(isBinary ? buf : buf.toString(), isBinary) } catch (error) { - if (error.code === 'WS_INVALID_PAYLOAD') { - this.options.verbose && console.log('[error]', 'Invalid message payload') - } else { - console.error(error) - } - // kick user when error + console.error(error) + // disconnect when error ws.close() } }, @@ -361,10 +383,15 @@ class fastWS { } } - gracefulStop () { + gracefulStop (canForceExit = true) { if (this._socket) { + const forceStop = canForceExit && setTimeout(() => { + this.options.verbose && console.log('Force stop') + process.exit(0) + }, this.options.forceStopTimeout) this.options.verbose && console.log('Shutting down...') uWS.us_listen_socket_close(this._socket) + clearTimeout(forceStop) this._socket = null } } diff --git a/server/response.js b/server/response.js index 794b66a..49faf34 100644 --- a/server/response.js +++ b/server/response.js @@ -178,7 +178,13 @@ class Response extends Writable { } } Object.keys(this._headers).forEach(key => { - this.response.writeHeader(key, this._headers[key]) + if (this._headers[key] instanceof Array) { + this._headers[key].forEach(data => { + this.response.writeHeader(key, data) + }) + } else { + this.response.writeHeader(key, this._headers[key]) + } }) } } diff --git a/server/ws-protocol/basic.js b/server/ws-protocol/basic.js index a6ca63b..1efec0b 100644 --- a/server/ws-protocol/basic.js +++ b/server/ws-protocol/basic.js @@ -1,51 +1,65 @@ -const inet = require('../inet') const { EventEmitter } = require('events') +const inet = require('../inet') +const ServerError = require('../errors') + +const nullParser = { + parse (payload) { + return payload + }, + stringify (payload) { + return payload + } +} class WSClient extends EventEmitter { - constructor (session, request) { + constructor (socket, request, { parser = nullParser } = {}) { super() - this.session = session - this.requestHeaders = request.headers + this.socket = socket + this.requestHeaders = {} + request.forEach((k, v) => { + this.requestHeaders[k] = v + }) this.internalEvents = ['message', 'binary', 'drained', 'close', 'ping', 'pong'] + this.parser = parser } incomingPacket (payload, isBinary) { if (isBinary) { - this.emit('binary', payload) + super.emit('binary', payload) } else { - this.emit('message', { data: payload.toString() }) + super.emit('message', this.parser.parse(payload)) } } onClose (code, message) { - this.emit('close', code, Buffer.from(message)) + super.emit('close', code, Buffer.from(message)) } onPing () { - this.emit('ping') + super.emit('ping') } onPong () { - this.emit('pong') + super.emit('pong') } get remoteAddress () { - return inet.ntop(Buffer.from(this.session.getRemoteAddress())) + return inet.ntop(Buffer.from(this.socket.getRemoteAddress())) } drain () { - if (this.session.getBufferedAmount() === 0) { - this.emit('drained') + if (this.socket.getBufferedAmount() === 0) { + super.emit('drained') } } _publish (topic, data, isBinary, compress) { - this.session.publish(topic, data, isBinary, compress) + this.socket.publish(topic, data, isBinary, compress) } async _send (data, isBinary, compress) { - if (this.session.getBufferedAmount() === 0) { - return this.session.send(data, isBinary, compress) + if (this.socket.getBufferedAmount() === 0) { + return this.socket.send(data, isBinary, compress) } else { await new Promise((resolve) => { super.once('drained', async () => { @@ -57,32 +71,57 @@ class WSClient extends EventEmitter { } join (channel) { - return this.session.subscribe(channel) + return this.socket.subscribe(channel) } quit (channel) { - return this.session.unsubscribe(channel) + return this.socket.unsubscribe(channel) } send (data, compress = true) { - return this._send(data, false, compress) + return this._send(this.parser.stringify(data), false, compress) } sendBinary (data, compress = true) { return this._send(data, true, compress) } - broadcast (channel, event, data, compress = true) { - this._publish(channel, data, false, compress) + sendToChannel (channel, data, compress = true) { + this._publish(channel, this.parser.stringify(data), false, compress) } - broadcastBinary (channel, data, compress = true) { + sendBinaryToChannel (channel, data, compress = true) { this._publish(channel, data, true, compress) } close () { - return this.session.close() + return this.socket.close() + } +} + +class WSProtocol { + constructor (options = {}) { + if (options.parser) { + if (typeof options.parser.parse !== 'function') { + throw new ServerError({ + code: 'INVALID_OPTIONS', + message: 'Invalid WebSocket protocol options.' + }) + } + if (typeof options.parser.stringify !== 'function') { + throw new ServerError({ + code: 'INVALID_OPTIONS', + message: 'Invalid WebSocket protocol options.' + }) + } + } + this.options = options + } + + newClient (socket, request) { + return new WSClient(socket, request, this.options) } } -module.exports = WSClient +module.exports = WSProtocol +module.exports.WSClient = WSClient diff --git a/server/ws-protocol/echo.js b/server/ws-protocol/echo.js index e8081cf..4fb30ff 100644 --- a/server/ws-protocol/echo.js +++ b/server/ws-protocol/echo.js @@ -1,12 +1,13 @@ -const basic = require('./basic') +const BasicProtocol = require('./basic') -class WSClient extends basic { - constructor (session, request) { - super(session, request) - this.on('message', ({ data }) => { - this.send(data) +class WSProtocol extends BasicProtocol { + newClient (socket, request) { + const client = new BasicProtocol.WSClient(socket, request) + client.on('message', data => { + client.send(data) }) + return client } } -module.exports = WSClient +module.exports = WSProtocol diff --git a/server/ws-protocol/fast-ws.js b/server/ws-protocol/fast-ws.js index d4f39c3..cd1c41d 100644 --- a/server/ws-protocol/fast-ws.js +++ b/server/ws-protocol/fast-ws.js @@ -1,9 +1,7 @@ const Replicator = require('replicator') -const basic = require('./basic') +const BasicProtocol = require('./basic') const ServerError = require('../errors') -const replicator = new Replicator() - const PING = '\x0F' const PONG = '\x0E' const DATA_START = '\x01' @@ -14,78 +12,84 @@ const IDLE = '\x16' const eventId = (str) => str.split('').reduce((sum, char, index) => sum + char.charCodeAt(0) * (index + 1), 0).toString(16) -function parsePayload (payload) { - if (payload[0] === DATA_START && payload[payload.length - 1] === DATA_END) { - return { type: 'message', data: replicator.decode(payload.slice(1, -1)) } - } else if (payload[0] === PING) { - return { type: 'ping', data: Number(payload.slice(1)) } - } else if (payload[0] === PONG) { - return { type: 'pong', data: new Date() - Number(payload.slice(1)) } - } else if (payload[0] === EVENT) { - const eventSplitIndex = payload.indexOf(IDLE) - const replySplitIndex = payload.indexOf(DATA_START) - if (eventSplitIndex === -1 || replySplitIndex === -1) { - throw new ServerError({ code: 'WS_INVALID_PAYLOAD' }) - } - const event = payload.slice(1, eventSplitIndex) - const replyId = payload.slice(eventSplitIndex + 1, replySplitIndex) - const dataPayload = payload.slice(replySplitIndex) - return { - type: 'event', - name: event, - reply_id: replyId, - data: parsePayload(dataPayload).data +class Parser { + constructor (options) { + this.replicator = new Replicator(options) + } + + parse (payload) { + if (payload[0] === DATA_START && payload[payload.length - 1] === DATA_END) { + return { type: 'message', data: this.replicator.decode(payload.slice(1, -1)) } + } else if (payload[0] === PING) { + return { type: 'ping', data: Number(payload.slice(1)) } + } else if (payload[0] === PONG) { + return { type: 'pong', data: new Date() - Number(payload.slice(1)) } + } else if (payload[0] === EVENT) { + const eventSplitIndex = payload.indexOf(IDLE) + const replySplitIndex = payload.indexOf(DATA_START) + if (eventSplitIndex === -1 || replySplitIndex === -1) { + throw new ServerError({ code: 'WS_INVALID_PAYLOAD' }) + } + const event = payload.slice(1, eventSplitIndex) + const replyId = payload.slice(eventSplitIndex + 1, replySplitIndex) + const dataPayload = payload.slice(replySplitIndex) + return { + type: 'event', + name: event, + reply_id: replyId, + data: this.parse(dataPayload).data + } + } else { + throw new Error('Invalid payload') } - } else { - throw new ServerError({ code: 'WS_INVALID_PAYLOAD' }) } -} -function getPayload (data, type = 'message') { - if (type === 'reply') { - return RESPONSE + data.replyId + getPayload(data.data) - } else if (type === 'event') { - return EVENT + eventId(data.event) + getPayload(data.data) - } else if (type === 'ping') { - return PING + new Date().valueOf().toString() - } else if (type === 'pong') { - return PONG + data.toString() - } else if (type === 'message') { - return DATA_START + replicator.encode(data) + DATA_END - } else { - return '' + stringify (data, type = 'message') { + if (type === 'reply') { + return RESPONSE + data.replyId + this.stringify(data.data) + } else if (type === 'event') { + return EVENT + eventId(data.event) + this.stringify(data.data) + } else if (type === 'ping') { + return PING + new Date().valueOf().toString() + } else if (type === 'pong') { + return PONG + data.toString() + } else if (type === 'message') { + return DATA_START + this.replicator.encode(data) + DATA_END + } else { + throw new Error('Invalid type') + } } } -class WSClient extends basic { - constructor (session, request) { - super(session, request) +class WSClient extends BasicProtocol.WSClient { + constructor (socket, request, parser) { + super(socket, request, { parser }) this._send('\x00\x02', 0, 0) } incomingPacket (payload, isBinary) { if (isBinary) { - this.emit('binary', payload) + super.emit('binary', payload) } else { - const incoming = parsePayload(payload.toString()) + const incoming = this.parser.parse(payload.toString(), isBinary) if (incoming.type === 'event') { incoming.reply = (data) => { if (incoming.reply_id) { - this._send(getPayload({ replyId: incoming.reply_id, data }, 'reply')) + this._send(this.parser.stringify({ replyId: incoming.reply_id, data }, 'reply')) } } - this.emit(incoming.name, incoming) + super.emit(incoming.name, incoming) } else { if (incoming.type === 'ping') { - this._send(getPayload(incoming.data, 'pong')) + this._send(this.parser.stringify(incoming.data, 'pong')) } - this.emit(incoming.type, incoming.data) + super.emit(incoming.type, incoming.data) } } } ping () { - this._send(getPayload(null, 'ping')) + this._send(this.parser.stringify(null, 'ping')) } on (event, listener) { @@ -96,6 +100,14 @@ class WSClient extends basic { } } + once (event, listener) { + if (this.internalEvents.includes(event)) { + super.once(event, listener) + } else { + super.once(eventId(event), listener) + } + } + addListener (event, listener) { if (this.internalEvents.includes(event)) { super.addListener(event, listener) @@ -128,29 +140,24 @@ class WSClient extends basic { } } - send (event, data, compress = true) { - return this._send(getPayload({ event, data }, 'event'), false, compress) + emit (event, data, compress = true) { + return this._send(this.parser.stringify({ event, data }, 'event'), false, compress) } - sendMessage (data, compress = true) { - return this._send(getPayload(data), false, compress) - } - - sendBinary (data, compress = true) { - return this._send(data, true, compress) - } - - broadcast (channel, event, data, compress = true) { - this._publish(channel, getPayload({ event, data }, 'event'), false, compress) + emitToChannel (channel, event, data, compress = true) { + this._publish(channel, this.parser.stringify({ event, data }, 'event'), false, compress) } +} - broadcastMessage (channel, data, compress = true) { - this._publish(channel, getPayload(data), false, compress) +class WSProtocol extends BasicProtocol { + constructor (options = {}) { + super() + this.parser = new Parser(options.parserOptions) } - broadcastBinary (channel, data, compress = true) { - this._publish(channel, data, true, compress) + newClient (socket, request) { + return new WSClient(socket, request, this.parser) } } -module.exports = WSClient +module.exports = WSProtocol diff --git a/test/cases/ws-client.js b/test/cases/ws-client.js index 4398315..9f3a411 100644 --- a/test/cases/ws-client.js +++ b/test/cases/ws-client.js @@ -10,21 +10,21 @@ module.exports = function ({ HTTP_PORT }) { try { { const data = { 'Hello': 'World' } - const res = await ws.send('echo', data, true) + const res = await ws.emit('echo', data, true) if (JSON.stringify(res) !== JSON.stringify(data)) { throw new Error('Response data mismatch (1)') } } { const data = "Test string" - const res = await ws.send('echo', data, true) + const res = await ws.emit('echo', data, true) if (JSON.stringify(res) !== JSON.stringify(data)) { throw new Error('Response data mismatch (2)') } } { const data = 123 - const res = await ws.send('echo', data, true) + const res = await ws.emit('echo', data, true) if (JSON.stringify(res) !== JSON.stringify(data)) { throw new Error('Response data mismatch (3)') } diff --git a/test/cases/wss-client.js b/test/cases/wss-client.js index c73858c..dbd3b24 100644 --- a/test/cases/wss-client.js +++ b/test/cases/wss-client.js @@ -7,9 +7,9 @@ module.exports = function ({ HTTPS_PORT }) { ws.on('error', reject) ws.on('connect', () => { - ws.send('join', 'test') - ws.send('broadcast', { room: 'test', message: 'test-message' }) - ws.on('someone said', (data) => { + ws.emit('join', 'test') + ws.send('test-message') + ws.on('someone said', data => { if (data === 'test-message') { resolve() } else { diff --git a/test/index.js b/test/index.js index 8fe9b48..0ccfec0 100644 --- a/test/index.js +++ b/test/index.js @@ -28,10 +28,17 @@ async function tests() { if (file.endsWith('.js') && file[0] !== '.') { const caseName = file.slice(0, -3).replace(/[-_]/ig, ' ') try { - setTimeout(() => { - throw new Error(`Timeout: ${caseName}`) - }, TIMEOUT) - await require(`./cases/${file}`)(config) + await new Promise(async (resolve, reject) => { + setTimeout(() => { + reject('Timeout') + }, TIMEOUT) + try { + await require(`./cases/${file}`)(config) + resolve() + } catch (error) { + reject(error) + } + }) console.log(success(`[success] ${caseName}`)) } catch (e) { console.log(warning(`[failed] ${caseName}`)) diff --git a/test/prepare/app.js b/test/prepare/app.js index 4ea5b24..5640fe5 100644 --- a/test/prepare/app.js +++ b/test/prepare/app.js @@ -8,18 +8,19 @@ module.exports = function (app) { ws.on('echo', ({ reply, data }) => { reply(data) }) - ws.on('broadcast', ({ data: { room, message } }) => { - ws.broadcast(room, 'someone said', message) + ws.on('join', ({ data: channel }) => { + ws.join(channel.toString()) + ws.channel = channel.toString() }) - ws.on('join', ({ data }) => { - ws.join(data.toString()) + ws.on('message', message => { + if (ws.channel) { + ws.emitToChannel(ws.channel, 'someone said', message) + } }) }) app.ws('/echo', ws => null, { protocol: 'echo' }) - // app.ws('/io', ws => null, { protocol: 'socket.io' }) - app.get('/get', (req, res) => { res.json(req.query) })