Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Server Side Events endpoint #75

Merged
merged 5 commits into from
Sep 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
905 changes: 487 additions & 418 deletions package-lock.json

Large diffs are not rendered by default.

5 changes: 4 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
"amqplib": "^0.10.3",
"axios": "^1.4.0",
"cache-manager": "^5.2.1",
"ethers": "^6.7.1",
"express-formidable": "^1.2.0",
"express-session": "^1.17.3",
"pg": "^8.11.3",
Expand All @@ -51,6 +52,7 @@
"@nestjs/schematics": "^9.0.0",
"@nestjs/testing": "^9.0.0",
"@types/amqplib": "^0.10.1",
"@types/eventsource": "^1.1.11",
"@types/express": "^4.17.13",
"@types/jest": "^29.5.3",
"@types/node": "18.15.11",
Expand All @@ -60,8 +62,9 @@
"eslint": "^8.45.0",
"eslint-config-prettier": "^8.3.0",
"eslint-plugin-prettier": "^4.0.0",
"eventsource": "^2.0.2",
"husky": "^8.0.3",
"jest": "^29.6.1",
"jest": "^29.7.0",
"prettier": "^2.3.2",
"source-map-support": "^0.5.20",
"supertest": "^6.1.3",
Expand Down
69 changes: 69 additions & 0 deletions src/routes/events/events.controller.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import { Test, TestingModule } from '@nestjs/testing';
import { EventsController } from './events.controller';
import { EventsService } from './events.service';
import { QueueProvider } from '../../datasources/queue/queue.provider';
import { WebhookService } from '../webhook/webhook.service';
import { firstValueFrom } from 'rxjs';
import { TxServiceEvent, TxServiceEventType } from './event.dto';
import { BadRequestException } from '@nestjs/common';

describe('EventsController', () => {
let controller: EventsController;
let service: EventsService;

beforeEach(async () => {
const module: TestingModule = await Test.createTestingModule({
controllers: [EventsController],
providers: [EventsService, QueueProvider, WebhookService],
})
.overrideProvider(QueueProvider)
.useValue({})
.overrideProvider(WebhookService)
.useValue({})
.compile();

controller = module.get<EventsController>(EventsController);
service = module.get<EventsService>(EventsService);
});

describe('SSE events', () => {
it('should require an EIP55 address', async () => {
const notValidAddress = '0x8618CE407F169ABB1388348A19632AaFA857CCB9';
const expectedError = new BadRequestException('Not valid EIP55 address', {
description: `${notValidAddress} is not a valid EIP55 Safe address`,
});
expect(() => {
controller.sse(notValidAddress);
}).toThrow(expectedError);
});
it('should receive for a Safe', async () => {
const relevantSafeAddress = '0x8618ce407F169ABB1388348A19632AaFA857CCB9';
const notRelevantSafeAddress =
'0x3F6E283068Ded118459B56fC669A27C3a65e587D';
const txServiceEvents: Array<TxServiceEvent> = [
{
chainId: '1',
type: 'SAFE_CREATED' as TxServiceEventType,
hero: 'Saitama',
address: notRelevantSafeAddress,
},
{
chainId: '100',
type: 'MESSAGE_CREATED' as TxServiceEventType,
hero: 'Atomic Samurai',
address: relevantSafeAddress,
},
];
const observable = controller.sse(relevantSafeAddress);
const firstValue = firstValueFrom(observable);
txServiceEvents.map((txServiceEvent) =>
service.pushEventToEventsObservable(txServiceEvent),
);

// Not relevant event must be ignored by Safe filter
const event = await firstValue;
expect(event.data).toEqual(txServiceEvents[1]);
expect(event.type).toEqual('message');
});
});
});
19 changes: 19 additions & 0 deletions src/routes/events/events.controller.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import { BadRequestException, Controller, Param, Sse } from '@nestjs/common';
import { Observable } from 'rxjs';
import { EventsService } from './events.service';
import { getAddress, isAddress } from 'ethers';

@Controller('events')
export class EventsController {
constructor(private readonly eventsService: EventsService) {}

@Sse('/sse/:safe')
sse(@Param('safe') safe: string): Observable<MessageEvent> {
if (isAddress(safe) && getAddress(safe) === safe)
return this.eventsService.getEventsObservableForSafe(safe);

throw new BadRequestException('Not valid EIP55 address', {
description: `${safe} is not a valid EIP55 Safe address`,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we consider check if the address is a Safe address?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would add too many dependencies for the service, in my opinion

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But it's a good observation, depending on traffic we have to think on how to protect this endpoint, maybe a flag to disable the feature in the future

});
}
}
5 changes: 3 additions & 2 deletions src/routes/events/events.module.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import { Module } from '@nestjs/common';
import { EventsController } from './events.controller';
import { EventsService } from './events.service';
import { WebhookModule } from '../webhook/webhook.module';
import { QueueModule } from '../../datasources/queue/queue.module';
import { WebhookModule } from '../webhook/webhook.module';

@Module({
imports: [QueueModule, WebhookModule],
// controllers: [Controller],
controllers: [EventsController],
providers: [EventsService],
})
export class EventsModule {}
7 changes: 7 additions & 0 deletions src/routes/events/events.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ describe('EventsService', () => {
describe('processEvent', () => {
it('should post webhooks', async () => {
const postEveryWebhook = jest.spyOn(webhookService, 'postEveryWebhook');
const pushEventToEventsObservable = jest.spyOn(
eventsService,
'pushEventToEventsObservable',
);
const msg = {
chainId: '1',
type: 'SAFE_CREATED' as TxServiceEventType,
Expand All @@ -56,12 +60,15 @@ describe('EventsService', () => {
await eventsService.processEvent(JSON.stringify(msg));
expect(postEveryWebhook).toBeCalledTimes(1);
expect(postEveryWebhook).toBeCalledWith(msg);
expect(pushEventToEventsObservable).toBeCalledTimes(1);
expect(pushEventToEventsObservable).toBeCalledWith(msg);
});
});

describe('processMessageEvents', () => {
it('should post webhooks', async () => {
const postEveryWebhook = jest.spyOn(webhookService, 'postEveryWebhook');

const messageCreated = {
chainId: '1',
type: 'MESSAGE_CREATED' as TxServiceEventType,
Expand Down
32 changes: 32 additions & 0 deletions src/routes/events/events.service.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { Observable, Subject, filter } from 'rxjs';
import { Injectable, Logger, OnApplicationBootstrap } from '@nestjs/common';
import { WebhookService } from '../webhook/webhook.service';
import { QueueProvider } from '../../datasources/queue/queue.provider';
Expand All @@ -7,6 +8,7 @@ import { TxServiceEvent } from './event.dto';
@Injectable()
export class EventsService implements OnApplicationBootstrap {
private readonly logger = new Logger(EventsService.name);
private eventsSubject = new Subject<MessageEvent<TxServiceEvent>>();

constructor(
private readonly queueProvider: QueueProvider,
Expand All @@ -24,6 +26,35 @@ export class EventsService implements OnApplicationBootstrap {
);
}

/**
*
* @param safe
* @returns Events rx.js observable used by the Server Side Events endpoint
*/
getEventsObservableForSafe(
safe: string,
): Observable<MessageEvent<TxServiceEvent>> {
return this.eventsSubject.pipe(filter((ev) => ev.data.address === safe));
}

/**
* Push txServiceEvent to the events observable (used by the Server Side Events endpoint)
* @param txServiceEvent
* @returns Crafted MessageEvent from txServiceEvent
*/
pushEventToEventsObservable(
txServiceEvent: TxServiceEvent,
): MessageEvent<TxServiceEvent> {
const messageEvent: MessageEvent<TxServiceEvent> = new MessageEvent(
'message',
{
data: txServiceEvent,
},
);
this.eventsSubject.next(messageEvent);
return messageEvent;
}

/**
*
* Event must have at least a `chainId` and `type`
Expand Down Expand Up @@ -55,6 +86,7 @@ export class EventsService implements OnApplicationBootstrap {
return Promise.resolve([undefined]);
}

this.pushEventToEventsObservable(txServiceEvent);
return this.webhookService.postEveryWebhook(txServiceEvent);
}
}
58 changes: 58 additions & 0 deletions test/app.e2e-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ import { INestApplication } from '@nestjs/common';
import * as request from 'supertest';
import { AppModule } from './../src/app.module';
import { QueueProvider } from '../src/datasources/queue/queue.provider';
import { EventsService } from '../src/routes/events/events.service';
import { Server } from 'tls';
import { TxServiceEventType } from '../src/routes/events/event.dto';
import EventSource = require('eventsource');

/* eslint-disable */
const { version } = require('../package.json');
Expand All @@ -11,12 +15,14 @@ const { version } = require('../package.json');
describe('AppController (e2e)', () => {
let app: INestApplication;
let queueProvider: QueueProvider;
let eventsService: EventsService;

beforeEach(async () => {
const moduleFixture: TestingModule = await Test.createTestingModule({
imports: [AppModule],
}).compile();

eventsService = moduleFixture.get<EventsService>(EventsService);
queueProvider = moduleFixture.get<QueueProvider>(QueueProvider);
app = moduleFixture.createNestApplication();
await app.init();
Expand All @@ -34,4 +40,56 @@ describe('AppController (e2e)', () => {
.expect(200)
.expect(expected);
});

describe('/events/sse/:safe (GET)', () => {
it('should subscribe and receive Server Side Events', () => {
const validSafeAddress = '0x8618ce407F169ABB1388348A19632AaFA857CCB9';
const msg = {
chainId: '1',
type: 'SAFE_CREATED' as TxServiceEventType,
hero: 'Tatsumaki',
address: validSafeAddress,
};

const path = `/events/sse/${validSafeAddress}`;

// Supertest cannot be used, as it does not support EventSource
const server = app.getHttpServer();
server.listen();
const port = server.address().port;
const protocol = server instanceof Server ? 'https' : 'http';
const url = protocol + '://127.1.0.1:' + port + path;

const eventSource = new EventSource(url);
// Use an empty promise so test has to wait for it, and do the cleanup there
const messageReceived = new Promise((resolve) => {
eventSource.onmessage = (event) => {
expect(event.type).toBe('message');
const parsedData = JSON.parse(event.data);
expect(parsedData).toStrictEqual(msg);
// Stop EventSource and server
eventSource.close();
server.close();
resolve(null);
};
});

// Wait a little to send the message
setTimeout(() => {
eventsService.pushEventToEventsObservable(msg);
}, 1000);

return messageReceived;
});
it('should return a 400 if safe address is not EIP55 valid', () => {
const notValidAddress = '0x8618CE407F169ABB1388348A19632AaFA857CCB9';
const url = `/events/sse/${notValidAddress}`;
const expected = {
statusCode: 400,
message: 'Not valid EIP55 address',
error: `${notValidAddress} is not a valid EIP55 Safe address`,
};
return request(app.getHttpServer()).get(url).expect(400).expect(expected);
});
});
});