Skip to content

Commit

Permalink
fix: restore queue bindings on reconnect (#34)
Browse files Browse the repository at this point in the history
* fix: restore queue bindings on reconnect
  • Loading branch information
alexkvak authored Oct 8, 2021
1 parent fa0b471 commit c264b67
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 15 deletions.
4 changes: 2 additions & 2 deletions src/connection.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import getAMQPNodeAdapter from './adapters/amqp-node';
import getAMQPNodeAdapter, { AMQPConnection } from './adapters/amqp-node';
import connectServiceQueues, { ServiceConnection } from './service';
import { CreateServiceOptions } from './index';

Expand All @@ -9,7 +9,7 @@ export enum ConnectionStatus {
DISCONNECTED = 'disconnected'
}

const connect = (options: CreateServiceOptions): { service: ServiceConnection; connection: Promise<void> } => {
const connect = (options: CreateServiceOptions): { service: ServiceConnection; connection: Promise<AMQPConnection> } => {
const { connectOptions, serviceName } = options;

if (!options.logger) {
Expand Down
2 changes: 1 addition & 1 deletion src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { AMQPOptions } from './adapters/amqp-node';
export class ConnectServicesError extends Error {
payload: unknown;

constructor(msg: string, errPayload: unknown = {}) {
constructor(msg: string, errPayload: unknown) {
super(msg);

this.payload = errPayload;
Expand Down
86 changes: 85 additions & 1 deletion src/service.spec.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
import connectService, { ServiceConnection } from './service';
import { AMQPConnection } from './adapters/amqp-node';
import { ConnectionStatus } from './connection';
import { amqpConnectError, amqpConnectGracefullyStopped } from './errors';
import {
amqpConnectError,
amqpConnectGracefullyStopped,
ConnectionNotInitialized
} from './errors';
import { Message } from './message';
import { Logger } from './logger';

Expand Down Expand Up @@ -177,6 +181,39 @@ describe('#getConnection', () => {
});
});

describe('#handleConnectionClose', () => {
const message = { content: 'message' };

it('reconnects and rebinds handlers', async () => {
serviceConnection.unsubscribe = jest.fn().mockResolvedValue({});
serviceConnection.connect = jest.fn();
serviceConnection.initQueue = jest.fn();

await serviceConnection.handleConnectionClose(message as unknown as Message);

expect(serviceConnection.unsubscribe).toBeCalled();
expect(serviceConnection.connect).toBeCalled();
expect(serviceConnection.initQueue).not.toBeCalled();
});

it('reconnects and rebinds handlers', async () => {
serviceConnection.unsubscribe = jest.fn().mockResolvedValue({});
const connection = { bindQueue: jest.fn() };
serviceConnection.connect = jest.fn().mockResolvedValue(connection);
serviceConnection.initQueue = jest.fn();

const handlerMock = async () => undefined;
serviceConnection.setActionHandler('handler1', handlerMock);

await serviceConnection.handleConnectionClose(message as unknown as Message);

expect(serviceConnection.unsubscribe).toBeCalled();
expect(serviceConnection.connect).toBeCalled();
expect(serviceConnection.initQueue).toBeCalled();
expect(connection.bindQueue).toBeCalledWith('dispatcher', 'dispatcher', '*.handler1');
});
});

describe('#connectionEventHandler', () => {
const message = { content: 'message' };

Expand All @@ -187,6 +224,7 @@ describe('#connectionEventHandler', () => {

expect(serviceConnection.handleConnectionClose).lastCalledWith(message);
});

it('an event other than close is logged with the type warn', () => {
const warnStub = jest.fn();
serviceConnection.log = {warn: warnStub} as unknown as Logger;
Expand All @@ -210,6 +248,14 @@ describe('#handleConnectionError', () => {
});

describe('#getConnectionString', () => {
it('throws if connection string is empty', () => {
serviceConnection.getConnectionStringStandalone = jest.fn().mockReturnValue('');
serviceConnection.getConnectionStringFromCluster = jest.fn();
serviceConnection.options.cluster = [];

expect(() => serviceConnection.getConnectionString()).toThrow('Wrong configuration. Either cluster or standalone mode should be enabled');
});

it('if in cluster mode calls getConnectionStringFromCluster', () => {
serviceConnection.getConnectionStringFromCluster = jest.fn().mockReturnValue('connection');
serviceConnection.getConnectionStringStandalone = jest.fn();
Expand Down Expand Up @@ -302,6 +348,11 @@ describe('#postMessage', () => {
jest.spyOn(Date, 'now').mockImplementation(() => now);
});

it('throws error if connection is null', async () => {
serviceConnection.connection = null;
await expect(serviceConnection.postMessage(['news'], { foo: 'bar' }, { messageId: '42' })).rejects.toThrow(ConnectionNotInitialized);
});

it('calls connection "sendToQueue" with default options', async () => {
await serviceConnection.postMessage(['news'], { foo: 'bar' }, { messageId: '42' });
const mockCalls = amqpConnection.sendToQueue.mock.calls;
Expand Down Expand Up @@ -400,6 +451,13 @@ describe('#messageHandler', () => {
}
};

it('throws error if connection is null', async () => {
serviceConnection.connection = null;

await expect(serviceConnection.messageHandler(messageMock)).rejects.toThrow(ConnectionNotInitialized);
});


it('validates message', async () => {
const validateMessage = jest.spyOn(ServiceConnection, 'validateMessage');

Expand Down Expand Up @@ -527,6 +585,11 @@ describe('#subscribe', () => {
});

describe('#subscribeOn', () => {
it('throws error if connection is null', async () => {
serviceConnection.connection = null;
await expect(serviceConnection.subscribeOn('actionAction', jest.fn())).rejects.toThrow(ConnectionNotInitialized);
});

it('sets action handler for action', async () => {
serviceConnection.initQueue = jest.fn().mockResolvedValue(true);
serviceConnection.setActionHandler = jest.fn().mockResolvedValue(true);
Expand Down Expand Up @@ -562,6 +625,12 @@ describe('#initQueue', () => {
});

describe('#unsubscribe', () => {
it('throws error if connection is null', async () => {
serviceConnection.connection = null;

await expect(serviceConnection.unsubscribe()).rejects.toThrow(ConnectionNotInitialized);
});

it('cancels connection with queue consumer tag', async () => {
serviceConnection.queuesConsumerTags.dispatcher = 'ssdSDGHISdfadsg';

Expand All @@ -573,6 +642,11 @@ describe('#unsubscribe', () => {
});

describe('#consumeQueue', () => {
it('throws error if connection is null', async () => {
serviceConnection.connection = null;
await expect(serviceConnection.consumeQueue('dispatcher', jest.fn())).rejects.toThrow(ConnectionNotInitialized);
});

it('consumes queue and saves its consumer tag to hash map', async () => {
const result = await serviceConnection.consumeQueue('dispatcher', () => undefined);

Expand Down Expand Up @@ -623,3 +697,13 @@ describe('#assertTopicExchange', () => {
await expect(serviceConnection.assertTopicExchange()).rejects.toThrow('No connection');
});
});

describe('#hasHandlers', () => {
it('should return false if only default handler set', () => {
expect(serviceConnection.hasHandlers()).toEqual(false);
});
it('should return true if custom handler set', () => {
serviceConnection.setActionHandler('handler1', jest.fn());
expect(serviceConnection.hasHandlers()).toEqual(true);
});
});
32 changes: 21 additions & 11 deletions src/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import defaultRetryStrategy from './retry-strategies/default';

const DEFAULT_HEART_BEAT = 30;

const DEFAULT_ACTION = 'defaultAction';

export class ServiceConnection extends EventEmitter {
/**
* Validate AMQP message against service rules
Expand Down Expand Up @@ -73,7 +75,7 @@ export class ServiceConnection extends EventEmitter {
[queueName: string]: string;
} = {};
handlers: {
defaultAction: MessageHandler;
[DEFAULT_ACTION]: MessageHandler;
[handlerName: string]: MessageHandler;
};
options: AMQPOptions;
Expand All @@ -88,7 +90,7 @@ export class ServiceConnection extends EventEmitter {
this.amqp = adapter;
this.setConnectionStatus(ConnectionStatus.CONNECTING);
this.handlers = {
defaultAction: async ({ message, ack }): Promise<void> => {
[DEFAULT_ACTION]: async ({ message, ack }): Promise<void> => {
ack();
const { fields } = message; // TODO check for {}
log.error('[amqp-connection] No action for message', fields);
Expand All @@ -97,7 +99,7 @@ export class ServiceConnection extends EventEmitter {
}

hasHandlers(): boolean {
return Object.keys(this.handlers).some(name => name !== 'defaultAction');
return Object.keys(this.handlers).some(name => name !== DEFAULT_ACTION);
}

/**
Expand Down Expand Up @@ -143,12 +145,14 @@ export class ServiceConnection extends EventEmitter {
/**
* Connect to AMQP server with service options
*/
async connect(): Promise<void> {
async connect(): Promise<AMQPConnection> {
this.setConnectionStatus(ConnectionStatus.CONNECTING);
this.connection = this.getConnection();
const connection = this.connection = this.getConnection();

await this.assertTopicExchange();
await this.assertServiceQueue();

return connection;
}

/**
Expand Down Expand Up @@ -236,10 +240,16 @@ export class ServiceConnection extends EventEmitter {
await this.unsubscribe();

if (this.status !== ConnectionStatus.DISCONNECTING) {
await this.connect();
const connection = await this.connect();

if (this.hasHandlers()) {
const handlers = Object.keys(this.handlers).filter(name => name !== DEFAULT_ACTION);

if (handlers.length > 0) {
await this.initQueue(this.name);

for (const handler of handlers) {
await connection.bindQueue(this.name, ServiceConnection.getTopicExchange(this.options.exchange), `*.${handler}`);
}
}
}
}
Expand Down Expand Up @@ -361,7 +371,7 @@ export class ServiceConnection extends EventEmitter {

const {
properties: {
headers: { action: messageAction = 'defaultAction' }
headers: { action: messageAction = DEFAULT_ACTION }
}
} = message;
const handler = this.getActionHandler(messageAction);
Expand Down Expand Up @@ -400,7 +410,7 @@ export class ServiceConnection extends EventEmitter {
* Unregister action handler from service
*/
getActionHandler(handlerName: string): (options: MessageHandlerOptions) => Promise<void> {
return this.handlers[handlerName] ?? this.handlers.defaultAction;
return this.handlers[handlerName] ?? this.handlers[DEFAULT_ACTION];
}

/**
Expand All @@ -413,7 +423,7 @@ export class ServiceConnection extends EventEmitter {
* });
*/
subscribe(onConsume: MessageHandler): Promise<void> {
this.setActionHandler('defaultAction', onConsume);
this.setActionHandler(DEFAULT_ACTION, onConsume);

return this.initQueue(this.name);
}
Expand Down Expand Up @@ -543,7 +553,7 @@ const connectService = (
options: AMQPOptions,
serviceName: string,
log: Logger
): { service: ServiceConnection; connection: Promise<void> } => {
): { service: ServiceConnection; connection: Promise<AMQPConnection> } => {
const service = new ServiceConnection(adapter, options, serviceName, log);
const connection = service.connect();

Expand Down

0 comments on commit c264b67

Please sign in to comment.