From 2b0706a8d3f18610b9d1289cbfe7ad0037857a4f Mon Sep 17 00:00:00 2001 From: Francois Ferrand Date: Thu, 26 Dec 2024 20:01:55 +0100 Subject: [PATCH] Set `doesNotNeedOpogUpdate` when deleting entries If the bucket has no lifecycle or notification configuration, we don't need the oplog update, and can skip it to lower the load on mongo. Issue: BB-590 --- .../mongoProcessor/MongoQueueProcessor.js | 15 ++++- .../ingestion/MongoQueueProcessor.js | 56 +++++++++++++++---- 2 files changed, 59 insertions(+), 12 deletions(-) diff --git a/extensions/mongoProcessor/MongoQueueProcessor.js b/extensions/mongoProcessor/MongoQueueProcessor.js index 593a6291a..24dc291de 100644 --- a/extensions/mongoProcessor/MongoQueueProcessor.js +++ b/extensions/mongoProcessor/MongoQueueProcessor.js @@ -391,10 +391,11 @@ class MongoQueueProcessor { * @param {Logger.newRequestLogger} log - request logger object * @param {DeleteOpQueueEntry} sourceEntry - delete object entry * @param {string} location - zenko storage location name + * @param {BucketInfo} bucketInfo - bucket info object * @param {function} done - callback(error) * @return {undefined} */ - _processDeleteOpQueueEntry(log, sourceEntry, location, done) { + _processDeleteOpQueueEntry(log, sourceEntry, location, bucketInfo, done) { const bucket = sourceEntry.getBucket(); const key = sourceEntry.getObjectKey(); const versionId = extractVersionId(sourceEntry.getObjectVersionedKey()); @@ -425,6 +426,8 @@ class MongoQueueProcessor { return cb(); }, cb => { + const options = {}; + // Calling deleteObject with undefined options to use deleteObjectNoVer which is used for // deleting non versioned objects that only have master keys. // When deleting a versioned object however we supply the version id in the options, which @@ -432,7 +435,15 @@ class MongoQueueProcessor { // have both a master and version keys. This handles the deletion of both the version and the master // keys in the case where no other version is available, or deleting the version and updating the // master key otherwise. - const options = versionId ? { versionId } : undefined; + if (versionId) { + options.versionId = versionId; + } + + // If the bucket has no lifecycle or notification configuration, we don't need the + // oplog update, and can skip it to lower the load on mongo + if (!bucketInfo.lifecycleConfiguration && !bucketInfo.notificationConfiguration) { + options.doesNotNeedOpogUpdate = true; + } return this._mongoClient.deleteObject(bucket, key, options, log, cb); }, diff --git a/tests/functional/ingestion/MongoQueueProcessor.js b/tests/functional/ingestion/MongoQueueProcessor.js index 2c4ec48a3..dbd973fc1 100644 --- a/tests/functional/ingestion/MongoQueueProcessor.js +++ b/tests/functional/ingestion/MongoQueueProcessor.js @@ -16,7 +16,7 @@ const authdata = require('../../../conf/authdata.json'); const ObjectQueueEntry = require('../../../lib/models/ObjectQueueEntry'); const DeleteOpQueueEntry = require('../../../lib/models/DeleteOpQueueEntry'); const fakeLogger = require('../../utils/fakeLogger'); -const { ObjectMDArchive } = require('arsenal/build/lib/models'); +const { ObjectMDArchive, LifecycleConfiguration, NotificationConfiguration } = require('arsenal/build/lib/models'); const kafkaConfig = config.kafka; const mongoProcessorConfig = config.extensions.mongoProcessor; @@ -829,7 +829,31 @@ describe('MongoQueueProcessor', function mqp() { }); describe('::_processDeleteOpQueueEntry', () => { - it('should delete an existing versioned object from mongo', done => { + [ + { + title: '', + patchBucketInfo: (bucketInfo, next) => next(null, bucketInfo), + options: { doesNotNeedOpogUpdate: true, versionId: VERSION_ID }, + }, + { + title: ' with bucket notification', + patchBucketInfo: (bucketInfo, next) => next(null, { + ...bucketInfo, + notificationConfiguration: new NotificationConfiguration(), + }), + options: { versionId: VERSION_ID }, + }, + { + title: ' with lifecycle configuration', + patchBucketInfo: (bucketInfo, next) => next(null, { + ...bucketInfo, + lifecycleConfiguration: new LifecycleConfiguration(null, { replicationEndpoints: [] }), + }), + options: { versionId: VERSION_ID }, + }, + ].forEach(({ + title, patchBucketInfo, options, + }) => it(`should delete an existing versioned object from mongo${title}`, done => { // use existing version id const versionKey = `${KEY}${VID_SEP}${VERSION_ID}`; const objmd = new ObjectMD() @@ -837,11 +861,13 @@ describe('MongoQueueProcessor', function mqp() { .setVersionId(VERSION_ID) .setDataStoreName(LOCATION); const entry = new ObjectQueueEntry(BUCKET, versionKey, objmd); + const deleteObject = sinon.stub(mongoClient, 'deleteObject').callThrough(); async.waterfall([ next => mongoClient.getBucketAttributes(BUCKET, fakeLogger, next), + patchBucketInfo, (bucketInfo, next) => mqp._processDeleteOpQueueEntry(fakeLogger, - entry, LOCATION, next), + entry, LOCATION, bucketInfo, next), ], err => { assert.ifError(err); @@ -849,9 +875,13 @@ describe('MongoQueueProcessor', function mqp() { assert.strictEqual(deleted.length, 1); assert.strictEqual(deleted[0].key, KEY); assert.strictEqual(deleted[0].versionId, VERSION_ID); + + sinon.assert.calledOnce(deleteObject); + assert.deepStrictEqual(deleteObject.getCall(0).args[2], options); + done(); }); - }); + })); it('should delete an existing non versioned object from mongo', done => { const objmd = new ObjectMD() @@ -869,7 +899,7 @@ describe('MongoQueueProcessor', function mqp() { next => mongoClient.getBucketAttributes(BUCKET, fakeLogger, next), (bucketInfo, next) => mqp._processDeleteOpQueueEntry(fakeLogger, - entry, LOCATION, next), + entry, LOCATION, bucketInfo, next), ], err => { assert.ifError(err); @@ -900,7 +930,7 @@ describe('MongoQueueProcessor', function mqp() { next => mongoClient.getBucketAttributes(BUCKET, fakeLogger, next), (bucketInfo, next) => mqp._processDeleteOpQueueEntry(fakeLogger, - entry, LOCATION, next), + entry, LOCATION, bucketInfo, next), ], err => { assert.ifError(err); @@ -922,14 +952,17 @@ describe('MongoQueueProcessor', function mqp() { async.waterfall([ next => mongoClient.getBucketAttributes(BUCKET, fakeLogger, next), (bucketInfo, next) => mqp._processDeleteOpQueueEntry(fakeLogger, - entry, LOCATION, next), + entry, LOCATION, bucketInfo, next), ], err => { assert.ifError(err); assert.ok(deleteObject.calledOnce); assert.strictEqual(deleteObject.getCall(0).args[0], BUCKET); assert.strictEqual(deleteObject.getCall(0).args[1], KEY); - assert.strictEqual(deleteObject.getCall(0).args[2].versionId, VERSION_ID); + assert.deepStrictEqual(deleteObject.getCall(0).args[2], { + doesNotNeedOpogUpdate: true, + versionId: VERSION_ID + }); done(); }); @@ -947,14 +980,17 @@ describe('MongoQueueProcessor', function mqp() { async.waterfall([ next => mongoClient.getBucketAttributes(BUCKET, fakeLogger, next), (bucketInfo, next) => mqp._processDeleteOpQueueEntry(fakeLogger, - entry, LOCATION, next), + entry, LOCATION, bucketInfo, next), ], err => { assert.ok(err?.is?.InternalError); assert.ok(deleteObject.calledOnce); assert.strictEqual(deleteObject.getCall(0).args[0], BUCKET); assert.strictEqual(deleteObject.getCall(0).args[1], KEY); - assert.strictEqual(deleteObject.getCall(0).args[2].versionId, VERSION_ID); + assert.deepStrictEqual(deleteObject.getCall(0).args[2], { + doesNotNeedOpogUpdate: true, + versionId: VERSION_ID, + }); done(); });