diff --git a/README.md b/README.md index 1f73017..5c3516c 100644 --- a/README.md +++ b/README.md @@ -15,8 +15,14 @@ When sending a message indicating the recipients, the message sent to their queu ### Subscribe to broadcast messages by message type ```javascript + const logger = { + info: console.log, + warn: console.log, + error: console.error, + } const client = createClient({ serviceName: 'news', + logger, connectOptions: { username: 'test', password: '123', diff --git a/src/connection.spec.ts b/src/connection.spec.ts index 95af886..73bd491 100644 --- a/src/connection.spec.ts +++ b/src/connection.spec.ts @@ -37,16 +37,4 @@ Array [ ] `); }); - - it('should fail if logger is empty', () => { - const optionsMock = { - connectOptions: { - username: 'stub', - password: 'test' - }, - serviceName: 'test' - }; - - expect(() => connect(optionsMock)).toThrowError('logger is required.'); - }) }); diff --git a/src/connection.ts b/src/connection.ts index 3146ab3..851c0d0 100644 --- a/src/connection.ts +++ b/src/connection.ts @@ -12,10 +12,6 @@ export enum ConnectionStatus { const connect = (options: CreateServiceOptions): { service: ServiceConnection; connection: Promise } => { const { connectOptions, serviceName } = options; - if (!options.logger) { - throw new Error('logger is required.'); - } - return connectServiceQueues(getAMQPNodeAdapter(), connectOptions, serviceName, options.logger); }; diff --git a/src/index.ts b/src/index.ts index 1537175..e64c00b 100644 --- a/src/index.ts +++ b/src/index.ts @@ -6,7 +6,7 @@ import { MessageHandlerOptions, MessageHandler } from './message'; export interface CreateServiceOptions { appId?: string; serviceName: string; - logger?: Logger; + logger: Logger; connectOptions: AMQPOptions; } diff --git a/src/service.spec.ts b/src/service.spec.ts index 008a64e..44e4206 100644 --- a/src/service.spec.ts +++ b/src/service.spec.ts @@ -1,11 +1,7 @@ import connectService, { ServiceConnection } from './service'; import { AMQPConnection } from './adapters/amqp-node'; import { ConnectionStatus } from './connection'; -import { - amqpConnectError, - amqpConnectGracefullyStopped, - ConnectionNotInitialized -} from './errors'; +import { amqpConnectError, amqpConnectGracefullyStopped, ConnectionNotInitialized } from './errors'; const testAdapter = { connect: jest.fn() }; const optionsMock = { @@ -171,7 +167,8 @@ describe('#getConnection', () => { }); describe('#handleConnectionClose', () => { - it('reconnects and rebinds handlers', async () => { + it('reconnects if status is CONNECTED', async () => { + serviceConnection.status = ConnectionStatus.CONNECTED; serviceConnection.unsubscribe = jest.fn().mockResolvedValue({}); serviceConnection.connect = jest.fn(); serviceConnection.initQueue = jest.fn(); @@ -184,6 +181,7 @@ describe('#handleConnectionClose', () => { }); it('reconnects and rebinds handlers', async () => { + serviceConnection.status = ConnectionStatus.CONNECTED; serviceConnection.unsubscribe = jest.fn().mockResolvedValue({}); const connection = { queueBind: jest.fn() }; serviceConnection.connect = jest.fn().mockResolvedValue(connection); @@ -199,10 +197,47 @@ describe('#handleConnectionClose', () => { expect(serviceConnection.initQueue).toBeCalled(); expect(connection.queueBind).toBeCalledWith('dispatcher', 'dispatcher', '*.handler1'); }); + + it('reconnects and rebinds handlers only once', async () => { + serviceConnection.status = ConnectionStatus.CONNECTED; + serviceConnection.unsubscribe = jest.fn().mockResolvedValue({}); + const connection = { queueBind: jest.fn() }; + serviceConnection.connect = jest.fn().mockResolvedValue(connection); + serviceConnection.initQueue = jest.fn(); + + const handlerMock = async () => undefined; + serviceConnection.setActionHandler('handler1', handlerMock); + serviceConnection.setActionHandler('handler1', handlerMock); + + await Promise.all([ + serviceConnection.handleConnectionClose({} as unknown as Error), + serviceConnection.handleConnectionClose({} as unknown as Error), + serviceConnection.handleConnectionClose({} as unknown as Error) + ]); + + expect(serviceConnection.unsubscribe).toBeCalledTimes(1); + expect(serviceConnection.connect).toBeCalledTimes(1); + expect(serviceConnection.initQueue).toBeCalledTimes(1); + expect(connection.queueBind).toBeCalledTimes(1); + }); + + it.each([ConnectionStatus.DISCONNECTING, ConnectionStatus.CONNECTING])('does not reconnect if status is %s', async (status) => { + serviceConnection.status = status; + serviceConnection.unsubscribe = jest.fn().mockResolvedValue({}); + serviceConnection.connect = jest.fn(); + serviceConnection.initQueue = jest.fn(); + + await serviceConnection.handleConnectionClose({} as unknown as Error); + + expect(serviceConnection.unsubscribe).not.toBeCalled(); + expect(serviceConnection.connect).not.toBeCalled(); + expect(serviceConnection.initQueue).not.toBeCalled(); + }); }); describe('#handleConnectionError', () => { it('logs error to console', () => { + serviceConnection.status = ConnectionStatus.CONNECTED; serviceConnection.unsubscribe = jest.fn(); serviceConnection.connect = jest.fn(); const message = { content: 'message' }; diff --git a/src/service.ts b/src/service.ts index 59b6c1f..5d9eed9 100644 --- a/src/service.ts +++ b/src/service.ts @@ -4,15 +4,15 @@ import EventEmitter from 'events'; import timeout from './timeout'; import randomPickConnectionString from './random-pick'; import { - EmptyMessageError, - AmqpConnectGracefullyStopped, - emptyMessageError, amqpConnectError, + AmqpConnectGracefullyStopped, amqpConnectGracefullyStopped, ConnectionNotInitialized, + emptyMessageError, + EmptyMessageError, unexpectedNonStringAction } from './errors'; -import { MessageOptions, MessageHandlerOptions, MessageHandler } from './message'; +import { MessageHandler, MessageHandlerOptions, MessageOptions } from './message'; import { ConnectionStatus } from './connection'; import { AMQPAdapter, @@ -211,24 +211,29 @@ export class ServiceConnection extends EventEmitter { * Handle connection close */ async handleConnectionClose(error: Error): Promise { + if (this.status === ConnectionStatus.DISCONNECTING || this.status === ConnectionStatus.CONNECTING) { + return; + } + + this.setConnectionStatus(ConnectionStatus.DISCONNECTED); + // set the "connecting" status in order to avoid concurrent connection in case + // when the handler is called several times in the short period of time + this.status = ConnectionStatus.CONNECTING; + const { password, ...restOptions } = this.options; this.log.error('[amqp-connection] Connection closed.', error, restOptions); - this.emit(ConnectionStatus.DISCONNECTED); - await this.unsubscribe(); - if (this.status !== ConnectionStatus.DISCONNECTING) { - const connection = await this.connect(); + const connection = await this.connect(); - const handlers = Object.keys(this.handlers).filter(name => name !== DEFAULT_ACTION); + const handlers = Object.keys(this.handlers).filter(name => name !== DEFAULT_ACTION); - if (handlers.length > 0) { - await this.initQueue(this.name); + if (handlers.length > 0) { + await this.initQueue(this.name); - for (const handler of handlers) { - await connection.queueBind(this.name, ServiceConnection.getTopicExchange(this.options.exchange), `*.${handler}`); - } + for (const handler of handlers) { + await connection.queueBind(this.name, ServiceConnection.getTopicExchange(this.options.exchange), `*.${handler}`); } } }