From 9cd004029317e4ffebe1c40251e75de58f0d6567 Mon Sep 17 00:00:00 2001 From: Bastian Rihm Date: Tue, 12 Nov 2024 17:10:58 +0100 Subject: [PATCH] Fix concurrent autoupdate single requests --- .../app/site/services/auth-check.service.ts | 2 +- .../autoupdate-communication.service.ts | 10 +++++++--- .../services/autoupdate/autoupdate.service.ts | 20 ++++++++++++++----- 3 files changed, 23 insertions(+), 9 deletions(-) diff --git a/client/src/app/site/services/auth-check.service.ts b/client/src/app/site/services/auth-check.service.ts index 36c42aab37..8514bd9ff6 100644 --- a/client/src/app/site/services/auth-check.service.ts +++ b/client/src/app/site/services/auth-check.service.ts @@ -97,7 +97,7 @@ export class AuthCheckService { if (typeof info === `string`) { meetingIdString = this.osRouter.getMeetingId(info); } - if (Number.isNaN(Number(meetingIdString))) { + if (Number.isNaN(Number(meetingIdString)) || +meetingIdString <= 0) { return false; } await this.fetchMeetingIfNotExists(+meetingIdString); diff --git a/client/src/app/site/services/autoupdate/autoupdate-communication.service.ts b/client/src/app/site/services/autoupdate/autoupdate-communication.service.ts index 8d00f2e86f..c0ae037ec0 100644 --- a/client/src/app/site/services/autoupdate/autoupdate-communication.service.ts +++ b/client/src/app/site/services/autoupdate/autoupdate-communication.service.ts @@ -36,7 +36,7 @@ import { SUBSCRIPTION_SUFFIX } from '../model-request.service'; }) export class AutoupdateCommunicationService { private autoupdateDataObservable: Observable; - private openResolvers = new Map) => void>(); + private openResolvers = new Map) => void)[]>(); private endpointName: string; private autoupdateEndpointStatus: 'healthy' | 'unhealthy' = `healthy`; private unhealtyTimeout: any; @@ -130,7 +130,11 @@ export class AutoupdateCommunicationService { public open(streamId: Id | null, description: string, request: ModelRequest, params = {}): Promise { return new Promise((resolve, reject) => { const requestHash = djb2hash(JSON.stringify(request)); - this.openResolvers.set(requestHash, resolve); + if (this.openResolvers.has(requestHash)) { + this.openResolvers.get(requestHash).push(resolve); + } else { + this.openResolvers.set(requestHash, [resolve]); + } this.sharedWorker .sendMessage(`autoupdate`, { action: `open`, @@ -298,7 +302,7 @@ export class AutoupdateCommunicationService { return; } - this.openResolvers.get(data.content.requestHash)(data.content?.streamId); + this.openResolvers.get(data.content.requestHash).forEach(r => r(data.content?.streamId)); this.openResolvers.delete(data.content.requestHash); } diff --git a/client/src/app/site/services/autoupdate/autoupdate.service.ts b/client/src/app/site/services/autoupdate/autoupdate.service.ts index d8644f9c7f..58556c129a 100644 --- a/client/src/app/site/services/autoupdate/autoupdate.service.ts +++ b/client/src/app/site/services/autoupdate/autoupdate.service.ts @@ -71,7 +71,7 @@ export class AutoupdateService { private _activeRequestObjects: AutoupdateSubscriptionMap = {}; private _mutex = new Mutex(); private _currentQueryParams: QueryParams | null = null; - private _resolveDataReceived: ((value: ModelData) => void)[] = []; + private _resolveDataReceived: { [key: number]: ((value: ModelData) => void)[] } = []; public constructor( private httpEndpointService: HttpStreamEndpointService, @@ -224,8 +224,13 @@ export class AutoupdateService { ); let rejectReceivedData: any; + let resolveIdx: number; const receivedData = new Promise((resolve, reject) => { - this._resolveDataReceived[id] = resolve; + if (this._resolveDataReceived[id]) { + resolveIdx = this._resolveDataReceived[id].push(resolve) - 1; + } else { + this._resolveDataReceived[id] = [resolve]; + } rejectReceivedData = reject; }); receivedData.catch((e: Error) => { @@ -238,9 +243,9 @@ export class AutoupdateService { close: (): void => { this.communication.close(id); delete this._activeRequestObjects[id]; - if (this._resolveDataReceived[id]) { + if (this._resolveDataReceived[id] && this._resolveDataReceived[id][resolveIdx]) { rejectReceivedData(new Error(`Connection canceled`)); - delete this._resolveDataReceived[id]; + delete this._resolveDataReceived[id][resolveIdx]; } console.debug(`[autoupdate] stream closed:`, description); @@ -323,7 +328,12 @@ export class AutoupdateService { this.communication.cleanupCollections(requestId, deletedModels); if (this._resolveDataReceived[requestId]) { - this._resolveDataReceived[requestId](modelData); + for (let i = 0; i < this._resolveDataReceived[requestId].length; i++) { + if (this._resolveDataReceived[requestId][i]) { + this._resolveDataReceived[requestId][i](modelData); + delete this._resolveDataReceived[requestId][i]; + } + } delete this._resolveDataReceived[requestId]; } }