diff --git a/content/recipes/cqrs.md b/content/recipes/cqrs.md index 9d37f5bed2..ce25287df0 100644 --- a/content/recipes/cqrs.md +++ b/content/recipes/cqrs.md @@ -290,6 +290,251 @@ export class HeroesGameModule {} `CommandBus`, `QueryBus` and `EventBus` are **Observables**. This means that you can easily subscribe to the whole stream and enrich your application with **Event Sourcing**. +#### Configurable Event Bus + +The Nest `CqrsModule` by default uses an internal (local) event bus. For large applications that span multiple microservices, it is sometimes necesary to use an external event bus (such as Kafka, ActiveMQ or RabbitMQ). + +Nest provides two interfaces which allow us to implement a custom event bus. To implement an event publisher we implement the `IEventPublisher` interface and the `publish` function. As an example we have written an event publisher for Kafka which leverages the `kafkajs` node module. + +```typescript +@@filename(KafkaPublisher) +class KafkaPublisher implements IEventPublisher { + + private readonly kafkaProducer: Producer; + + constructor(@Inject('KAFKA_BROKER') broker: string) { + const kafka = new Kafka({ + clientId: 'my-app', + brokers: [broker] + }) + + this.kafkaProducer = kafka.producer(); + } + + async connect(): Promise { + await this.kafkaProducer.connect(); + } + + publish(event: T): any { + this.kafkaProducer.send({ + topic: event.constructor.name, + messages: [ + { value: JSON.stringify(event) } + ] + }) + } +} +@@switch +class KafkaPublisher { + + constructor(@Inject('KAFKA_BROKER') broker) { + const kafka = new Kafka({ + clientId: 'my-app', + brokers: [broker] + }) + + this.kafkaProducer = kafka.producer(); + } + + async connect() { + await this.kafkaProducer.connect(); + } + + publish(event) { + this.kafkaProducer.send({ + topic: event.constructor.name, + messages: [ + { value: JSON.stringify(event) } + ] + }) + } +} +``` + +Similarly we implement the `IMessageSource` interface and the `bridgeEventsTo` function to setup an event listener. + +```typescript +@@filename(KafkaSubscriber) +class KafkaSubscriber implements IMessageSource { + + private readonly kafkaConsumer: Consumer + private bridge: Subject + private readonly events: Array; + + constructor(@Inject('KAFKA_BROKER') broker: string, + @Inject('EVENTS') events: Array) { + const kafka = new Kafka({ + clientId: 'my-app', + brokers: [broker] + }) + + this.events = events; + this.kafkaConsumer = kafka.consumer({groupId: 'test-group'}); + } + + async connect(): Promise { + await this.kafkaConsumer.connect(); + for(const event of this.events) { + await this.kafkaConsumer.subscribe({ topic: event.name, fromBeginning: false }) + } + + await this.kafkaConsumer.run({ + eachMessage: async ({topic, partition, message}) => { + if(this.bridge) { + for(const event of this.events) { + if(event.name === topic) { + const parsedJson = JSON.parse(message.value.toString()); + const receivedEvent = new event(parsedJson) + this.bridge.next(receivedEvent) + } + } + } + } + }) + } + + bridgeEventsTo(subject: Subject): any { + this.bridge = subject + } +} +@@switch +class KafkaSubscriber { + + constructor(@Inject('KAFKA_BROKER') broker, + @Inject('EVENTS') events) { + const kafka = new Kafka({ + clientId: 'my-app', + brokers: [broker] + }) + + this.events = events; + this.kafkaConsumer = kafka.consumer({groupId: 'test-group'}); + } + + async connect() { + await this.kafkaConsumer.connect(); + for(const event of this.events) { + await this.kafkaConsumer.subscribe({ topic: event.name, fromBeginning: false }) + } + + await this.kafkaConsumer.run({ + eachMessage: async ({topic, partition, message}) => { + if(this.bridge) { + for(const event of this.events) { + if(event.name === topic) { + const parsedJson = JSON.parse(message.value.toString()); + const receivedEvent = new event(parsedJson) + this.bridge.next(receivedEvent) + } + } + } + } + }) + } + + bridgeEventsTo(subject) { + this.bridge = subject + } +} +``` + +We now include these in our `HeroModule` and configure them upon module initialization. + +```typescript +@@filename(heroes-game.module) +export const CommandHandlers = [KillDragonHandler, DropAncientItemHandler]; +export const EventHandlers = [HeroKilledDragonHandler, HeroFoundItemHandler]; +export const Events = [HeroKilledDragonEvent, HeroFoundItemEvent] + +@Module({ + imports: [CqrsModule, ConfigModule.forRoot()], + controllers: [HeroesGameController], + providers: [ + HeroRepository, + ...CommandHandlers, + ...EventHandlers, + ...QueryHandlers, + { + provide: 'EVENTS', + useValue: Events, + }, + { + provide: 'KAFKA_BROKER', + useFactory: (configService: ConfigService) => { + return configService.get('KAFKA_BROKER') + }, + inject: [ConfigService] + }, + HeroesGameSagas, + KafkaPublisher, + KafkaSubscriber + ], +}) +export class HeroesGameModule implements OnModuleInit { + constructor( + private readonly event$: EventBus, + private readonly kafkaPublisher: KafkaPublisher, + private readonly kafkaSubscriber: KafkaSubscriber, + ) {} + + async onModuleInit(): Promise { + await this.kafkaSubscriber.connect(); + this.kafkaSubscriber.bridgeEventsTo(this.event$.subject$); + + await this.kafkaPublisher.connect(); + this.event$.publisher = this.kafkaPublisher; + } +} +@@switch +export const CommandHandlers = [KillDragonHandler, DropAncientItemHandler]; +export const EventHandlers = [HeroKilledDragonHandler, HeroFoundItemHandler]; +export const Events = [HeroKilledDragonEvent, HeroFoundItemEvent] + +@Module({ + imports: [CqrsModule, ConfigModule.forRoot()], + controllers: [HeroesGameController], + providers: [ + HeroRepository, + ...CommandHandlers, + ...EventHandlers, + ...QueryHandlers, + { + provide: 'EVENTS', + useValue: Events, + }, + { + provide: 'KAFKA_BROKER', + useFactory: (configService) => { + return configService.get('KAFKA_BROKER') + }, + inject: [ConfigService] + }, + HeroesGameSagas, + KafkaPublisher, + KafkaSubscriber + ], +}) +export class HeroesGameModule { + constructor( + event$, + kafkaPublisher, + kafkaSubscriber, + ) { + this.kafkaPublisher = kafkaPublisher; + this.kafkaSubscriber = kafkaSubscriber; + this.event$ = event$; + } + + async onModuleInit() { + await this.kafkaSubscriber.connect(); + this.kafkaSubscriber.bridgeEventsTo(this.event$.subject$); + + await this.kafkaPublisher.connect(); + this.event$.publisher = this.kafkaPublisher; + } +} +``` + #### Example A working example is available [here](https://github.com/kamilmysliwiec/nest-cqrs-example).