Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs(cqrs): Added configurable event bus example #1995

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
245 changes: 245 additions & 0 deletions content/recipes/cqrs.md
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,251 @@ export class HeroesGameModule {}

`CommandBus`, `QueryBus` and `EventBus` are **Observables**. This means that you can easily subscribe to the whole stream and enrich your application with **Event Sourcing**.

#### Configurable Event Bus

The Nest `CqrsModule` by default uses an internal (local) event bus. For large applications that span multiple microservices, it is sometimes necesary to use an external event bus (such as Kafka, ActiveMQ or RabbitMQ).

Nest provides two interfaces which allow us to implement a custom event bus. To implement an event publisher we implement the `IEventPublisher` interface and the `publish` function. As an example we have written an event publisher for Kafka which leverages the `kafkajs` node module.

```typescript
@@filename(KafkaPublisher)
class KafkaPublisher implements IEventPublisher {

private readonly kafkaProducer: Producer;

constructor(@Inject('KAFKA_BROKER') broker: string) {
const kafka = new Kafka({
clientId: 'my-app',
brokers: [broker]
})

this.kafkaProducer = kafka.producer();
}

async connect(): Promise<void> {
await this.kafkaProducer.connect();
}

publish<T>(event: T): any {
this.kafkaProducer.send({
topic: event.constructor.name,
messages: [
{ value: JSON.stringify(event) }
]
})
}
}
@@switch
class KafkaPublisher {

constructor(@Inject('KAFKA_BROKER') broker) {
const kafka = new Kafka({
clientId: 'my-app',
brokers: [broker]
})

this.kafkaProducer = kafka.producer();
}

async connect() {
await this.kafkaProducer.connect();
}

publish(event) {
this.kafkaProducer.send({
topic: event.constructor.name,
messages: [
{ value: JSON.stringify(event) }
]
})
}
}
```

Similarly we implement the `IMessageSource` interface and the `bridgeEventsTo` function to setup an event listener.

```typescript
@@filename(KafkaSubscriber)
class KafkaSubscriber implements IMessageSource {

private readonly kafkaConsumer: Consumer
private bridge: Subject<any>
private readonly events: Array<any>;

constructor(@Inject('KAFKA_BROKER') broker: string,
@Inject('EVENTS') events: Array<any>) {
const kafka = new Kafka({
clientId: 'my-app',
brokers: [broker]
})

this.events = events;
this.kafkaConsumer = kafka.consumer({groupId: 'test-group'});
}

async connect(): Promise<void> {
await this.kafkaConsumer.connect();
for(const event of this.events) {
await this.kafkaConsumer.subscribe({ topic: event.name, fromBeginning: false })
}

await this.kafkaConsumer.run({
eachMessage: async ({topic, partition, message}) => {
if(this.bridge) {
for(const event of this.events) {
if(event.name === topic) {
const parsedJson = JSON.parse(message.value.toString());
const receivedEvent = new event(parsedJson)
this.bridge.next(receivedEvent)
}
}
}
}
})
}

bridgeEventsTo<T extends IEvent>(subject: Subject<T>): any {
this.bridge = subject
}
}
@@switch
class KafkaSubscriber {

constructor(@Inject('KAFKA_BROKER') broker,
@Inject('EVENTS') events) {
const kafka = new Kafka({
clientId: 'my-app',
brokers: [broker]
})

this.events = events;
this.kafkaConsumer = kafka.consumer({groupId: 'test-group'});
}

async connect() {
await this.kafkaConsumer.connect();
for(const event of this.events) {
await this.kafkaConsumer.subscribe({ topic: event.name, fromBeginning: false })
}

await this.kafkaConsumer.run({
eachMessage: async ({topic, partition, message}) => {
if(this.bridge) {
for(const event of this.events) {
if(event.name === topic) {
const parsedJson = JSON.parse(message.value.toString());
const receivedEvent = new event(parsedJson)
this.bridge.next(receivedEvent)
}
}
}
}
})
}

bridgeEventsTo(subject) {
this.bridge = subject
}
}
```

We now include these in our `HeroModule` and configure them upon module initialization.

```typescript
@@filename(heroes-game.module)
export const CommandHandlers = [KillDragonHandler, DropAncientItemHandler];
export const EventHandlers = [HeroKilledDragonHandler, HeroFoundItemHandler];
export const Events = [HeroKilledDragonEvent, HeroFoundItemEvent]

@Module({
imports: [CqrsModule, ConfigModule.forRoot()],
controllers: [HeroesGameController],
providers: [
HeroRepository,
...CommandHandlers,
...EventHandlers,
...QueryHandlers,
{
provide: 'EVENTS',
useValue: Events,
},
{
provide: 'KAFKA_BROKER',
useFactory: (configService: ConfigService) => {
return configService.get('KAFKA_BROKER')
},
inject: [ConfigService]
},
HeroesGameSagas,
KafkaPublisher,
KafkaSubscriber
],
})
export class HeroesGameModule implements OnModuleInit {
constructor(
private readonly event$: EventBus,
private readonly kafkaPublisher: KafkaPublisher,
private readonly kafkaSubscriber: KafkaSubscriber,
) {}

async onModuleInit(): Promise<any> {
await this.kafkaSubscriber.connect();
this.kafkaSubscriber.bridgeEventsTo(this.event$.subject$);

await this.kafkaPublisher.connect();
this.event$.publisher = this.kafkaPublisher;
}
}
@@switch
export const CommandHandlers = [KillDragonHandler, DropAncientItemHandler];
export const EventHandlers = [HeroKilledDragonHandler, HeroFoundItemHandler];
export const Events = [HeroKilledDragonEvent, HeroFoundItemEvent]

@Module({
imports: [CqrsModule, ConfigModule.forRoot()],
controllers: [HeroesGameController],
providers: [
HeroRepository,
...CommandHandlers,
...EventHandlers,
...QueryHandlers,
{
provide: 'EVENTS',
useValue: Events,
},
{
provide: 'KAFKA_BROKER',
useFactory: (configService) => {
return configService.get('KAFKA_BROKER')
},
inject: [ConfigService]
},
HeroesGameSagas,
KafkaPublisher,
KafkaSubscriber
],
})
export class HeroesGameModule {
constructor(
event$,
kafkaPublisher,
kafkaSubscriber,
) {
this.kafkaPublisher = kafkaPublisher;
this.kafkaSubscriber = kafkaSubscriber;
this.event$ = event$;
}

async onModuleInit() {
await this.kafkaSubscriber.connect();
this.kafkaSubscriber.bridgeEventsTo(this.event$.subject$);

await this.kafkaPublisher.connect();
this.event$.publisher = this.kafkaPublisher;
}
}
```

#### Example

A working example is available [here](https://github.com/kamilmysliwiec/nest-cqrs-example).