diff --git a/lib/common/meta-teg.discovery.ts b/lib/common/meta-teg.discovery.ts index 7406b26..e28c40c 100644 --- a/lib/common/meta-teg.discovery.ts +++ b/lib/common/meta-teg.discovery.ts @@ -1,9 +1,19 @@ import { Injectable } from '@nestjs/common'; -import { ModuleRef, ModulesContainer, Reflector } from '@nestjs/core'; +import { ModulesContainer, Reflector } from '@nestjs/core'; import { MetadataScanner } from '@nestjs/core'; import { InstanceWrapper } from '@nestjs/core/injector/instance-wrapper'; -import { MESSAGE_ROUTER, MODULE_TOKEN, SER_DAS_KEY } from '../constants'; -import { CallbackFunctionVariadic, IMetaTegsMap, ISerDes } from '../interfaces'; +import { + INTERCEPTOR_KEY, + MESSAGE_ROUTER, + MODULE_TOKEN, + SER_DAS_KEY, +} from '../constants'; +import { + CallbackFunctionVariadic, + IMetaTegsMap, + ISerDes, + TypeRmqInterceptor, +} from '../interfaces'; import { RQMColorLogger } from './logger'; import { Module } from '@nestjs/core/injector/module'; @@ -35,7 +45,6 @@ export class MetaTegsScannerService { const { instance } = provider; if (instance instanceof Object) { const allMethodNames = this.metadataScanner.getAllMethodNames(instance); - allMethodNames.forEach((name: string) => this.lookupMethods(metaTeg, rmqMessagesMap, instance, name), ); @@ -58,7 +67,15 @@ export class MetaTegsScannerService { const boundHandler = instance[methodName].bind(instance); if (event) { const serdes = this.getSerDesMetaData(method, instance.constructor); - rmqMessagesMap.set(event, { handler: boundHandler, serdes }); + const interceptors = this.getInterceptorMetaData( + method, + instance.constructor, + ); + rmqMessagesMap.set(event, { + handler: boundHandler, + serdes, + interceptors, + }); this.logger.log('Mapped ' + event, MESSAGE_ROUTER); } } @@ -68,6 +85,20 @@ export class MetaTegsScannerService { this.getMetaData(SER_DAS_KEY, target) ); } + private getInterceptorMetaData( + method: CallbackFunctionVariadic, + target: object, + ): TypeRmqInterceptor[] { + const methodMeta = this.getMetaData( + INTERCEPTOR_KEY, + method, + ); + const targetMeta = this.getMetaData( + INTERCEPTOR_KEY, + target, + ); + return [targetMeta, methodMeta].filter((meta) => meta !== undefined); + } private getMetaData(key: string, target: any) { return this.reflector.get(key, target); } diff --git a/lib/constants.ts b/lib/constants.ts index 66c773f..8130d74 100644 --- a/lib/constants.ts +++ b/lib/constants.ts @@ -5,7 +5,9 @@ export const RMQ_ROUTES_TRANSFORM = 'RMQ_ROUTES_TRANSFORM'; export const RMQ_APP_OPTIONS = 'RMQ_APP_OPTIONS'; export const TARGET_MODULE = 'TARGET_MODULE'; export const SER_DAS_KEY = 'SER_DAS_KEY'; +export const INTERCEPTOR_KEY = 'INTERCEPTOR_KEY'; export const SERDES = 'SERDES'; +export const INTERCEPTORS = 'INTERCEPTORS'; export const INITIALIZATION_STEP_DELAY = 400; export const DEFAULT_TIMEOUT = 40000; diff --git a/lib/decorators/index.ts b/lib/decorators/index.ts index d992dae..7d99f53 100644 --- a/lib/decorators/index.ts +++ b/lib/decorators/index.ts @@ -1,3 +1,4 @@ export * from './rmq-message.decorator'; export * from './transform.decorator'; export * from './serdes.decorator'; +export * from './interceptor.decorator'; diff --git a/lib/decorators/interceptor.decorator.ts b/lib/decorators/interceptor.decorator.ts new file mode 100644 index 0000000..ad5bffc --- /dev/null +++ b/lib/decorators/interceptor.decorator.ts @@ -0,0 +1,6 @@ +import { SetMetadata } from '@nestjs/common'; +import { INTERCEPTOR_KEY } from '../constants'; +import { TypeRmqInterceptor } from 'lib/interfaces'; + +export const RmqInterceptor = (options: TypeRmqInterceptor) => + SetMetadata(INTERCEPTOR_KEY, options); diff --git a/lib/decorators/rmq-message.decorator.ts b/lib/decorators/rmq-message.decorator.ts index 513cd7f..f9f7b24 100644 --- a/lib/decorators/rmq-message.decorator.ts +++ b/lib/decorators/rmq-message.decorator.ts @@ -1,11 +1,11 @@ -import { IDescriptor } from 'lib/interfaces'; +import { IDescriptorRoute } from 'lib/interfaces'; import { NON_ROUTE, RMQ_MESSAGE_META_TEG } from '../constants'; export const reflectFunction = (event: string) => function ( target: any, propertyKey: string | symbol, - descriptor: IDescriptor, + descriptor: IDescriptorRoute, ) { Reflect.defineMetadata(RMQ_MESSAGE_META_TEG, event, descriptor.value); }; diff --git a/lib/interfaces/index.ts b/lib/interfaces/index.ts index 86a10b7..6aaa7bd 100644 --- a/lib/interfaces/index.ts +++ b/lib/interfaces/index.ts @@ -1,4 +1,5 @@ export * from './rmq-options.interface'; +export * from './interceptor.interface'; export * from './serdes.interface'; export * from './metategs'; export * from './rmqService'; diff --git a/lib/interfaces/interceptor.interface.ts b/lib/interfaces/interceptor.interface.ts new file mode 100644 index 0000000..27958bd --- /dev/null +++ b/lib/interfaces/interceptor.interface.ts @@ -0,0 +1,13 @@ +import { ConsumeMessage } from 'amqplib'; + +export type ReverseFunction = ( + content: any, + message: ConsumeMessage, +) => Promise; +export abstract class IRmqInterceptor { + abstract intercept( + content: any, + message: ConsumeMessage, + ): Promise; +} +export type TypeRmqInterceptor = typeof IRmqInterceptor; diff --git a/lib/interfaces/metategs.ts b/lib/interfaces/metategs.ts index 8ce6cfc..5a07309 100644 --- a/lib/interfaces/metategs.ts +++ b/lib/interfaces/metategs.ts @@ -1,5 +1,6 @@ import { ConsumeMessage } from 'amqplib'; import { ISerDes } from './serdes.interface'; +import { TypeRmqInterceptor } from './interceptor.interface'; export type IConsumFunction = ( message?: any, @@ -8,9 +9,10 @@ export type IConsumFunction = ( export interface MetaTegEnpoint { handler: IConsumFunction; serdes?: ISerDes | undefined; + interceptors?: TypeRmqInterceptor[]; } export type IMetaTegsMap = Map; -export interface IDescriptor { +export interface IDescriptorRoute { value?: IConsumFunction; } diff --git a/lib/interfaces/rmq-options.interface.ts b/lib/interfaces/rmq-options.interface.ts index 8d0e06c..b84dd93 100644 --- a/lib/interfaces/rmq-options.interface.ts +++ b/lib/interfaces/rmq-options.interface.ts @@ -2,6 +2,7 @@ import { Options } from 'amqplib'; import { LoggerService, ModuleMetadata } from '@nestjs/common'; import { RMQIntercepterClass, RMQPipeClass } from '../common'; import { ISerDes } from './serdes.interface'; +import { TypeRmqInterceptor } from './interceptor.interface'; export interface IQueue { queue: string; @@ -34,6 +35,7 @@ export interface IMessageBroker { replyTo?: IQueue; queue?: IQueue; serDes?: ISerDes; + interceptor?: TypeRmqInterceptor[]; messageTimeout?: number; serviceName?: string; } diff --git a/lib/rmq.module.ts b/lib/rmq.module.ts index 5213f68..3988db2 100644 --- a/lib/rmq.module.ts +++ b/lib/rmq.module.ts @@ -6,7 +6,12 @@ import { IRabbitMQConfig, IGlobalOptions, } from './interfaces'; -import { MODULE_TOKEN, RMQ_BROKER_OPTIONS, SERDES } from './constants'; +import { + INTERCEPTORS, + MODULE_TOKEN, + RMQ_BROKER_OPTIONS, + SERDES, +} from './constants'; import { DiscoveryModule } from '@nestjs/core'; import { MetaTegsScannerService, getUniqId } from './common'; import { RmqNestjsCoreModule } from './rmq-core.module'; @@ -39,6 +44,7 @@ export class RmqNestjsModule { providers: [ { provide: RMQ_BROKER_OPTIONS, useValue: options }, { provide: SERDES, useValue: options.serDes ?? serDes }, + { provide: INTERCEPTORS, useValue: options.interceptor ?? [] }, { provide: MODULE_TOKEN, useFactory: getUniqId }, RmqService, MetaTegsScannerService, diff --git a/lib/rmq.service.ts b/lib/rmq.service.ts index 70f1c06..18aa34d 100644 --- a/lib/rmq.service.ts +++ b/lib/rmq.service.ts @@ -13,6 +13,7 @@ import { ISerDes, TypeChanel, TypeQueue, + TypeRmqInterceptor, } from './interfaces'; import { IConsumFunction, @@ -26,6 +27,7 @@ import { INDICATE_REPLY_QUEUE, INITIALIZATION_STEP_DELAY, INOF_NOT_FULL_OPTIONS, + INTERCEPTORS, MODULE_TOKEN, NACKED, NON_ROUTE, @@ -43,6 +45,7 @@ import { RmqNestjsConnectService } from './rmq-connect.service'; import { getUniqId } from './common/get-uniqId'; import { EventEmitter } from 'stream'; import { RQMColorLogger } from './common/logger'; +import { rejects } from 'assert'; @Injectable() export class RmqService implements OnModuleInit, OnModuleDestroy { @@ -61,6 +64,7 @@ export class RmqService implements OnModuleInit, OnModuleDestroy { @Inject(RMQ_BROKER_OPTIONS) private readonly options: IMessageBroker, @Inject(RMQ_APP_OPTIONS) private readonly globalOptions: IGlobalOptions, @Inject(SERDES) private readonly serDes: ISerDes, + @Inject(INTERCEPTORS) private readonly interceptors: TypeRmqInterceptor[], @Inject(MODULE_TOKEN) private readonly moduleToken: string, ) { this.logger = globalOptions.appOptions?.logger @@ -184,9 +188,15 @@ export class RmqService implements OnModuleInit, OnModuleDestroy { try { if (!message) throw new Error('Received null message'); const route = this.getRouteByTopic(message.fields.routingKey); - const consumer = this.getConsumer(route); const messageParse = this.deserializeMessage(consumer, message.content); + const interceptors = this.getInterceptors(consumer); + const interceptorsReversed = await this.interceptorsReverse( + interceptors, + message, + messageParse, + ); + let result = { error: ERROR_NO_ROUTE }; if (consumer.handler) result = await this.handleMessage( @@ -194,7 +204,11 @@ export class RmqService implements OnModuleInit, OnModuleDestroy { messageParse, message, ); - + await Promise.all( + interceptorsReversed + .reverse() + .map(async (revers) => await revers(result, message)), + ); if (message.properties.replyTo) await this.sendReply( message.properties.replyTo, @@ -204,9 +218,18 @@ export class RmqService implements OnModuleInit, OnModuleDestroy { ); } catch (error) { this.logger.error('Error processing message', { error, message }); + this.rmqNestjsConnectService.nack(message, false, false); } } + private async interceptorsReverse(interceptors, message, messageParse) { + const interceptorsReversed: any[] = []; + for (const interceptor of interceptors) { + const fnReversed = await interceptor.intercept(message, messageParse); + interceptorsReversed.push(fnReversed); + } + return interceptorsReversed; + } private getConsumer(route: string): MetaTegEnpoint { return this.rmqMessageTegs.get(route) || this.rmqMessageTegs.get(NON_ROUTE); @@ -217,7 +240,11 @@ export class RmqService implements OnModuleInit, OnModuleDestroy { ? consumer.serdes.deserialize(content) : this.serDes.deserialize(content); } - + private getInterceptors(consumer: MetaTegEnpoint) { + return this.interceptors + .concat(consumer.interceptors) + .map((interceptor: any) => new interceptor()); + } private async handleMessage( handler: IConsumFunction, messageParse: string, @@ -245,9 +272,8 @@ export class RmqService implements OnModuleInit, OnModuleDestroy { private async listenReplyQueue( message: ConsumeMessage | null, ): Promise { - if (message.properties.correlationId) { + if (message.properties.correlationId) this.sendResponseEmitter.emit(message.properties.correlationId, message); - } } private async bindQueueExchange() { diff --git a/test/mocks/event.interceptor.ts b/test/mocks/event.interceptor.ts new file mode 100644 index 0000000..2d4ebc3 --- /dev/null +++ b/test/mocks/event.interceptor.ts @@ -0,0 +1,41 @@ +import { ConsumeMessage } from 'amqplib'; +import { + ReverseFunction, + IRmqInterceptor, +} from '../../lib/interfaces/interceptor.interface'; + +export class EventInterceptorModule implements IRmqInterceptor { + async intercept( + message: ConsumeMessage, + content: any, + ): Promise { + if (content?.array) content.array.push(1); + return async (content: any, message: ConsumeMessage) => { + if (content?.array) content.array.push(6); + }; + } +} + +export class EventInterceptorClass implements IRmqInterceptor { + async intercept( + message: ConsumeMessage, + content: any, + ): Promise { + if (content?.array) content.array.push(2); + return async (content: any, message: ConsumeMessage) => { + if (content?.array) content.array.push(5); + }; + } +} + +export class EventInterceptorEndpoint implements IRmqInterceptor { + async intercept( + message: ConsumeMessage, + content: any, + ): Promise { + content.array.push(3); + return async (content: any, message: ConsumeMessage) => { + content.array.push(4); + }; + } +} diff --git a/test/mocks/rmq-nestjs.module.ts b/test/mocks/rmq-nestjs.module.ts index 950d3ee..91717ce 100644 --- a/test/mocks/rmq-nestjs.module.ts +++ b/test/mocks/rmq-nestjs.module.ts @@ -2,6 +2,7 @@ import { Module } from '@nestjs/common'; import { RmqNestjsModule } from '../../lib'; import { RmqEvents } from './rmq.event'; import { RmqServieController } from './rmq.controller'; +import { EventInterceptorModule } from './event.interceptor'; @Module({ imports: [ @@ -25,6 +26,7 @@ import { RmqServieController } from './rmq.controller'; options: { exclusive: true }, consumOptions: { noAck: true }, }, + interceptor: [EventInterceptorModule], }), ], providers: [RmqEvents, RmqServieController], diff --git a/test/mocks/rmq.controller.ts b/test/mocks/rmq.controller.ts index 62c6df9..ed04a93 100644 --- a/test/mocks/rmq.controller.ts +++ b/test/mocks/rmq.controller.ts @@ -15,6 +15,16 @@ export class RmqServieController { ); return sendhi; } + async sendMessageWithInterceptor( + obj: Record, + topic: string = 'text.interceptor', + ) { + const sendhi = await this.rmqServie.send( + topic, + obj, + ); + return sendhi; + } async sendGlobalRoute(obj: Record) { const message = await this.rmqGlobalService.send< diff --git a/test/mocks/rmq.event.ts b/test/mocks/rmq.event.ts index a734f59..0a6b938 100644 --- a/test/mocks/rmq.event.ts +++ b/test/mocks/rmq.event.ts @@ -1,13 +1,23 @@ import { Injectable, Logger } from '@nestjs/common'; -import { MessageNonRoute, MessageRoute, SerDes } from '../../lib/decorators/'; +import { + MessageNonRoute, + MessageRoute, + RmqInterceptor, + SerDes, +} from '../../lib/decorators/'; import { RmqService } from '../../lib'; import { ConsumeMessage } from 'amqplib'; +import { + EventInterceptorClass, + EventInterceptorEndpoint, +} from './event.interceptor'; @Injectable() @SerDes({ deserialize: (message: Buffer): any => JSON.parse(message.toString()), serializer: (message: any): Buffer => Buffer.from(JSON.stringify(message)), }) +@RmqInterceptor(EventInterceptorClass) export class RmqEvents { constructor(private readonly rmqServie: RmqService) {} @MessageRoute('text.text') @@ -40,6 +50,7 @@ export class RmqEvents { this.rmqServie.ack(consumeMessage); return { message: obj }; } + @MessageRoute('rpc.#') recivedTopicPattern(obj: any, consumeMessage: ConsumeMessage) { this.rmqServie.ack(consumeMessage); @@ -50,7 +61,12 @@ export class RmqEvents { this.rmqServie.ack(consumeMessage); Logger.log(obj); } - + @MessageRoute('text.interceptor') + @RmqInterceptor(EventInterceptorEndpoint) + recivedMessage(obj: any, consumeMessage: ConsumeMessage) { + this.rmqServie.ack(consumeMessage); + return obj; + } @MessageNonRoute() recivedNonRoute(obj: any, consumeMessage: ConsumeMessage) { this.rmqServie.ack(consumeMessage); diff --git a/test/rmq-nestjs.spec.ts b/test/rmq-nestjs.spec.ts index a32a033..a05fa78 100644 --- a/test/rmq-nestjs.spec.ts +++ b/test/rmq-nestjs.spec.ts @@ -117,6 +117,18 @@ describe('RMQe2e', () => { expect(status).toBeTruthy(); }); }); + + describe('send message with interceptors', () => { + it('send with interceptors', async () => { + const obj = { array: [0] }; + const topic = 'text.interceptor'; + const message = await rmqServieController.sendMessageWithInterceptor( + obj, + topic, + ); + expect(message.array).toEqual([0, 1, 2, 3, 4, 5, 6]); + }); + }); afterAll(async () => { await delay(500); await api.close();