Skip to content

Commit

Permalink
Merge pull request #4918 from novuhq/nv-3205-do-not-perform-fetchsock…
Browse files Browse the repository at this point in the history
…ets-when-service-is-shutting-down

refactor: do not perform fetchsockets when service is shutting down
  • Loading branch information
djabarovgeorge authored Dec 5, 2023
2 parents 773dde0 + 1bb87ea commit 4de0c45
Show file tree
Hide file tree
Showing 15 changed files with 70 additions and 51 deletions.
1 change: 0 additions & 1 deletion apps/api/src/bootstrap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@ export async function bootstrap(expressApp?): Promise<INestApplication> {
await app.listen(process.env.PORT);
}

// Starts listening for shutdown hooks
app.enableShutdownHooks();

Logger.log(`Started application in NODE_ENV=${process.env.NODE_ENV} on port ${process.env.PORT}`);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import { Inject, Injectable, Logger } from '@nestjs/common';
const nr = require('newrelic');
import {
getExecutionLogWorkerOptions,
INovuWorker,
PinoLogger,
storage,
Store,
Expand All @@ -18,13 +17,12 @@ import { ObservabilityBackgroundTransactionEnum } from '@novu/shared';
const LOG_CONTEXT = 'ExecutionLogWorker';

@Injectable()
export class ExecutionLogWorker extends ExecutionLogWorkerService implements INovuWorker {
export class ExecutionLogWorker extends ExecutionLogWorkerService {
constructor(
private createExecutionDetails: CreateExecutionDetails,
public workflowInMemoryProviderService: WorkflowInMemoryProviderService
) {
super(new BullMqService(workflowInMemoryProviderService));

this.initWorker(this.getWorkerProcessor(), this.getWorkerOptions());
}

Expand Down
3 changes: 1 addition & 2 deletions apps/worker/src/app/workflow/services/standard.worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import { IJobData, ObservabilityBackgroundTransactionEnum } from '@novu/shared';
import {
BullMqService,
getStandardWorkerOptions,
INovuWorker,
Job,
PinoLogger,
StandardWorkerService,
Expand All @@ -29,7 +28,7 @@ import {
const LOG_CONTEXT = 'StandardWorker';

@Injectable()
export class StandardWorker extends StandardWorkerService implements INovuWorker {
export class StandardWorker extends StandardWorkerService {
constructor(
private handleLastFailedJob: HandleLastFailedJob,
private runJob: RunJob,
Expand Down
5 changes: 2 additions & 3 deletions apps/worker/src/app/workflow/services/workflow.worker.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import { Inject, Injectable, Logger } from '@nestjs/common';
import { Injectable, Logger } from '@nestjs/common';
const nr = require('newrelic');
import {
getWorkflowWorkerOptions,
INovuWorker,
PinoLogger,
storage,
Store,
Expand All @@ -19,7 +18,7 @@ import { ObservabilityBackgroundTransactionEnum } from '@novu/shared';
const LOG_CONTEXT = 'WorkflowWorker';

@Injectable()
export class WorkflowWorker extends WorkflowWorkerService implements INovuWorker {
export class WorkflowWorker extends WorkflowWorkerService {
constructor(
private triggerEventUsecase: TriggerEvent,
public workflowInMemoryProviderService: WorkflowInMemoryProviderService
Expand Down
2 changes: 2 additions & 0 deletions apps/ws/src/bootstrap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ export async function bootstrap() {

app.useWebSocketAdapter(redisIoAdapter);

app.enableShutdownHooks();

await app.init();

try {
Expand Down
6 changes: 3 additions & 3 deletions apps/ws/src/health/health.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ import { TerminusModule } from '@nestjs/terminus';
import { HealthController } from './health.controller';
import { SharedModule } from '../shared/shared.module';
import { WSServerHealthIndicator } from '../socket/services';
import { WSGateway } from '../socket/ws.gateway';
import { SocketModule } from '../socket/socket.module';

const PROVIDERS = [WSServerHealthIndicator, WSGateway];
const PROVIDERS = [WSServerHealthIndicator];

@Module({
imports: [TerminusModule, SharedModule],
imports: [TerminusModule, SharedModule, SocketModule],
providers: PROVIDERS,
controllers: [HealthController],
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ export class SubscriberOnlineService {
async handleDisconnection(subscriber: ISubscriberJwt, activeConnections: number) {
const lastOnlineAt = new Date().toISOString();
let isOnline = false;

if (activeConnections > 1) {
isOnline = true;
}
Expand Down
3 changes: 1 addition & 2 deletions apps/ws/src/socket/services/web-socket.worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import { Injectable, Logger } from '@nestjs/common';
import {
BullMqService,
getWebSocketWorkerOptions,
INovuWorker,
WebSocketsWorkerService,
WorkerOptions,
WorkflowInMemoryProviderService,
Expand All @@ -16,7 +15,7 @@ import { ObservabilityBackgroundTransactionEnum } from '@novu/shared';
const LOG_CONTEXT = 'WebSocketWorker';

@Injectable()
export class WebSocketWorker extends WebSocketsWorkerService implements INovuWorker {
export class WebSocketWorker extends WebSocketsWorkerService {
constructor(
private externalServicesRoute: ExternalServicesRoute,
private workflowInMemoryProviderService: WorkflowInMemoryProviderService
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,6 @@ export class ExternalServicesRoute {
return;
}

const connection = await this.connectionExist(command);

if (!connection) {
return;
}

let unreadCount = this.extractCount(command.payload?.unreadCount);

if (unreadCount === undefined) {
Expand All @@ -91,12 +85,6 @@ export class ExternalServicesRoute {
return;
}

const connection = await this.connectionExist(command);

if (!connection) {
return;
}

let unseenCount = this.extractCount(command.payload?.unseenCount);

if (unseenCount === undefined) {
Expand Down
50 changes: 45 additions & 5 deletions apps/ws/src/socket/ws.gateway.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
const nr = require('newrelic');
import { Server, Socket } from 'socket.io';
import { JwtService } from '@nestjs/jwt';
import { OnGatewayConnection, OnGatewayDisconnect, WebSocketGateway, WebSocketServer } from '@nestjs/websockets';
import { Logger } from '@nestjs/common';
import { Server, Socket } from 'socket.io';

import { ISubscriberJwt, ObservabilityBackgroundTransactionEnum } from '@novu/shared';
import { IDestroy } from '@novu/application-generic';

import { SubscriberOnlineService } from '../shared/subscriber-online';

const LOG_CONTEXT = 'WSGateway';

@WebSocketGateway()
export class WSGateway implements OnGatewayConnection, OnGatewayDisconnect {
export class WSGateway implements OnGatewayConnection, OnGatewayDisconnect, IDestroy {
private isShutdown = false;

constructor(private jwtService: JwtService, private subscriberOnlineService: SubscriberOnlineService) {}

@WebSocketServer()
Expand Down Expand Up @@ -82,7 +85,18 @@ export class WSGateway implements OnGatewayConnection, OnGatewayDisconnect {
}
}

/*
* This method is called when a client disconnects from the server.
* * When a shutdown is in progress, we opt out of updating the subscriber status,
* assuming that when the current instance goes down, another instance will take its place and handle the subscriber status update.
*/
private async processDisconnectionRequest(connection: Socket) {
if (!this.isShutdown) {
await this.handlerSubscriberDisconnection(connection);
}
}

private async handlerSubscriberDisconnection(connection: Socket) {
const token = this.extractToken(connection);

if (!token || token === 'null') {
Expand All @@ -98,6 +112,12 @@ export class WSGateway implements OnGatewayConnection, OnGatewayDisconnect {
await this.subscriberOnlineService.handleDisconnection(subscriber, activeConnections);
}

private async getActiveConnections(socket: Socket, subscriberId: string) {
const activeSockets = await socket.in(subscriberId).fetchSockets();

return activeSockets.length;
}

private async processConnectionRequest(connection: Socket) {
const token = this.extractToken(connection);

Expand Down Expand Up @@ -129,9 +149,29 @@ export class WSGateway implements OnGatewayConnection, OnGatewayDisconnect {
socket.disconnect();
}

private async getActiveConnections(socket: Socket, subscriberId: string) {
const activeSockets = await socket.in(subscriberId).fetchSockets();
async gracefulShutdown(): Promise<void> {
try {
if (!this.server) {
Logger.error('WS server was not initialized while executing shutdown', LOG_CONTEXT);

return activeSockets.length;
return;
}

Logger.log('Closing WS server for incoming new connections', LOG_CONTEXT);
this.server.close();

Logger.log('Disconnecting active sockets connections', LOG_CONTEXT);
this.server.sockets.disconnectSockets();
} catch (e) {
Logger.error(e, 'Unexpected exception was thrown while graceful shut down was executed', LOG_CONTEXT);
throw e;
} finally {
Logger.log(`Graceful shutdown down has finished`, LOG_CONTEXT);
}
}

async onModuleDestroy(): Promise<void> {
this.isShutdown = true;
await this.gracefulShutdown();
}
}
1 change: 1 addition & 0 deletions packages/application-generic/src/modules/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
export { QueuesModule } from './queues.module';
export { MetricsModule } from './metrics.module';
export * from './interfaces';
4 changes: 4 additions & 0 deletions packages/application-generic/src/modules/interfaces.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
export interface IDestroy {
gracefulShutdown?: () => Promise<void>;
onModuleDestroy: () => Promise<void>;
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,11 @@ import { setTimeout } from 'timers/promises';
import { Worker } from '../bull-mq';

import { IHealthIndicator } from '../../health';
import { IDestroy } from '../../modules';

export interface INovuWorker {
export interface INovuWorker extends IDestroy {
readonly DEFAULT_ATTEMPTS: number;
gracefulShutdown: () => Promise<void>;
readonly topic: string;
onModuleDestroy: () => Promise<void>;
pause: () => Promise<void>;
resume: () => Promise<void>;
worker: Worker;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
import { JobTopicNameEnum } from '@novu/shared';

import { INovuWorker } from '../readiness';
import { WorkerBaseService } from './worker-base.service';
import { BullMqService } from '../bull-mq';

const LOG_CONTEXT = 'SubscriberProcessWorkerService';

export class SubscriberProcessWorkerService
extends WorkerBaseService
implements INovuWorker
{
export class SubscriberProcessWorkerService extends WorkerBaseService {
constructor(private bullMqService: BullMqService) {
super(JobTopicNameEnum.PROCESS_SUBSCRIBER, bullMqService);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,8 @@
import { IJobData, JobTopicNameEnum } from '@novu/shared';
import { Inject, Injectable, Logger } from '@nestjs/common';

import {
BullMqService,
JobsOptions,
Processor,
Worker,
WorkerOptions,
} from '../bull-mq';
import { WorkflowInMemoryProviderService } from '../in-memory-provider';
import { JobTopicNameEnum } from '@novu/shared';
import { Logger } from '@nestjs/common';

import { BullMqService, Processor, Worker, WorkerOptions } from '../bull-mq';
import { INovuWorker } from '../readiness';

const LOG_CONTEXT = 'WorkerService';

Expand All @@ -19,7 +13,7 @@ export type WorkerProcessor =

export { WorkerOptions };

export class WorkerBaseService {
export class WorkerBaseService implements INovuWorker {
private instance: BullMqService;

public readonly DEFAULT_ATTEMPTS = 3;
Expand Down

0 comments on commit 4de0c45

Please sign in to comment.