From 10f73fe07156caae2c88f752842a65eda9388f42 Mon Sep 17 00:00:00 2001 From: Bossslime Date: Mon, 14 Aug 2023 11:30:42 -0400 Subject: [PATCH] Finished chunking and updated readme, also changed syntax to exactly mimic normal tcp --- Examples/ClientExample.js | 44 ++- Examples/ServerExample.js | 35 +- Limitless-TCP.js | 745 +++++++++++++++++++++++++++++++------- README.md | 25 +- 4 files changed, 709 insertions(+), 140 deletions(-) diff --git a/Examples/ClientExample.js b/Examples/ClientExample.js index ae73539..61dc4fa 100644 --- a/Examples/ClientExample.js +++ b/Examples/ClientExample.js @@ -2,16 +2,48 @@ let { TCPClient } = require('../Limitless-TCP'); let tcpClient = new TCPClient('127.0.0.1', 1234); -tcpClient.connect(); +tcpClient.connect(() => {}); + +let str = ""; + + +// for (let i = 0; i < 10000000; i++) { +// str += rand_str_without_O0(); +// } +// console.log(str.length); tcpClient.on('connect', () => { - tcpClient.emit({type: 'test', data: 'This is a test packet 1'}); - tcpClient.emit({type: 'test', data: 'This is a test packet 2'}); - tcpClient.emit('Yo 1'); - tcpClient.emit('Yo 2'); + tcpClient.write({ test: str }) + + // for (let i = 0; i < 1000; i++) { + // tcpClient.write('test') + // } + + // tcpClient.write({ test: str }); + + tcpClient.on('data', (data) => { + console.log(data); + }); }); tcpClient.on('error', (err) => { console.log(err) //Handle error, heartbeat errors are formatted differently -}); \ No newline at end of file +}); + + + + + + + + +function rand_str_without_O0() { + const list = "ABCDEFGHIJKLMNPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789~`!@#$%^&*()_-+=}]{[|\"\\':;?/>.<,"; + var res = ""; + for(var i = 0; i < 12; i++) { + var rnd = Math.floor(Math.random() * list.length); + res = res + list.charAt(rnd); + } + return res; +} \ No newline at end of file diff --git a/Examples/ServerExample.js b/Examples/ServerExample.js index b623d49..f9a59b0 100644 --- a/Examples/ServerExample.js +++ b/Examples/ServerExample.js @@ -2,25 +2,46 @@ let { TCPServer } = require('../Limitless-TCP'); let settings = { useHeartbeat: true, - useCompression: true + useCompression: false, + useChunking: true } let tcpServer = new TCPServer(1234, settings); //The settings here will be applied to any clients that connect to the server -tcpServer.listen(); +let str = ""; + + +// for (let i = 0; i < 10000000; i++) { +// str += rand_str_without_O0(); +// } +// console.log(str.length); + +tcpServer.listen(() => {}); //Set to null because it is listening for a server event -tcpServer.on('connect', null, (socket) => { +tcpServer.on('connection', (socket) => { + socket.write({ test: str }, socket); - tcpServer.on('data', socket, (data) => { + socket.on('data', (data) => { console.log(data) }); - tcpServer.on('error', socket, (err) => { + socket.on('error', (err) => { + console.log(err) //Handle error, heartbeat errors are formatted differently and are per socket }); }); -tcpServer.on('error', null, (err) =>{ +tcpServer.on('error', (err) =>{ //Handle error, heartbeat errors dont appear here -}); \ No newline at end of file +}); + +function rand_str_without_O0() { + const list = "ABCDEFGHIJKLMNPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789~`!@#$%^&*()_-+=}]{[|\"\\':;?/>.<,"; + var res = ""; + for(var i = 0; i < 12; i++) { + var rnd = Math.floor(Math.random() * list.length); + res = res + list.charAt(rnd); + } + return res; +} \ No newline at end of file diff --git a/Limitless-TCP.js b/Limitless-TCP.js index 5e18982..2f322a0 100644 --- a/Limitless-TCP.js +++ b/Limitless-TCP.js @@ -8,6 +8,9 @@ const net = require('net'); const crypto = require('crypto'); +const { Worker } = require('worker_threads'); + +let chunkSize = 14000; class TCPClient { @@ -19,6 +22,8 @@ class TCPClient { this.useCompression = false; this.useChunking = false; + this.worker = null; + this.port = port; this.address = address; @@ -45,6 +50,7 @@ class TCPClient { switch (packet.type) { case 'tcpsjs-heartbeat': this.lastHeartbeat = Date.now(); + this.socket.write(JSON.stringify({ type: 'tcpsjs-heartbeat' }) + ''); break; case 'tcpsjs-connect': @@ -64,6 +70,202 @@ class TCPClient { if (packet.data.useChunking) { this.useChunking = true; } + + //Runs a worker (thread) to compress, chunk, and send the data in the background + this.worker = new Worker(` + const { parentPort } = require('worker_threads'); + + let transactions = []; + let invalidPackets = ""; + + //Garbage collector + setInterval(() => { + let counter = 0; + for (let transaction of transactions) { + if (transaction.lastTransaction + 15000 <= Date.now()) { + transactions.splice(counter, 1); + }else { + counter++; + } + } + }, 5000); + + parentPort.on('message', function(data) { + data = JSON.parse(data); + switch (data.type) { + case 'write_chunk': + let packetData; + if (data.data.useCompression) { + packetData = Buffer.from(require('pako').deflate(data.data.packetData)); + }else { + packetData = Buffer.from(data.data.packetData); + } + + + packetData = splitBuffer(packetData, data.data.chunkSize); + let transactionId = require('crypto').randomUUID(); + + let chunkIndex = 0; + let packetInterval = setInterval(() => { + + if (chunkIndex === packetData.length - 1) { + parentPort.postMessage({ type: 'write_chunk', data: JSON.stringify({ type: 'tcpsjs-chunk', transactionId: transactionId, chunkNum: chunkIndex, chunkAmount: packetData.length, last: true, data: packetData[chunkIndex] }) + ''}); + clearInterval(packetInterval); + }else { + parentPort.postMessage({ type: 'write_chunk', data: JSON.stringify({ type: 'tcpsjs-chunk', transactionId: transactionId, chunkNum: chunkIndex, chunkAmount: packetData.length, last: false, data: packetData[chunkIndex] }) + ''}); + chunkIndex++; + } + }, 1); + break; + + + case 'parse_data': + let parsedData = parseData(data.data.data); + + for (let packet of parsedData.parsedPackets) { + + let packetData; + switch(packet.type) { + case 'tcpsjs-packet': + if (data.data.useCompression) { + packetData = Buffer.from(require('pako').inflate(new Uint8Array(Buffer.from(packet.data.data)))).toString(); + }else { + packetData = packet.data.toString(); + } + + try { //Try to parse, if error it isn't a json + packetData = JSON.parse(packetData) + }catch (e) {} + + //Send a message to emit the data event + parentPort.postMessage({ type: 'emit_data', data: packetData }); + break; + + case 'tcpsjs-chunk': + let transaction; + + //If transaction exists, fetch it, if not, create a new one + if (transactions.some(item => item.id === packet.transactionId)) { + transaction = transactions[transactions.findIndex(item => item.id === packet.transactionId)]; + }else { + transaction = { + id: packet.transactionId, + chunks: [], + lastTransaction: Date.now() + } + transactions.push(transaction); + } + + transaction.chunks.push(Buffer.from(packet.data.data)); + + console.log(packet.chunkNum + " - " + packet.chunkAmount); + + if (packet.last) { + if (data.data.useCompression) { + packetData = Buffer.from(require('pako').inflate(new Uint8Array(Buffer.concat(transaction.chunks)))).toString(); + }else { + packetData = Buffer.concat(transaction.chunks).toString(); + } + + let counter = 0; + for (let item of transactions) { + if (item.id === transaction.transactionId) { + transactions.splice(counter, 1); + }else { + counter++; + } + } + + try { + packetData = JSON.parse(packetData); + }catch (e) {} + + parentPort.postMessage({ type: 'emit_data', data: packetData }); + } + break; + } + } + break; + } + + }); + + function splitBuffer(buffer, chunkSize) { + const chunks = []; + let offset = 0; + + while (offset < buffer.length) { + const chunk = buffer.slice(offset, offset + chunkSize); + chunks.push(chunk); + offset += chunkSize; + } + + return chunks; + } + + function parseData(data) { + let parsedPackets = []; + let splitPackets = data.split(''); + + for (let packet of splitPackets) { + if (packet.length !== 0) { + try { + packet = JSON.parse(packet); + + parsedPackets.push(packet); + + }catch (e) { + try { + if (packet.endsWith('}')) { + invalidPackets += (packet + ''); + }else { + invalidPackets += packet; + } + }catch (err) { + } + + let unstackedPackets = invalidPackets.split(''); + + let index = 0; + for (let unstackedPacket of unstackedPackets) { + if (unstackedPacket.length !== 0) { + try { + unstackedPacket = JSON.parse(unstackedPacket); + + parsedPackets.push(unstackedPacket); + + unstackedPackets.splice(index, 1); + }catch (e) { + index++; + } + } + } + + invalidPackets = ""; + unstackedPackets.forEach((unstackedPacket) => { + invalidPackets += unstackedPacket; + }); + } + + + } + } + + return { parsedPackets: parsedPackets, invalidPackets: invalidPackets }; + } + `, {eval: true}); + + this.worker.on('message', (msg) => { + switch (msg.type) { + case 'write_chunk': + this.socket.write(msg.data); + break; + + case 'emit_data': + this.socket.emit("data", Buffer.from(JSON.stringify({ type: 'tcpsjs-data', data: msg.data }))); + break; + } + }); break; } } catch (e) { @@ -73,6 +275,7 @@ class TCPClient { }); this.socket.on('close', () => { + this.worker.terminate(); this.socket.destroy(); if (this.heartbeatInterval !== null && this.heartbeatInterval !== undefined) { clearInterval(this.heartbeatInterval); @@ -81,6 +284,7 @@ class TCPClient { this.socket.on('end', () => { this.socket.destroy(); + this.worker.terminate(); if (this.heartbeatInterval !== null && this.heartbeatInterval !== undefined) { clearInterval(this.heartbeatInterval); } @@ -90,18 +294,29 @@ class TCPClient { return this; } + write(data) { this.emit(data); } emit(data) { - try { + try { //If err, it isnt a json data = JSON.stringify(data); - }catch (e) {} + }catch (e) { + try { //Try to stringify it anyways + data = data.toString(); + }catch (e) {} + } if (this.isServerAnInstance) { - if (this.useCompression) { - this.socket.write(JSON.stringify({ type: 'tcpsjs-packet', data: Buffer.from(pako.deflate(data)) }) + ''); - }else { - this.socket.write(JSON.stringify({ type: 'tcpsjs-packet', data: data }) + ''); + if (this.useChunking && data.length >= chunkSize) { //If it is less than chunksize, it can be sent in one packet + this.worker.postMessage(JSON.stringify({type: 'write_chunk', data: {packetData: data, chunkSize: chunkSize, useCompression: this.useCompression} })); + + }else { //This is sent in one packet + if (this.useCompression) { + this.socket.write(JSON.stringify({ type: 'tcpsjs-packet', data: Buffer.from(pako.deflate(data)) }) + ''); + }else { + this.socket.write(JSON.stringify({ type: 'tcpsjs-packet', data: data }) + ''); + } } + }else { this.socket.write(data); } @@ -110,7 +325,6 @@ class TCPClient { on(event, callback) { switch(event.toLowerCase()) { case 'close': - console.log('s'); this.socket.end(); this.socket.on('close', callback); break; @@ -127,33 +341,35 @@ class TCPClient { case 'data': //This will attempt to split incoming packets then return them each in a callback this.socket.on('data', (dataBuffer) => { - if (this.isServerAnInstance) { - let splitPackets = dataBuffer.toString().split(''); - - for (let packet of splitPackets) { - if (packet.length !== 0) { - packet = JSON.parse(packet); - if (packet.type === 'tcpsjs-packet') { - let packetData; - - if (this.useCompression) { - packetData = Buffer.from(pako.inflate(new Uint8Array(Buffer.from(packet.data.data)))).toString(); - }else { - packetData = packet.data.toString(); - } - - try { //Try to parse, if error it isn't a json - packetData = JSON.parse(packetData) - }catch (e) {} - - //Returns a packet and a function to reply - callback(packetData); - } + if (this.isServerAnInstance) { + try { + let packet = JSON.parse(dataBuffer.toString()); + if (packet.type === 'tcpsjs-data') { + let packetData = packet.data; + try { + JSON.parse(packetData); + }catch (e) {} + + callback(packetData) } + }catch (e) { + this.worker.postMessage(JSON.stringify({ + type: 'parse_data', + data: { + data: dataBuffer.toString(), + useCompression: this.useCompression + } + })); } }else { - callback(dataBuffer); + let packet = dataBuffer.toString() + + try { + packet = JSON.parse(dataBuffer); + }catch (e) {} + + callback(packet); } }); break; @@ -186,6 +402,7 @@ class TCPClient { this.heartbeatInterval = setInterval(() => { if (this.lastHeartbeat + 8000 <= Date.now()) { this.socket.emit('error', new TCPServiceError(ErrorType.HEARTBEAT, 'This socket has timed out from the server.')); + this.worker.terminate(); this.socket.destroy(); clearInterval(this.heartbeatInterval); } @@ -204,6 +421,11 @@ class TCPServer { * } */ constructor(port, settings) { + + this.chunks = []; //temp + this.invalidPackets = ""; //temp + this.chunkNums = []; //temp + this.port = port; if (settings === null) { @@ -250,15 +472,256 @@ class TCPServer { if (this.useHeartbeat) { this.startHeartbeat(); } + + //Runs a worker (thread) to compress, chunk, and send the data in the background + this.worker = new Worker(` + const { parentPort } = require('worker_threads'); + + + /** + * socket = { + * id: id, + * transactions: [], + * invalidPackets: '', + * lastTransaction: null + * } + */ + let sockets = []; + + //Garbage collector + setInterval(() => { + let socketCounter = 0; + for (let socket of sockets) { + if (socket.lastTransaction + 15000 <= Date.now()) { + sockets.splice(socketCounter, 1); + }else { + socketCounter++; + let transactionCounter = 0; + for (let transaction of socket.transactions) { + if (transaction.lastTransaction + 15000 <= Date.now()) { + socket.transactions.splice(transactionCounter, 1); + }else { + transactionCounter++; + } + } + } + } + }, 5000); + + parentPort.on('message', function(data) { + data = JSON.parse(data); + switch (data.type) { + case 'write_chunk': + let packetData; + if (data.data.useCompression) { + packetData = Buffer.from(require('pako').deflate(data.data.packetData)); + }else { + packetData = Buffer.from(data.data.packetData); + } + + + packetData = splitBuffer(packetData, data.data.chunkSize); + let transactionId = require('crypto').randomUUID(); + + let chunkIndex = 0; + let packetInterval = setInterval(() => { + + if (chunkIndex === packetData.length - 1) { + parentPort.postMessage({ type: 'write_chunk', socketId: data.socketId, data: JSON.stringify({ type: 'tcpsjs-chunk', transactionId: transactionId, chunkNum: chunkIndex, chunkAmount: packetData.length, last: true, data: packetData[chunkIndex] }) + ''}); + clearInterval(packetInterval); + }else { + parentPort.postMessage({ type: 'write_chunk', socketId: data.socketId, data: JSON.stringify({ type: 'tcpsjs-chunk', transactionId: transactionId, chunkNum: chunkIndex, chunkAmount: packetData.length, last: false, data: packetData[chunkIndex] }) + ''}); + chunkIndex++; + } + }, 1); + break; + + + case 'parse_data': + let socket = getSocketById(data.data.socketId);; + let parsedData = parseData(socket, data.data.data); + + for (let packet of parsedData.parsedPackets) { + + let packetData; + switch(packet.type) { + case 'tcpsjs-packet': + if (data.data.useCompression) { + packetData = Buffer.from(require('pako').inflate(new Uint8Array(Buffer.from(packet.data.data)))).toString(); + }else { + packetData = packet.data.toString(); + } + + try { //Try to parse, if error it isn't a json + packetData = JSON.parse(packetData) + }catch (e) {} + + //Send a message to emit the data event + parentPort.postMessage({ type: 'emit_data', socketId: socket.id, data: packetData }); + break; + + case 'tcpsjs-chunk': + let transaction = getTransactionById(socket, packet.transactionId) + + transaction.chunks.push(Buffer.from(packet.data.data)); + + console.log(packet.chunkNum + " - " + packet.chunkAmount); + + if (packet.last) { + if (data.data.useCompression) { + packetData = Buffer.from(require('pako').inflate(new Uint8Array(Buffer.concat(transaction.chunks)))).toString(); + }else { + packetData = Buffer.concat(transaction.chunks).toString(); + } + + let counter = 0; + for (let item of socket.transactions) { + if (item.id === transaction.transactionId) { + socket.transactions.splice(counter, 1); + }else { + counter++; + } + } + + try { + packetData = JSON.parse(packetData); + }catch (e) {} + + parentPort.postMessage({ type: 'emit_data', socketId: socket.id, data: packetData }); + } + break; + } + } + break; + } + + }); + + function getTransactionById(socket, transactionId) { + let transaction = null; + + if (socket.transactions.some(item => item.id === transactionId)) { + transaction = socket.transactions[socket.transactions.findIndex(item => item.id === transactionId)]; + }else { + transaction = { + id: transactionId, + chunks: [], + lastTransaction: Date.now() + } + socket.transactions.push(transaction); + } + + return transaction; + } + + function getSocketById(socketId) { + let socket = null; + + if (sockets.some(item => item.id = socketId)) { + socket = sockets[sockets.findIndex(item => item.id = socketId)]; + + }else { + socket = { + id: socketId, + transactions: [], + invalidPackets: "", + lastTransaction: Date.now() + } + + sockets.push(socket); + } + + return socket; + } + + function splitBuffer(buffer, chunkSize) { + const chunks = []; + let offset = 0; + + while (offset < buffer.length) { + const chunk = buffer.slice(offset, offset + chunkSize); + chunks.push(chunk); + offset += chunkSize; + } + + return chunks; + } + + function parseData(socket, data) { + let parsedPackets = []; + let splitPackets = data.split(''); + + for (let packet of splitPackets) { + if (packet.length !== 0) { + try { + packet = JSON.parse(packet); + + parsedPackets.push(packet); + + }catch (e) { + try { + if (packet.endsWith('}')) { + socket.invalidPackets += (packet + ''); + }else { + socket.invalidPackets += packet; + } + }catch (err) { + } + + let unstackedPackets = socket.invalidPackets.split(''); + + let index = 0; + for (let unstackedPacket of unstackedPackets) { + if (unstackedPacket.length !== 0) { + try { + unstackedPacket = JSON.parse(unstackedPacket); + + parsedPackets.push(unstackedPacket); + + unstackedPackets.splice(index, 1); + }catch (e) { + index++; + } + } + } + + socket.invalidPackets = ""; + unstackedPackets.forEach((unstackedPacket) => { + socket.invalidPackets += unstackedPacket; + }); + } + + + } + } + + return { parsedPackets: parsedPackets, invalidPackets: socket.invalidPackets }; + } + `, {eval: true}); + + this.worker.on('message', (msg) => { + + let socket = this.connectedSockets[this.connectedSockets.findIndex(item => item.id === msg.socketId)]; + switch (msg.type) { + case 'write_chunk': + socket.tcpSocket.write(msg.data); + break; + + case 'emit_data': + socket.tcpSocket.emit("data", Buffer.from(JSON.stringify({ type: 'tcpsjs-data', data: msg.data }))); + break; + } + }); }); this.server.on('connection', (socket) => { - socket.id = crypto.randomUUID(); + + socket = new Socket(this, socket); this.connectedSockets.push(socket); this.allSockets.push(socket); - socket.write(JSON.stringify({ type: 'tcpsjs-connect', data: { useHeartbeat: this.useHeartbeat, useCompression: this.useCompression, useChunking: this.useChunking } })) + socket.tcpSocket.write(JSON.stringify({ type: 'tcpsjs-connect', data: { useHeartbeat: this.useHeartbeat, useCompression: this.useCompression, useChunking: this.useChunking } }) + '') if (this.useHeartbeat) { socket.lastHeartbeat = Date.now(); @@ -266,7 +729,7 @@ class TCPServer { socket.heartbeatReceived = true; //Listed for heartbeats - socket.on('data', (dataBuffer) => { + socket.tcpSocket.on('data', (dataBuffer) => { let splitPackets = dataBuffer.toString().split(''); for (let packet of splitPackets) { @@ -289,6 +752,7 @@ class TCPServer { } this.server.on('close', () => { + this.worker.terminate(); this.server.close(() => { if (this.heartbeatInterval !== null && this.heartbeatInterval !== undefined) { clearInterval(this.heartbeatInterval); @@ -297,6 +761,7 @@ class TCPServer { }); this.server.on('end', () => { + this.worker.terminate(); this.server.close(() => { if (this.heartbeatInterval !== null && this.heartbeatInterval !== undefined) { clearInterval(this.heartbeatInterval); @@ -307,101 +772,34 @@ class TCPServer { return this; } - emit(data, socket) { - try { - data = JSON.stringify(data); - }catch (e) {} - - if (this.useCompression) { - socket.write(JSON.stringify({ type: 'tcpsjs-packet', data: Buffer.from(pako.deflate(data)) }) + ''); - }else { - socket.write(JSON.stringify({ type: 'tcpsjs-packet', data: data }) + ''); - } - } - - on(event, socket, callback) { - if (socket === null) { - switch (event.toLowerCase()) { - case 'connection': - case 'connect': - this.server.on('connection', callback); - break; - - case 'error': - this.server.on('error', callback); - break; - - case 'close': - this.server.on('close', callback); - break; - - default: - console.log("There was an issue listening to the event '" + event + "'"); - - callback(null); - break; - } - }else { - switch(event.toLowerCase()) { - case 'close': - socket.on('close', cb => { - this.removeSocketFromConnectedSockets(socket); - callback(cb); - }); - break; - - case 'data': - //This will attempt to split incoming packets then return them each in a callback - socket.on('data', (dataBuffer) => { - let splitPackets = dataBuffer.toString().split(''); - - for (let packet of splitPackets) { - if (packet.length !== 0) { - packet = JSON.parse(packet); - if (packet.type === 'tcpsjs-packet') { - - let packetData; - - if (this.useCompression) { - packetData = Buffer.from(pako.inflate(new Uint8Array(Buffer.from(packet.data.data)))).toString(); - }else { - packetData = packet.data.toString(); - } - - try { //Try to parse, if error it isn't a json - packetData = JSON.parse(packetData) - }catch (e) {} - - //Returns a packet and a function to reply - callback(packetData); - } + on(event, callback) { + switch (event.toLowerCase()) { + case 'connection': + case 'connect': + this.server.on('connection', (socket) => { + setTimeout(() => { + for (let sock of this.connectedSockets) { + if (sock.tcpSocket.id === socket.id) { + callback(sock); } } - }); - break; - - case 'drain': - socket.on('data', callback); - break; - - case 'end': - socket.on('end', callback); - break; + }, 10); + }); + break; - case 'error': - socket.on('error', callback); - break; + case 'error': + this.server.on('error', callback); + break; - case 'lookup': - socket.on('lookup', callback); - break; + case 'close': + this.server.on('close', callback); + break; - default: - console.log("There was an issue listening to the event '" + event + "'"); + default: + console.log("There was an issue listening to the event '" + event + "'"); - callback(null); - break; - } + callback(null); + break; } } @@ -411,7 +809,7 @@ class TCPServer { this.heartbeatInterval = setInterval(() => { for (let socket of this.connectedSockets) { socket.heartbeatReceived = false; - socket.write(JSON.stringify({ type: 'tcpsjs-heartbeat' }) + ''); + socket.tcpSocket.write(JSON.stringify({ type: 'tcpsjs-heartbeat' }) + ''); setTimeout(() => { if (socket.heartbeatReceived) { @@ -420,7 +818,7 @@ class TCPServer { socket.heartbeatCounter++; if (socket.heartbeatCounter === 8) { - socket.emit('error', new TCPServiceError(ErrorType.HEARTBEAT, 'A client has timed out due to heartbeat', socket)); + socket.tcpSocket.emit('error', new TCPServiceError(ErrorType.HEARTBEAT, 'A client has timed out due to heartbeat', socket)); this.removeSocketFromConnectedSockets(socket); } } @@ -438,6 +836,105 @@ class TCPServer { } } +/** + * This class is used by the server for each socket instance + */ +class Socket { + + constructor(serverInstance, socketInstance) { + this.tcpServer = serverInstance; + this.tcpSocket = socketInstance; + + this.id = crypto.randomUUID(); + + this.tcpSocket.id = crypto.randomUUID(); + } + + + on(event, callback) { + switch(event.toLowerCase()) { + case 'close': + this.tcpSocket.on('close', cb => { + this.tcpServer.removeSocketFromConnectedSockets(this); + callback(cb); + }); + break; + + case 'data': + //This will attempt to split incoming packets then return them each in a callback + this.tcpSocket.on('data', (dataBuffer) => { + try { + let packet = JSON.parse(dataBuffer.toString()); + if (packet.type === 'tcpsjs-data') { + let packetData = packet.data; + try { + JSON.parse(packetData); + }catch (e) {} + + callback(packetData) + } + }catch (e) { + this.tcpServer.worker.postMessage(JSON.stringify({ + type: 'parse_data', + data: { + socketId: this.id, + data: dataBuffer.toString(), + useCompression: this.tcpServer.useCompression + } + })); + } + }); + break; + + case 'drain': + this.tcpSocket.on('data', callback); + break; + + case 'end': + this.tcpSocket.on('end', callback); + break; + + case 'error': + this.tcpSocket.on('error', callback); + break; + + case 'lookup': + this.tcpSocket.on('lookup', callback); + break; + + default: + console.log("There was an issue listening to the event '" + event + "'"); + + callback(null); + break; + } + } + + write(data) { + this.emit(data); + } + emit(data) { + try { //If err, it isnt a json + data = JSON.stringify(data); + }catch (e) { + try { //Try to stringify it anyways + data = data.toString(); + }catch (e) {} + } + + if (this.tcpServer.useChunking && data.length >= chunkSize) { //If it is less than chunksize, it can be sent in one packet + this.tcpServer.worker.postMessage(JSON.stringify({type: 'write_chunk', socketId: socket.id, data: {packetData: data, chunkSize: chunkSize, useCompression: this.useCompression} })); + + }else { //This is sent in one packet + if (this.tcpServer.useCompression) { + this.tcpSocket.write(JSON.stringify({ type: 'tcpsjs-packet', data: Buffer.from(pako.deflate(data)) }) + ''); + }else { + this.tcpSocket.write(JSON.stringify({ type: 'tcpsjs-packet', data: data }) + ''); + } + } + } +} + module.exports = { TCPClient, TCPServer } diff --git a/README.md b/README.md index e96f62e..01551d8 100644 --- a/README.md +++ b/README.md @@ -9,16 +9,24 @@ Main features: * Built in self auto updating connected and all sockets array * No need to stringify or parse JSON's, the data you send is the data you receive, no annoying buffers * No limits from tcp +* The only speed limit is your network, in closed networks, it is able to perfectly handle packets through a for loop * Built in heartbeats with timeout error * Built in packet compression using ZLib * Settings for each feature so you can setup the server YOUR way +Beta Features: +* Built in packet chunking system (This feature is inconsistent, it depends on the computer that it is running on) + A few things to watch out for: * Both the client and the server must have heartbeats set to true for it to work Required Modules: * [Pako](https://github.com/nodeca/pako) (ZLib compression library) +## The Best Part +The best part about this library, is the syntax is the exact same as regular tcp, meaning that you can fully migrate your project +over without any hassle + # Getting started ## Client: @@ -93,17 +101,28 @@ let {TCPServer} = require('Limitless-TCP'); /** * @param settings = { //Any null values will be set to true * useHeartbeat: bool, - * useCompression: bool + * useCompression: bool, + * useChunking: bool (Beta Feature) * } */ let tcpServer = new TCPServer( num: port, obj: settings ) tcpServer.listen(); -tcpServer.on( str: event, null/socket: socket, (callback) => {} ); //If the socket field is null then it listens for tcpServer events instead of socket specific -tcpServer.emit( data, socket: socket ); +tcpServer.on( str: event, (callback) => {} ); ``` +### Writing a message to socket +```javascript +tcpServer.on( 'connect', (socket) => { + socket.emit( any: message ); + socket.write( any: message ); +}); +``` + +This is also how you would achieve chunking, it is built in and automatically detected when a packet gets overs 14000 bytes. +Setting useChunking to false would just attempt to send the packet in one go no matter the size. + ### Connected Sockets and All Sockets: There is a built-in, auto updating array with all the connected sockets and every socket that is and has been connected (In its runtime, a restart would reset this) ```javascript