Skip to content

Commit

Permalink
Merge pull request #4906 from novuhq/nv-3202-race-condition-when-read…
Browse files Browse the repository at this point in the history
…ing-the-service-instance-sw-health-indicator

Race condition when reading the service instance - SW Health Indicator
  • Loading branch information
djabarovgeorge authored Dec 4, 2023
2 parents 873e42f + 9c9c2fc commit 5f68810
Show file tree
Hide file tree
Showing 7 changed files with 71 additions and 17 deletions.
5 changes: 4 additions & 1 deletion apps/ws/src/health/health.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@ import { HealthCheck, HealthCheckResult, HealthCheckService, HttpHealthIndicator
import { DalServiceHealthIndicator, WebSocketsQueueServiceHealthIndicator } from '@novu/application-generic';

import { version } from '../../package.json';
import { WSHealthIndicator } from '../socket/services';

@Controller('v1/health-check')
export class HealthController {
constructor(
private healthCheckService: HealthCheckService,
private dalHealthIndicator: DalServiceHealthIndicator,
private webSocketsQueueHealthIndicator: WebSocketsQueueServiceHealthIndicator
private webSocketsQueueHealthIndicator: WebSocketsQueueServiceHealthIndicator,
private wsHealthIndicator: WSHealthIndicator
) {}

@Get()
Expand All @@ -18,6 +20,7 @@ export class HealthController {
const result = await this.healthCheckService.check([
async () => this.dalHealthIndicator.isHealthy(),
async () => this.webSocketsQueueHealthIndicator.isHealthy(),
async () => this.wsHealthIndicator.isHealthy(),
async () => {
return {
apiVersion: {
Expand Down
5 changes: 5 additions & 0 deletions apps/ws/src/health/health.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,14 @@ import { TerminusModule } from '@nestjs/terminus';

import { HealthController } from './health.controller';
import { SharedModule } from '../shared/shared.module';
import { WSGateway } from '../socket/ws.gateway';
import { WSHealthIndicator } from '../socket/services';

const PROVIDERS = [WSHealthIndicator, WSGateway];

@Module({
imports: [TerminusModule, SharedModule],
providers: PROVIDERS,
controllers: [HealthController],
})
export class HealthModule {}
1 change: 1 addition & 0 deletions apps/ws/src/socket/services/index.ts
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
export { WebSocketWorker } from './web-socket.worker';
export { WSHealthIndicator } from './ws-health-indicator.service';
25 changes: 25 additions & 0 deletions apps/ws/src/socket/services/ws-health-indicator.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import { HealthIndicator, HealthIndicatorResult } from '@nestjs/terminus';
import { Injectable } from '@nestjs/common';

import { IHealthIndicator } from '@novu/application-generic';

import { WSGateway } from '../ws.gateway';

@Injectable()
export class WSHealthIndicator extends HealthIndicator implements IHealthIndicator {
private INDICATOR_KEY = 'ws';

constructor(private wsGateway: WSGateway) {
super();
}

async isHealthy(): Promise<HealthIndicatorResult> {
const status = !!this.wsGateway.server;

return this.getStatus(this.INDICATOR_KEY, status);
}

isActive(): Promise<HealthIndicatorResult> {
return this.isHealthy();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,21 @@ export class ExternalServicesRoute {

public async execute(command: ExternalServicesRouteCommand) {
const isOnline = await this.connectionExist(command);
if (isOnline) {
if (command.event === WebSocketEventEnum.RECEIVED) {
await this.processReceivedEvent(command);
}

if (command.event === WebSocketEventEnum.UNSEEN) {
await this.sendUnseenCountChange(command);
}

if (command.event === WebSocketEventEnum.UNREAD) {
await this.sendUnreadCountChange(command);
}

if (!isOnline) {
return;
}

if (command.event === WebSocketEventEnum.RECEIVED) {
await this.processReceivedEvent(command);
}

if (command.event === WebSocketEventEnum.UNSEEN) {
await this.sendUnseenCountChange(command);
}

if (command.event === WebSocketEventEnum.UNREAD) {
await this.sendUnreadCountChange(command);
}
}

Expand Down Expand Up @@ -127,7 +130,13 @@ export class ExternalServicesRoute {
}
}

private async connectionExist(command: ExternalServicesRouteCommand) {
private async connectionExist(command: ExternalServicesRouteCommand): Promise<boolean | undefined> {
if (!this.wsGateway.server) {
Logger.error('No sw server found, unable to check if connection exists', LOG_CONTEXT);

return;
}

return !!(await this.wsGateway.server.sockets.in(command.userId).fetchSockets()).length;
}
}
16 changes: 13 additions & 3 deletions apps/ws/src/socket/ws.gateway.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,21 @@
const nr = require('newrelic');

import { JwtService } from '@nestjs/jwt';
import { OnGatewayConnection, OnGatewayDisconnect, WebSocketGateway, WebSocketServer } from '@nestjs/websockets';
import { Logger } from '@nestjs/common';
import { Server, Socket } from 'socket.io';
import { JwtService } from '@nestjs/jwt';

import { ISubscriberJwt, ObservabilityBackgroundTransactionEnum } from '@novu/shared';

import { SubscriberOnlineService } from '../shared/subscriber-online';

const LOG_CONTEXT = 'WSGateway';

@WebSocketGateway()
export class WSGateway implements OnGatewayConnection, OnGatewayDisconnect {
constructor(private jwtService: JwtService, private subscriberOnlineService: SubscriberOnlineService) {}

@WebSocketServer()
server: Server;
server: Server | null;

async handleDisconnect(connection: Socket) {
// eslint-disable-next-line @typescript-eslint/no-this-alias
Expand Down Expand Up @@ -112,6 +116,12 @@ export class WSGateway implements OnGatewayConnection, OnGatewayDisconnect {

// eslint-disable-next-line @typescript-eslint/no-explicit-any
async sendMessage(userId: string, event: string, data: any) {
if (!this.server) {
Logger.error('No sw server available to send message', LOG_CONTEXT);

return;
}

this.server.to(userId).emit(event, data);
}

Expand Down
1 change: 1 addition & 0 deletions packages/application-generic/src/health/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ export * from './web-sockets-queue.health-indicator';
export * from './workflow-queue.health-indicator';
export * from './subscriber-process-queue.health-indicator';
export * from './queue-health-indicator.service';
export * from './health-indicator.interface';

0 comments on commit 5f68810

Please sign in to comment.