Skip to content

Commit

Permalink
Merge pull request #4531 from mahendraHegde/idempotency-impl
Browse files Browse the repository at this point in the history
feat: add idempotency interceptor
  • Loading branch information
LetItRock authored Oct 27, 2023
2 parents 4df730b + d0dc15a commit 460708d
Show file tree
Hide file tree
Showing 12 changed files with 630 additions and 3 deletions.
5 changes: 4 additions & 1 deletion .cspell.json
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,10 @@
"Kamil",
"Myśliwiec",
"nestframework",
"ryver"
"ryver",
"idempotency",
"IDEMPOTENCY",
"Idempotency"
],
"flagWords": [],
"patterns": [
Expand Down
307 changes: 307 additions & 0 deletions apps/api/e2e/idempotency.e2e.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,307 @@
import { UserSession } from '@novu/testing';
import { CacheService } from '@novu/application-generic';
import { expect } from 'chai';
describe('Idempotency Test', async () => {
let session: UserSession;
const path = '/v1/testing/idempotency';
const HEADER_KEYS = {
IDEMPOTENCY_KEY: 'idempotency-key',
RETRY_AFTER: 'retry-after',
IDEMPOTENCY_REPLAY: 'idempotency-replay',
LINK: 'link',
};
const DOCS_LINK = 'docs.novu.co/idempotency';

let cacheService: CacheService | null = null;

describe('when enabled', () => {
before(async () => {
session = new UserSession();
await session.initialize();
cacheService = session.testServer?.getService(CacheService);
process.env.IS_API_IDEMPOTENCY_ENABLED = 'true';
});

it('should return cached same response for duplicate requests', async () => {
const key = `1`;
const { body, headers } = await session.testAgent
.post(path)
.set(HEADER_KEYS.IDEMPOTENCY_KEY, key)
.set('authorization', `ApiKey ${session.apiKey}`)
.send({ data: 201 })
.expect(201);
const { body: bodyDupe, headers: headerDupe } = await session.testAgent
.post(path)
.set(HEADER_KEYS.IDEMPOTENCY_KEY, key)
.set('authorization', `ApiKey ${session.apiKey}`)
.send({ data: 201 })
.expect(201);
expect(typeof body.data.number === 'number').to.be.true;
expect(body.data.number).to.equal(bodyDupe.data.number);
expect(headers[HEADER_KEYS.IDEMPOTENCY_KEY]).to.eq(key);
expect(headerDupe[HEADER_KEYS.IDEMPOTENCY_KEY]).to.eq(key);
expect(headerDupe[HEADER_KEYS.IDEMPOTENCY_REPLAY]).to.eq('true');
});
it('should return cached and use correct cache key when apiKey is used', async () => {
const key = `2`;
const { body, headers } = await session.testAgent
.post(path)
.set(HEADER_KEYS.IDEMPOTENCY_KEY, key)
.set('authorization', `ApiKey ${session.apiKey}`)
.send({ data: 201 })
.expect(201);
const cacheKey = `test-${session.organization._id}-${key}`;
session.testServer?.getHttpServer();
// eslint-disable-next-line @typescript-eslint/no-non-null-asserted-optional-chain
const cacheVal = JSON.stringify(JSON.parse(await cacheService?.get(cacheKey)!).data);
expect(JSON.stringify(body)).to.eq(cacheVal);
const { body: bodyDupe, headers: headerDupe } = await session.testAgent
.post(path)
.set(HEADER_KEYS.IDEMPOTENCY_KEY, key)
.set('authorization', `ApiKey ${session.apiKey}`)
.send({ data: 201 })
.expect(201);
expect(typeof body.data.number === 'number').to.be.true;
expect(body.data.number).to.equal(bodyDupe.data.number);
expect(headers[HEADER_KEYS.IDEMPOTENCY_KEY]).to.eq(key);
expect(headerDupe[HEADER_KEYS.IDEMPOTENCY_KEY]).to.eq(key);
expect(headerDupe[HEADER_KEYS.IDEMPOTENCY_REPLAY]).to.eq('true');
});
it('should return cached and use correct cache key when authToken and apiKey combination is used', async () => {
const key = `3`;
const { body, headers } = await session.testAgent
.post(path)
.set(HEADER_KEYS.IDEMPOTENCY_KEY, key)
.set('authorization', session.token)
.send({ data: 201 })
.expect(201);
const cacheKey = `test-${session.organization._id}-${key}`;
session.testServer?.getHttpServer();
// eslint-disable-next-line @typescript-eslint/no-non-null-asserted-optional-chain
const cacheVal = JSON.stringify(JSON.parse(await cacheService?.get(cacheKey)!).data);
expect(JSON.stringify(body)).to.eq(cacheVal);
const { body: bodyDupe, headers: headerDupe } = await session.testAgent
.post(path)
.set(HEADER_KEYS.IDEMPOTENCY_KEY, key)
.set('authorization', `ApiKey ${session.apiKey}`)
.send({ data: 201 })
.expect(201);
expect(typeof body.data.number === 'number').to.be.true;
expect(body.data.number).to.equal(bodyDupe.data.number);
expect(headers[HEADER_KEYS.IDEMPOTENCY_KEY]).to.eq(key);
expect(headerDupe[HEADER_KEYS.IDEMPOTENCY_KEY]).to.eq(key);
expect(headerDupe[HEADER_KEYS.IDEMPOTENCY_REPLAY]).to.eq('true');
});
it('should return conflict when concurrent requests are made', async () => {
const key = `4`;
const [{ headers, body, status }, { headers: headerDupe, body: bodyDupe, status: statusDupe }] =
await Promise.all([
session.testAgent.post(path).set(HEADER_KEYS.IDEMPOTENCY_KEY, key).send({ data: 250 }),
session.testAgent.post(path).set(HEADER_KEYS.IDEMPOTENCY_KEY, key).send({ data: 250 }),
]);
const oneSuccess = status === 201 || statusDupe === 201;
const oneConflict = status === 409 || statusDupe === 409;
const conflictBody = status === 201 ? bodyDupe : body;
const retryHeader = headers[HEADER_KEYS.RETRY_AFTER] || headerDupe[HEADER_KEYS.RETRY_AFTER];
expect(oneSuccess).to.be.true;
expect(oneConflict).to.be.true;
expect(headers[HEADER_KEYS.IDEMPOTENCY_KEY]).to.eq(key);
expect(headerDupe[HEADER_KEYS.IDEMPOTENCY_KEY]).to.eq(key);
expect(headerDupe[HEADER_KEYS.LINK]).to.eq(DOCS_LINK);
expect(retryHeader).to.eq(`1`);
expect(JSON.stringify(conflictBody)).to.eq(
JSON.stringify({
message: `Request with key "${key}" is currently being processed. Please retry after 1 second`,
error: 'Conflict',
statusCode: 409,
})
);
});
it('should return conflict when different body is sent for same key', async () => {
const key = '5';
const { headers, body, status } = await session.testAgent
.post(path)
.set(HEADER_KEYS.IDEMPOTENCY_KEY, key)
.set('authorization', `ApiKey ${session.apiKey}`)
.send({ data: 250 });
const {
headers: headerDupe,
body: bodyDupe,
status: statusDupe,
} = await session.testAgent.post(path).set(HEADER_KEYS.IDEMPOTENCY_KEY, key).send({ data: 251 });

const oneSuccess = status === 201 || statusDupe === 201;
const oneConflict = status === 422 || statusDupe === 422;
const conflictBody = status === 201 ? bodyDupe : body;
expect(oneSuccess).to.be.true;
expect(oneConflict).to.be.true;
expect(headers[HEADER_KEYS.IDEMPOTENCY_KEY]).to.eq(key);
expect(headerDupe[HEADER_KEYS.IDEMPOTENCY_KEY]).to.eq(key);
expect(headerDupe[HEADER_KEYS.LINK]).to.eq(DOCS_LINK);
expect(JSON.stringify(conflictBody)).to.eq(
JSON.stringify({
message: `Request with key "${key}" is being reused for a different body`,
error: 'Unprocessable Entity',
statusCode: 422,
})
);
});
it('should return non cached response for unique requests', async () => {
const key = '6';
const key1 = '7';
const { body, headers } = await session.testAgent
.post(path)
.set(HEADER_KEYS.IDEMPOTENCY_KEY, key)
.set('authorization', `ApiKey ${session.apiKey}`)
.send({ data: 201 })
.expect(201);

const { body: bodyDupe, headers: headerDupe } = await session.testAgent
.post(path)
.set(HEADER_KEYS.IDEMPOTENCY_KEY, key1)
.send({ data: 201 })
.expect(201);
expect(typeof body.data.number === 'number').to.be.true;
expect(typeof bodyDupe.data.number === 'number').to.be.true;
expect(body.data.number).not.to.equal(bodyDupe.data.number);
expect(headers[HEADER_KEYS.IDEMPOTENCY_KEY]).to.eq(key);
expect(headerDupe[HEADER_KEYS.IDEMPOTENCY_KEY]).to.eq(key1);
});
it('should return non cached response for GET requests', async () => {
const key = '8';
const { body, headers } = await session.testAgent
.get(path)
.set(HEADER_KEYS.IDEMPOTENCY_KEY, key)
.set('authorization', `ApiKey ${session.apiKey}`)
.send({})
.expect(200);

const { body: bodyDupe } = await session.testAgent
.get(path)
.set(HEADER_KEYS.IDEMPOTENCY_KEY, key)
.set('authorization', `ApiKey ${session.apiKey}`)
.send({})
.expect(200);
expect(typeof body.data.number === 'number').to.be.true;
expect(typeof bodyDupe.data.number === 'number').to.be.true;
expect(body.data.number).not.to.equal(bodyDupe.data.number);
expect(headers[HEADER_KEYS.IDEMPOTENCY_KEY]).to.eq(undefined);
});
it('should return cached error response for duplicate requests', async () => {
const key = '9';
const { body, headers } = await session.testAgent
.post(path)
.set(HEADER_KEYS.IDEMPOTENCY_KEY, key)
.set('authorization', `ApiKey ${session.apiKey}`)
.send({ data: 422 })
.expect(422);

const { body: bodyDupe, headers: headerDupe } = await session.testAgent
.post(path)
.set(HEADER_KEYS.IDEMPOTENCY_KEY, key)
.set('authorization', `ApiKey ${session.apiKey}`)
.send({ data: 422 })
.expect(422);
expect(JSON.stringify(body)).to.equal(JSON.stringify(bodyDupe));

expect(headers[HEADER_KEYS.IDEMPOTENCY_KEY]).to.eq(key);
expect(headerDupe[HEADER_KEYS.IDEMPOTENCY_KEY]).to.eq(key);
});
it('should return 400 when key bigger than allowed limit', async () => {
const key = Array.from({ length: 256 })
.fill(0)
.map((i) => i)
.join('');
const { body } = await session.testAgent
.post(path)
.set(HEADER_KEYS.IDEMPOTENCY_KEY, key)
.set('authorization', `ApiKey ${session.apiKey}`)
.send({ data: 250 })
.expect(400);
expect(JSON.stringify(body)).to.eq(
JSON.stringify({
message: `idempotencyKey "${key}" has exceeded the maximum allowed length of 255 characters`,
error: 'Bad Request',
statusCode: 400,
})
);
});
});

describe('when disabled', () => {
before(async () => {
session = new UserSession();
await session.initialize();
process.env.IS_API_IDEMPOTENCY_ENABLED = 'false';
});

it('should not return cached same response for duplicate requests', async () => {
const key = '10';
const { body } = await session.testAgent
.post(path)
.set(HEADER_KEYS.IDEMPOTENCY_KEY, key)
.set('authorization', `ApiKey ${session.apiKey}`)
.send({ data: 201 })
.expect(201);

const { body: bodyDupe } = await session.testAgent
.post(path)
.set(HEADER_KEYS.IDEMPOTENCY_KEY, key)
.set('authorization', `ApiKey ${session.apiKey}`)
.send({ data: 201 })
.expect(201);
expect(typeof body.data.number === 'number').to.be.true;
expect(body.data.number).not.to.equal(bodyDupe.data.number);
});
it('should return non cached response for unique requests', async () => {
const key = '11';
const key1 = '12';
const { body } = await session.testAgent
.post(path)
.set(HEADER_KEYS.IDEMPOTENCY_KEY, key)
.set('authorization', `ApiKey ${session.apiKey}`)
.send({ data: 201 })
.expect(201);

const { body: bodyDupe } = await session.testAgent
.post(path)
.set(HEADER_KEYS.IDEMPOTENCY_KEY, key1)
.send({ data: 201 })
.expect(201);
expect(typeof body.data.number === 'number').to.be.true;
expect(typeof bodyDupe.data.number === 'number').to.be.true;
expect(body.data.number).not.to.equal(bodyDupe.data.number);
});
it('should return non cached response for GET requests', async () => {
const key = '13';
const { body } = await session.testAgent.get(path).set(HEADER_KEYS.IDEMPOTENCY_KEY, key).send({}).expect(200);

const { body: bodyDupe } = await session.testAgent
.get(path)
.set(HEADER_KEYS.IDEMPOTENCY_KEY, key)
.set('authorization', `ApiKey ${session.apiKey}`)
.send({})
.expect(200);
expect(typeof body.data.number === 'number').to.be.true;
expect(typeof bodyDupe.data.number === 'number').to.be.true;
expect(body.data.number).not.to.equal(bodyDupe.data.number);
});
it('should not return cached error response for duplicate requests', async () => {
const key = '14';
const { body } = await session.testAgent
.post(path)
.set(HEADER_KEYS.IDEMPOTENCY_KEY, key)
.set('authorization', `ApiKey ${session.apiKey}`)
.send({ data: '500' })
.expect(500);

const { body: bodyDupe } = await session.testAgent
.post(path)
.set(HEADER_KEYS.IDEMPOTENCY_KEY, key)
.set('authorization', `ApiKey ${session.apiKey}`)
.send({ data: '500' })
.expect(500);
expect(JSON.stringify(body)).not.to.equal(JSON.stringify(bodyDupe));
});
});
});
2 changes: 2 additions & 0 deletions apps/api/src/.env.development
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,5 @@ NOVU_SMS_INTEGRATION_SENDER=
INTERCOM_IDENTITY_VERIFICATION_SECRET_KEY=

LAUNCH_DARKLY_SDK_KEY=

IS_API_IDEMPOTENCY_ENABLED=false
2 changes: 2 additions & 0 deletions apps/api/src/.env.production
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,5 @@ NOVU_SMS_INTEGRATION_SENDER=
INTERCOM_IDENTITY_VERIFICATION_SECRET_KEY=

LAUNCH_DARKLY_SDK_KEY=

IS_API_IDEMPOTENCY_ENABLED=false
2 changes: 2 additions & 0 deletions apps/api/src/.env.test
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,5 @@ MAX_NOVU_INTEGRATION_SMS_REQUESTS=20
NOVU_SMS_INTEGRATION_ACCOUNT_SID=test
NOVU_SMS_INTEGRATION_TOKEN=test
NOVU_SMS_INTEGRATION_SENDER=1234567890

IS_API_IDEMPOTENCY_ENABLED=true
8 changes: 7 additions & 1 deletion apps/api/src/app.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import { TopicsModule } from './app/topics/topics.module';
import { InboundParseModule } from './app/inbound-parse/inbound-parse.module';
import { BlueprintModule } from './app/blueprint/blueprint.module';
import { TenantModule } from './app/tenant/tenant.module';
import { IdempotencyInterceptor } from './app/shared/framework/idempotency.interceptor';

const enterpriseImports = (): Array<Type | DynamicModule | Promise<DynamicModule> | ForwardReference> => {
const modules: Array<Type | DynamicModule | Promise<DynamicModule> | ForwardReference> = [];
Expand Down Expand Up @@ -78,7 +79,12 @@ const enterpriseModules = enterpriseImports();

const modules = baseModules.concat(enterpriseModules);

const providers: Provider[] = [];
const providers: Provider[] = [
{
provide: APP_INTERCEPTOR,
useClass: IdempotencyInterceptor,
},
];

if (process.env.SENTRY_DSN) {
modules.push(RavenModule);
Expand Down
Loading

0 comments on commit 460708d

Please sign in to comment.