Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Interceptor #58

Merged
merged 4 commits into from
Jul 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 36 additions & 5 deletions lib/common/meta-teg.discovery.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,19 @@
import { Injectable } from '@nestjs/common';
import { ModuleRef, ModulesContainer, Reflector } from '@nestjs/core';
import { ModulesContainer, Reflector } from '@nestjs/core';
import { MetadataScanner } from '@nestjs/core';
import { InstanceWrapper } from '@nestjs/core/injector/instance-wrapper';
import { MESSAGE_ROUTER, MODULE_TOKEN, SER_DAS_KEY } from '../constants';
import { CallbackFunctionVariadic, IMetaTegsMap, ISerDes } from '../interfaces';
import {
INTERCEPTOR_KEY,
MESSAGE_ROUTER,
MODULE_TOKEN,
SER_DAS_KEY,
} from '../constants';
import {
CallbackFunctionVariadic,
IMetaTegsMap,
ISerDes,
TypeRmqInterceptor,
} from '../interfaces';
import { RQMColorLogger } from './logger';
import { Module } from '@nestjs/core/injector/module';

Expand Down Expand Up @@ -35,7 +45,6 @@ export class MetaTegsScannerService {
const { instance } = provider;
if (instance instanceof Object) {
const allMethodNames = this.metadataScanner.getAllMethodNames(instance);

allMethodNames.forEach((name: string) =>
this.lookupMethods(metaTeg, rmqMessagesMap, instance, name),
);
Expand All @@ -58,7 +67,15 @@ export class MetaTegsScannerService {
const boundHandler = instance[methodName].bind(instance);
if (event) {
const serdes = this.getSerDesMetaData(method, instance.constructor);
rmqMessagesMap.set(event, { handler: boundHandler, serdes });
const interceptors = this.getInterceptorMetaData(
method,
instance.constructor,
);
rmqMessagesMap.set(event, {
handler: boundHandler,
serdes,
interceptors,
});
this.logger.log('Mapped ' + event, MESSAGE_ROUTER);
}
}
Expand All @@ -68,6 +85,20 @@ export class MetaTegsScannerService {
this.getMetaData<ISerDes>(SER_DAS_KEY, target)
);
}
private getInterceptorMetaData(
method: CallbackFunctionVariadic,
target: object,
): TypeRmqInterceptor[] {
const methodMeta = this.getMetaData<TypeRmqInterceptor>(
INTERCEPTOR_KEY,
method,
);
const targetMeta = this.getMetaData<TypeRmqInterceptor>(
INTERCEPTOR_KEY,
target,
);
return [targetMeta, methodMeta].filter((meta) => meta !== undefined);
}
private getMetaData<T>(key: string, target: any) {
return this.reflector.get<T>(key, target);
}
Expand Down
2 changes: 2 additions & 0 deletions lib/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ export const RMQ_ROUTES_TRANSFORM = 'RMQ_ROUTES_TRANSFORM';
export const RMQ_APP_OPTIONS = 'RMQ_APP_OPTIONS';
export const TARGET_MODULE = 'TARGET_MODULE';
export const SER_DAS_KEY = 'SER_DAS_KEY';
export const INTERCEPTOR_KEY = 'INTERCEPTOR_KEY';
export const SERDES = 'SERDES';
export const INTERCEPTORS = 'INTERCEPTORS';

export const INITIALIZATION_STEP_DELAY = 400;
export const DEFAULT_TIMEOUT = 40000;
Expand Down
1 change: 1 addition & 0 deletions lib/decorators/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
export * from './rmq-message.decorator';
export * from './transform.decorator';
export * from './serdes.decorator';
export * from './interceptor.decorator';
6 changes: 6 additions & 0 deletions lib/decorators/interceptor.decorator.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import { SetMetadata } from '@nestjs/common';
import { INTERCEPTOR_KEY } from '../constants';
import { TypeRmqInterceptor } from 'lib/interfaces';

export const RmqInterceptor = (options: TypeRmqInterceptor) =>
SetMetadata(INTERCEPTOR_KEY, options);
4 changes: 2 additions & 2 deletions lib/decorators/rmq-message.decorator.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import { IDescriptor } from 'lib/interfaces';
import { IDescriptorRoute } from 'lib/interfaces';
import { NON_ROUTE, RMQ_MESSAGE_META_TEG } from '../constants';

export const reflectFunction = (event: string) =>
function (
target: any,
propertyKey: string | symbol,
descriptor: IDescriptor,
descriptor: IDescriptorRoute,
) {
Reflect.defineMetadata(RMQ_MESSAGE_META_TEG, event, descriptor.value);
};
Expand Down
1 change: 1 addition & 0 deletions lib/interfaces/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
export * from './rmq-options.interface';
export * from './interceptor.interface';
export * from './serdes.interface';
export * from './metategs';
export * from './rmqService';
13 changes: 13 additions & 0 deletions lib/interfaces/interceptor.interface.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import { ConsumeMessage } from 'amqplib';

export type ReverseFunction = (
content: any,
message: ConsumeMessage,
) => Promise<void>;
export abstract class IRmqInterceptor {
abstract intercept(
content: any,
message: ConsumeMessage,
): Promise<ReverseFunction>;
}
export type TypeRmqInterceptor = typeof IRmqInterceptor;
4 changes: 3 additions & 1 deletion lib/interfaces/metategs.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { ConsumeMessage } from 'amqplib';
import { ISerDes } from './serdes.interface';
import { TypeRmqInterceptor } from './interceptor.interface';

export type IConsumFunction = (
message?: any,
Expand All @@ -8,9 +9,10 @@ export type IConsumFunction = (
export interface MetaTegEnpoint {
handler: IConsumFunction;
serdes?: ISerDes | undefined;
interceptors?: TypeRmqInterceptor[];
}
export type IMetaTegsMap = Map<string, MetaTegEnpoint>;
export interface IDescriptor {
export interface IDescriptorRoute {
value?: IConsumFunction;
}

Expand Down
2 changes: 2 additions & 0 deletions lib/interfaces/rmq-options.interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { Options } from 'amqplib';
import { LoggerService, ModuleMetadata } from '@nestjs/common';
import { RMQIntercepterClass, RMQPipeClass } from '../common';
import { ISerDes } from './serdes.interface';
import { TypeRmqInterceptor } from './interceptor.interface';

export interface IQueue {
queue: string;
Expand Down Expand Up @@ -34,6 +35,7 @@ export interface IMessageBroker {
replyTo?: IQueue;
queue?: IQueue;
serDes?: ISerDes;
interceptor?: TypeRmqInterceptor[];
messageTimeout?: number;
serviceName?: string;
}
Expand Down
8 changes: 7 additions & 1 deletion lib/rmq.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,12 @@ import {
IRabbitMQConfig,
IGlobalOptions,
} from './interfaces';
import { MODULE_TOKEN, RMQ_BROKER_OPTIONS, SERDES } from './constants';
import {
INTERCEPTORS,
MODULE_TOKEN,
RMQ_BROKER_OPTIONS,
SERDES,
} from './constants';
import { DiscoveryModule } from '@nestjs/core';
import { MetaTegsScannerService, getUniqId } from './common';
import { RmqNestjsCoreModule } from './rmq-core.module';
Expand Down Expand Up @@ -39,6 +44,7 @@ export class RmqNestjsModule {
providers: [
{ provide: RMQ_BROKER_OPTIONS, useValue: options },
{ provide: SERDES, useValue: options.serDes ?? serDes },
{ provide: INTERCEPTORS, useValue: options.interceptor ?? [] },
{ provide: MODULE_TOKEN, useFactory: getUniqId },
RmqService,
MetaTegsScannerService,
Expand Down
36 changes: 31 additions & 5 deletions lib/rmq.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
ISerDes,
TypeChanel,
TypeQueue,
TypeRmqInterceptor,
} from './interfaces';
import {
IConsumFunction,
Expand All @@ -26,6 +27,7 @@ import {
INDICATE_REPLY_QUEUE,
INITIALIZATION_STEP_DELAY,
INOF_NOT_FULL_OPTIONS,
INTERCEPTORS,
MODULE_TOKEN,
NACKED,
NON_ROUTE,
Expand All @@ -43,6 +45,7 @@ import { RmqNestjsConnectService } from './rmq-connect.service';
import { getUniqId } from './common/get-uniqId';
import { EventEmitter } from 'stream';
import { RQMColorLogger } from './common/logger';
import { rejects } from 'assert';

@Injectable()
export class RmqService implements OnModuleInit, OnModuleDestroy {
Expand All @@ -61,6 +64,7 @@ export class RmqService implements OnModuleInit, OnModuleDestroy {
@Inject(RMQ_BROKER_OPTIONS) private readonly options: IMessageBroker,
@Inject(RMQ_APP_OPTIONS) private readonly globalOptions: IGlobalOptions,
@Inject(SERDES) private readonly serDes: ISerDes,
@Inject(INTERCEPTORS) private readonly interceptors: TypeRmqInterceptor[],
@Inject(MODULE_TOKEN) private readonly moduleToken: string,
) {
this.logger = globalOptions.appOptions?.logger
Expand Down Expand Up @@ -184,17 +188,27 @@ export class RmqService implements OnModuleInit, OnModuleDestroy {
try {
if (!message) throw new Error('Received null message');
const route = this.getRouteByTopic(message.fields.routingKey);

const consumer = this.getConsumer(route);
const messageParse = this.deserializeMessage(consumer, message.content);
const interceptors = this.getInterceptors(consumer);
const interceptorsReversed = await this.interceptorsReverse(
interceptors,
message,
messageParse,
);

let result = { error: ERROR_NO_ROUTE };
if (consumer.handler)
result = await this.handleMessage(
consumer.handler,
messageParse,
message,
);

await Promise.all(
interceptorsReversed
.reverse()
.map(async (revers) => await revers(result, message)),
);
if (message.properties.replyTo)
await this.sendReply(
message.properties.replyTo,
Expand All @@ -204,9 +218,18 @@ export class RmqService implements OnModuleInit, OnModuleDestroy {
);
} catch (error) {
this.logger.error('Error processing message', { error, message });

this.rmqNestjsConnectService.nack(message, false, false);
}
}
private async interceptorsReverse(interceptors, message, messageParse) {
const interceptorsReversed: any[] = [];
for (const interceptor of interceptors) {
const fnReversed = await interceptor.intercept(message, messageParse);
interceptorsReversed.push(fnReversed);
}
return interceptorsReversed;
}

private getConsumer(route: string): MetaTegEnpoint {
return this.rmqMessageTegs.get(route) || this.rmqMessageTegs.get(NON_ROUTE);
Expand All @@ -217,7 +240,11 @@ export class RmqService implements OnModuleInit, OnModuleDestroy {
? consumer.serdes.deserialize(content)
: this.serDes.deserialize(content);
}

private getInterceptors(consumer: MetaTegEnpoint) {
return this.interceptors
.concat(consumer.interceptors)
.map((interceptor: any) => new interceptor());
}
private async handleMessage(
handler: IConsumFunction,
messageParse: string,
Expand Down Expand Up @@ -245,9 +272,8 @@ export class RmqService implements OnModuleInit, OnModuleDestroy {
private async listenReplyQueue(
message: ConsumeMessage | null,
): Promise<void> {
if (message.properties.correlationId) {
if (message.properties.correlationId)
this.sendResponseEmitter.emit(message.properties.correlationId, message);
}
}

private async bindQueueExchange() {
Expand Down
41 changes: 41 additions & 0 deletions test/mocks/event.interceptor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import { ConsumeMessage } from 'amqplib';
import {
ReverseFunction,
IRmqInterceptor,
} from '../../lib/interfaces/interceptor.interface';

export class EventInterceptorModule implements IRmqInterceptor {
async intercept(
message: ConsumeMessage,
content: any,
): Promise<ReverseFunction> {
if (content?.array) content.array.push(1);
return async (content: any, message: ConsumeMessage) => {
if (content?.array) content.array.push(6);
};
}
}

export class EventInterceptorClass implements IRmqInterceptor {
async intercept(
message: ConsumeMessage,
content: any,
): Promise<ReverseFunction> {
if (content?.array) content.array.push(2);
return async (content: any, message: ConsumeMessage) => {
if (content?.array) content.array.push(5);
};
}
}

export class EventInterceptorEndpoint implements IRmqInterceptor {
async intercept(
message: ConsumeMessage,
content: any,
): Promise<ReverseFunction> {
content.array.push(3);
return async (content: any, message: ConsumeMessage) => {
content.array.push(4);
};
}
}
2 changes: 2 additions & 0 deletions test/mocks/rmq-nestjs.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { Module } from '@nestjs/common';
import { RmqNestjsModule } from '../../lib';
import { RmqEvents } from './rmq.event';
import { RmqServieController } from './rmq.controller';
import { EventInterceptorModule } from './event.interceptor';

@Module({
imports: [
Expand All @@ -25,6 +26,7 @@ import { RmqServieController } from './rmq.controller';
options: { exclusive: true },
consumOptions: { noAck: true },
},
interceptor: [EventInterceptorModule],
}),
],
providers: [RmqEvents, RmqServieController],
Expand Down
10 changes: 10 additions & 0 deletions test/mocks/rmq.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,16 @@ export class RmqServieController {
);
return sendhi;
}
async sendMessageWithInterceptor(
obj: Record<string, any>,
topic: string = 'text.interceptor',
) {
const sendhi = await this.rmqServie.send<object, { array: number[] }>(
topic,
obj,
);
return sendhi;
}

async sendGlobalRoute(obj: Record<string, any>) {
const message = await this.rmqGlobalService.send<
Expand Down
Loading
Loading