Skip to content

Commit

Permalink
docs(cqrs): Added configurable event bus example
Browse files Browse the repository at this point in the history
  • Loading branch information
bradsheppard committed Aug 4, 2021
1 parent 0f1873c commit 827645d
Showing 1 changed file with 245 additions and 0 deletions.
245 changes: 245 additions & 0 deletions content/recipes/cqrs.md
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,251 @@ export class HeroesGameModule {}

`CommandBus`, `QueryBus` and `EventBus` are **Observables**. This means that you can easily subscribe to the whole stream and enrich your application with **Event Sourcing**.

#### Configurable Event Bus

The Nest `CqrsModule` by default uses an internal (local) event bus. For large applications that span multiple microservices, it is sometimes necesary to use an external event bus (such as Kafka, ActiveMQ or RabbitMQ).

Nest provides two interfaces which allow us to implement a custom event bus. To implement an event publisher we implement the `IEventPublisher` interface and the `publish` function. As an example we have written an event publisher for Kafka which leverages the `kafkajs` node module.

```typescript
@@filename(KafkaPublisher)
class KafkaPublisher implements IEventPublisher {

private readonly kafkaProducer: Producer;

constructor(@Inject('KAFKA_BROKER') broker: string) {
const kafka = new Kafka({
clientId: 'my-app',
brokers: [broker]
})

this.kafkaProducer = kafka.producer();
}

async connect(): Promise<void> {
await this.kafkaProducer.connect();
}

publish<T>(event: T): any {
this.kafkaProducer.send({
topic: event.constructor.name,
messages: [
{ value: JSON.stringify(event) }
]
})
}
}
@@switch
class KafkaPublisher {

constructor(@Inject('KAFKA_BROKER') broker) {
const kafka = new Kafka({
clientId: 'my-app',
brokers: [broker]
})

this.kafkaProducer = kafka.producer();
}

async connect() {
await this.kafkaProducer.connect();
}

publish(event) {
this.kafkaProducer.send({
topic: event.constructor.name,
messages: [
{ value: JSON.stringify(event) }
]
})
}
}
```

Similarly we implement the `IMessageSource` interface and the `bridgeEventsTo` function to setup an event listener.

```typescript
@@filename(KafkaSubscriber)
class KafkaSubscriber implements IMessageSource {

private readonly kafkaConsumer: Consumer
private bridge: Subject<any>
private readonly events: Array<any>;

constructor(@Inject('KAFKA_BROKER') broker: string,
@Inject('EVENTS') events: Array<any>) {
const kafka = new Kafka({
clientId: 'my-app',
brokers: [broker]
})

this.events = events;
this.kafkaConsumer = kafka.consumer({groupId: 'test-group'});
}

async connect(): Promise<void> {
await this.kafkaConsumer.connect();
for(const event of this.events) {
await this.kafkaConsumer.subscribe({ topic: event.name, fromBeginning: false })
}

await this.kafkaConsumer.run({
eachMessage: async ({topic, partition, message}) => {
if(this.bridge) {
for(const event of this.events) {
if(event.name === topic) {
const parsedJson = JSON.parse(message.value.toString());
const receivedEvent = new event(parsedJson)
this.bridge.next(receivedEvent)
}
}
}
}
})
}

bridgeEventsTo<T extends IEvent>(subject: Subject<T>): any {
this.bridge = subject
}
}
@@switch
class KafkaSubscriber {

constructor(@Inject('KAFKA_BROKER') broker,
@Inject('EVENTS') events) {
const kafka = new Kafka({
clientId: 'my-app',
brokers: [broker]
})

this.events = events;
this.kafkaConsumer = kafka.consumer({groupId: 'test-group'});
}

async connect() {
await this.kafkaConsumer.connect();
for(const event of this.events) {
await this.kafkaConsumer.subscribe({ topic: event.name, fromBeginning: false })
}

await this.kafkaConsumer.run({
eachMessage: async ({topic, partition, message}) => {
if(this.bridge) {
for(const event of this.events) {
if(event.name === topic) {
const parsedJson = JSON.parse(message.value.toString());
const receivedEvent = new event(parsedJson)
this.bridge.next(receivedEvent)
}
}
}
}
})
}

bridgeEventsTo(subject) {
this.bridge = subject
}
}
```

We now include these in our `HeroModule` and configure them upon module initialization.

```typescript
@@filename(heroes-game.module)
export const CommandHandlers = [KillDragonHandler, DropAncientItemHandler];
export const EventHandlers = [HeroKilledDragonHandler, HeroFoundItemHandler];
export const Events = [HeroKilledDragonEvent, HeroFoundItemEvent]

@Module({
imports: [CqrsModule, ConfigModule.forRoot()],
controllers: [HeroesGameController],
providers: [
HeroRepository,
...CommandHandlers,
...EventHandlers,
...QueryHandlers,
{
provide: 'EVENTS',
useValue: Events,
},
{
provide: 'KAFKA_BROKER',
useFactory: (configService: ConfigService) => {
return configService.get('KAFKA_BROKER')
},
inject: [ConfigService]
},
HeroesGameSagas,
KafkaPublisher,
KafkaSubscriber
],
})
export class HeroesGameModule implements OnModuleInit {
constructor(
private readonly event$: EventBus,
private readonly kafkaPublisher: KafkaPublisher,
private readonly kafkaSubscriber: KafkaSubscriber,
) {}

async onModuleInit(): Promise<any> {
await this.kafkaSubscriber.connect();
this.kafkaSubscriber.bridgeEventsTo(this.event$.subject$);

await this.kafkaPublisher.connect();
this.event$.publisher = this.kafkaPublisher;
}
}
@@switch
export const CommandHandlers = [KillDragonHandler, DropAncientItemHandler];
export const EventHandlers = [HeroKilledDragonHandler, HeroFoundItemHandler];
export const Events = [HeroKilledDragonEvent, HeroFoundItemEvent]

@Module({
imports: [CqrsModule, ConfigModule.forRoot()],
controllers: [HeroesGameController],
providers: [
HeroRepository,
...CommandHandlers,
...EventHandlers,
...QueryHandlers,
{
provide: 'EVENTS',
useValue: Events,
},
{
provide: 'KAFKA_BROKER',
useFactory: (configService) => {
return configService.get('KAFKA_BROKER')
},
inject: [ConfigService]
},
HeroesGameSagas,
KafkaPublisher,
KafkaSubscriber
],
})
export class HeroesGameModule {
constructor(
event$,
kafkaPublisher,
kafkaSubscriber,
) {
this.kafkaPublisher = kafkaPublisher;
this.kafkaSubscriber = kafkaSubscriber;
this.event$ = event$;
}

async onModuleInit() {
await this.kafkaSubscriber.connect();
this.kafkaSubscriber.bridgeEventsTo(this.event$.subject$);

await this.kafkaPublisher.connect();
this.event$.publisher = this.kafkaPublisher;
}
}
```

#### Example

A working example is available [here](https://github.com/kamilmysliwiec/nest-cqrs-example).

0 comments on commit 827645d

Please sign in to comment.