Skip to content

Commit

Permalink
feat(app-generic): implement memory db and redo queues and workers
Browse files Browse the repository at this point in the history
  • Loading branch information
p-fernandez committed Sep 4, 2023
1 parent 8223aff commit 58c46d7
Show file tree
Hide file tree
Showing 124 changed files with 10,782 additions and 15,717 deletions.
1 change: 0 additions & 1 deletion apps/api/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
"bcrypt": "^5.0.0",
"body-parser": "^1.20.0",
"bull": "^4.2.1",
"bullmq": "^3.10.2",
"class-transformer": "^0.5.1",
"class-validator": "^0.14.0",
"compression": "^1.7.4",
Expand Down
2 changes: 1 addition & 1 deletion apps/api/src/app.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import { BlueprintModule } from './app/blueprint/blueprint.module';
import { TenantModule } from './app/tenant/tenant.module';

const modules: Array<Type | DynamicModule | Promise<DynamicModule> | ForwardReference> = [
InboundParseModule,
OrganizationModule,
SharedModule,
UserModule,
Expand All @@ -59,6 +58,7 @@ const modules: Array<Type | DynamicModule | Promise<DynamicModule> | ForwardRefe
TopicsModule,
BlueprintModule,
TenantModule,
InboundParseModule,
];

const providers: Provider[] = [];
Expand Down
10 changes: 5 additions & 5 deletions apps/api/src/app/events/e2e/delay-events.e2e.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import {
} from '@novu/dal';
import { UserSession, SubscribersService } from '@novu/testing';
import { StepTypeEnum, DelayTypeEnum, DigestUnitEnum, DigestTypeEnum } from '@novu/shared';
import { QueueService } from '@novu/application-generic';
import { StandardQueueService } from '@novu/application-generic';

const axiosInstance = axios.create();

Expand All @@ -21,7 +21,7 @@ describe('Trigger event - Delay triggered events - /v1/events/trigger (POST)', f
let subscriber: SubscriberEntity;
let subscriberService: SubscribersService;
const jobRepository = new JobRepository();
let queueService: QueueService;
let standardQueueService: StandardQueueService;
const messageRepository = new MessageRepository();

const triggerEvent = async (payload, transactionId?: string, overrides = {}) => {
Expand All @@ -48,7 +48,7 @@ describe('Trigger event - Delay triggered events - /v1/events/trigger (POST)', f
template = await session.createTemplate();
subscriberService = new SubscribersService(session.organization._id, session.environment._id);
subscriber = await subscriberService.createSubscriber();
queueService = session?.testServer?.getService(QueueService);
standardQueueService = session?.testServer?.getService(StandardQueueService);
});

it('should delay event for time interval', async function () {
Expand Down Expand Up @@ -141,7 +141,7 @@ describe('Trigger event - Delay triggered events - /v1/events/trigger (POST)', f
customVar: 'Testing of User Name',
},
id,
{ delay: { amount: 3, unit: DigestUnitEnum.SECONDS } }
{ delay: { amount: 1, unit: DigestUnitEnum.SECONDS } }
);
await session.awaitRunningJobs(template?._id, true, 0);
const messages = await messageRepository.find({
Expand Down Expand Up @@ -190,7 +190,7 @@ describe('Trigger event - Delay triggered events - /v1/events/trigger (POST)', f
const updatedAt = delayedJob?.updatedAt as string;
const diff = differenceInMilliseconds(new Date(delayedJob.payload.sendAt), new Date(updatedAt));

const delay = await queueService.bullMqService.queue.getDelayed();
const delay = await standardQueueService.queue.getDelayed();
expect(delay[0].opts.delay).to.approximately(diff, 1000);
});

Expand Down
3 changes: 0 additions & 3 deletions apps/api/src/app/events/e2e/digest-events.e2e.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import {
} from '@novu/dal';
import { StepTypeEnum, DigestTypeEnum, DigestUnitEnum, IDigestRegularMetadata } from '@novu/shared';
import { UserSession, SubscribersService } from '@novu/testing';
import { QueueService } from '@novu/application-generic';

const axiosInstance = axios.create();

Expand All @@ -24,7 +23,6 @@ describe('Trigger event - Digest triggered events - /v1/events/trigger (POST)',
let subscriber: SubscriberEntity;
let subscriberService: SubscribersService;
const jobRepository = new JobRepository();
let queueService: QueueService;
const messageRepository = new MessageRepository();

const triggerEvent = async (payload, transactionId?: string): Promise<void> => {
Expand All @@ -50,7 +48,6 @@ describe('Trigger event - Digest triggered events - /v1/events/trigger (POST)',
template = await session.createTemplate();
subscriberService = new SubscribersService(session.organization._id, session.environment._id);
subscriber = await subscriberService.createSubscriber();
queueService = session.testServer?.getService(QueueService);
});

it('should digest events within time interval', async function () {
Expand Down
19 changes: 16 additions & 3 deletions apps/api/src/app/events/e2e/trigger-event.e2e.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* eslint-disable @typescript-eslint/no-non-null-assertion */
import { expect } from 'chai';
import axios from 'axios';
import axios, { AxiosResponse } from 'axios';
import { v4 as uuid } from 'uuid';
import { differenceInMilliseconds, subDays } from 'date-fns';
import {
Expand Down Expand Up @@ -433,10 +433,12 @@ describe(`Trigger event - ${eventTriggerPath} (POST)`, function () {
});

it('should correctly set expiration date (TTL) for notification and messages', async function () {
const templateName = template.triggers[0].identifier;

const { data: body } = await axiosInstance.post(
`${session.serverUrl}${eventTriggerPath}`,
{
name: template.triggers[0].identifier,
name: templateName,
to: {
subscriberId: subscriber.subscriberId,
},
Expand All @@ -452,8 +454,19 @@ describe(`Trigger event - ${eventTriggerPath} (POST)`, function () {
}
);

expect(body.data).to.have.all.keys('acknowledged', 'status', 'transactionId');
expect(body.data.acknowledged).to.equal(true);
expect(body.data.status).to.equal('processed');
expect(body.data.transaction).to.be.a.string;

await session.awaitRunningJobs(template._id);

const jobs = await jobRepository.find({
_templateId: template._id,
_environmentId: session.environment._id,
});
expect(jobs.length).to.equal(2);

const notifications = await notificationRepository.findBySubscriberId(session.environment._id, subscriber._id);

expect(notifications.length).to.equal(1);
Expand Down Expand Up @@ -1836,7 +1849,7 @@ export async function sendTrigger(
newSubscriberIdInAppNotification: string,
payload: Record<string, unknown> = {},
overrides: Record<string, unknown> = {}
) {
): Promise<AxiosResponse> {
return await axiosInstance.post(
`${session.serverUrl}${eventTriggerPath}`,
{
Expand Down
55 changes: 38 additions & 17 deletions apps/api/src/app/events/events.module.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,28 @@
import { Module } from '@nestjs/common';
import { TerminusModule } from '@nestjs/terminus';

import {
AddDelayJob,
AddDigestJob,
AddJob,
bullMqService,
CreateNotificationJobs,
DigestFilterSteps,
DigestFilterStepsBackoff,
DigestFilterStepsRegular,
DigestFilterStepsTimed,
EventsDistributedLockService,
GetNovuProviderCredentials,
ProcessSubscriber,
QueuesModule,
StorageHelperService,
SendTestEmail,
QueueService,
CalculateDelayService,
GetNovuProviderCredentials,
StoreSubscriberJobs,
TriggerEvent,
} from '@novu/application-generic';

import { EventsController } from './events.controller';
import { TriggerHandlerQueueService } from './services/workflow-queue/trigger-handler-queue.service';
import { EventsWorkflowQueueService } from './services';
import { USE_CASES } from './usecases';

import { SharedModule } from '../shared/shared.module';
Expand All @@ -25,6 +37,26 @@ import { TopicsModule } from '../topics/topics.module';
import { LayoutsModule } from '../layouts/layouts.module';
import { TenantModule } from '../tenant/tenant.module';

const PROVIDERS = [
AddDelayJob,
AddDigestJob,
AddJob,
bullMqService,
CreateNotificationJobs,
DigestFilterSteps,
DigestFilterStepsBackoff,
DigestFilterStepsRegular,
DigestFilterStepsTimed,
EventsWorkflowQueueService,
GetNovuProviderCredentials,
StorageHelperService,
EventsDistributedLockService,
ProcessSubscriber,
SendTestEmail,
StoreSubscriberJobs,
TriggerEvent,
];

@Module({
imports: [
SharedModule,
Expand All @@ -39,20 +71,9 @@ import { TenantModule } from '../tenant/tenant.module';
TopicsModule,
LayoutsModule,
TenantModule,
QueuesModule,
],
controllers: [EventsController],
providers: [
...USE_CASES,
{
provide: QueueService,
useClass: QueueService,
},
StorageHelperService,
TriggerHandlerQueueService,
EventsDistributedLockService,
SendTestEmail,
CalculateDelayService,
GetNovuProviderCredentials,
],
providers: [...PROVIDERS, ...USE_CASES],
})
export class EventsModule {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import { Test } from '@nestjs/testing';
import { expect } from 'chai';

import { EventsWorkflowQueueService } from './events-workflow-queue.service';

let eventsWorkflowQueueService: EventsWorkflowQueueService;

describe('EventsWorkflowQueue service', () => {
beforeEach(async () => {
eventsWorkflowQueueService = new EventsWorkflowQueueService();
await eventsWorkflowQueueService.queue.obliterate();
});

after(async () => {
await eventsWorkflowQueueService.gracefulShutdown();
});

it('should be initialised properly', async () => {
expect(eventsWorkflowQueueService).to.exist;
expect(Object.keys(eventsWorkflowQueueService)).to.include.members([
'name',
'queue',
'DEFAULT_ATTEMPTS',
'instance',
]);
expect(eventsWorkflowQueueService.name).to.equal('trigger-handler');
expect(await eventsWorkflowQueueService.bullMqService.getRunningStatus()).to.deep.equal({
queueIsPaused: false,
queueName: 'trigger-handler',
workerName: undefined,
workerIsRunning: undefined,
});
expect(eventsWorkflowQueueService.queue).to.deep.include({
_events: {},
_eventsCount: 0,
_maxListeners: undefined,
name: 'trigger-handler',
jobsOpts: {
removeOnComplete: true,
},
});
expect(eventsWorkflowQueueService.queue.opts).to.deep.include({
blockingConnection: false,
connection: {
connectTimeout: 50000,
db: 1,
family: 4,
host: 'localhost',
keepAlive: 7200,
keyPrefix: '',
password: undefined,
port: 6379,
tls: undefined,
},
defaultJobOptions: {
removeOnComplete: true,
},
prefix: 'bull',
sharedConnection: false,
});
expect(eventsWorkflowQueueService.queue.opts.connection).to.deep.include({
host: 'localhost',
port: 6379,
});
});

it('should add a job in the queue', async () => {
const jobId = 'events-workflow-job-id';
const _environmentId = 'events-workflow-environment-id';
const _organizationId = 'events-workflow-organization-id';
const _userId = 'events-workflow-user-id';
const jobData = {
_id: jobId,
test: 'events-workflow-job-data',
_environmentId,
_organizationId,
_userId,
};
await eventsWorkflowQueueService.add(jobId, jobData, _organizationId);

expect(await eventsWorkflowQueueService.queue.getActiveCount()).to.equal(0);
expect(await eventsWorkflowQueueService.queue.getWaitingCount()).to.equal(1);

const eventsWorkflowQueueJobs = await eventsWorkflowQueueService.queue.getJobs();
expect(eventsWorkflowQueueJobs.length).to.equal(1);
const [eventsWorkflowQueueJob] = eventsWorkflowQueueJobs;
expect(eventsWorkflowQueueJob).to.deep.include({
id: '1',
name: jobId,
data: jobData,
attemptsMade: 0,
});
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import { Injectable } from '@nestjs/common';
import { WorkflowQueueService } from '@novu/application-generic';

@Injectable()
export class EventsWorkflowQueueService extends WorkflowQueueService {}
1 change: 1 addition & 0 deletions apps/api/src/app/events/services/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export { EventsWorkflowQueueService } from './events-workflow-queue.service';
Loading

0 comments on commit 58c46d7

Please sign in to comment.