Skip to content

Commit

Permalink
Merge branch 'next' into v0.22.x
Browse files Browse the repository at this point in the history
  • Loading branch information
LetItRock committed Dec 5, 2023
2 parents af48576 + 4de0c45 commit 47f6f33
Show file tree
Hide file tree
Showing 39 changed files with 185 additions and 117 deletions.
1 change: 0 additions & 1 deletion apps/api/src/app/shared/shared.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ const PROVIDERS = [
imports: [
QueuesModule.forRoot([
JobTopicNameEnum.EXECUTION_LOG,
JobTopicNameEnum.WORKFLOW,
JobTopicNameEnum.WEB_SOCKETS,
JobTopicNameEnum.WORKFLOW,
JobTopicNameEnum.INBOUND_PARSE_MAIL,
Expand Down
1 change: 0 additions & 1 deletion apps/api/src/bootstrap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@ export async function bootstrap(expressApp?): Promise<INestApplication> {
await app.listen(process.env.PORT);
}

// Starts listening for shutdown hooks
app.enableShutdownHooks();

Logger.log(`Started application in NODE_ENV=${process.env.NODE_ENV} on port ${process.env.PORT}`);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import { Inject, Injectable, Logger } from '@nestjs/common';
const nr = require('newrelic');
import {
getExecutionLogWorkerOptions,
INovuWorker,
PinoLogger,
storage,
Store,
Expand All @@ -18,13 +17,12 @@ import { ObservabilityBackgroundTransactionEnum } from '@novu/shared';
const LOG_CONTEXT = 'ExecutionLogWorker';

@Injectable()
export class ExecutionLogWorker extends ExecutionLogWorkerService implements INovuWorker {
export class ExecutionLogWorker extends ExecutionLogWorkerService {
constructor(
private createExecutionDetails: CreateExecutionDetails,
public workflowInMemoryProviderService: WorkflowInMemoryProviderService
) {
super(new BullMqService(workflowInMemoryProviderService));

this.initWorker(this.getWorkerProcessor(), this.getWorkerOptions());
}

Expand Down
3 changes: 1 addition & 2 deletions apps/worker/src/app/workflow/services/standard.worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import { IJobData, ObservabilityBackgroundTransactionEnum } from '@novu/shared';
import {
BullMqService,
getStandardWorkerOptions,
INovuWorker,
Job,
PinoLogger,
StandardWorkerService,
Expand All @@ -29,7 +28,7 @@ import {
const LOG_CONTEXT = 'StandardWorker';

@Injectable()
export class StandardWorker extends StandardWorkerService implements INovuWorker {
export class StandardWorker extends StandardWorkerService {
constructor(
private handleLastFailedJob: HandleLastFailedJob,
private runJob: RunJob,
Expand Down
5 changes: 2 additions & 3 deletions apps/worker/src/app/workflow/services/workflow.worker.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import { Inject, Injectable, Logger } from '@nestjs/common';
import { Injectable, Logger } from '@nestjs/common';
const nr = require('newrelic');
import {
getWorkflowWorkerOptions,
INovuWorker,
PinoLogger,
storage,
Store,
Expand All @@ -19,7 +18,7 @@ import { ObservabilityBackgroundTransactionEnum } from '@novu/shared';
const LOG_CONTEXT = 'WorkflowWorker';

@Injectable()
export class WorkflowWorker extends WorkflowWorkerService implements INovuWorker {
export class WorkflowWorker extends WorkflowWorkerService {
constructor(
private triggerEventUsecase: TriggerEvent,
public workflowInMemoryProviderService: WorkflowInMemoryProviderService
Expand Down
5 changes: 3 additions & 2 deletions apps/worker/src/bootstrap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,19 +66,20 @@ export async function bootstrap(): Promise<INestApplication> {
app.use(bodyParser.json());
app.use(bodyParser.urlencoded({ extended: true }));

// Starts listening for shutdown hooks
app.enableShutdownHooks();

Logger.log('BOOTSTRAPPED SUCCESSFULLY');

await app.listen(process.env.PORT);
await app.init();

try {
await startAppInfra(app);
} catch (e) {
process.exit(1);
}

await app.listen(process.env.PORT);

Logger.log(`Started application in NODE_ENV=${process.env.NODE_ENV} on port ${process.env.PORT}`);

return app;
Expand Down
13 changes: 13 additions & 0 deletions apps/ws/src/bootstrap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import helmet from 'helmet';
import { BullMqService } from '@novu/application-generic';
import { version } from '../package.json';
import { getErrorInterceptor, Logger } from '@novu/application-generic';
import { prepareAppInfra, startAppInfra } from './socket/services';

if (process.env.SENTRY_DSN) {
Sentry.init({
Expand All @@ -27,6 +28,8 @@ export async function bootstrap() {
app.useLogger(app.get(Logger));
app.flushLogs();

await prepareAppInfra(app);

app.useGlobalInterceptors(getErrorInterceptor());

app.setGlobalPrefix(CONTEXT_PATH);
Expand All @@ -42,5 +45,15 @@ export async function bootstrap() {

app.useWebSocketAdapter(redisIoAdapter);

app.enableShutdownHooks();

await app.init();

try {
await startAppInfra(app);
} catch (e) {
process.exit(1);
}

await app.listen(process.env.PORT as string);
}
18 changes: 10 additions & 8 deletions apps/ws/src/health/health.controller.ts
Original file line number Diff line number Diff line change
@@ -1,26 +1,28 @@
import { Controller, Get } from '@nestjs/common';
import { HealthCheck, HealthCheckResult, HealthCheckService, HttpHealthIndicator } from '@nestjs/terminus';
import { DalServiceHealthIndicator, WebSocketsQueueServiceHealthIndicator } from '@novu/application-generic';
import { Controller, Get, Inject } from '@nestjs/common';
import { HealthCheck, HealthCheckResult, HealthCheckService } from '@nestjs/terminus';
import { DalServiceHealthIndicator, IHealthIndicator } from '@novu/application-generic';

import { version } from '../../package.json';
import { WSHealthIndicator } from '../socket/services';
import { WSServerHealthIndicator } from '../socket/services';

@Controller('v1/health-check')
export class HealthController {
constructor(
private healthCheckService: HealthCheckService,
private dalHealthIndicator: DalServiceHealthIndicator,
private webSocketsQueueHealthIndicator: WebSocketsQueueServiceHealthIndicator,
private wsHealthIndicator: WSHealthIndicator
private wsServerHealthIndicator: WSServerHealthIndicator,
@Inject('QUEUE_HEALTH_INDICATORS') private indicators: IHealthIndicator[]
) {}

@Get()
@HealthCheck()
async healthCheck(): Promise<HealthCheckResult> {
const indicatorHealthCheckFunctions = this.indicators.map((indicator) => async () => indicator.isHealthy());

const result = await this.healthCheckService.check([
...indicatorHealthCheckFunctions,
async () => this.dalHealthIndicator.isHealthy(),
async () => this.webSocketsQueueHealthIndicator.isHealthy(),
async () => this.wsHealthIndicator.isHealthy(),
async () => this.wsServerHealthIndicator.isHealthy(),
async () => {
return {
apiVersion: {
Expand Down
8 changes: 4 additions & 4 deletions apps/ws/src/health/health.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ import { TerminusModule } from '@nestjs/terminus';

import { HealthController } from './health.controller';
import { SharedModule } from '../shared/shared.module';
import { WSGateway } from '../socket/ws.gateway';
import { WSHealthIndicator } from '../socket/services';
import { WSServerHealthIndicator } from '../socket/services';
import { SocketModule } from '../socket/socket.module';

const PROVIDERS = [WSHealthIndicator, WSGateway];
const PROVIDERS = [WSServerHealthIndicator];

@Module({
imports: [TerminusModule, SharedModule],
imports: [TerminusModule, SharedModule, SocketModule],
providers: PROVIDERS,
controllers: [HealthController],
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ export class SubscriberOnlineService {
async handleDisconnection(subscriber: ISubscriberJwt, activeConnections: number) {
const lastOnlineAt = new Date().toISOString();
let isOnline = false;

if (activeConnections > 1) {
isOnline = true;
}
Expand Down
24 changes: 24 additions & 0 deletions apps/ws/src/socket/services/cold-start.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import { INestApplication } from '@nestjs/common';
import { INovuWorker, ReadinessService } from '@novu/application-generic';
import { WebSocketWorker } from './web-socket.worker';

const getWorkers = (app: INestApplication): INovuWorker[] => {
const webSocketWorker = app.get(WebSocketWorker, { strict: false });

const workers: INovuWorker[] = [webSocketWorker];

return workers;
};

export const prepareAppInfra = async (app: INestApplication): Promise<void> => {
const readinessService = app.get(ReadinessService);
const workers = getWorkers(app);

await readinessService.pauseWorkers(workers);
};

export const startAppInfra = async (app: INestApplication): Promise<void> => {
const readinessService = app.get(ReadinessService);
const workers = getWorkers(app);
await readinessService.enableWorkers(workers);
};
3 changes: 2 additions & 1 deletion apps/ws/src/socket/services/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
export { WebSocketWorker } from './web-socket.worker';
export { WSHealthIndicator } from './ws-health-indicator.service';
export { WSServerHealthIndicator } from './ws-server-health-indicator.service';
export { prepareAppInfra, startAppInfra } from './cold-start.service';
3 changes: 1 addition & 2 deletions apps/ws/src/socket/services/web-socket.worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import { Injectable, Logger } from '@nestjs/common';
import {
BullMqService,
getWebSocketWorkerOptions,
INovuWorker,
WebSocketsWorkerService,
WorkerOptions,
WorkflowInMemoryProviderService,
Expand All @@ -16,7 +15,7 @@ import { ObservabilityBackgroundTransactionEnum } from '@novu/shared';
const LOG_CONTEXT = 'WebSocketWorker';

@Injectable()
export class WebSocketWorker extends WebSocketsWorkerService implements INovuWorker {
export class WebSocketWorker extends WebSocketsWorkerService {
constructor(
private externalServicesRoute: ExternalServicesRoute,
private workflowInMemoryProviderService: WorkflowInMemoryProviderService
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import { IHealthIndicator } from '@novu/application-generic';
import { WSGateway } from '../ws.gateway';

@Injectable()
export class WSHealthIndicator extends HealthIndicator implements IHealthIndicator {
private INDICATOR_KEY = 'ws';
export class WSServerHealthIndicator extends HealthIndicator implements IHealthIndicator {
private INDICATOR_KEY = 'ws-server';

constructor(private wsGateway: WSGateway) {
super();
Expand Down
6 changes: 3 additions & 3 deletions apps/ws/src/socket/socket.module.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Inject, Module, OnApplicationShutdown, OnModuleInit, Provider } from '@nestjs/common';
import { WebSocketsWorkerService, WorkflowInMemoryProviderService } from '@novu/application-generic';
import { Module, OnApplicationShutdown, Provider } from '@nestjs/common';
import { WorkflowInMemoryProviderService } from '@novu/application-generic';

import { WSGateway } from './ws.gateway';
import { SharedModule } from '../shared/shared.module';
Expand All @@ -9,7 +9,7 @@ import { WebSocketWorker } from './services';

const USE_CASES: Provider[] = [ExternalServicesRoute];

const PROVIDERS: Provider[] = [WSGateway, WebSocketsWorkerService, WebSocketWorker];
const PROVIDERS: Provider[] = [WSGateway, WebSocketWorker];

const memoryQueueService = {
provide: WorkflowInMemoryProviderService,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,6 @@ export class ExternalServicesRoute {
return;
}

const connection = await this.connectionExist(command);

if (!connection) {
return;
}

let unreadCount = this.extractCount(command.payload?.unreadCount);

if (unreadCount === undefined) {
Expand All @@ -91,12 +85,6 @@ export class ExternalServicesRoute {
return;
}

const connection = await this.connectionExist(command);

if (!connection) {
return;
}

let unseenCount = this.extractCount(command.payload?.unseenCount);

if (unseenCount === undefined) {
Expand Down
50 changes: 45 additions & 5 deletions apps/ws/src/socket/ws.gateway.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
const nr = require('newrelic');
import { Server, Socket } from 'socket.io';
import { JwtService } from '@nestjs/jwt';
import { OnGatewayConnection, OnGatewayDisconnect, WebSocketGateway, WebSocketServer } from '@nestjs/websockets';
import { Logger } from '@nestjs/common';
import { Server, Socket } from 'socket.io';

import { ISubscriberJwt, ObservabilityBackgroundTransactionEnum } from '@novu/shared';
import { IDestroy } from '@novu/application-generic';

import { SubscriberOnlineService } from '../shared/subscriber-online';

const LOG_CONTEXT = 'WSGateway';

@WebSocketGateway()
export class WSGateway implements OnGatewayConnection, OnGatewayDisconnect {
export class WSGateway implements OnGatewayConnection, OnGatewayDisconnect, IDestroy {
private isShutdown = false;

constructor(private jwtService: JwtService, private subscriberOnlineService: SubscriberOnlineService) {}

@WebSocketServer()
Expand Down Expand Up @@ -82,7 +85,18 @@ export class WSGateway implements OnGatewayConnection, OnGatewayDisconnect {
}
}

/*
* This method is called when a client disconnects from the server.
* * When a shutdown is in progress, we opt out of updating the subscriber status,
* assuming that when the current instance goes down, another instance will take its place and handle the subscriber status update.
*/
private async processDisconnectionRequest(connection: Socket) {
if (!this.isShutdown) {
await this.handlerSubscriberDisconnection(connection);
}
}

private async handlerSubscriberDisconnection(connection: Socket) {
const token = this.extractToken(connection);

if (!token || token === 'null') {
Expand All @@ -98,6 +112,12 @@ export class WSGateway implements OnGatewayConnection, OnGatewayDisconnect {
await this.subscriberOnlineService.handleDisconnection(subscriber, activeConnections);
}

private async getActiveConnections(socket: Socket, subscriberId: string) {
const activeSockets = await socket.in(subscriberId).fetchSockets();

return activeSockets.length;
}

private async processConnectionRequest(connection: Socket) {
const token = this.extractToken(connection);

Expand Down Expand Up @@ -129,9 +149,29 @@ export class WSGateway implements OnGatewayConnection, OnGatewayDisconnect {
socket.disconnect();
}

private async getActiveConnections(socket: Socket, subscriberId: string) {
const activeSockets = await socket.in(subscriberId).fetchSockets();
async gracefulShutdown(): Promise<void> {
try {
if (!this.server) {
Logger.error('WS server was not initialized while executing shutdown', LOG_CONTEXT);

return activeSockets.length;
return;
}

Logger.log('Closing WS server for incoming new connections', LOG_CONTEXT);
this.server.close();

Logger.log('Disconnecting active sockets connections', LOG_CONTEXT);
this.server.sockets.disconnectSockets();
} catch (e) {
Logger.error(e, 'Unexpected exception was thrown while graceful shut down was executed', LOG_CONTEXT);
throw e;
} finally {
Logger.log(`Graceful shutdown down has finished`, LOG_CONTEXT);
}
}

async onModuleDestroy(): Promise<void> {
this.isShutdown = true;
await this.gracefulShutdown();
}
}
1 change: 0 additions & 1 deletion libs/dal/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
"@aws-sdk/s3-request-presigner": "^3.382.0",
"@faker-js/faker": "^6.0.0",
"@novu/shared": "^0.22.0",
"@sendgrid/mail": "^7.4.2",
"JSONStream": "^1.3.5",
"archiver": "^5.0.0",
"async": "^3.2.0",
Expand Down
Loading

3 comments on commit 47f6f33

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.