Skip to content

Commit

Permalink
feat: interceptor on individual modules
Browse files Browse the repository at this point in the history
  • Loading branch information
DIY0R committed Jul 12, 2024
1 parent 4342d24 commit 0eb6802
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 2 deletions.
1 change: 1 addition & 0 deletions lib/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ export const TARGET_MODULE = 'TARGET_MODULE';
export const SER_DAS_KEY = 'SER_DAS_KEY';
export const INTERCEPTOR_KEY = 'INTERCEPTOR_KEY';
export const SERDES = 'SERDES';
export const INTERCEPTORS = 'INTERCEPTORS';

export const INITIALIZATION_STEP_DELAY = 400;
export const DEFAULT_TIMEOUT = 40000;
Expand Down
2 changes: 2 additions & 0 deletions lib/interfaces/rmq-options.interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { Options } from 'amqplib';
import { LoggerService, ModuleMetadata } from '@nestjs/common';
import { RMQIntercepterClass, RMQPipeClass } from '../common';
import { ISerDes } from './serdes.interface';
import { TypeRmqInterceptor } from './interceptor.interface';

export interface IQueue {
queue: string;
Expand Down Expand Up @@ -34,6 +35,7 @@ export interface IMessageBroker {
replyTo?: IQueue;
queue?: IQueue;
serDes?: ISerDes;
interceptor?: TypeRmqInterceptor[];
messageTimeout?: number;
serviceName?: string;
}
Expand Down
8 changes: 7 additions & 1 deletion lib/rmq.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,12 @@ import {
IRabbitMQConfig,
IGlobalOptions,
} from './interfaces';
import { MODULE_TOKEN, RMQ_BROKER_OPTIONS, SERDES } from './constants';
import {
INTERCEPTORS,
MODULE_TOKEN,
RMQ_BROKER_OPTIONS,
SERDES,
} from './constants';
import { DiscoveryModule } from '@nestjs/core';
import { MetaTegsScannerService, getUniqId } from './common';
import { RmqNestjsCoreModule } from './rmq-core.module';
Expand Down Expand Up @@ -39,6 +44,7 @@ export class RmqNestjsModule {
providers: [
{ provide: RMQ_BROKER_OPTIONS, useValue: options },
{ provide: SERDES, useValue: options.serDes ?? serDes },
{ provide: INTERCEPTORS, useValue: options.interceptor ?? [] },
{ provide: MODULE_TOKEN, useFactory: getUniqId },
RmqService,
MetaTegsScannerService,
Expand Down
7 changes: 6 additions & 1 deletion lib/rmq.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
ISerDes,
TypeChanel,
TypeQueue,
TypeRmqInterceptor,
} from './interfaces';
import {
IConsumFunction,
Expand All @@ -26,6 +27,7 @@ import {
INDICATE_REPLY_QUEUE,
INITIALIZATION_STEP_DELAY,
INOF_NOT_FULL_OPTIONS,
INTERCEPTORS,
MODULE_TOKEN,
NACKED,
NON_ROUTE,
Expand Down Expand Up @@ -62,6 +64,7 @@ export class RmqService implements OnModuleInit, OnModuleDestroy {
@Inject(RMQ_BROKER_OPTIONS) private readonly options: IMessageBroker,
@Inject(RMQ_APP_OPTIONS) private readonly globalOptions: IGlobalOptions,
@Inject(SERDES) private readonly serDes: ISerDes,
@Inject(INTERCEPTORS) private readonly interceptors: TypeRmqInterceptor[],
@Inject(MODULE_TOKEN) private readonly moduleToken: string,
) {
this.logger = globalOptions.appOptions?.logger
Expand Down Expand Up @@ -238,7 +241,9 @@ export class RmqService implements OnModuleInit, OnModuleDestroy {
: this.serDes.deserialize(content);
}
private getInterceptors(consumer: MetaTegEnpoint) {
return consumer.interceptors.map((interceptor: any) => new interceptor());
return this.interceptors
.concat(consumer.interceptors)
.map((interceptor: any) => new interceptor());
}
private async handleMessage(
handler: IConsumFunction,
Expand Down

0 comments on commit 0eb6802

Please sign in to comment.