Skip to content

Commit

Permalink
#58 tweak socket registry
Browse files Browse the repository at this point in the history
- own module
- also clean up correctly when user got "kicked"
  • Loading branch information
[email protected] authored and [email protected] committed Jun 20, 2020
1 parent 81c4a2f commit 29f8575
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 97 deletions.
108 changes: 48 additions & 60 deletions server/src/socketManager.js
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import {v4 as uuid} from 'uuid';

import getLogger from './getLogger';
import socketRegistryFactory from './socketRegistry';

const LOGGER = getLogger('socketManager');

LOGGER.transports[0].level = 'debug';

export default function socketManagerFactory(commandProcessor, sendEventToRoom) {
const socketToUserIdMap = {};
const socketToRoomMap = {};
const registry = socketRegistryFactory();

return {
handleIncomingCommand,
Expand All @@ -23,35 +25,50 @@ export default function socketManagerFactory(commandProcessor, sendEventToRoom)
return;
}

const joinedRoomEvent = getJoinedRoomEvent(producedEvents);
if (joinedRoomEvent) {
registerUserWithSocket(socket.id, matchingUserId);
registerSocketWithRoom(joinedRoomEvent.roomId, socket);
}

if (hasLeftRoomEvent(producedEvents)) {
unregisterUserWithSocket(socket.id);
unregisterSocketWithRoom(socket.id);
}
updateSocketRegistry(matchingUserId, producedEvents, socket);

sendEvents(producedEvents, producedEvents[0].roomId);
} catch (commandProcessingError) {
handleCommandProcessingError(commandProcessingError, msg, socket);
}
}

function updateSocketRegistry(userId, producedEvents, socket) {
const joinedRoomEvent = getJoinedRoomEvent(producedEvents);
if (joinedRoomEvent) {
registry.registerSocketMapping(socket, userId, joinedRoomEvent.roomId);
}

const leftRoomEvent = getLeftRoomEvent(producedEvents);
if (leftRoomEvent) {
registry.removeSocketMapping(socket.id, leftRoomEvent.userId, leftRoomEvent.roomId);
} else {
const kickedRoomEvent = getKickedRoomEvent(producedEvents);
if (kickedRoomEvent) {
registry.removeSocketMapping(socket.id, kickedRoomEvent.userId, kickedRoomEvent.roomId);
}
}
}

function getJoinedRoomEvent(producedEvents) {
if (!producedEvents) {
return undefined;
}
return producedEvents.find((e) => e.name === 'joinedRoom');
}

function hasLeftRoomEvent(producedEvents) {
function getLeftRoomEvent(producedEvents) {
if (!producedEvents) {
return undefined;
}
return producedEvents.find((e) => e.name === 'leftRoom');
}

function getKickedRoomEvent(producedEvents) {
if (!producedEvents) {
return undefined;
}
return !!producedEvents.find((e) => e.name === 'leftRoom');
return producedEvents.find((e) => e.name === 'kicked');
}

/**
Expand All @@ -65,9 +82,9 @@ export default function socketManagerFactory(commandProcessor, sendEventToRoom)
* @return {string} the userId
*/
function getUserIdForMessage(socketId, msg) {
const matchingUserId = socketToUserIdMap[socketId];
if (matchingUserId) {
return matchingUserId;
const mapping = registry.getMapping(socketId);
if (mapping && mapping.userId) {
return mapping.userId;
}

if (msg.name === 'joinRoom' && msg.payload && msg.payload.userId) {
Expand Down Expand Up @@ -108,52 +125,25 @@ export default function socketManagerFactory(commandProcessor, sendEventToRoom)
socket.emit('event', commandRejectedEvent);
}

/**
* we need do keep the mapping of a socket to userId,
* so that we can retrieve the userId for any message (command) sent on this socket
*/
function registerUserWithSocket(socketId, userId) {
socketToUserIdMap[socketId] = userId;
}

function unregisterUserWithSocket(socketId) {
delete socketToUserIdMap[socketId];
}

/**
* we need do keep the mapping of a socket to room,
* so that we can produce "user left" events on socket disconnect.
*
* also join sockets together in a socket.io "room" , so that we can emit messages to all sockets in that room
*/
function registerSocketWithRoom(roomId, socket) {
if (!roomId) {
throw new Error('Fatal! No roomId after "joinedRoom" to put into socketToRoomMap!');
}
socketToRoomMap[socket.id] = roomId;

socket.join(roomId, () => LOGGER.debug(`Socket ${socket.id} joined room ${roomId}`));
}

function unregisterSocketWithRoom(socketId) {
delete socketToRoomMap[socketId];
}

/**
* if the socket is disconnected (e.g. user closed browser tab), manually produce and handle
* a "leaveRoom" command that will mark the user.
*/
async function onDisconnect(socket) {
const userId = socketToUserIdMap[socket.id];
const roomId = socketToRoomMap[socket.id]; // socket.rooms is at this moment already emptied. so we have to use our own map
// socket.rooms is at this moment already emptied
const mapping = registry.getMapping(socket.id);

if (!userId || !roomId) {
if (!mapping) {
// this happens if user is on landing page, socket is opened, then leaves, never joined a room. perfectly fine.
LOGGER.debug(`Socket disconnected. no mapping to userId (${userId}) or roomId (${roomId})`);
LOGGER.debug(`Socket ${socket.id} disconnected. no mapping...`);
return;
}

if (isLastSocketForUserId(userId)) {
const {userId, roomId} = mapping;

LOGGER.debug(`Socket ${socket.id} disconnected. Mapping to user ${userId} in room ${roomId}`);

if (registry.isLastSocketForUserId(userId)) {
const leaveRoomCommand = {
id: uuid(),
roomId: roomId,
Expand All @@ -163,13 +153,11 @@ export default function socketManagerFactory(commandProcessor, sendEventToRoom)
}
};
await handleIncomingCommand(socket, leaveRoomCommand);
} else {
LOGGER.debug(
`User ${userId} in room ${roomId}, has more open sockets. Removing mapping for socket ${socket.id}`
);
registry.removeSocketMapping(socket.id, userId, roomId);
}
}

function isLastSocketForUserId(userId) {
const socketsOfThatUser = Object.values(socketToUserIdMap).filter(
(socketUserId) => socketUserId === userId
);
return socketsOfThatUser.length === 1;
}
}
79 changes: 79 additions & 0 deletions server/src/socketRegistry.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import getLogger from './getLogger';

const LOGGER = getLogger('socketRegistry');

LOGGER.transports[0].level = 'debug';

/**
* we need do keep the mapping of a socket to userId,
* so that we can retrieve the userId for any message (command) sent on this socket.
*
* we need do keep the mapping of a socket to room,
* so that we can produce "user left" events on socket disconnect.
*
* maps socket IDs to userIds and roomIds
*/
export default function socketRegistryFactory() {
const registry = {};

return {
registerSocketMapping,
removeSocketMapping,
isLastSocketForUserId,
getMapping
};

function getMapping(socketId) {
return registry[socketId];
}

function registerSocketMapping(socket, userId, roomId) {
LOGGER.debug(`Registering socket ${socket.id} : user ${userId} and room ${roomId}`);

if (registry[socket.id]) {
LOGGER.warn(`Overriding old mapping for socket ${socket.id}`);
}
registry[socket.id] = {
userId,
roomId
};

// also join sockets together in a socket.io "room" , so that we can emit messages to all sockets in that room
socket.join(roomId);
}

function removeSocketMapping(socketId, userId, roomId) {
LOGGER.debug(`Removing mapping: socket ${socketId} -> [user ${userId}, room ${roomId}]`);

if (registry[socketId].userId !== userId) {
LOGGER.warn(
`socket to userId mapping mismatch: socket ${socketId} maps to user ${registry[socketId].userId}. expected user ${userId}`
);
}
if (registry[socketId].roomId !== roomId) {
LOGGER.warn(
`socket to roomId mapping mismatch: socket ${socketId} maps to room ${registry[socketId].roomId}. expected room ${roomId}`
);
}

delete registry[socketId];
}

function isLastSocketForUserId(userId) {
const socketsOfThatUser = Object.entries(registry).filter(
(entry) => entry[1].userId === userId
);
if (socketsOfThatUser.length > 1) {
LOGGER.debug(
`User ${userId} has multiple open sockets: ${regEntriesToString(socketsOfThatUser)}`
);
}
return socketsOfThatUser.length === 1;
}

function regEntriesToString(regEntries) {
return regEntries
.map((re) => `${re[0]} -> [user ${re[1].userId}, room ${re[1].roomId}]`)
.join(' ');
}
}
38 changes: 1 addition & 37 deletions server/test/unit/socketManagerTest.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,32 +45,7 @@ test('registering new socket, new user, new room successfully', async () => {
expect(socket.join.mock.calls[0][0]).toEqual(joinedRoomEvent.roomId);
});

test('should throw if produced joinedRoom event has no roomId', async () => {
const sendEventToRoom = jest.fn();

const joinedRoomEvent = {
/* for some reason, no roomId*/
name: 'joinedRoom'
};

const socketManager = socketManagerFactory(
getMockCmdProcessor([joinedRoomEvent]),
sendEventToRoom
);

const socket = getMockSocketObject();
const dummyCommand = {
id: uuid()
};

await socketManager.handleIncomingCommand(socket, dummyCommand);
expectEmittedCommandRejected(
socket,
'Fatal! No roomId after "joinedRoom" to put into socketToRoomMap!'
);
});

test('should correctly handle joining & leaving multiple rooms in sequence', async () => {
test('should correctly handle joining & leaving multiple rooms in sequence', async () => {
const sendEventToRoom = jest.fn();
const mockCmdProcessor = jest.fn().mockName('mockCommandProcessor');
const socketManager = socketManagerFactory(mockCmdProcessor, sendEventToRoom);
Expand Down Expand Up @@ -147,17 +122,6 @@ test('should handle disconnect', async () => {
await socketManager.onDisconnect(getMockSocketObject());
});

function expectEmittedCommandRejected(socket, reason) {
expect(socket.emit.mock.calls.length).toBe(1);
expect(socket.emit.mock.calls[0][0]).toEqual('event');
expect(socket.emit.mock.calls[0][1]).toMatchObject({
name: 'commandRejected',
payload: {
reason
}
});
}

function getMockCmdProcessor(producedEvents) {
return jest
.fn(() =>
Expand Down

0 comments on commit 29f8575

Please sign in to comment.