Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(deps): upgrade socket.io-adapter #6

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# History

- [0.3.2](#032-2024-01-23) (Jan 2024)
- [0.3.1](#031-2024-01-10) (Jan 2024)
- [0.3.0](#030-2023-02-23) (Feb 2023)
- [0.2.1](#021-2022-05-03) (May 2022)
Expand All @@ -8,6 +9,17 @@

# Release notes

## [0.3.2](https://github.com/socketio/socket.io-mongo-adapter/compare/0.3.1...0.3.2) (2024-01-23)


### Bug Fixes

* add support for AWS DocumentDB ([#21](https://github.com/socketio/socket.io-mongo-adapter/issues/21)) ([0c80f7f](https://github.com/socketio/socket.io-mongo-adapter/commit/0c80f7fd1da772cc54971fd93a1fa93f0c5e47d0))
* ensure CSR works with a capped collection ([d3fa038](https://github.com/socketio/socket.io-mongo-adapter/commit/d3fa03874038ed9ec011d8795ac7dc6d840f4abe))
* exclude offline nodes when calling serverCount() ([e2fb8c2](https://github.com/socketio/socket.io-mongo-adapter/commit/e2fb8c2f9d126e763e4f0c0ffba158f2d0c5c17a))



## [0.3.1](https://github.com/socketio/socket.io-mongo-adapter/compare/0.3.0...0.3.1) (2024-01-10)


Expand Down
98 changes: 68 additions & 30 deletions lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import {
} from "socket.io-adapter";
import { randomBytes } from "crypto";
import { ObjectId, MongoServerError, WithId, Document } from "mongodb";
import type { Collection, ChangeStream, ResumeToken } from "mongodb";
import type { Collection, ChangeStream, ChangeStreamOptions } from "mongodb";

const randomId = () => randomBytes(8).toString("hex");
const debug = require("debug")("socket.io-mongo-adapter");
Expand Down Expand Up @@ -161,7 +161,7 @@ export function createAdapter(
let isClosed = false;
let adapters = new Map<string, MongoAdapter>();
let changeStream: ChangeStream;
let resumeToken: ResumeToken;
let changeStreamOpts: ChangeStreamOptions = {};

const initChangeStream = () => {
if (isClosed || (changeStream && !changeStream.closed)) {
Expand All @@ -178,14 +178,12 @@ export function createAdapter(
},
},
],
{
resumeAfter: resumeToken,
}
changeStreamOpts
);

changeStream.on("change", (event: any) => {
if (event.operationType === "insert") {
resumeToken = changeStream.resumeToken;
changeStreamOpts.resumeAfter = changeStream.resumeToken;
adapters.get(event.fullDocument?.nsp)?.onEvent(event);
}
});
Expand All @@ -197,7 +195,7 @@ export function createAdapter(
!err.hasErrorLabel("ResumableChangeStreamError")
) {
// the resume token was not found in the oplog
resumeToken = null;
changeStreamOpts = {};
}
});

Expand Down Expand Up @@ -617,6 +615,13 @@ export class MongoAdapter extends Adapter {
}

public serverCount(): Promise<number> {
this.nodesMap.forEach((lastSeen, uid) => {
const nodeSeemsDown = Date.now() - lastSeen > this.heartbeatTimeout;
if (nodeSeemsDown) {
debug("node %s seems down", uid);
this.nodesMap.delete(uid);
}
});
return Promise.resolve(1 + this.nodesMap.size);
}

Expand Down Expand Up @@ -671,20 +676,9 @@ export class MongoAdapter extends Adapter {
}).catch(onPublishError);
}

private getExpectedResponseCount() {
this.nodesMap.forEach((lastSeen, uid) => {
const nodeSeemsDown = Date.now() - lastSeen > this.heartbeatTimeout;
if (nodeSeemsDown) {
debug("node %s seems down", uid);
this.nodesMap.delete(uid);
}
});
return this.nodesMap.size;
}

async fetchSockets(opts: BroadcastOptions): Promise<any[]> {
const localSockets = await super.fetchSockets(opts);
const expectedResponseCount = this.getExpectedResponseCount();
const expectedResponseCount = (await this.serverCount()) - 1;

if (opts.flags?.local || expectedResponseCount === 0) {
return localSockets;
Expand Down Expand Up @@ -745,7 +739,7 @@ export class MongoAdapter extends Adapter {

private async serverSideEmitWithAck(packet: any[]) {
const ack = packet.pop();
const expectedResponseCount = this.getExpectedResponseCount();
const expectedResponseCount = (await this.serverCount()) - 1;

debug(
'waiting for %d responses to "serverSideEmit" request',
Expand Down Expand Up @@ -812,28 +806,22 @@ export class MongoAdapter extends Adapter {
try {
results = await Promise.all([
// could use a sparse index on [data.pid] (only index the documents whose type is EventType.SESSION)
this.mongoCollection.findOneAndDelete({
type: EventType.SESSION,
"data.pid": pid,
}),
this.findSession(pid),
this.mongoCollection.findOne({
type: EventType.BROADCAST,
_id: eventOffset,
}),
]);
} catch (e) {
debug("error while fetching session: %s", (e as Error).message);
return Promise.reject("error while fetching session");
}

const result = (results[0]?.ok
? results[0].value // mongodb@5
: results[0]) as unknown as WithId<Document>; // mongodb@6

if (!result || !results[1]) {
if (!results[0] || !results[1]) {
return Promise.reject("session or offset not found");
}

const session = result.data;
const session = results[0].data;

// could use a sparse index on [_id, nsp, data.opts.rooms, data.opts.except] (only index the documents whose type is EventType.BROADCAST)
/* addition daniel genis 20231026
Expand Down Expand Up @@ -882,4 +870,54 @@ export class MongoAdapter extends Adapter {

return session;
}

private findSession(
pid: PrivateSessionId
): Promise<WithId<Document> | undefined> {
const isCollectionCapped = !this.addCreatedAtField;
if (isCollectionCapped) {
return this.mongoCollection
.findOne(
{
type: EventType.SESSION,
"data.pid": pid,
},
{
sort: {
_id: -1,
},
}
)
.then((result) => {
if (!result) {
debug("session not found");
return;
}

if (result.data.sid) {
debug("session found, adding tombstone");

// since the collection is capped, we cannot remove documents from it, so we add a tombstone to prevent recovering the same session twice
// note: we could also have used two distinct collections, one for the events (capped) and the other for the sessions (not capped, with a TTL)
const TOMBSTONE_SESSION = { pid, tombstone: true };
this.persistSession(TOMBSTONE_SESSION);

return result;
} else {
debug("tombstone session found");
}
});
} else {
return this.mongoCollection
.findOneAndDelete({
type: EventType.SESSION,
"data.pid": pid,
})
.then((result) => {
return result?.ok && result.value
? result.value // mongodb@5
: (result as unknown as WithId<Document>); // mongodb@6
});
}
}
}
98 changes: 49 additions & 49 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,50 +1,50 @@
{
"name": "@gynzy/mongo-adapter",
"version": "0.3.1-gynzy-1",
"description": "The Socket.IO MongoDB adapter, allowing to broadcast events between several Socket.IO servers",
"license": "MIT",
"repository": {
"type": "git",
"url": "[email protected]:socketio/socket.io-mongo-adapter.git"
},
"files": [
"dist/"
],
"main": "./dist/index.js",
"types": "./dist/index.d.ts",
"scripts": {
"test": "npm run format:check && tsc && nyc mocha --require ts-node/register test/index.ts",
"format:check": "prettier --parser typescript --check 'lib/**/*.ts' 'test/**/*.ts'",
"format:fix": "prettier --parser typescript --write 'lib/**/*.ts' 'test/**/*.ts'",
"build": "tsc"
},
"dependencies": {
"debug": "~4.3.1",
"mongodb": "*"
},
"peerDependencies": {
"socket.io-adapter": "^2.5.2"
},
"devDependencies": {
"@types/expect.js": "^0.3.29",
"@types/mocha": "^8.2.1",
"@types/node": "^14.14.7",
"expect.js": "0.3.1",
"mocha": "^10.2.0",
"nyc": "^15.1.0",
"prettier": "^2.1.2",
"socket.io": "^4.6.1",
"socket.io-client": "^4.6.1",
"ts-node": "^10.9.1",
"typescript": "^4.9.4"
},
"engines": {
"node": ">=10.0.0"
},
"keywords": [
"socket.io",
"mongodb",
"mongo",
"adapter"
]
}
"name": "@gynzy/mongo-adapter",
"version": "0.3.2-gynzy",
"description": "The Socket.IO MongoDB adapter, allowing to broadcast events between several Socket.IO servers",
"license": "MIT",
"repository": {
"type": "git",
"url": "[email protected]:socketio/socket.io-mongo-adapter.git"
},
"files": [
"dist/"
],
"main": "./dist/index.js",
"types": "./dist/index.d.ts",
"scripts": {
"test": "npm run format:check && tsc && nyc mocha --require ts-node/register test/index.ts",
"format:check": "prettier --parser typescript --check 'lib/**/*.ts' 'test/**/*.ts'",
"format:fix": "prettier --parser typescript --write 'lib/**/*.ts' 'test/**/*.ts'",
"build": "tsc"
},
"dependencies": {
"debug": "~4.3.1",
"mongodb": "*"
},
"peerDependencies": {
"socket.io-adapter": "^2.5.4"
},
"devDependencies": {
"@types/expect.js": "^0.3.29",
"@types/mocha": "^8.2.1",
"@types/node": "^14.14.7",
"expect.js": "0.3.1",
"mocha": "^10.2.0",
"nyc": "^15.1.0",
"prettier": "^2.1.2",
"socket.io": "^4.6.1",
"socket.io-client": "^4.6.1",
"ts-node": "^10.9.1",
"typescript": "^4.9.4"
},
"engines": {
"node": ">=10.0.0"
},
"keywords": [
"socket.io",
"mongodb",
"mongo",
"adapter"
]
}
Loading
Loading