Skip to content

Commit

Permalink
merge pull request #58 from interceptor
Browse files Browse the repository at this point in the history
Interceptor
  • Loading branch information
DIY0R authored Jul 12, 2024
2 parents 7c18f8d + 0eb6802 commit a8105e7
Show file tree
Hide file tree
Showing 16 changed files with 187 additions and 16 deletions.
41 changes: 36 additions & 5 deletions lib/common/meta-teg.discovery.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,19 @@
import { Injectable } from '@nestjs/common';
import { ModuleRef, ModulesContainer, Reflector } from '@nestjs/core';
import { ModulesContainer, Reflector } from '@nestjs/core';
import { MetadataScanner } from '@nestjs/core';
import { InstanceWrapper } from '@nestjs/core/injector/instance-wrapper';
import { MESSAGE_ROUTER, MODULE_TOKEN, SER_DAS_KEY } from '../constants';
import { CallbackFunctionVariadic, IMetaTegsMap, ISerDes } from '../interfaces';
import {
INTERCEPTOR_KEY,
MESSAGE_ROUTER,
MODULE_TOKEN,
SER_DAS_KEY,
} from '../constants';
import {
CallbackFunctionVariadic,
IMetaTegsMap,
ISerDes,
TypeRmqInterceptor,
} from '../interfaces';
import { RQMColorLogger } from './logger';
import { Module } from '@nestjs/core/injector/module';

Expand Down Expand Up @@ -35,7 +45,6 @@ export class MetaTegsScannerService {
const { instance } = provider;
if (instance instanceof Object) {
const allMethodNames = this.metadataScanner.getAllMethodNames(instance);

allMethodNames.forEach((name: string) =>
this.lookupMethods(metaTeg, rmqMessagesMap, instance, name),
);
Expand All @@ -58,7 +67,15 @@ export class MetaTegsScannerService {
const boundHandler = instance[methodName].bind(instance);
if (event) {
const serdes = this.getSerDesMetaData(method, instance.constructor);
rmqMessagesMap.set(event, { handler: boundHandler, serdes });
const interceptors = this.getInterceptorMetaData(
method,
instance.constructor,
);
rmqMessagesMap.set(event, {
handler: boundHandler,
serdes,
interceptors,
});
this.logger.log('Mapped ' + event, MESSAGE_ROUTER);
}
}
Expand All @@ -68,6 +85,20 @@ export class MetaTegsScannerService {
this.getMetaData<ISerDes>(SER_DAS_KEY, target)
);
}
private getInterceptorMetaData(
method: CallbackFunctionVariadic,
target: object,
): TypeRmqInterceptor[] {
const methodMeta = this.getMetaData<TypeRmqInterceptor>(
INTERCEPTOR_KEY,
method,
);
const targetMeta = this.getMetaData<TypeRmqInterceptor>(
INTERCEPTOR_KEY,
target,
);
return [targetMeta, methodMeta].filter((meta) => meta !== undefined);
}
private getMetaData<T>(key: string, target: any) {
return this.reflector.get<T>(key, target);
}
Expand Down
2 changes: 2 additions & 0 deletions lib/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ export const RMQ_ROUTES_TRANSFORM = 'RMQ_ROUTES_TRANSFORM';
export const RMQ_APP_OPTIONS = 'RMQ_APP_OPTIONS';
export const TARGET_MODULE = 'TARGET_MODULE';
export const SER_DAS_KEY = 'SER_DAS_KEY';
export const INTERCEPTOR_KEY = 'INTERCEPTOR_KEY';
export const SERDES = 'SERDES';
export const INTERCEPTORS = 'INTERCEPTORS';

export const INITIALIZATION_STEP_DELAY = 400;
export const DEFAULT_TIMEOUT = 40000;
Expand Down
1 change: 1 addition & 0 deletions lib/decorators/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
export * from './rmq-message.decorator';
export * from './transform.decorator';
export * from './serdes.decorator';
export * from './interceptor.decorator';
6 changes: 6 additions & 0 deletions lib/decorators/interceptor.decorator.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import { SetMetadata } from '@nestjs/common';
import { INTERCEPTOR_KEY } from '../constants';
import { TypeRmqInterceptor } from 'lib/interfaces';

export const RmqInterceptor = (options: TypeRmqInterceptor) =>
SetMetadata(INTERCEPTOR_KEY, options);
4 changes: 2 additions & 2 deletions lib/decorators/rmq-message.decorator.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import { IDescriptor } from 'lib/interfaces';
import { IDescriptorRoute } from 'lib/interfaces';
import { NON_ROUTE, RMQ_MESSAGE_META_TEG } from '../constants';

export const reflectFunction = (event: string) =>
function (
target: any,
propertyKey: string | symbol,
descriptor: IDescriptor,
descriptor: IDescriptorRoute,
) {
Reflect.defineMetadata(RMQ_MESSAGE_META_TEG, event, descriptor.value);
};
Expand Down
1 change: 1 addition & 0 deletions lib/interfaces/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
export * from './rmq-options.interface';
export * from './interceptor.interface';
export * from './serdes.interface';
export * from './metategs';
export * from './rmqService';
13 changes: 13 additions & 0 deletions lib/interfaces/interceptor.interface.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import { ConsumeMessage } from 'amqplib';

export type ReverseFunction = (
content: any,
message: ConsumeMessage,
) => Promise<void>;
export abstract class IRmqInterceptor {
abstract intercept(
content: any,
message: ConsumeMessage,
): Promise<ReverseFunction>;
}
export type TypeRmqInterceptor = typeof IRmqInterceptor;
4 changes: 3 additions & 1 deletion lib/interfaces/metategs.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { ConsumeMessage } from 'amqplib';
import { ISerDes } from './serdes.interface';
import { TypeRmqInterceptor } from './interceptor.interface';

export type IConsumFunction = (
message?: any,
Expand All @@ -8,9 +9,10 @@ export type IConsumFunction = (
export interface MetaTegEnpoint {
handler: IConsumFunction;
serdes?: ISerDes | undefined;
interceptors?: TypeRmqInterceptor[];
}
export type IMetaTegsMap = Map<string, MetaTegEnpoint>;
export interface IDescriptor {
export interface IDescriptorRoute {
value?: IConsumFunction;
}

Expand Down
2 changes: 2 additions & 0 deletions lib/interfaces/rmq-options.interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { Options } from 'amqplib';
import { LoggerService, ModuleMetadata } from '@nestjs/common';
import { RMQIntercepterClass, RMQPipeClass } from '../common';
import { ISerDes } from './serdes.interface';
import { TypeRmqInterceptor } from './interceptor.interface';

export interface IQueue {
queue: string;
Expand Down Expand Up @@ -34,6 +35,7 @@ export interface IMessageBroker {
replyTo?: IQueue;
queue?: IQueue;
serDes?: ISerDes;
interceptor?: TypeRmqInterceptor[];
messageTimeout?: number;
serviceName?: string;
}
Expand Down
8 changes: 7 additions & 1 deletion lib/rmq.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,12 @@ import {
IRabbitMQConfig,
IGlobalOptions,
} from './interfaces';
import { MODULE_TOKEN, RMQ_BROKER_OPTIONS, SERDES } from './constants';
import {
INTERCEPTORS,
MODULE_TOKEN,
RMQ_BROKER_OPTIONS,
SERDES,
} from './constants';
import { DiscoveryModule } from '@nestjs/core';
import { MetaTegsScannerService, getUniqId } from './common';
import { RmqNestjsCoreModule } from './rmq-core.module';
Expand Down Expand Up @@ -39,6 +44,7 @@ export class RmqNestjsModule {
providers: [
{ provide: RMQ_BROKER_OPTIONS, useValue: options },
{ provide: SERDES, useValue: options.serDes ?? serDes },
{ provide: INTERCEPTORS, useValue: options.interceptor ?? [] },
{ provide: MODULE_TOKEN, useFactory: getUniqId },
RmqService,
MetaTegsScannerService,
Expand Down
36 changes: 31 additions & 5 deletions lib/rmq.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
ISerDes,
TypeChanel,
TypeQueue,
TypeRmqInterceptor,
} from './interfaces';
import {
IConsumFunction,
Expand All @@ -26,6 +27,7 @@ import {
INDICATE_REPLY_QUEUE,
INITIALIZATION_STEP_DELAY,
INOF_NOT_FULL_OPTIONS,
INTERCEPTORS,
MODULE_TOKEN,
NACKED,
NON_ROUTE,
Expand All @@ -43,6 +45,7 @@ import { RmqNestjsConnectService } from './rmq-connect.service';
import { getUniqId } from './common/get-uniqId';
import { EventEmitter } from 'stream';
import { RQMColorLogger } from './common/logger';
import { rejects } from 'assert';

@Injectable()
export class RmqService implements OnModuleInit, OnModuleDestroy {
Expand All @@ -61,6 +64,7 @@ export class RmqService implements OnModuleInit, OnModuleDestroy {
@Inject(RMQ_BROKER_OPTIONS) private readonly options: IMessageBroker,
@Inject(RMQ_APP_OPTIONS) private readonly globalOptions: IGlobalOptions,
@Inject(SERDES) private readonly serDes: ISerDes,
@Inject(INTERCEPTORS) private readonly interceptors: TypeRmqInterceptor[],
@Inject(MODULE_TOKEN) private readonly moduleToken: string,
) {
this.logger = globalOptions.appOptions?.logger
Expand Down Expand Up @@ -184,17 +188,27 @@ export class RmqService implements OnModuleInit, OnModuleDestroy {
try {
if (!message) throw new Error('Received null message');
const route = this.getRouteByTopic(message.fields.routingKey);

const consumer = this.getConsumer(route);
const messageParse = this.deserializeMessage(consumer, message.content);
const interceptors = this.getInterceptors(consumer);
const interceptorsReversed = await this.interceptorsReverse(
interceptors,
message,
messageParse,
);

let result = { error: ERROR_NO_ROUTE };
if (consumer.handler)
result = await this.handleMessage(
consumer.handler,
messageParse,
message,
);

await Promise.all(
interceptorsReversed
.reverse()
.map(async (revers) => await revers(result, message)),
);
if (message.properties.replyTo)
await this.sendReply(
message.properties.replyTo,
Expand All @@ -204,9 +218,18 @@ export class RmqService implements OnModuleInit, OnModuleDestroy {
);
} catch (error) {
this.logger.error('Error processing message', { error, message });

this.rmqNestjsConnectService.nack(message, false, false);
}
}
private async interceptorsReverse(interceptors, message, messageParse) {
const interceptorsReversed: any[] = [];
for (const interceptor of interceptors) {
const fnReversed = await interceptor.intercept(message, messageParse);
interceptorsReversed.push(fnReversed);
}
return interceptorsReversed;
}

private getConsumer(route: string): MetaTegEnpoint {
return this.rmqMessageTegs.get(route) || this.rmqMessageTegs.get(NON_ROUTE);
Expand All @@ -217,7 +240,11 @@ export class RmqService implements OnModuleInit, OnModuleDestroy {
? consumer.serdes.deserialize(content)
: this.serDes.deserialize(content);
}

private getInterceptors(consumer: MetaTegEnpoint) {
return this.interceptors
.concat(consumer.interceptors)
.map((interceptor: any) => new interceptor());
}
private async handleMessage(
handler: IConsumFunction,
messageParse: string,
Expand Down Expand Up @@ -245,9 +272,8 @@ export class RmqService implements OnModuleInit, OnModuleDestroy {
private async listenReplyQueue(
message: ConsumeMessage | null,
): Promise<void> {
if (message.properties.correlationId) {
if (message.properties.correlationId)
this.sendResponseEmitter.emit(message.properties.correlationId, message);
}
}

private async bindQueueExchange() {
Expand Down
41 changes: 41 additions & 0 deletions test/mocks/event.interceptor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import { ConsumeMessage } from 'amqplib';
import {
ReverseFunction,
IRmqInterceptor,
} from '../../lib/interfaces/interceptor.interface';

export class EventInterceptorModule implements IRmqInterceptor {
async intercept(
message: ConsumeMessage,
content: any,
): Promise<ReverseFunction> {
if (content?.array) content.array.push(1);
return async (content: any, message: ConsumeMessage) => {
if (content?.array) content.array.push(6);
};
}
}

export class EventInterceptorClass implements IRmqInterceptor {
async intercept(
message: ConsumeMessage,
content: any,
): Promise<ReverseFunction> {
if (content?.array) content.array.push(2);
return async (content: any, message: ConsumeMessage) => {
if (content?.array) content.array.push(5);
};
}
}

export class EventInterceptorEndpoint implements IRmqInterceptor {
async intercept(
message: ConsumeMessage,
content: any,
): Promise<ReverseFunction> {
content.array.push(3);
return async (content: any, message: ConsumeMessage) => {
content.array.push(4);
};
}
}
2 changes: 2 additions & 0 deletions test/mocks/rmq-nestjs.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { Module } from '@nestjs/common';
import { RmqNestjsModule } from '../../lib';
import { RmqEvents } from './rmq.event';
import { RmqServieController } from './rmq.controller';
import { EventInterceptorModule } from './event.interceptor';

@Module({
imports: [
Expand All @@ -25,6 +26,7 @@ import { RmqServieController } from './rmq.controller';
options: { exclusive: true },
consumOptions: { noAck: true },
},
interceptor: [EventInterceptorModule],
}),
],
providers: [RmqEvents, RmqServieController],
Expand Down
10 changes: 10 additions & 0 deletions test/mocks/rmq.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,16 @@ export class RmqServieController {
);
return sendhi;
}
async sendMessageWithInterceptor(
obj: Record<string, any>,
topic: string = 'text.interceptor',
) {
const sendhi = await this.rmqServie.send<object, { array: number[] }>(
topic,
obj,
);
return sendhi;
}

async sendGlobalRoute(obj: Record<string, any>) {
const message = await this.rmqGlobalService.send<
Expand Down
Loading

0 comments on commit a8105e7

Please sign in to comment.