Skip to content

Commit

Permalink
fix: private properties
Browse files Browse the repository at this point in the history
  • Loading branch information
DIY0R committed Nov 12, 2024
1 parent 8611073 commit d7547fe
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 14 deletions.
11 changes: 8 additions & 3 deletions lib/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ const { EventEmitter } = require('stream');

class Connection extends EventEmitter {
#connections = new Map();

constructor(port, targetNode) {
super();
this.#start(port, targetNode);
Expand All @@ -22,12 +23,11 @@ class Connection extends EventEmitter {
socket.on('close', () => this.#deleteConnection(connectionId));
socket.on('end', () => console.log('end'));
socket.on('timeout', () => console.log('timeout'));
socket.on('error', () => this.#errorHandler());
socket.on('error', error => this.#errorHandler(error));
socket
.pipe(collectMessage())
.on('data', message => this.#onData(connectionId, message));
this.#connections.set(connectionId, socket);
this._newConnection(connectionId);
this.#newConnection(connectionId, socket);
}

_publish(connectionId, message) {
Expand All @@ -48,6 +48,11 @@ class Connection extends EventEmitter {
this._deleteConnection(connectionId);
}

#newConnection(connectionId, socket) {
this.#connections.set(connectionId, socket);
this._newConnection(connectionId);
}

#onData(connectionId, message) {
this._onMessage(connectionId, message);
}
Expand Down
38 changes: 27 additions & 11 deletions lib/messaging.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,17 @@ const {
} = require('./constants');

class Messaging extends Connection {
NODE_ID = uuid();
#NODE_ID = uuid();
#seenMessages = new Set();
neighbors = new Map();
seenMessages = new Set();

constructor(port, nodeId, targetNode) {
constructor(port, nodePrefix, targetNode) {
super(port, targetNode);
if (nodeId) this.NODE_ID = nodeId;
this.#cleanSeenMessages();
this.#init(nodePrefix);
}

_newConnection(connectionId) {
this._publish(connectionId, createData(HANDSHAKE, this.NODE_ID));
this._publish(connectionId, createData(HANDSHAKE, this.#NODE_ID));
}

_onMessage(connectionId, messageObject) {
Expand All @@ -34,7 +33,7 @@ class Messaging extends Connection {

_deleteConnection(connectionId) {
const nodeId = this.#findNodeId(connectionId);
if (nodeId) this.neighbors.delete(connectionId);
if (nodeId) this.neighbors.delete(nodeId);
}

async publish(nodeId, message) {
Expand All @@ -46,7 +45,7 @@ class Messaging extends Connection {
const { messageId = uuid(), ttl = TTL } = param || {};
const connectionId = this.neighbors.get(nodeId);
const targets = connectionId ? [connectionId] : this.neighbors;
this.seenMessages.add(messageId);
this.#addSeenMessage(messageId);
targets.forEach(target =>
this._publish(
target,
Expand All @@ -72,12 +71,20 @@ class Messaging extends Connection {

[DM](_, messageObject) {
const { messageId, data, to, ttl } = messageObject;
if (to === this.NODE_ID) return void this.emit(DM, data);
const isSeenMessage = this.seenMessages.has(messageId);
const isSeenMessage = this.#seenMessages.has(messageId);
if (ttl < 1 || isSeenMessage) return;
if (to === this.#NODE_ID) {
this.#addSeenMessage(messageId);
this.emit(DM, data);
return;
}
this.#broadcast(to, data, { messageId, ttl: ttl - 1 });
}

get nodeId() {
return this.#NODE_ID;
}

async #neighborCheck() {
return new Promise((resolve, reject) => {
if (this.neighbors.size > 0) return resolve();
Expand All @@ -95,8 +102,17 @@ class Messaging extends Connection {
});
}

#init(nodePrefix) {
if (nodePrefix) this.#NODE_ID = nodePrefix + '-' + uuid();
this.#cleanSeenMessages();
}

#addSeenMessage(messageId) {
this.#seenMessages.add(messageId);
}

#cleanSeenMessages() {
setInterval(() => this.seenMessages.clear(), TIME_CLEAN_MSG);
setInterval(() => this.#seenMessages.clear(), TIME_CLEAN_MSG);
}
}

Expand Down

0 comments on commit d7547fe

Please sign in to comment.