From 4e591884c213e0d888aa0e0453019dbf13bd261c Mon Sep 17 00:00:00 2001 From: w1zart <56317956+w1zart@users.noreply.github.com> Date: Wed, 7 Jun 2023 18:32:39 +0300 Subject: [PATCH] feat: add single active consumer argument support (#160) * feat: add single active consumer argument support * feat: update README.md * feat: refactor queue options * feat: update README.md * feat: remove unused interface --------- Co-authored-by: w1zart Co-authored-by: w1zart --- README.md | 7 ++++++- src/adapters/amqp-node.ts | 2 +- src/connection.spec.ts | 3 +++ src/connection.ts | 8 +++++--- src/index.ts | 2 ++ src/service.spec.ts | 23 +++++++++++++---------- src/service.ts | 24 ++++++++++++++++++++---- 7 files changed, 50 insertions(+), 19 deletions(-) diff --git a/README.md b/README.md index a5fd122..c3f8f7a 100644 --- a/README.md +++ b/README.md @@ -31,10 +31,15 @@ const client = createClient({ amqps: true, frameMax: 8192, }, + queueOptions: { + singleActiveConsumer: true, + } }); ``` -See [AMQPOptions](https://github.com/Tinkoff/mbclient/blob/master/src/adapters/amqp-node.ts#L3) interface to get all available options. +See [AMQPOptions](https://github.com/Tinkoff/mbclient/blob/master/src/adapters/amqp-node.ts#L3) interface to get all available options. + +See [QueueOptions](https://github.com/Tinkoff/mbclient/blob/master/src/service.ts#L28) interface to get all available queue options. ## Subscribing diff --git a/src/adapters/amqp-node.ts b/src/adapters/amqp-node.ts index c87a9c4..3185c13 100644 --- a/src/adapters/amqp-node.ts +++ b/src/adapters/amqp-node.ts @@ -87,7 +87,7 @@ interface AMQPMessageProps { userId?: string; } interface AMQPQueueParams { durable: true; } -export interface AMQPQueueArgs { 'ha-mode'?: 'all'; } +export interface AMQPQueueArgs { 'ha-mode'?: 'all'; 'x-single-active-consumer'?: boolean; } // `reexport` type, enumerating only used subset export type AMQPConnection = { diff --git a/src/connection.spec.ts b/src/connection.spec.ts index 73bd491..0dd6065 100644 --- a/src/connection.spec.ts +++ b/src/connection.spec.ts @@ -28,6 +28,9 @@ Array [ "password": "test", "username": "stub", }, + Object { + "singleActiveConsumer": false, + }, "test", Object { "error": [MockFunction], diff --git a/src/connection.ts b/src/connection.ts index 851c0d0..f6e4141 100644 --- a/src/connection.ts +++ b/src/connection.ts @@ -1,5 +1,5 @@ import getAMQPNodeAdapter, { AMQPConnection } from './adapters/amqp-node'; -import connectServiceQueues, { ServiceConnection } from './service'; +import connectServiceQueues, { QueueOptions, ServiceConnection } from './service'; import { CreateServiceOptions } from './index'; export enum ConnectionStatus { @@ -9,10 +9,12 @@ export enum ConnectionStatus { DISCONNECTED = 'disconnected' } +const queueOptionsDefault: QueueOptions = { singleActiveConsumer: false }; + const connect = (options: CreateServiceOptions): { service: ServiceConnection; connection: Promise } => { - const { connectOptions, serviceName } = options; + const { connectOptions, logger, serviceName, queueOptions = queueOptionsDefault } = options; - return connectServiceQueues(getAMQPNodeAdapter(), connectOptions, serviceName, options.logger); + return connectServiceQueues(getAMQPNodeAdapter(), connectOptions, queueOptions, serviceName, logger); }; export default connect; diff --git a/src/index.ts b/src/index.ts index 6486c03..dacc8b4 100644 --- a/src/index.ts +++ b/src/index.ts @@ -2,11 +2,13 @@ import connect from './connection'; import { AMQPOptions } from './adapters/amqp-node'; import { Logger } from './logger'; import { MessageHandler } from './message'; +import { QueueOptions } from './service'; export interface CreateServiceOptions { serviceName: string; logger: Logger; connectOptions: AMQPOptions; + queueOptions?: QueueOptions; } export interface ClientSendMessage { diff --git a/src/service.spec.ts b/src/service.spec.ts index 44e4206..27b3d35 100644 --- a/src/service.spec.ts +++ b/src/service.spec.ts @@ -9,6 +9,9 @@ const optionsMock = { password: 'test123', host: 'localhost' }; +const queueOptionsMock = { + singleActiveConsumer: true, +}; const logger = { info: jest.fn(), error: jest.fn(), @@ -29,7 +32,7 @@ const amqpConnection = { let serviceConnection: ServiceConnection; beforeEach(() => { - serviceConnection = new ServiceConnection(testAdapter, optionsMock, 'dispatcher', logger); + serviceConnection = new ServiceConnection(testAdapter, optionsMock, queueOptionsMock, 'dispatcher', logger); serviceConnection.connection = Promise.resolve(amqpConnection); logger.error.mockClear(); }); @@ -73,14 +76,14 @@ describe('#constructor', () => { }); describe('#getQueueArgs', () => { - it('return empty object if configured in standalone mode', () => { - expect(serviceConnection.getQueueArgs()).toEqual({}); + it('return options args object if configured in standalone mode', () => { + expect(serviceConnection.getQueueArgs()).toEqual({ "x-single-active-consumer": true }); }); - it('return ha-mode=all if configured in cluster mode', () => { + it('return options args object with ha-mode=all if configured in cluster mode', () => { serviceConnection.options.cluster = ['a', 'b', 'c']; - expect(serviceConnection.getQueueArgs()).toEqual({ 'ha-mode': 'all' }); + expect(serviceConnection.getQueueArgs()).toEqual({ 'ha-mode': 'all', "x-single-active-consumer": true }); }); }); @@ -122,11 +125,11 @@ describe('#assertServiceQueue', () => { it('asserts and await assertion of service queue', async () => { await serviceConnection.assertServiceQueue(); - expect(amqpConnection.queue).lastCalledWith('dispatcher', { durable: true }, {}); + expect(amqpConnection.queue).lastCalledWith('dispatcher', { durable: true }, { "x-single-active-consumer": true }); }); it('should throw if connection not initialized', async () => { - const notInitializedServiceConnection = new ServiceConnection(testAdapter, optionsMock, 'dispatcher', logger); + const notInitializedServiceConnection = new ServiceConnection(testAdapter, optionsMock, queueOptionsMock, 'dispatcher', logger); await expect(notInitializedServiceConnection.assertServiceQueue()).rejects.toThrowError('Connection was not initialized with connect() method.'); }); @@ -134,7 +137,7 @@ describe('#assertServiceQueue', () => { describe('#getConnection', () => { it('sets retry strategy from options and retries on errors', async () => { - const serviceConn = new ServiceConnection(testAdapter, optionsMock, 'dispatcher', logger); + const serviceConn = new ServiceConnection(testAdapter, optionsMock, queueOptionsMock, 'dispatcher', logger); const retryStrategy = jest.fn().mockReturnValue(5); @@ -160,7 +163,7 @@ describe('#getConnection', () => { }); it('generate error when status is disconnecting', async () => { - const serviceConn = new ServiceConnection(testAdapter, optionsMock, 'dispatcher', logger); + const serviceConn = new ServiceConnection(testAdapter, optionsMock, queueOptionsMock, 'dispatcher', logger); serviceConn.status = ConnectionStatus.DISCONNECTING; await expect(serviceConn.getConnection()).rejects.toThrow(amqpConnectGracefullyStopped()); }); @@ -773,7 +776,7 @@ describe('#consumeQueue', () => { describe('connectService', () => { it('return object with service and connection', () => { - const result = connectService(testAdapter, optionsMock, 'dispatcher', logger); + const result = connectService(testAdapter, optionsMock, queueOptionsMock, 'dispatcher', logger); result.connection.catch(() => { // Do nothing }); diff --git a/src/service.ts b/src/service.ts index 474e9e9..17f47c1 100644 --- a/src/service.ts +++ b/src/service.ts @@ -25,6 +25,10 @@ import { import { Logger } from './logger'; import defaultRetryStrategy from './retry-strategies/default'; +export interface QueueOptions { + singleActiveConsumer: boolean; +} + const DEFAULT_HEART_BEAT = 30; const DEFAULT_FRAME_MAX = 4096; @@ -51,7 +55,7 @@ export class ServiceConnection extends EventEmitter { * Validates AMQP message againts service rules and returns parsed result */ static getContentFromMessage(message: AMQPMessage): unknown | never { - if(message.body === null) { + if (message.body === null) { throw emptyMessageError(); } @@ -84,12 +88,14 @@ export class ServiceConnection extends EventEmitter { [handlerName: string]: MessageHandler; }; options: AMQPOptions; + queueOptions: QueueOptions; connection: Promise | null = null; - constructor(adapter: AMQPAdapter, options: AMQPOptions, serviceName: string, log: Logger) { + constructor(adapter: AMQPAdapter, options: AMQPOptions, queueOptions: QueueOptions, serviceName: string, log: Logger) { super(); this.options = options; + this.queueOptions = queueOptions; this.name = serviceName; this.log = log; this.amqp = adapter; @@ -116,7 +122,16 @@ export class ServiceConnection extends EventEmitter { * ha-mode property should be set to 'all' to force queue replication */ getQueueArgs(): AMQPQueueArgs { - return this.isClusterConnection() ? { 'ha-mode': 'all' } : {}; + const args: AMQPQueueArgs = {}; + + if (this.isClusterConnection()) { + args["ha-mode"] = "all"; + } + if (this.queueOptions.singleActiveConsumer) { + args["x-single-active-consumer"] = true; + } + + return args; } /** @@ -546,10 +561,11 @@ export class ServiceConnection extends EventEmitter { const connectService = ( adapter: AMQPAdapter, options: AMQPOptions, + queueOptions: QueueOptions, serviceName: string, log: Logger ): { service: ServiceConnection; connection: Promise } => { - const service = new ServiceConnection(adapter, options, serviceName, log); + const service = new ServiceConnection(adapter, options, queueOptions, serviceName, log); const connection = service.connect(); return { service, connection };