From c264b674ac6c231c7961ce87f1bd473af50894e1 Mon Sep 17 00:00:00 2001 From: Alex Kvak Date: Fri, 8 Oct 2021 12:53:04 +0300 Subject: [PATCH] fix: restore queue bindings on reconnect (#34) * fix: restore queue bindings on reconnect --- src/connection.ts | 4 +-- src/errors.ts | 2 +- src/service.spec.ts | 86 ++++++++++++++++++++++++++++++++++++++++++++- src/service.ts | 32 +++++++++++------ 4 files changed, 109 insertions(+), 15 deletions(-) diff --git a/src/connection.ts b/src/connection.ts index 19a153c..3146ab3 100644 --- a/src/connection.ts +++ b/src/connection.ts @@ -1,4 +1,4 @@ -import getAMQPNodeAdapter from './adapters/amqp-node'; +import getAMQPNodeAdapter, { AMQPConnection } from './adapters/amqp-node'; import connectServiceQueues, { ServiceConnection } from './service'; import { CreateServiceOptions } from './index'; @@ -9,7 +9,7 @@ export enum ConnectionStatus { DISCONNECTED = 'disconnected' } -const connect = (options: CreateServiceOptions): { service: ServiceConnection; connection: Promise } => { +const connect = (options: CreateServiceOptions): { service: ServiceConnection; connection: Promise } => { const { connectOptions, serviceName } = options; if (!options.logger) { diff --git a/src/errors.ts b/src/errors.ts index 2b41cb4..2f83def 100644 --- a/src/errors.ts +++ b/src/errors.ts @@ -3,7 +3,7 @@ import { AMQPOptions } from './adapters/amqp-node'; export class ConnectServicesError extends Error { payload: unknown; - constructor(msg: string, errPayload: unknown = {}) { + constructor(msg: string, errPayload: unknown) { super(msg); this.payload = errPayload; diff --git a/src/service.spec.ts b/src/service.spec.ts index cc54d45..aee1aba 100644 --- a/src/service.spec.ts +++ b/src/service.spec.ts @@ -1,7 +1,11 @@ import connectService, { ServiceConnection } from './service'; import { AMQPConnection } from './adapters/amqp-node'; import { ConnectionStatus } from './connection'; -import { amqpConnectError, amqpConnectGracefullyStopped } from './errors'; +import { + amqpConnectError, + amqpConnectGracefullyStopped, + ConnectionNotInitialized +} from './errors'; import { Message } from './message'; import { Logger } from './logger'; @@ -177,6 +181,39 @@ describe('#getConnection', () => { }); }); +describe('#handleConnectionClose', () => { + const message = { content: 'message' }; + + it('reconnects and rebinds handlers', async () => { + serviceConnection.unsubscribe = jest.fn().mockResolvedValue({}); + serviceConnection.connect = jest.fn(); + serviceConnection.initQueue = jest.fn(); + + await serviceConnection.handleConnectionClose(message as unknown as Message); + + expect(serviceConnection.unsubscribe).toBeCalled(); + expect(serviceConnection.connect).toBeCalled(); + expect(serviceConnection.initQueue).not.toBeCalled(); + }); + + it('reconnects and rebinds handlers', async () => { + serviceConnection.unsubscribe = jest.fn().mockResolvedValue({}); + const connection = { bindQueue: jest.fn() }; + serviceConnection.connect = jest.fn().mockResolvedValue(connection); + serviceConnection.initQueue = jest.fn(); + + const handlerMock = async () => undefined; + serviceConnection.setActionHandler('handler1', handlerMock); + + await serviceConnection.handleConnectionClose(message as unknown as Message); + + expect(serviceConnection.unsubscribe).toBeCalled(); + expect(serviceConnection.connect).toBeCalled(); + expect(serviceConnection.initQueue).toBeCalled(); + expect(connection.bindQueue).toBeCalledWith('dispatcher', 'dispatcher', '*.handler1'); + }); +}); + describe('#connectionEventHandler', () => { const message = { content: 'message' }; @@ -187,6 +224,7 @@ describe('#connectionEventHandler', () => { expect(serviceConnection.handleConnectionClose).lastCalledWith(message); }); + it('an event other than close is logged with the type warn', () => { const warnStub = jest.fn(); serviceConnection.log = {warn: warnStub} as unknown as Logger; @@ -210,6 +248,14 @@ describe('#handleConnectionError', () => { }); describe('#getConnectionString', () => { + it('throws if connection string is empty', () => { + serviceConnection.getConnectionStringStandalone = jest.fn().mockReturnValue(''); + serviceConnection.getConnectionStringFromCluster = jest.fn(); + serviceConnection.options.cluster = []; + + expect(() => serviceConnection.getConnectionString()).toThrow('Wrong configuration. Either cluster or standalone mode should be enabled'); + }); + it('if in cluster mode calls getConnectionStringFromCluster', () => { serviceConnection.getConnectionStringFromCluster = jest.fn().mockReturnValue('connection'); serviceConnection.getConnectionStringStandalone = jest.fn(); @@ -302,6 +348,11 @@ describe('#postMessage', () => { jest.spyOn(Date, 'now').mockImplementation(() => now); }); + it('throws error if connection is null', async () => { + serviceConnection.connection = null; + await expect(serviceConnection.postMessage(['news'], { foo: 'bar' }, { messageId: '42' })).rejects.toThrow(ConnectionNotInitialized); + }); + it('calls connection "sendToQueue" with default options', async () => { await serviceConnection.postMessage(['news'], { foo: 'bar' }, { messageId: '42' }); const mockCalls = amqpConnection.sendToQueue.mock.calls; @@ -400,6 +451,13 @@ describe('#messageHandler', () => { } }; + it('throws error if connection is null', async () => { + serviceConnection.connection = null; + + await expect(serviceConnection.messageHandler(messageMock)).rejects.toThrow(ConnectionNotInitialized); + }); + + it('validates message', async () => { const validateMessage = jest.spyOn(ServiceConnection, 'validateMessage'); @@ -527,6 +585,11 @@ describe('#subscribe', () => { }); describe('#subscribeOn', () => { + it('throws error if connection is null', async () => { + serviceConnection.connection = null; + await expect(serviceConnection.subscribeOn('actionAction', jest.fn())).rejects.toThrow(ConnectionNotInitialized); + }); + it('sets action handler for action', async () => { serviceConnection.initQueue = jest.fn().mockResolvedValue(true); serviceConnection.setActionHandler = jest.fn().mockResolvedValue(true); @@ -562,6 +625,12 @@ describe('#initQueue', () => { }); describe('#unsubscribe', () => { + it('throws error if connection is null', async () => { + serviceConnection.connection = null; + + await expect(serviceConnection.unsubscribe()).rejects.toThrow(ConnectionNotInitialized); + }); + it('cancels connection with queue consumer tag', async () => { serviceConnection.queuesConsumerTags.dispatcher = 'ssdSDGHISdfadsg'; @@ -573,6 +642,11 @@ describe('#unsubscribe', () => { }); describe('#consumeQueue', () => { + it('throws error if connection is null', async () => { + serviceConnection.connection = null; + await expect(serviceConnection.consumeQueue('dispatcher', jest.fn())).rejects.toThrow(ConnectionNotInitialized); + }); + it('consumes queue and saves its consumer tag to hash map', async () => { const result = await serviceConnection.consumeQueue('dispatcher', () => undefined); @@ -623,3 +697,13 @@ describe('#assertTopicExchange', () => { await expect(serviceConnection.assertTopicExchange()).rejects.toThrow('No connection'); }); }); + +describe('#hasHandlers', () => { + it('should return false if only default handler set', () => { + expect(serviceConnection.hasHandlers()).toEqual(false); + }); + it('should return true if custom handler set', () => { + serviceConnection.setActionHandler('handler1', jest.fn()); + expect(serviceConnection.hasHandlers()).toEqual(true); + }); +}); diff --git a/src/service.ts b/src/service.ts index 8f105ce..eb43564 100644 --- a/src/service.ts +++ b/src/service.ts @@ -24,6 +24,8 @@ import defaultRetryStrategy from './retry-strategies/default'; const DEFAULT_HEART_BEAT = 30; +const DEFAULT_ACTION = 'defaultAction'; + export class ServiceConnection extends EventEmitter { /** * Validate AMQP message against service rules @@ -73,7 +75,7 @@ export class ServiceConnection extends EventEmitter { [queueName: string]: string; } = {}; handlers: { - defaultAction: MessageHandler; + [DEFAULT_ACTION]: MessageHandler; [handlerName: string]: MessageHandler; }; options: AMQPOptions; @@ -88,7 +90,7 @@ export class ServiceConnection extends EventEmitter { this.amqp = adapter; this.setConnectionStatus(ConnectionStatus.CONNECTING); this.handlers = { - defaultAction: async ({ message, ack }): Promise => { + [DEFAULT_ACTION]: async ({ message, ack }): Promise => { ack(); const { fields } = message; // TODO check for {} log.error('[amqp-connection] No action for message', fields); @@ -97,7 +99,7 @@ export class ServiceConnection extends EventEmitter { } hasHandlers(): boolean { - return Object.keys(this.handlers).some(name => name !== 'defaultAction'); + return Object.keys(this.handlers).some(name => name !== DEFAULT_ACTION); } /** @@ -143,12 +145,14 @@ export class ServiceConnection extends EventEmitter { /** * Connect to AMQP server with service options */ - async connect(): Promise { + async connect(): Promise { this.setConnectionStatus(ConnectionStatus.CONNECTING); - this.connection = this.getConnection(); + const connection = this.connection = this.getConnection(); await this.assertTopicExchange(); await this.assertServiceQueue(); + + return connection; } /** @@ -236,10 +240,16 @@ export class ServiceConnection extends EventEmitter { await this.unsubscribe(); if (this.status !== ConnectionStatus.DISCONNECTING) { - await this.connect(); + const connection = await this.connect(); - if (this.hasHandlers()) { + const handlers = Object.keys(this.handlers).filter(name => name !== DEFAULT_ACTION); + + if (handlers.length > 0) { await this.initQueue(this.name); + + for (const handler of handlers) { + await connection.bindQueue(this.name, ServiceConnection.getTopicExchange(this.options.exchange), `*.${handler}`); + } } } } @@ -361,7 +371,7 @@ export class ServiceConnection extends EventEmitter { const { properties: { - headers: { action: messageAction = 'defaultAction' } + headers: { action: messageAction = DEFAULT_ACTION } } } = message; const handler = this.getActionHandler(messageAction); @@ -400,7 +410,7 @@ export class ServiceConnection extends EventEmitter { * Unregister action handler from service */ getActionHandler(handlerName: string): (options: MessageHandlerOptions) => Promise { - return this.handlers[handlerName] ?? this.handlers.defaultAction; + return this.handlers[handlerName] ?? this.handlers[DEFAULT_ACTION]; } /** @@ -413,7 +423,7 @@ export class ServiceConnection extends EventEmitter { * }); */ subscribe(onConsume: MessageHandler): Promise { - this.setActionHandler('defaultAction', onConsume); + this.setActionHandler(DEFAULT_ACTION, onConsume); return this.initQueue(this.name); } @@ -543,7 +553,7 @@ const connectService = ( options: AMQPOptions, serviceName: string, log: Logger -): { service: ServiceConnection; connection: Promise } => { +): { service: ServiceConnection; connection: Promise } => { const service = new ServiceConnection(adapter, options, serviceName, log); const connection = service.connect();