Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/next' into add-otel
Browse files Browse the repository at this point in the history
# Conflicts:
#	packages/application-generic/package.json
  • Loading branch information
AliaksandrRyzhou committed Jan 16, 2024
2 parents e9a15fe + e7206eb commit a541b5a
Show file tree
Hide file tree
Showing 34 changed files with 527 additions and 119 deletions.
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
<div align="center">
👨‍💻 <a href="https://novuhq.notion.site/Careers-page-00bffbc69d8b44b790badfeadb7e44c5?utm_source=github" target="_blank">
We are hiring in Europe!</a> 👩‍💻
</div>
<br/>
<br/>

<div align="center">
<a href="https://novu.co?utm_source=github" target="_blank">
<picture>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import { UserSession } from '@novu/testing';
import { CacheService } from '@novu/application-generic';
import { expect } from 'chai';
import { HttpResponseHeaderKeysEnum } from '../src/app/shared/framework/types';
import { HttpResponseHeaderKeysEnum } from './types';
import { DOCS_LINK } from './idempotency.interceptor';

process.env.LAUNCH_DARKLY_SDK_KEY = ''; // disable Launch Darkly to allow test to define FF state

describe('Idempotency Test', async () => {
let session: UserSession;
const path = '/v1/testing/idempotency';
const DOCS_LINK = 'https://docs.novu.co/additional-resources/idempotency';

let cacheService: CacheService | null = null;

describe('when enabled', () => {
Expand All @@ -33,9 +35,9 @@ describe('Idempotency Test', async () => {
.expect(201);
expect(typeof body.data.number === 'number').to.be.true;
expect(body.data.number).to.equal(bodyDupe.data.number);
expect(headers[HttpResponseHeaderKeysEnum.IDEMPOTENCY_KEY]).to.eq(key);
expect(headerDupe[HttpResponseHeaderKeysEnum.IDEMPOTENCY_KEY]).to.eq(key);
expect(headerDupe[HttpResponseHeaderKeysEnum.IDEMPOTENCY_REPLAY]).to.eq('true');
expect(headers[HttpResponseHeaderKeysEnum.IDEMPOTENCY_KEY.toLowerCase()]).to.eq(key);
expect(headerDupe[HttpResponseHeaderKeysEnum.IDEMPOTENCY_KEY.toLowerCase()]).to.eq(key);
expect(headerDupe[HttpResponseHeaderKeysEnum.IDEMPOTENCY_REPLAY.toLowerCase()]).to.eq('true');
});
it('should return cached and use correct cache key when apiKey is used', async () => {
const key = `2`;
Expand All @@ -58,9 +60,9 @@ describe('Idempotency Test', async () => {
.expect(201);
expect(typeof body.data.number === 'number').to.be.true;
expect(body.data.number).to.equal(bodyDupe.data.number);
expect(headers[HttpResponseHeaderKeysEnum.IDEMPOTENCY_KEY]).to.eq(key);
expect(headerDupe[HttpResponseHeaderKeysEnum.IDEMPOTENCY_KEY]).to.eq(key);
expect(headerDupe[HttpResponseHeaderKeysEnum.IDEMPOTENCY_REPLAY]).to.eq('true');
expect(headers[HttpResponseHeaderKeysEnum.IDEMPOTENCY_KEY.toLowerCase()]).to.eq(key);
expect(headerDupe[HttpResponseHeaderKeysEnum.IDEMPOTENCY_KEY.toLowerCase()]).to.eq(key);
expect(headerDupe[HttpResponseHeaderKeysEnum.IDEMPOTENCY_REPLAY.toLowerCase()]).to.eq('true');
});
it('should return cached and use correct cache key when authToken and apiKey combination is used', async () => {
const key = `3`;
Expand All @@ -83,9 +85,9 @@ describe('Idempotency Test', async () => {
.expect(201);
expect(typeof body.data.number === 'number').to.be.true;
expect(body.data.number).to.equal(bodyDupe.data.number);
expect(headers[HttpResponseHeaderKeysEnum.IDEMPOTENCY_KEY]).to.eq(key);
expect(headerDupe[HttpResponseHeaderKeysEnum.IDEMPOTENCY_KEY]).to.eq(key);
expect(headerDupe[HttpResponseHeaderKeysEnum.IDEMPOTENCY_REPLAY]).to.eq('true');
expect(headers[HttpResponseHeaderKeysEnum.IDEMPOTENCY_KEY.toLowerCase()]).to.eq(key);
expect(headerDupe[HttpResponseHeaderKeysEnum.IDEMPOTENCY_KEY.toLowerCase()]).to.eq(key);
expect(headerDupe[HttpResponseHeaderKeysEnum.IDEMPOTENCY_REPLAY.toLowerCase()]).to.eq('true');
});
it('should return conflict when concurrent requests are made', async () => {
const key = `4`;
Expand All @@ -98,12 +100,13 @@ describe('Idempotency Test', async () => {
const oneConflict = status === 409 || statusDupe === 409;
const conflictBody = status === 201 ? bodyDupe : body;
const retryHeader =
headers[HttpResponseHeaderKeysEnum.RETRY_AFTER] || headerDupe[HttpResponseHeaderKeysEnum.RETRY_AFTER];
headers[HttpResponseHeaderKeysEnum.RETRY_AFTER.toLowerCase()] ||
headerDupe[HttpResponseHeaderKeysEnum.RETRY_AFTER.toLowerCase()];
expect(oneSuccess).to.be.true;
expect(oneConflict).to.be.true;
expect(headers[HttpResponseHeaderKeysEnum.IDEMPOTENCY_KEY]).to.eq(key);
expect(headerDupe[HttpResponseHeaderKeysEnum.IDEMPOTENCY_KEY]).to.eq(key);
expect(headerDupe[HttpResponseHeaderKeysEnum.LINK]).to.eq(DOCS_LINK);
expect(headers[HttpResponseHeaderKeysEnum.IDEMPOTENCY_KEY.toLowerCase()]).to.eq(key);
expect(headerDupe[HttpResponseHeaderKeysEnum.IDEMPOTENCY_KEY.toLowerCase()]).to.eq(key);
expect(headerDupe[HttpResponseHeaderKeysEnum.LINK.toLowerCase()]).to.eq(DOCS_LINK);
expect(retryHeader).to.eq(`1`);
expect(JSON.stringify(conflictBody)).to.eq(
JSON.stringify({
Expand Down Expand Up @@ -131,9 +134,9 @@ describe('Idempotency Test', async () => {
const conflictBody = status === 201 ? bodyDupe : body;
expect(oneSuccess).to.be.true;
expect(oneConflict).to.be.true;
expect(headers[HttpResponseHeaderKeysEnum.IDEMPOTENCY_KEY]).to.eq(key);
expect(headerDupe[HttpResponseHeaderKeysEnum.IDEMPOTENCY_KEY]).to.eq(key);
expect(headerDupe[HttpResponseHeaderKeysEnum.LINK]).to.eq(DOCS_LINK);
expect(headers[HttpResponseHeaderKeysEnum.IDEMPOTENCY_KEY.toLowerCase()]).to.eq(key);
expect(headerDupe[HttpResponseHeaderKeysEnum.IDEMPOTENCY_KEY.toLowerCase()]).to.eq(key);
expect(headerDupe[HttpResponseHeaderKeysEnum.LINK.toLowerCase()]).to.eq(DOCS_LINK);
expect(JSON.stringify(conflictBody)).to.eq(
JSON.stringify({
message: `Request with key "${key}" is being reused for a different body`,
Expand All @@ -160,8 +163,8 @@ describe('Idempotency Test', async () => {
expect(typeof body.data.number === 'number').to.be.true;
expect(typeof bodyDupe.data.number === 'number').to.be.true;
expect(body.data.number).not.to.equal(bodyDupe.data.number);
expect(headers[HttpResponseHeaderKeysEnum.IDEMPOTENCY_KEY]).to.eq(key);
expect(headerDupe[HttpResponseHeaderKeysEnum.IDEMPOTENCY_KEY]).to.eq(key1);
expect(headers[HttpResponseHeaderKeysEnum.IDEMPOTENCY_KEY.toLowerCase()]).to.eq(key);
expect(headerDupe[HttpResponseHeaderKeysEnum.IDEMPOTENCY_KEY.toLowerCase()]).to.eq(key1);
});
it('should return non cached response for GET requests', async () => {
const key = '8';
Expand All @@ -181,7 +184,7 @@ describe('Idempotency Test', async () => {
expect(typeof body.data.number === 'number').to.be.true;
expect(typeof bodyDupe.data.number === 'number').to.be.true;
expect(body.data.number).not.to.equal(bodyDupe.data.number);
expect(headers[HttpResponseHeaderKeysEnum.IDEMPOTENCY_KEY]).to.eq(undefined);
expect(headers[HttpResponseHeaderKeysEnum.IDEMPOTENCY_KEY.toLowerCase()]).to.eq(undefined);
});
it('should return cached error response for duplicate requests', async () => {
const key = '9';
Expand All @@ -200,8 +203,8 @@ describe('Idempotency Test', async () => {
.expect(422);
expect(JSON.stringify(body)).to.equal(JSON.stringify(bodyDupe));

expect(headers[HttpResponseHeaderKeysEnum.IDEMPOTENCY_KEY]).to.eq(key);
expect(headerDupe[HttpResponseHeaderKeysEnum.IDEMPOTENCY_KEY]).to.eq(key);
expect(headers[HttpResponseHeaderKeysEnum.IDEMPOTENCY_KEY.toLowerCase()]).to.eq(key);
expect(headerDupe[HttpResponseHeaderKeysEnum.IDEMPOTENCY_KEY.toLowerCase()]).to.eq(key);
});
it('should return 400 when key bigger than allowed limit', async () => {
const key = Array.from({ length: 256 })
Expand All @@ -222,6 +225,41 @@ describe('Idempotency Test', async () => {
})
);
});

describe('Allowed Authentication Security Schemes', () => {
it('should set Idempotency-Key header when ApiKey security scheme is used to authenticate', async () => {
const key = '10';
const { headers } = await session.testAgent
.post(path)
.set(HttpResponseHeaderKeysEnum.IDEMPOTENCY_KEY, key)
.set('authorization', `ApiKey ${session.apiKey}`)
.send({ data: 201 })
.expect(201);
expect(headers[HttpResponseHeaderKeysEnum.IDEMPOTENCY_KEY.toLowerCase()]).to.exist;
});

it('should set rate limit headers when a Bearer security scheme is used to authenticate', async () => {
const key = '10';
const { headers } = await session.testAgent
.post(path)
.set(HttpResponseHeaderKeysEnum.IDEMPOTENCY_KEY, key)
.set('authorization', session.token)
.send({ data: 201 })
.expect(201);
expect(headers[HttpResponseHeaderKeysEnum.IDEMPOTENCY_KEY.toLowerCase()]).to.exist;
});

it('should NOT set rate limit headers when NO authorization header is present', async () => {
const key = '10';
const { headers } = await session.testAgent
.post(path)
.set(HttpResponseHeaderKeysEnum.IDEMPOTENCY_KEY, key)
.set('authorization', '')
.send({ data: 201 })
.expect(401);
expect(headers[HttpResponseHeaderKeysEnum.IDEMPOTENCY_KEY.toLowerCase()]).not.to.exist;
});
});
});

describe('when disabled', () => {
Expand Down
58 changes: 46 additions & 12 deletions apps/api/src/app/shared/framework/idempotency.interceptor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,37 +11,64 @@ import {
BadRequestException,
ConflictException,
} from '@nestjs/common';
import { CacheService, Instrument } from '@novu/application-generic';
import { CacheService, FeatureFlagCommand, GetIsApiIdempotencyEnabled, Instrument } from '@novu/application-generic';
import { Observable, of, throwError } from 'rxjs';
import { catchError, map } from 'rxjs/operators';
import { createHash } from 'crypto';
import { IJwtPayload } from '@novu/shared';
import { ApiAuthSchemeEnum, IJwtPayload } from '@novu/shared';
import { HttpResponseHeaderKeysEnum } from './types';

const LOG_CONTEXT = 'IdempotencyInterceptor';
const IDEMPOTENCY_CACHE_TTL = 60 * 60 * 24; //24h
const IDEMPOTENCY_PROGRESS_TTL = 60 * 5; //5min

const DOCS_LINK = 'https://docs.novu.co/additional-resources/idempotency';

enum ReqStatusEnum {
PROGRESS = 'in-progress',
SUCCESS = 'success',
ERROR = 'error',
}

export const DOCS_LINK = 'https://docs.novu.co/additional-resources/idempotency';
export const ALLOWED_AUTH_SCHEMES = [ApiAuthSchemeEnum.API_KEY];
const ALLOWED_METHODS = ['post', 'patch'];

@Injectable()
export class IdempotencyInterceptor implements NestInterceptor {
constructor(private readonly cacheService: CacheService) {}
constructor(
private readonly cacheService: CacheService,
private getIsApiIdempotencyEnabled: GetIsApiIdempotencyEnabled
) {}

protected async isEnabled(context: ExecutionContext): Promise<boolean> {
const isAllowedAuthScheme = this.isAllowedAuthScheme(context);
if (!isAllowedAuthScheme) {
return true;
}

const user = this.getReqUser(context);
const { organizationId, environmentId, _id } = user;

const isEnabled = await this.getIsApiIdempotencyEnabled.execute(
FeatureFlagCommand.create({
environmentId,
organizationId,
userId: _id,
})
);

return isEnabled;
}

@Instrument()
async intercept(context: ExecutionContext, next: CallHandler): Promise<Observable<any>> {
const request = context.switchToHttp().getRequest();
const isAllowedMethod = ALLOWED_METHODS.includes(request.method.toLowerCase());
const idempotencyKey = this.getIdempotencyKey(context);
const isEnabled = process.env.IS_API_IDEMPOTENCY_ENABLED == 'true';
if (!isEnabled || !idempotencyKey || !['post', 'patch'].includes(request.method.toLowerCase())) {
const isEnabled = await this.isEnabled(context);
if (!idempotencyKey || !isAllowedMethod || !isEnabled) {
return next.handle();
}

if (idempotencyKey?.length > 255) {
return throwError(
() =>
Expand Down Expand Up @@ -93,6 +120,13 @@ export class IdempotencyInterceptor implements NestInterceptor {
return req.user;
}

private isAllowedAuthScheme(context: ExecutionContext): boolean {
const req = context.switchToHttp().getRequest();
const authScheme = req.authScheme;

return ALLOWED_AUTH_SCHEMES.some((scheme) => authScheme === scheme);
}

private getCacheKey(context: ExecutionContext): string {
const user = this.getReqUser(context);
if (user === undefined) {
Expand Down Expand Up @@ -158,7 +192,7 @@ export class IdempotencyInterceptor implements NestInterceptor {
const parsed = JSON.parse(data);
if (parsed.status === ReqStatusEnum.PROGRESS) {
// api call is in progress, so client need to handle this case
Logger.error(`previous api call in progress rejecting the request. key:${idempotencyKey}`, LOG_CONTEXT);
Logger.verbose(`previous api call in progress rejecting the request. key: "${idempotencyKey}"`, LOG_CONTEXT);
this.setHeaders(context.switchToHttp().getResponse(), {
[HttpResponseHeaderKeysEnum.RETRY_AFTER]: `1`,
[HttpResponseHeaderKeysEnum.LINK]: DOCS_LINK,
Expand All @@ -170,7 +204,7 @@ export class IdempotencyInterceptor implements NestInterceptor {
}
if (bodyHash !== parsed.bodyHash) {
//different body sent than before
Logger.error(`idempotency key is being reused for different bodies. key:${idempotencyKey}`, LOG_CONTEXT);
Logger.verbose(`idempotency key is being reused for different bodies. key: "${idempotencyKey}"`, LOG_CONTEXT);
this.setHeaders(context.switchToHttp().getResponse(), {
[HttpResponseHeaderKeysEnum.LINK]: DOCS_LINK,
});
Expand All @@ -183,7 +217,7 @@ export class IdempotencyInterceptor implements NestInterceptor {

//already seen the request return cached response
if (parsed.status === ReqStatusEnum.ERROR) {
Logger.error(`returning cached error response. key:${idempotencyKey}`, LOG_CONTEXT);
Logger.verbose(`returning cached error response. key: "${idempotencyKey}"`, LOG_CONTEXT);

throw this.buildError(parsed.data);
}
Expand All @@ -210,7 +244,7 @@ export class IdempotencyInterceptor implements NestInterceptor {
{ status: ReqStatusEnum.SUCCESS, bodyHash, statusCode: statusCode, data: response },
IDEMPOTENCY_CACHE_TTL
);
Logger.verbose(`cached the success response for idempotency key:${idempotencyKey}`, LOG_CONTEXT);
Logger.verbose(`cached the success response for idempotency key: "${idempotencyKey}"`, LOG_CONTEXT);
this.setHeaders(httpResponse, { [HttpResponseHeaderKeysEnum.IDEMPOTENCY_KEY]: idempotencyKey });

return response;
Expand All @@ -229,7 +263,7 @@ export class IdempotencyInterceptor implements NestInterceptor {
},
IDEMPOTENCY_CACHE_TTL
).catch(() => {});
Logger.verbose(`cached the error response for idempotency key:${idempotencyKey}`, LOG_CONTEXT);
Logger.verbose(`cached the error response for idempotency key: "${idempotencyKey}"`, LOG_CONTEXT);
this.setHeaders(context.switchToHttp().getResponse(), {
[HttpResponseHeaderKeysEnum.IDEMPOTENCY_KEY]: idempotencyKey,
});
Expand Down
2 changes: 2 additions & 0 deletions apps/api/src/app/shared/shared.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import {
LoggerModule,
QueuesModule,
storageService,
getIsApiIdempotencyEnabled,
} from '@novu/application-generic';

import * as packageJson from '../../../package.json';
Expand Down Expand Up @@ -90,6 +91,7 @@ const PROVIDERS = [
featureFlagsService,
getIsTopicNotificationEnabled,
getIsApiRateLimitingEnabled,
getIsApiIdempotencyEnabled,
InvalidateCacheService,
storageService,
...DAL_MODELS,
Expand Down
6 changes: 6 additions & 0 deletions apps/web/public/static/images/providers/dark/rocket-chat.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading

0 comments on commit a541b5a

Please sign in to comment.