Skip to content

Commit

Permalink
Set doesNotNeedOpogUpdate when deleting entries
Browse files Browse the repository at this point in the history
If the bucket has no lifecycle or notification configuration, we don't
need the oplog update, and can skip it to lower the load on mongo.

Issue: BB-590
  • Loading branch information
francoisferrand committed Dec 26, 2024
1 parent 60d5798 commit 2b0706a
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 12 deletions.
15 changes: 13 additions & 2 deletions extensions/mongoProcessor/MongoQueueProcessor.js
Original file line number Diff line number Diff line change
Expand Up @@ -391,10 +391,11 @@ class MongoQueueProcessor {
* @param {Logger.newRequestLogger} log - request logger object
* @param {DeleteOpQueueEntry} sourceEntry - delete object entry
* @param {string} location - zenko storage location name
* @param {BucketInfo} bucketInfo - bucket info object
* @param {function} done - callback(error)
* @return {undefined}
*/
_processDeleteOpQueueEntry(log, sourceEntry, location, done) {
_processDeleteOpQueueEntry(log, sourceEntry, location, bucketInfo, done) {
const bucket = sourceEntry.getBucket();
const key = sourceEntry.getObjectKey();
const versionId = extractVersionId(sourceEntry.getObjectVersionedKey());
Expand Down Expand Up @@ -425,14 +426,24 @@ class MongoQueueProcessor {
return cb();
},
cb => {
const options = {};

// Calling deleteObject with undefined options to use deleteObjectNoVer which is used for
// deleting non versioned objects that only have master keys.
// When deleting a versioned object however we supply the version id in the options, which
// causes the function to call the deleteObjectVer function that is used to handle objects that
// have both a master and version keys. This handles the deletion of both the version and the master
// keys in the case where no other version is available, or deleting the version and updating the
// master key otherwise.
const options = versionId ? { versionId } : undefined;
if (versionId) {
options.versionId = versionId;
}

// If the bucket has no lifecycle or notification configuration, we don't need the
// oplog update, and can skip it to lower the load on mongo
if (!bucketInfo.lifecycleConfiguration && !bucketInfo.notificationConfiguration) {
options.doesNotNeedOpogUpdate = true;
}

return this._mongoClient.deleteObject(bucket, key, options, log, cb);
},
Expand Down
56 changes: 46 additions & 10 deletions tests/functional/ingestion/MongoQueueProcessor.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ const authdata = require('../../../conf/authdata.json');
const ObjectQueueEntry = require('../../../lib/models/ObjectQueueEntry');
const DeleteOpQueueEntry = require('../../../lib/models/DeleteOpQueueEntry');
const fakeLogger = require('../../utils/fakeLogger');
const { ObjectMDArchive } = require('arsenal/build/lib/models');
const { ObjectMDArchive, LifecycleConfiguration, NotificationConfiguration } = require('arsenal/build/lib/models');

const kafkaConfig = config.kafka;
const mongoProcessorConfig = config.extensions.mongoProcessor;
Expand Down Expand Up @@ -829,29 +829,59 @@ describe('MongoQueueProcessor', function mqp() {
});

describe('::_processDeleteOpQueueEntry', () => {
it('should delete an existing versioned object from mongo', done => {
[
{
title: '',
patchBucketInfo: (bucketInfo, next) => next(null, bucketInfo),
options: { doesNotNeedOpogUpdate: true, versionId: VERSION_ID },
},
{
title: ' with bucket notification',
patchBucketInfo: (bucketInfo, next) => next(null, {
...bucketInfo,
notificationConfiguration: new NotificationConfiguration(),
}),
options: { versionId: VERSION_ID },
},
{
title: ' with lifecycle configuration',
patchBucketInfo: (bucketInfo, next) => next(null, {
...bucketInfo,
lifecycleConfiguration: new LifecycleConfiguration(null, { replicationEndpoints: [] }),
}),
options: { versionId: VERSION_ID },
},
].forEach(({
title, patchBucketInfo, options,
}) => it(`should delete an existing versioned object from mongo${title}`, done => {
// use existing version id
const versionKey = `${KEY}${VID_SEP}${VERSION_ID}`;
const objmd = new ObjectMD()
.setKey(KEY)
.setVersionId(VERSION_ID)
.setDataStoreName(LOCATION);
const entry = new ObjectQueueEntry(BUCKET, versionKey, objmd);
const deleteObject = sinon.stub(mongoClient, 'deleteObject').callThrough();
async.waterfall([
next => mongoClient.getBucketAttributes(BUCKET, fakeLogger,
next),
patchBucketInfo,
(bucketInfo, next) => mqp._processDeleteOpQueueEntry(fakeLogger,
entry, LOCATION, next),
entry, LOCATION, bucketInfo, next),
], err => {
assert.ifError(err);

const deleted = mqp.getDeleted();
assert.strictEqual(deleted.length, 1);
assert.strictEqual(deleted[0].key, KEY);
assert.strictEqual(deleted[0].versionId, VERSION_ID);

sinon.assert.calledOnce(deleteObject);
assert.deepStrictEqual(deleteObject.getCall(0).args[2], options);

done();
});
});
}));

it('should delete an existing non versioned object from mongo', done => {
const objmd = new ObjectMD()
Expand All @@ -869,7 +899,7 @@ describe('MongoQueueProcessor', function mqp() {
next => mongoClient.getBucketAttributes(BUCKET, fakeLogger,
next),
(bucketInfo, next) => mqp._processDeleteOpQueueEntry(fakeLogger,
entry, LOCATION, next),
entry, LOCATION, bucketInfo, next),
], err => {
assert.ifError(err);

Expand Down Expand Up @@ -900,7 +930,7 @@ describe('MongoQueueProcessor', function mqp() {
next => mongoClient.getBucketAttributes(BUCKET, fakeLogger,
next),
(bucketInfo, next) => mqp._processDeleteOpQueueEntry(fakeLogger,
entry, LOCATION, next),
entry, LOCATION, bucketInfo, next),
], err => {
assert.ifError(err);

Expand All @@ -922,14 +952,17 @@ describe('MongoQueueProcessor', function mqp() {
async.waterfall([
next => mongoClient.getBucketAttributes(BUCKET, fakeLogger, next),
(bucketInfo, next) => mqp._processDeleteOpQueueEntry(fakeLogger,
entry, LOCATION, next),
entry, LOCATION, bucketInfo, next),
], err => {
assert.ifError(err);

assert.ok(deleteObject.calledOnce);
assert.strictEqual(deleteObject.getCall(0).args[0], BUCKET);
assert.strictEqual(deleteObject.getCall(0).args[1], KEY);
assert.strictEqual(deleteObject.getCall(0).args[2].versionId, VERSION_ID);
assert.deepStrictEqual(deleteObject.getCall(0).args[2], {
doesNotNeedOpogUpdate: true,
versionId: VERSION_ID
});

done();
});
Expand All @@ -947,14 +980,17 @@ describe('MongoQueueProcessor', function mqp() {
async.waterfall([
next => mongoClient.getBucketAttributes(BUCKET, fakeLogger, next),
(bucketInfo, next) => mqp._processDeleteOpQueueEntry(fakeLogger,
entry, LOCATION, next),
entry, LOCATION, bucketInfo, next),
], err => {
assert.ok(err?.is?.InternalError);

assert.ok(deleteObject.calledOnce);
assert.strictEqual(deleteObject.getCall(0).args[0], BUCKET);
assert.strictEqual(deleteObject.getCall(0).args[1], KEY);
assert.strictEqual(deleteObject.getCall(0).args[2].versionId, VERSION_ID);
assert.deepStrictEqual(deleteObject.getCall(0).args[2], {
doesNotNeedOpogUpdate: true,
versionId: VERSION_ID,
});

done();
});
Expand Down

0 comments on commit 2b0706a

Please sign in to comment.